Internet Speedtest

Sockets, TCP/IP, (XML-)RPC und ähnliche Themen gehören in dieses Forum
BlackJack

@Nr8: `subprocess`, `sys`, und `os` werden importiert, aber überhaupt nicht verwendet. Sowie der fast alles was mit dem *-Import aus `gps` importiert wird. Sternchen-Importe sind Böse™. Im besten Fall machen sie Programme schlechter nachvollziebar, im schlechteren Fall hat man Namenskollisionen.

`lat`, `lon`, und `speed` am Ende der Schleife an 0 zu binden hat keinen Effekt, weil diese Werte nie benutzt werden.

Ungetestet:

Code: Alles auswählen

#!/usr/bin/env python
# coding: utf8
from __future__ import absolute_import, division, print_function
import time
import urllib
from datetime import datetime as DateTime
from gps import gps, WATCH_ENABLE, WATCH_NEWSTYLE

SPEED_TEST_URL = 'http://www.speedtestx.de/testfiles/data_500mb.test'


def get_latest_position(gps_session, previous_position):
    position = previous_position
    while gps_session.waiting():
        data = next(gps_session)
        if 'lat' in data:
            position = (data['lat'], data['lon'], data['speed'])
    return position
 
   
def measure(url=SPEED_TEST_URL, intervall=2, size=10):
    response = urllib.urlopen(url)
    for _ in range(1):
        time_start = time_end = time.time()
        amount = 0
 
        while time_end - time_start < intervall:
            bytes_read = len(response.read(size))
            if not bytes_read:
                return
            time_end = time.time()
            amount += bytes_read
        yield amount, time_end - time_start
  
 
def main():
    session = gps()
    session.stream(WATCH_ENABLE | WATCH_NEWSTYLE)
    try:
        for amount, time_delta in measure(size=1000, intervall=2):
            # 
            # FIXME Sehr wahrscheinlich soll hier nicht immer (0, 0, 0)
            #   übergeben werden.
            # 
            lat_lon_speed = get_latest_position(session, (0, 0, 0))
            lat, lon, speed = lat_lon_speed
            print(
                ';'.join(
                    [
                        format(DateTime.now(), '%H:%M:%S'),
                        format(amount / time_delta / 1024, '.3f'),
                        str(lat),
                        str(lon),
                        str(speed),
                    ]
                )
            )
            time.sleep(2)
    except (IndexError, IOError):
        return


if __name__ == '__main__':
    main()
 
Nr8
User
Beiträge: 42
Registriert: Mittwoch 21. Dezember 2016, 08:43

Vielen Dank!!!
Nr8
User
Beiträge: 42
Registriert: Mittwoch 21. Dezember 2016, 08:43

Wenn ich das jetzt aus einem bash starte und in einer Dauerschleife laufen lasse bekomme ich nach einiger Zeit keine GPS-Daten mehr und sie GPS-Maus ist über lsusb nicht mehr sichtbar und der gesamte USB-Port ist bis zum reboot abgeschaltet.

Das Skript wird in der Dauerschleife ständig neu gestartet. Ist das der Fehler?
Benutzeravatar
noisefloor
User
Beiträge: 3843
Registriert: Mittwoch 17. Oktober 2007, 21:40
Wohnort: WW
Kontaktdaten:

Hallo,
Das Skript wird in der Dauerschleife ständig neu gestartet. Ist das der Fehler?
Wahrscheinlich / vermutlich: ja. Programmiertechnisch ist das aber so wie so nicht weiter sinnvoll - warum legst du die Schleife nicht in den Skript?

Gruß, noisefloor
Nr8
User
Beiträge: 42
Registriert: Mittwoch 21. Dezember 2016, 08:43

Natürlich, danke für die Hilfe.
Nr8
User
Beiträge: 42
Registriert: Mittwoch 21. Dezember 2016, 08:43

