Internet Speedtest

Sockets, TCP/IP, (XML-)RPC und ähnliche Themen gehören in dieses Forum
BlackJack

Donnerstag 5. Januar 2017, 11:34

@Nr8: `subprocess`, `sys`, und `os` werden importiert, aber überhaupt nicht verwendet. Sowie der fast alles was mit dem *-Import aus `gps` importiert wird. Sternchen-Importe sind Böse™. Im besten Fall machen sie Programme schlechter nachvollziebar, im schlechteren Fall hat man Namenskollisionen.

`lat`, `lon`, und `speed` am Ende der Schleife an 0 zu binden hat keinen Effekt, weil diese Werte nie benutzt werden.

Ungetestet:

Code: Alles auswählen

#!/usr/bin/env python
# coding: utf8
from __future__ import absolute_import, division, print_function
import time
import urllib
from datetime import datetime as DateTime
from gps import gps, WATCH_ENABLE, WATCH_NEWSTYLE

SPEED_TEST_URL = 'http://www.speedtestx.de/testfiles/data_500mb.test'


def get_latest_position(gps_session, previous_position):
    position = previous_position
    while gps_session.waiting():
        data = next(gps_session)
        if 'lat' in data:
            position = (data['lat'], data['lon'], data['speed'])
    return position
 
   
def measure(url=SPEED_TEST_URL, intervall=2, size=10):
    response = urllib.urlopen(url)
    for _ in range(1):
        time_start = time_end = time.time()
        amount = 0
 
        while time_end - time_start < intervall:
            bytes_read = len(response.read(size))
            if not bytes_read:
                return
            time_end = time.time()
            amount += bytes_read
        yield amount, time_end - time_start
  
 
def main():
    session = gps()
    session.stream(WATCH_ENABLE | WATCH_NEWSTYLE)
    try:
        for amount, time_delta in measure(size=1000, intervall=2):
            # 
            # FIXME Sehr wahrscheinlich soll hier nicht immer (0, 0, 0)
            #   übergeben werden.
            # 
            lat_lon_speed = get_latest_position(session, (0, 0, 0))
            lat, lon, speed = lat_lon_speed
            print(
                ';'.join(
                    [
                        format(DateTime.now(), '%H:%M:%S'),
                        format(amount / time_delta / 1024, '.3f'),
                        str(lat),
                        str(lon),
                        str(speed),
                    ]
                )
            )
            time.sleep(2)
    except (IndexError, IOError):
        return


if __name__ == '__main__':
    main()
 
Nr8
User
Beiträge: 42
Registriert: Mittwoch 21. Dezember 2016, 08:43

Dienstag 10. Januar 2017, 07:28

Vielen Dank!!!
Nr8
User
Beiträge: 42
Registriert: Mittwoch 21. Dezember 2016, 08:43

Dienstag 10. Januar 2017, 14:39

Wenn ich das jetzt aus einem bash starte und in einer Dauerschleife laufen lasse bekomme ich nach einiger Zeit keine GPS-Daten mehr und sie GPS-Maus ist über lsusb nicht mehr sichtbar und der gesamte USB-Port ist bis zum reboot abgeschaltet.

Das Skript wird in der Dauerschleife ständig neu gestartet. Ist das der Fehler?
Benutzeravatar
noisefloor
User
Beiträge: 2387
Registriert: Mittwoch 17. Oktober 2007, 21:40
Wohnort: Görgeshausen
Kontaktdaten:

Dienstag 10. Januar 2017, 15:44

Hallo,
Das Skript wird in der Dauerschleife ständig neu gestartet. Ist das der Fehler?
Wahrscheinlich / vermutlich: ja. Programmiertechnisch ist das aber so wie so nicht weiter sinnvoll - warum legst du die Schleife nicht in den Skript?

Gruß, noisefloor
Nr8
User
Beiträge: 42
Registriert: Mittwoch 21. Dezember 2016, 08:43

Mittwoch 11. Januar 2017, 07:23

