Sockets/Threading

Sockets, TCP/IP, (XML-)RPC und ähnliche Themen gehören in dieses Forum
Antworten
MrDietmar
User
Beiträge: 5
Registriert: Dienstag 24. November 2015, 11:04

Hallo,

ich kämpf grade mit Sockets und stecke dabei ein wenig fest. Ich habe über 2 Stunden ein Fehler gesucht und nicht gefunden. Beim zusammenkürzen des Codes ist es mir aber grade bewusst gewurden. Ich fang erstmal von vorne an. Ich will mehrer Clients fernsteuern. Dazu habe ich einen Server (der grade das Problem ist) auf den sich die Clients verbinden, auf Befehle warten und ggf. eine Antwort zurück schicken. Meine Hoffnung war das es dafür schon sinnvolle Bibliotheken gibt... naja. Dass was ich gefunden habe, brachte Abhängigkeiten mit die ich nicht erfüllen will/kann, oder der Code sah schon müllig aus. Deswegen habe ich mich entscheiden das mit Socktes zusammen zu bauen/frickeln. Jetzt habe ich mit meiner Lösung ein Problem. Schicke ich ein Befehl an den Client und hole die Antwort direkt ab, fehlen von der Antwort die ersten acht Byte (proxy_server.test() / auskommentierte Zeilen) und es gibt ein Fehler weil die Sturktur der Nachricht fehlerhaft ist. Hole ich mir nicht direkt die Antwort, bekommen ich sie automatisch und vollständig von proxy_node.handle() ausgegeben. Ich habe 2 Stunden nicht verstanden warum. Erst beim zusammenkürzen des Codes ist mir klar gewurden. Das ist ein Threading Problem. proxy_node.handle() und proxy_server.test() laufen in unterschiedlichen Threads. Der eine Thread wartet immer auf neuen Nachrichten des Sockets, wenn noch ein zweiter Thread das selbe macht zerhackt es die Nachrichten und es knallt.
Wie umgehen ich das Problem am sinnvollsten? Ich hab leider mit Threading sogut wie keine Erfahrung, was übringens auch für Python gilt. Ich hab zwei Ideen und die sind unbrauchbar. Einmal die proxy_node.handle() Funktion nicht mehr auf den Socket lesen zu lassen, sondern nur noch gezielt den socket auslesen. Das funktioniert aber nicht, weil auch der Client die Komunikation initiallisieren muss. Die andere Idee war die letzte Nachricht über proxy_node.handle() in proxy_node zu speichern und dort raus die Nachrichten zu holen. Das ist aber völliger Humbug, weil ich auf gut Glück warten müsste um eine Nachricht auszulesen. Ich bin eigentlich offen für alles. Von komplett anders, bis "ist zwar Pfusch, aber so und so könnte das gehen". Das ist nur ein "Spassprojekt".

Code aufs wesentliche gekürzt:

Code: Alles auswählen

import socketserver
import threading
import time
import random
import builtins
import netcore


class proxy_node(socketserver.BaseRequestHandler):
    def handle(self):
        while True:
            try:
                data = netcore.reciveJson(self.request)
                if data == False:
                    return False
            except ConnectionResetError:
                return False

            print(data['cmd'])
            print(data)

class proxy_server(socketserver.ThreadingMixIn, socketserver.TCPServer):
    proxy_nodes = []

    def test(self):
        proxy_node = self.getAvailableProxyNode()

        netcore.sendJson(proxy_node.request, {'cmd': 'test'})

        #response = netcore.reciveJson(proxy_node.request)
        #print(response)


if __name__ == "__main__":
    HOST, PORT = "", 9999

    server = proxy_server((HOST, PORT), proxy_node)

    server_thread = threading.Thread(target=server.serve_forever)
    server_thread.daemon = True
    server_thread.start()

    while 1:
        command = input('CMD > ')
        if command == 't':
            server.test()


Netzwerkcode um aus dem Stream Nachrichten zu machen:

Code: Alles auswählen

import json
import struct