Kann ich mir die Datenraten auch irgendwie in der def measure() berechnen lassen und in der main nur den Rückgabewert der Funktion erhalten?

Ich möchte wenn ich keinen Empfang habe nur die Positionsdaten und 0kbit/s ausgeben.
Habe es so probiert klappt irgendwie nicht.

Code: Alles auswählen

def internet_on():
	try:
                url = socket.gethostbyname("www.speedtestx.de")
                s = socket.create_connection((url, 80), 2)
                return True
        except:
                return False
                
                
def measure(url= 'http://www.speedtestx.de/testfiles/data_500mb.test', intervall=2, size=10):
        while True:      
                response = urllib.urlopen(url)
                internet = internet_on()
                for _ in range(2):
                    time_start = time_end = time.time()
                    amount = 0
                    if internet == True:
                            while time_end - time_start < intervall:
                                    bytes_read = len(response.read(size))
                                    if not bytes_read:
                                            return
                                    time_end = time.time()
                                    amount += bytes_read
                            yield amount, time_end - time_start
                    elif internet == False:
                            amount = 0
                            time_end = 2
                            time_start = 1
                            time.sleep(1)
                            yield amount, time_end -time_start
	
Zuletzt geändert von Anonymous am Freitag 13. Januar 2017, 12:24, insgesamt 1-mal geändert.
Grund: Quelltext in Python-Codebox-Tags gesetzt.
Sirius3
User
Beiträge: 17711
Registriert: Sonntag 21. Oktober 2012, 17:20

@Nr8: Deine Einrückung ist kaputt. Außerdem wird immer mit 4 Leerzeichen pro Ebene eingerückt. Nackte excepts sind böse, weil so auch Tippfehler bei Variablennamen überdeckt werden und Du nur sehr schwer Fehler entdecken kannst. Überleg Dir genau, welche Fehler auftreten können und wie Du mit diesen umgehen willst. Was soll diese Funktion internet_on überhaupt bewirken? Eine Zeile davor versuchst Du mit urlopen die selbe Seite zu erreichen, sollte es da kein Internet geben, fliegst Du sowieso mit einem IOError aus der Funktion raus. Das einfachste wäre wohl, den Speed-Test in einem eigenen Thread laufen zu lassen.
Nr8
User
Beiträge: 42
Registriert: Mittwoch 21. Dezember 2016, 08:43

Hab das jetzt so geändert:

Code: Alles auswählen

def measure(url= 'http://www.speedtestx.de/testfiles/data_500mb.test', intervall=2, size=10):
        while True:
                try:
                        response = urllib.urlopen(url)
                        for _ in range(1):
                                time_start = time_end = time.time()
                                amount = 0
                                while time_end - time_start < intervall:
                                        bytes_read = len(response.read(size))
                                        if not bytes_read:
                                                return
                                        time_end = time.time()
                                        amount += bytes_read
                          
                except:
                        subprocess.call("./Verbindungsaufbau1")
                        amount = 0
                        time_end = 2
                        time_start = 1
                        time.sleep(1)
                yield amount, time_end -time_start
BlackJack

@Nr8: Ein nacktes ``except:`` ohne konkrete Ausnahme(n) ist keine gute Idee. Man behandelt nur Ausnahmen die man auch erwartet, denn sonst weiss man ja gar nicht, ob die Behandlung überhaupt zur Ausnahme passt und hat auch eine Chance Ausnahmen mitzubekommen mit denen man nicht gerechnet hat.

Was ist der Sinn der ``for``-Schleife in dieser Funktion?
Nr8
User
Beiträge: 42
Registriert: Mittwoch 21. Dezember 2016, 08:43

Also so besser???

