Benötige Hilfe bei Threading und socket recv

Sockets, TCP/IP, (XML-)RPC und ähnliche Themen gehören in dieses Forum
Antworten
opccorsa
User
Beiträge: 34
Registriert: Samstag 20. Juli 2013, 07:43

Hallo Leute, ich komme mal wieder nicht weiter mit meinem Programm. Es geht um das Empfangen über Netzwerk. Ich habe eine Hauptschleife die zyklisch durchläuft und unterschiedliche Funktionen ausführt. Nebenbei muss ich noch auf einen geöffneten Port hören und wenn dort Daten ankommen diese weiterverarbeiten. Ich habe mit einem ganz einfachen Beispiel angefangen aber ich bekomme die Recv Methode nicht so hin. Erst habe ich es mit Thread's versucht.

Code: Alles auswählen

 
import time
from threading import Thread
import socket, pickle

BIND_IP = '0.0.0.0'
BIND_PORT = 50000

def init(port): # Init den Socket
    global conn, addr, data# Variablen Global für NetService.py
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.bind(('', port)) # Öffne Port
    print("Wait of Client ...") # Debugging
    s.listen(1)
    client, addr = s.accept()
    return client, addr

def network_recv(conn):
    try:
        data = conn.recv(1024)
        if not data: return None
        print("Komplett: ", data)
    except (socket.timeout) as error:
        print("Fehler net recv ...", error) # Debugging
        return None 

client, addr = init(BIND_PORT)
while True: 
    t = Thread(target=network_recv, args=(client,))
    t.start()
    print("Schleife")
    time.sleep(1)
    


Damit läuft er irgendwann über und bringt mir die Fehlermeldung "can't start new thread". Wenn ich es ohne mache bleibt er bei recv stehen und wartet auf die "1024". Gibt es da nicht eine Lösung außer mit "select" und "fork" oder so?

Lieben Dank erstmal schon für Eure Bemühungen.
Grüße :D
__deets__
User
Beiträge: 14493
Registriert: Mittwoch 14. Oktober 2015, 14:29

Mit dem Code erzeugst du vollkommen unnötig neue Threads. Wenn reicht einer. Und konzeptionell ist da noch etwas aufzuarbeiten: ein Thread kann nichts returnen(*). Wo sollten denn die Rückgabe landen? Um Daten aus einem Thread zu bekommen betraf es einer geeigneten Daten Struktur, zb eine Queue. Die du periodisch in deiner Schleife abfragst.

Allerdings ist das von dir ausgeschlossene select durchaus ein gangbarer Weg, ob wirklich hängt von dem ab, was das Programm sonst macht.
opccorsa
User
Beiträge: 34
Registriert: Samstag 20. Juli 2013, 07:43

Ok. Danke erstmal für deine schnelle Antwort. Dann würde ich mich mal in beide Lösungen einarbeiten. Mal eine Seite suchen die einfache Beispiele hat. Hatte bis jetzt nur sehr komplexe Beispiele gefunden. Danke. LG
Sirius3
User
Beiträge: 17711
Registriert: Sonntag 21. Oktober 2012, 17:20

Vergiss gleich wieder, dass es überhaupt `global` gibt. Da werden auch Variablen benutzt, die gar nicht definiert werden.
In `network_recv` wird recv wahrscheinlich falsch benutzt.
In Deiner "Hauptschleife" werden gar nicht unterschiedliche Funktionen aufgerufen, was das Beispiel untauglich macht. Man kann nur sagen, so funktioniert es nicht. Aber wie man es statt dessen macht, kann man nur sagen, wenn Du zeigst, was Du wirklich machen willst.
opccorsa
User
Beiträge: 34
Registriert: Samstag 20. Juli 2013, 07:43

So Danke für die Hilfe. Ich habe es erstmal lauffähig bekommen wie ich es wollte. Denke mal da komme ich erstmal weiter falls es hier keine Verbesserungsvorschläge geben sollte. Die ich auch gerne annehmen würde.

Code: Alles auswählen

import time
from threading import Thread
import socket
import queue
import sys

BIND_IP = '0.0.0.0'
BIND_PORT = 50000
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
task = ""

def init(port): # Init den Socket
    s.bind(('', port)) # Öffne Port
    print("Wait of Client ...") # Debugging
    s.listen(1)
    client, addr = s.accept()
    return client, addr

def network_recv(conn):
    while True:
        try:
            data = conn.recv(1024)
            q.put((data))
            if not data: break
        except (socket.timeout) as error:
            print("Fehler net recv ...", error) # Debugging
            return None 

if __name__ == '__main__':
    q = queue.Queue()
    client, addr = init(BIND_PORT)
    t = Thread(target=network_recv, args=(client,))
    t.start()
    
    try:
        while True: 
            print("Schleife")
            try:
                task=q.get(False)   #Opt 1: Handle task here and call q.task_done()
                print("Recv in Schleife: ", task)
            except queue.Empty : #Handle empty queue here
                pass
            if task == b'':
                raise RuntimeError("socket connection broken")
            time.sleep(1)
    except RuntimeError:
        s.close()
        sys.exit()