Natürlich, danke für die Hilfe.
Nr8
User
Beiträge: 42
Registriert: Mittwoch 21. Dezember 2016, 08:43

Freitag 13. Januar 2017, 07:52

Kann ich mir die Datenraten auch irgendwie in der def measure() berechnen lassen und in der main nur den Rückgabewert der Funktion erhalten?

Ich möchte wenn ich keinen Empfang habe nur die Positionsdaten und 0kbit/s ausgeben.
Habe es so probiert klappt irgendwie nicht.

Code: Alles auswählen

def internet_on():
	try:
                url = socket.gethostbyname("www.speedtestx.de")
                s = socket.create_connection((url, 80), 2)
                return True
        except:
                return False
                
                
def measure(url= 'http://www.speedtestx.de/testfiles/data_500mb.test', intervall=2, size=10):
        while True:      
                response = urllib.urlopen(url)
                internet = internet_on()
                for _ in range(2):
                    time_start = time_end = time.time()
                    amount = 0
                    if internet == True:
                            while time_end - time_start < intervall:
                                    bytes_read = len(response.read(size))
                                    if not bytes_read:
                                            return
                                    time_end = time.time()
                                    amount += bytes_read
                            yield amount, time_end - time_start
                    elif internet == False:
                            amount = 0
                            time_end = 2
                            time_start = 1
                            time.sleep(1)
                            yield amount, time_end -time_start
	
Zuletzt geändert von Anonymous am Freitag 13. Januar 2017, 12:24, insgesamt 1-mal geändert.
Grund: Quelltext in Python-Codebox-Tags gesetzt.
Sirius3
User
Beiträge: 7779
Registriert: Sonntag 21. Oktober 2012, 17:20

Samstag 14. Januar 2017, 09:37

@Nr8: Deine Einrückung ist kaputt. Außerdem wird immer mit 4 Leerzeichen pro Ebene eingerückt. Nackte excepts sind böse, weil so auch Tippfehler bei Variablennamen überdeckt werden und Du nur sehr schwer Fehler entdecken kannst. Überleg Dir genau, welche Fehler auftreten können und wie Du mit diesen umgehen willst. Was soll diese Funktion internet_on überhaupt bewirken? Eine Zeile davor versuchst Du mit urlopen die selbe Seite zu erreichen, sollte es da kein Internet geben, fliegst Du sowieso mit einem IOError aus der Funktion raus. Das einfachste wäre wohl, den Speed-Test in einem eigenen Thread laufen zu lassen.
Nr8
User
Beiträge: 42
Registriert: Mittwoch 21. Dezember 2016, 08:43

Dienstag 17. Januar 2017, 08:10

Hab das jetzt so geändert:

Code: Alles auswählen

def measure(url= 'http://www.speedtestx.de/testfiles/data_500mb.test', intervall=2, size=10):
        while True:
                try:
                        response = urllib.urlopen(url)
                        for _ in range(1):
                                time_start = time_end = time.time()
                                amount = 0
                                while time_end - time_start < intervall:
                                        bytes_read = len(response.read(size))
                                        if not bytes_read:
                                                return
                                        time_end = time.time()
                                        amount += bytes_read
                          
                except:
                        subprocess.call("./Verbindungsaufbau1")
                        amount = 0
                        time_end = 2
                        time_start = 1
                        time.sleep(1)
                yield amount, time_end -time_start
BlackJack

Dienstag 17. Januar 2017, 09:36

@Nr8: Ein nacktes ``except:`` ohne konkrete Ausnahme(n) ist keine gute Idee. Man behandelt nur Ausnahmen die man auch erwartet, denn sonst weiss man ja gar nicht, ob die Behandlung überhaupt zur Ausnahme passt und hat auch eine Chance Ausnahmen mitzubekommen mit denen man nicht gerechnet hat.

Was ist der Sinn der ``for``-Schleife in dieser Funktion?
Nr8
User
Beiträge: 42
Registriert: Mittwoch 21. Dezember 2016, 08:43