Code: Alles auswählen

    def measure(url= 'http://www.speedtestx.de/testfiles/data_500mb.test', intervall=2, size=10):
            while True:
                    try:
                            response = urllib.urlopen(url)
                            time_start = time_end = time.time()
                            amount = 0
                            while time_end - time_start < intervall:
                                        bytes_read = len(response.read(size))
                                        if not bytes_read:
                                                return
                                        time_end = time.time()
                                        amount += bytes_read
                             
                    except IOError:
                            subprocess.call("./Verbindungsaufbau1")
                            amount = 0
                            time_end = 2
                            time_start = 1
                            time.sleep(1)
                    yield amount, time_end -time_start

Nr8
User
Beiträge: 42
Registriert: Mittwoch 21. Dezember 2016, 08:43

Hallo!

Wie kann ich die Ausgabe von Print zusätzlich in eine Datei schreiben???

Code: Alles auswählen

            
            print(
                ';'.join(
                    [
                        format(DateTime.now(), '%H:%M:%S'),
                        format(amount / time_delta / 1024, '.3f'),
                        str(lat),
                        str(lon),
                        str(speed),
                    ]
                )
            )
Nr8
User
Beiträge: 42
Registriert: Mittwoch 21. Dezember 2016, 08:43

Hab es schon.


:D
Danke!
Nr8
User
Beiträge: 42
Registriert: Mittwoch 21. Dezember 2016, 08:43

Hallo!

Ich hab mal wieder eine Frage.
Ich habe folgenden Code und die Ausgabe wechselt leider nur von LTE auf WCDMA aber nicht mehr auf LTE.

Kann es sein, dass der Port das blockiert?
wenn ich es mit bash über

sudo cat /dev/ttyUSB1 | grep HCSQ &

while true
do
sudo echo "AT^HCSQ?" > /dev/ttyUSB1
sleep 2
done

mache geht es.



[codebox=pycon file=Unbenannt.txt]
usb = serial.Serial()
usb.port = "/dev/ttyUSB1"
usb.baudrate = 9600
usb.bytesize = serial.EIGHTBITS
usb.parity = serial.PARITY_NONE
usb.stopbits = serial.STOPBITS_ONE
usb.timeout = 0
usb.xonxoff = False
usb.rtscts = False
usb.dsrdtr = False
usb.writeTimeout = 2



def doSignal():
i =0
usb.flushInput()
usb.flushOutput()
zeilen = 0

while True:
usb.write("AT^HCSQ?\x0D")
time.sleep(2)
HCSQ = usb.readline()
zeilen = zeilen + 1

if (zeilen >= 2):
break
a = HCSQ.translate(None, ':')
a = a.translate(None, '^')
a = a.translate(None, '"')
a = a.translate(None, 'HCSQ')
a = a.translate(None, '\r\n')
b = a.split(",")
signalst = tuple(b)

return signalst


[/code]
Sirius3
User
Beiträge: 17711
Registriert: Sonntag 21. Oktober 2012, 17:20

@Nr8: Deine Einrückung ist kaputt. Wenn Du etwas zwei mal machen willst, dann nimm eine for-Schleife. Bis Du Dir über das Zeile-Ende-Zeichen sicher? if braucht keine Klammern. .translate ist die falsche Methode um einen String zu parsen.
Nr8
User
Beiträge: 42
Registriert: Mittwoch 21. Dezember 2016, 08:43

Das mit dem Einrücken ist nur hier passiert.

Was soll ich denn Anstelle von .translate verwenden?

Meinst du die for schleife für die while True: ?
Ich möchte es halt alle paar Sekunden abrufen
Sirius3
User
Beiträge: 17711
Registriert: Sonntag 21. Oktober 2012, 17:20

@Nr8: das mit dem Einrücken passiert hier, weil Du Tabs mit Spaces gemischt hast. Das sollte nicht sein, weil es, wie Du siehst, nur Probleme macht. Jeder vernünftige Editor bietet Dir die Möglichkeit einmalig Tabs in Spaces umzuwandeln und danach nur noch mit Spaces einzurücken, damit solche Fehler nicht passieren.