def reciveMessage(socket):
    data = socket.recv(8)
    message = b''

    if not data:
        return False

    if data[0:2] != b'LE' and data[6:8] != b'LE':
        print('unkown format')
        print(data)

        return False

    len = struct.unpack('>I', data[2:6])[0]

    while True:
        data = socket.recv(len)
        if not data:
            return False

        message += data

        if message.__len__() >= len:
            break;

    return message


def sendMessage(socket, message):
    pack1 = struct.pack('>I', len(message))

    message = b'LE'+pack1+b'LE'+message

    socket.sendall(message)
    
    
def reciveJson(socket):
    message = reciveMessage(socket)

    if message == False:
        return False

    message = message.decode('utf8')

    return json.loads(message)

def sendJson(socket, obj):
    message = json.dumps(obj)
    sendMessage(socket, bytes(message, 'utf8'))
Sirius3
User
Beiträge: 17741
Registriert: Sonntag 21. Oktober 2012, 17:20

@MrDietmar: Dein Socket-Code ist fehlerhaft. Am besten socket.makefile aufrufen, dann kann man ihn wie ein Fileobjekt benutzen.
Damit zwei Threads nicht Resourcen nicht gleichzeitig benutzen, gibt es Locks.
Zur Fehlerbehandlung gibt es Exceptions. False zurückzuliefern ist für mich ein schwerer Programmierfehler. Die Magic-Functions mit __ sollten nicht direkt aufgerufen werden: es gibt die len-Funktion!
MrDietmar
User
Beiträge: 5
Registriert: Dienstag 24. November 2015, 11:04

Danke für deine Antwort. Was ist den an dem Socket-Code falsch und was würde mir der Socket als file handler bringen?
Das mit den Threads ist mir bewusst, aber ich weiß nicht wie ichs anwenden soll. Der Thread der handle() ausführt steckt permanent in der recv() Funktion des Sockets fest. Wenn ich im Main Thread eine Aktion ausführen lassen will kann ich nix mehr locken und warten kommt nicht in Frage, weil der Thread eher Stunden in der recv() Funktion hängt. :K

Das mit dem Error Handling ist mir bewusst, dass mit den Magic-Funktionen nicht. Ich hab mich gestern gewundert was das mit den __ soll. Nachdem ich weiß nach was ich schauen muss, macht das schon mehr sinn. Ich hab mir aber scheinbar genau den use case rausgesucht wo es keine Sinn macht. Eine Listen-Datentyp der diverse Methoden zum manipulieren und auslesen mitbringt (append, remove, count, sort usw...), aber die Anzahl der Elemente muss mit einer exteren Funktion bestimmt werden. :?:
Sirius3
User
Beiträge: 17741
Registriert: Sonntag 21. Oktober 2012, 17:20

@MrDietmar: Deine Fragen sind unverständlich: Von welchem MainThread redest Du? Was machst Du da genau? Was macht handle? Deine proxy-Klassen machen nichts sinnvolles, so dass da auch keine Probleme geben kann. Von Was für ein use case? Was für ein Listen-Datentyp?

socket.recv(8) liefert bis zu 8 Bytes, daher kann es zu unknown format kommen, obwohl alles stimmt. Wenn Dir das mit dem Error-Handling bewußt ist, warum machst Du's dann?
MrDietmar
User
Beiträge: 5
Registriert: Dienstag 24. November 2015, 11:04

Von welchem MainThread redest Du?[...]Was macht handle? Deine proxy-Klassen machen nichts sinnvolles, so dass da auch keine Probleme geben kann.
Vom Hauptprozess. Ich verwende den Socketserver von Python mit Threading. Jede Client Verbindung bekommt ihren eigenen Thread. Der/die Threads des/der Clients wartet in der Methode proxy_node.handle() auf Daten des Sockets. In dem geposteten Code gibt diese Methode nur die empfangenen Daten aus und wartet auf neue Daten. Im eigentlich Code werden die Daten Protokolliert und eventuell Befehle an den Client gesendet.
Genau bis hierhin funktionierts wie ich es mir vorstellen. Das Problem bekomm ich wenn ich vom Server aus ein Befehl an den Client senden will und die Antwort direkt haben möchte.
Zum Testen habe ich eine Endlosschleife die Konsoleneingaben wartet.