Das komplette Projekt würde ich euch auch gerne hier zeigen aber ich denke mal das wäre zu Umfangreich.

Liebe Grüße
Sirius3
User
Beiträge: 17711
Registriert: Sonntag 21. Oktober 2012, 17:20

`s` ist eine globale Variable, die nicht existieren sollte. `s` ist auch ein extrem schlechter Name für den server_socket.
`init` initialisiert nichts, sondern wartet auf eine Verbindung. Der Funktionsname ist verwirrend.
`q` ist wieder eine globale Variable und kommt in network_recv aus dem Nichts. Die Funktion wird in einem Thread aufgerufen, ein Rückgabewert macht da keinen Sinn.
`t` ist auch ein schlechter Name.
task wird ein einen String gebunden und dann mit Bytes verglichen? Sieht falsch aus.
Das sys.exit ist überflüssig und kann (muß) weg.
Alles unter if __name__ sollte in eine Funktion mit Namen main wandern.
Du bekommst gar nicht mit, ob die Verbindung zum Client wirklich abgebrochen wurde, oder einfach nur langsam ist. Das 1 Sekunde Warten sieht auch nicht sehr robust aus. Entweder kommen die Daten zu schnell, dann läuft die Queue voll, oder zu langsam, dann gibt es einen Fehler.
Mit `task` wird nichts sinnvolles gemacht. Das kann, wie oben schon geschrieben irgendetwas zwischen 1 und 1024 Bytes sein. Robuste Socket-Programmierung sieht anders aus.
Was schickt denn der Client und was soll damit passieren?

Code: Alles auswählen

import time
from threading import Thread
import socket
import queue

BIND_IP = '0.0.0.0'
BIND_PORT = 50000
CONNECTION_CLOSED = "connection_closed"

def wait_for_client(port):
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server_socket.bind(('', port)) # Öffne Port
    print("Wait of Client ...") # Debugging
    server_socket.listen(1)
    client, addr = server_socket.accept()
    server_socket.close()
    return client, addr

def network_recv(queue, conn):
    try:
        while True:
            # TODO: read one message and not only one byte.
            # Die 1 ist Absicht, um klar zu machen, dass hier noch was fehlt.
            data = conn.recv(1)
            queue.put((data))
            if not data:
                break
    except socket.timeout as error:
        # TODO: better error handling
        print("Fehler net recv ...", error)
    queue.put(CONNECTION_CLOSED)

def main():
    task_queue = queue.Queue()
    client, addr = wait_for_client(BIND_PORT)
    Thread(target=network_recv, args=(task_queue, client), daemon=True).start()
    
    while True: 
        print("Schleife")
        try:
            task = task_queue.get(False)
            print("Recv in Schleife: ", task)
        except queue.Empty:
            # TODO: was soll hier passieren
            time.sleep(1)
        else:
            if task == CONNECTION_CLOSED:
                break

if __name__ == '__main__':
    main()
opccorsa
User
Beiträge: 34
Registriert: Samstag 20. Juli 2013, 07:43

Also ich habe ein Programm was sich ausschließlich um 36 CAN-Bus Komponenten kümmert. Das bedeutet Aktoren schalten und Sensoren auswerten über CAN-Bus. "Nebenbei" möchte ich einen Port bereithalten als Server und wenn sich ein Client angemeldet hat, die Verbindung halten, prüfen und zyklisch auf ankommende Befehle reagieren. Die Zeit der Auswertung der ankommenden Befehle über TCP sind unkritisch, da das Programm nur die Befehle die er als JSON String bekommt ausführt. Da aber auch zyklische CAN Sync gesendet werden müssen und die GUI per PyQT bedient und abgefragt werden muss, darf das Programm nicht am socket.recv warten bzw. muss mir die Daten bei Anfrage per Queue liefern. Diese Anfrage geschieht dann in der main in einer While Schleife die natürlich später dann ohne sleep ausgeführt wird. Zum Projekt kommen dann noch verschiedenen Mikroarchitektur Module die zum Beispiel von "außen" per JSON File konfiguriert und Initialisiert werden. Da ich nur Elektronikentwickler bin und ansonsten nur mit Mikrocontrollerprogrammierung zu tun habe ist meine erlerntes Python sicher nicht das schönste. :-) Ich werde dein Beispiel mal versuchen und wieder was dazu lernen. Danke.

Dein Programm funktioniert. Sieht natürlich so auch besser aus. Da ich nicht genau weis wie lang der JSON String wird habe ich nur die max Byte hoch genommen. Ich denke mal das ist OK?
Sirius3
User
Beiträge: 17711
Registriert: Sonntag 21. Oktober 2012, 17:20

Wenn Du PyQt benutzt, ist einiges anders. Dort wird statt Threads QThreads verwendet, und statt einer Queue kann man dort den Signal-Slot-Mechanismus verwenden. Ähnlich wird wahrscheinlich auch der CAN-Bus angebunden werden sein.
Wie schon geschrieben, liefert recv eine beliebige Zahl an Bytes, da gibt es keine max Bytes. Du brauchst ein Protokoll, das garantiert, dass ein kompletter JSON-String gelesen wurde.
Antworten