Statt irgendwelche Zeichen aus einem String rauszulöschen und zu hoffen, dass dadurch nicht der wichtige Inhalt kaputt geht, solltest Du den String parsen. Da das eine Komma-separierte Liste ist, sollte das nicht all zu schwierig sein.
Nr8
User
Beiträge: 42
Registriert: Mittwoch 21. Dezember 2016, 08:43

Gibt es python etwas wie grep?

Wenn ich also response = ser.readline() nach etwas greppen möchte???

Vielen Dank!
BlackJack

@Nr8: Den ``in``-Operator auf Zeichenketten für einfache Tests auf statische Teilzeichenketten oder das `re`-Modul für reguläre Ausdrücke.
Nr8
User
Beiträge: 42
Registriert: Mittwoch 21. Dezember 2016, 08:43

Habe den Code jetzt etwas geändert.

Leider bekomme ich noch nicht das gewünschte Ergebniss.

response="" wollte ich testen ob ich alle paar sekunden neue Werte von HCSQ bekomme und das ist leider nicht so.

Sendet ser.write("AT^HCSQ?\x0D") nicht alle paar Sekunden eine Abfrage?

Code: Alles auswählen

ser = serial.Serial()
ser.port = "/dev/ttyUSB1"
ser.baudrate = 9600
ser.bytesize = serial.EIGHTBITS 
ser.parity = serial.PARITY_NONE
ser.stopbits = serial.STOPBITS_ONE
ser.timeout = 0            
ser.xonxoff = False    
ser.rtscts = False    
ser.dsrdtr = False      
ser.writeTimeout = 2    

def doSignal():
    ser.flushInput() #flush input buffer, discarding all its contents
    ser.flushOutput()#flush output buffer, aborting current output

    
    while True:
        response = ser.readline().rstrip()
        ser.write("AT^HCSQ?\x0D")
        time.sleep(1)  #give the serial port sometime to receive the data
        for HCSQ in response:  
            signalst =  response.split(',')
            verbindung = signalst[0]

            if (verbindung == '^HCSQ:"WCDMA"'):
                rssi = int(signalst[1])-120.5
                rsrp = int(signalst[2])-120.5
                sinr = int(signalst[3])
                sinr = (0.5*sinr)-32
                rsrq = 0
                verbindung = "WCDMA"
                subprocess.call("./Verbindungsaufbau1")
  
            elif (verbindung == '^HCSQ:"LTE"'):
                rssi = int(signalst[1])-120.5
                rsrp = int(signalst[2])-140.5
                sinr = int(signalst[3])
                sinr = (0.2*sinr)-20.1
                rsrq = (int(signalst[4])*0.5)-19.75
                verbindung = "LTE"
                
            elif (verbindung == '^HCSQ:"GSM"'):
                rssi = int(signalst[1])-120.5
                rsrp = 0
                sinr = 0
                sinr = 0
                rsrq = 0
                verbindung = "GSM"
                subprocess.call("./Verbindungsaufbau1")
            
            time.sleep(2)
            datei = (format(DateTime.now(), '%H:%M:%S'),str(rssi), str(rsrp), str(sinr), str(rsrq), str(verbindung))
            print (response)
            print (";".join(datei))
            fout.write(";".join(datei)+"\n")
            #print (verbindung)
            response=""

try: 
	ser.open()

except Exception, e:
	print "error open serial port: " + str(e)
	exit()

if ser.isOpen():
	while True:
		doSignal()
		time.sleep(0.1)


Nr8
User
Beiträge: 42
Registriert: Mittwoch 21. Dezember 2016, 08:43

Mit dem bash Code geht es.

[codebox=bash file=Unbenannt.bsh]
#!/bin/bash

sudo cat /dev/ttyUSB1 | grep HCSQ &

while true
do
sudo echo "AT^HCSQ?" >/dev/ttyUSB1
sleep 2
done
[/code]
Antworten