Code: Alles auswählen

while 1:
        command = input('CMD > ')
        if command == 't':
            server.test()
Die läuft im Hauptprozess und somit wird auch server.test() in diesem Ausgeführt. Führ ich server.test() aus, wie im geposteten Code, kommt die Antwort in proxy_node.handle() raus. Ich brauch diese Antwort aber in der aufrufenden Funktion, siehe den auskommentierten Teil. Wenn ich den Code mit reinnehme, warte ich an zwei Stellen auf Daten des Sockets und es zerlegt mir die Empfangene Nachricht. (Die ersten 8 Byte kommen in proxy_node.handle() raus und die Nachfolgenden in server.test())
Ich hoffe das ist verständlich.

Ich hab jetzt auch erstmal eine Lösung. In proxy_node.handle() unterscheide ich ob es eine Antwort auf ein Befehl ist oder eine vom Client initalisierte Benachrichtigung. Wenn es eine Antwort ist wird diese in einer property gespeichert. In server.test() wird nach dem Senden des Befehles solange gepollt bis eine Antwort in der property vorhanden ist. Das funktioniert soweit. Ich bin aber tortzdem noch an anderen Ansätzen interessiert.

Wenn Dir das mit dem Error-Handling bewußt ist, warum machst Du's dann?
Meine Python Kenntnisse belaufen sich auf ~10h. Da fehlts mir grade noch an so manchen Stellen. Ich hab den Focus auf ein Prototyp gelegt, sauberer Code spielte bis jetzt nicht unbedingt die Rolle.
Sirius3
User
Beiträge: 17741
Registriert: Sonntag 21. Oktober 2012, 17:20

@MrDietmar: so ähnlich macht man das auch, nur dass nichts in eine "Property gelegt wird", sondern in eine Queue. Dann braucht auch niemand zu pollen, sondern man kann einfach mit get auf die Antwort warten.
MrDietmar
User
Beiträge: 5
Registriert: Dienstag 24. November 2015, 11:04

Vielen Dank, mit der Queue sieht das nicht mehr so zusammengewürfelt aus. Vielleicht kannst du mir noch ein Tipp geben. Die Senden und Empfangsfunktion find ich jetzt nicht wirklich prickelnd. Gibt es dafür vielleicht auch ein sinnvolleren Ansatz? Nachrichten über ein TCP/IP Stream zu schicken ist doch eigentlich ein Standardproblem.
BlackJack

@MrDietmar: Ein Standardproblem ja, aber Unmengen an verschiedenen Lösungen. Davon kannst Du jetzt eine nachbauen, oder vielleicht das Problem auf einer höheren Ebene angehen als Sockets mit einem selbstgebastelten Protokoll. Kannst Du nicht etwas bestehendes verwenden, wie eine der vielen Message Queue-Varianten (ZeroMQ, RabbitMQ, …) oder vielleicht noch eine Ebene drüber, so etwas wie Celery.

Edit: Oder auch HTTP, oder einen RPC-Mechanismus.
MrDietmar
User
Beiträge: 5
Registriert: Dienstag 24. November 2015, 11:04

BlackJack hat geschrieben:Kannst Du nicht etwas bestehendes verwenden, wie eine der vielen Message Queue-Varianten (ZeroMQ, RabbitMQ, …) oder vielleicht noch eine Ebene drüber, so etwas wie Celery.

Edit: Oder auch HTTP, oder einen RPC-Mechanismus.
Da habe ich natürlich auch drüber nachgedacht, aber mir ist nix passendes in den Sinn gekommen. Bei den Message Queues hätte ich sowas wie Celery nachbauen müssen (kannte ich nicht) und das hab ich vom Aufwand gleich abgeschätzt wie die Socket Variante.
Ein Request baiserender Ansatz wie z.B über HTTP wäre das einfachste gewessen. Das kam aber nicht in Frage wegen NAT, Timing und Traffic. Eine überlegung war ein VPN drunter zu legen, aber administrations Aufwand und Laien tauglichkeit sprechen dagegen.
Antworten