Dienstag 17. Januar 2017, 10:42

Also so besser???

Code: Alles auswählen

    def measure(url= 'http://www.speedtestx.de/testfiles/data_500mb.test', intervall=2, size=10):
            while True:
                    try:
                            response = urllib.urlopen(url)
                            time_start = time_end = time.time()
                            amount = 0
                            while time_end - time_start < intervall:
                                        bytes_read = len(response.read(size))
                                        if not bytes_read:
                                                return
                                        time_end = time.time()
                                        amount += bytes_read
                             
                    except IOError:
                            subprocess.call("./Verbindungsaufbau1")
                            amount = 0
                            time_end = 2
                            time_start = 1
                            time.sleep(1)
                    yield amount, time_end -time_start

Nr8
User
Beiträge: 42
Registriert: Mittwoch 21. Dezember 2016, 08:43

Donnerstag 19. Januar 2017, 07:11

Hallo!

Wie kann ich die Ausgabe von Print zusätzlich in eine Datei schreiben???

Code: Alles auswählen

            
            print(
                ';'.join(
                    [
                        format(DateTime.now(), '%H:%M:%S'),
                        format(amount / time_delta / 1024, '.3f'),
                        str(lat),
                        str(lon),
                        str(speed),
                    ]
                )
            )
Nr8
User
Beiträge: 42
Registriert: Mittwoch 21. Dezember 2016, 08:43

Donnerstag 19. Januar 2017, 07:57

Hab es schon.


:D
Danke!
Nr8
User
Beiträge: 42
Registriert: Mittwoch 21. Dezember 2016, 08:43

Montag 23. Januar 2017, 09:35

Hallo!

Ich hab mal wieder eine Frage.
Ich habe folgenden Code und die Ausgabe wechselt leider nur von LTE auf WCDMA aber nicht mehr auf LTE.

Kann es sein, dass der Port das blockiert?
wenn ich es mit bash über

sudo cat /dev/ttyUSB1 | grep HCSQ &

while true
do
sudo echo "AT^HCSQ?" > /dev/ttyUSB1
sleep 2
done

mache geht es.



[codebox=pycon file=Unbenannt.txt]
usb = serial.Serial()
usb.port = "/dev/ttyUSB1"
usb.baudrate = 9600
usb.bytesize = serial.EIGHTBITS
usb.parity = serial.PARITY_NONE
usb.stopbits = serial.STOPBITS_ONE
usb.timeout = 0
usb.xonxoff = False
usb.rtscts = False
usb.dsrdtr = False
usb.writeTimeout = 2



def doSignal():
i =0
usb.flushInput()
usb.flushOutput()
zeilen = 0

while True:
usb.write("AT^HCSQ?\x0D")
time.sleep(2)
HCSQ = usb.readline()
zeilen = zeilen + 1

if (zeilen >= 2):
break
a = HCSQ.translate(None, ':')
a = a.translate(None, '^')
a = a.translate(None, '"')
a = a.translate(None, 'HCSQ')
a = a.translate(None, '\r\n')
b = a.split(",")
signalst = tuple(b)

return signalst


[/code]
Sirius3
User
Beiträge: 7779
Registriert: Sonntag 21. Oktober 2012, 17:20

Montag 23. Januar 2017, 09:56

@Nr8: Deine Einrückung ist kaputt. Wenn Du etwas zwei mal machen willst, dann nimm eine for-Schleife. Bis Du Dir über das Zeile-Ende-Zeichen sicher? if braucht keine Klammern. .translate ist die falsche Methode um einen String zu parsen.
Nr8
User
Beiträge: 42
Registriert: Mittwoch 21. Dezember 2016, 08:43

Montag 23. Januar 2017, 10:09

Das mit dem Einrücken ist nur hier passiert.

Was soll ich denn Anstelle von .translate verwenden?

Meinst du die for schleife für die while True: ?
Ich möchte es halt alle paar Sekunden abrufen
Antworten