socket server - thread - win32com

Sockets, TCP/IP, (XML-)RPC und ähnliche Themen gehören in dieses Forum
Antworten
Pf@nne
User
Beiträge: 43
Registriert: Donnerstag 18. April 2013, 16:50

Moin,

ich habe einen TCP-Server laufen.
Damit <TCPsever.accept()> und <client_socket.recv(1024)> nicht zum blockieren des Codes führen laufen diese einem thread.
Das funktioniert soweit auch sehr gut.

Code: Alles auswählen

class Server(threading.Thread):
    def __init__(self, GA_callback, iFrame_callback, ip, port,):
        self.GA_callback = GA_callback
        self.iFrame_callback = iFrame_callback
        #TCP-Server
        self.IP = ip
        self.port = port
        
        self.TCPsever = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.TCPsever.bind((self.IP, self.port))
        self.TCPsever.listen(5)  # max backlog of connections
        
        threading.Thread.__init__(self)
        self.running = True
        self.start()
    def run(self):  #threat running continuously
        while self.running:
            self.client_socket, address = self.TCPsever.accept()   #waiting for client
            self.handle_client_connection(self.client_socket)
    
    #--- handle client Rx-Data ------------------------------------------------
    def handle_client_connection(self, client_socket):
        try: 
            while True: 
                try:
                    request = client_socket.recv(1024)
                except Exception as inst:
                    client_socket.close()
                    break                
                if not request:
                    client_socket.close()
                    break                    
                else:
                    #I-Frame        
                    if request[2] & 0b00000001 == 0b0:     #.0 I-Frame                                 #_0 I-Frame
                        self.RxCounter += 1
                        self.handle_iFrame(request, client_socket, cmEngine_id)
                    
        finally: 
            client_socket.close()
Um die Serveraktivitäten soweit wie möglich im Hintergrund zu haben werden nach dem Empfang zwei Callbacks zurück zu den Hauptroutinen gesetzt.
Auch die Callbacks funktionieren einwandfrei.

Mein Problem ist jetzt ein win32com-object.
Im Hauptprogramm wird ein win32com Objekt instanziiert.
Im Hauptprogramm gibt es Funktionen für den Zugriff auf das win32com-Objekt.
Die Funktion für den Zugriff auf das win32com-Objekt soll nur durch die Callbackfunktionen aufgerufen werden.

Code: Alles auswählen

import win32com.client # get e.g. via "pip install pywin32"
cmEngine = win32com.client.Dispatch("OMICRON.CMEngAL")

def on_IEC60870_5_104_I_Frame_received_callback(APDU):       
    if APDU.ASDU.InfoObject.address.DEZ == 1:
        doFromThread()
 
def doFromThread():
    global cmEngine 
    print(cmEngine.DevScanForNew(False))      <---- Fehler
    print(cmEngine.DevGetList(0)) 

Bei Aufruf des win32com-Objektes kommt es zu einem Fehler, den ich nicht zu 100% eingrenzen konnte.
Nach meinen Recherchen scheint es mit dem Aufruf des win32com-Objektes aus dem Server-thread zu liegen.

Ich habe schon versucht mich im die <pythoncom> Routinen einzulesen.
(cmEngine_id = pythoncom.CoMarshalInterThreadInterfaceInStream(pythoncom.IID_IDispatch, cmEngine))
(pythoncom.CoInitialize())
(cmEngine= win32com.client.Dispatch(pythoncom.CoGetInterfaceAndReleaseStream(cmEngine_id , pythoncom.IID_IDispatch))

Hier werden die Referenzen (IDs) zum win32com-Objekt mit durchgereicht.
Leider enden meine Fähigkeiten an dieser Stelle.

Daher 2 Fragen:
Wie baut man einen TCP-Server "richtig" auf.
Der Server sollte den Code nicht blockieren.
  1. Wie baut man einen TCP-Server "richtig" auf?
    Der Server sollte den Code nicht blockieren.
  2. Wie kann ich ein win32com-Objekt über einen Callback aus einem thread gültig aufrufen?
Gruß
Pf@nne
Benutzeravatar
__blackjack__
User
Beiträge: 13077
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@Pf@nne: Das funktioniert so nicht. Das übliche Problem: Du rufst ``recv(1024)`` auf und gehst einfach davon aus, dass da alles geliefert wird was Du brauchst. Die minimale Garantie ist, dass *ein* Byte geliefert wird (ausser die Gegenseite hat gar nichts gesendet und die Verbindung geschlossen). Du greifst dann aber kurz danach auf das dritte Byte zu. Das muss es nicht geben. Du musst solange lesen bis Du sicher bist, dass Du tatsächlich die kompletten Daten beisammen hast. Da dort etwas von „Frame“ steht, wird es wohl irgend eine Struktur geben die das möglich macht den Datenrahmen zu lesen, also festzustellen wann der komplett ist.

Zusammen mit der ``while True``-Schleife um das ganze, die ja sagt, dass da mehrere Frames abgearbeitet werden, ist das nur eine Frage der Zeit bis Du darüber stolperst. Und falls das kein Halbduplex-Protokoll ist, hast Du dann auch nicht nur die Frage ob ein Datenrahmen komplett gelesen ist, sondern musst auch sicherstellen, dass eventuell schon gelesene Daten vom *nächsten* Frame nicht verloren gehen.

In der Regel kann man ganze einfacher gestallten wenn man sich vom Socket ein Dateiobjekt geben lässt.

Muss der Server ein Objekt sein und muss die Klasse dann auch tatsächlich von `Thread` erben? Auf jeden Fall sollte eine Thread-Klasse sich in ihrer `__init__()` nicht selbst starten. Damit bereitet es Kopfschmerzen wenn man von der Klasse dann mal ableiten möchte und es nicht mehr möglich ist *nach* dem Aufruf der Basis `__init__()` etwas zu machen was *vor* dem Threadstart passieren soll. Wie gesagt, ich würde erst einmal überhaupt das Erben von `Thread` in Frage stellen. `Thread`-Objekte haben ein `target`-Argument und Argumente für die Argumente von dem was asynchron ausgeführt werden soll.

In `run()` bindest Du das `client_socket` an das Objekt. Das wird dann aber gar nicht benutzt. Und würde so auch nur funktionieren wenn die Behandlung eines Clients weiterhin synchron bleibt.

Die ``client_socket.close()``-Aufrufe in der ``while``-Schleife sind durch das ``finally`` unnötig. Wobei man da auch ``with`` statt ``try``/``finally`` verwenden könnte und IMHO auch sollte.

Wieso ist beliebiges `Exception`-Objekt an den Namen `inst` gebunden? Was soll der Name bedeuten? Benutzt wird er dann auch nicht.

Wo kommt `RxCounter` her? Das Attribut müsste in der `__init__()` angelegt werden, da steht davon aber nichts. Und `cmEngine_id` kommt komplett aus dem Nichts.

Namen werden in Python klein_mit_unterstrichen geschrieben. Ausnahmen sind Konstanten (KOMPLETT_GROSS) und Klassen (PascalCase).

Und ``global`` ist an der Stelle nicht nur sinnfrei, sondern hat generell in einem sauberen Programm nichts zu suchen. In einem mit Threads und Problemen ist das auch etwas wo ich sagen würde das schaue ich mir gar nicht erst weiter an bis da kein globaler Zustand mehr drin ist.
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
Pf@nne
User
Beiträge: 43
Registriert: Donnerstag 18. April 2013, 16:50

Moin blackjack,

erstmal danke für deine Ausführungen!

Im ersten Schritt würde ich versuchen den Server mal umzubauen.
Erster Ansatz wäre hier nur das receive in einen thread zu verfrachten.
Eine Klasse muss der Server ja auch nicht zwingend sein.
Der Server soll ja nur aus dem "main" heraus gestartet werden und sich dann im Hintergrund um um den Empfang kümmern.
Ist dieser dann vollständig! soll ein Callback zurück in "main" führen.

Hört sich das für dich Schlüssig an?
__blackjack__ hat geschrieben: Mittwoch 19. Januar 2022, 00:20 In der Regel kann man ganze einfacher gestallten wenn man sich vom Socket ein Dateiobjekt geben lässt.
Was meinst du hiermit?
Hast du dafür ein Beispiel?
__blackjack__ hat geschrieben: Mittwoch 19. Januar 2022, 00:20 In einem mit Threads und Problemen ist das auch etwas wo ich sagen würde das schaue ich mir gar nicht erst weiter an bis da kein globaler Zustand mehr drin ist.
Der gesamte Code liegt auf GitHub:
https://github.com/Pfannex/CMC-104_Mapper
__blackjack__ hat geschrieben: Mittwoch 19. Januar 2022, 00:20 Namen werden in Python klein_mit_unterstrichen geschrieben. Ausnahmen sind Konstanten (KOMPLETT_GROSS) und Klassen (PascalCase).
Ändere ich...

Gruß
Pf@nne
Pf@nne
User
Beiträge: 43
Registriert: Donnerstag 18. April 2013, 16:50

Ich habe den Server nach einem Beispiel aus dem Netz umgebaut.

Code: Alles auswählen

rx_counter = 0
tx_counter = 0
testframe_ok = False
###############################################################################
#   IEC60870-5-104 Server
###############################################################################
def start_server(ga_callback, iFrame_callback, ip, port):
    tcp_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    tcp_server.bind((ip, port))
    
    tcp_server.listen(5)
    h.log('IEC 60870-5-104 Server listening on {}:{}'.format(ip, port))
    client_socket, address = tcp_server.accept()   #waiting for client Code Stops here
    h.log('IEC 60870-5-104 Client connected -  {}:{}'.format(address[0], address[1]))
    tcp_server.settimeout(2)
    handle_client_connection(client_socket, ga_callback, iFrame_callback)


#--- handle client Rx-Data ------------------------------------------------
def handle_client_connection(client_socket, ga_callback, iFrame_callback):
    global rx_counter, tx_counter
    while True:
        try:
            msg = client_socket.recv(1024)
        except socket.error as e:
            err = e.args[0]
            # this next if/else is a bit redundant, but illustrates how the
            # timeout exception is setup
            if err == 'timed out':
                sleep(1)
                h.log('recv timed out, retry later')
                rx_counter = 0
                tx_counter = 0
                continue
            else:
                h.log_error(e)
                client_socket.close()
                #sys.exit(1)
        except socket.error as e:
            # Something else happened, handle error, exit, etc.
            h.log_error(e)
            client_socket.close()
            #sys.exit(1)
        else:
            if len(msg) == 0:
                h.log('orderly shutdown on server end')
                client_socket.close()
                #sys.exit(0)
            else:
                if msg[0] == 0x68:  #start
                    if msg[1] == len(msg)-2:
                        #S-Frame
                        if msg[2] & 0b00000011 == 0b01:    #01 S-Frame
                            handle_sFrame(msg)
                        #U-Frame
                        if msg[2] & 0b00000011 == 0b11:    #11 U-Frame
                            handle_uFrame(msg, client_socket)
                        #I-Frame        
                        if msg[2] & 0b00000001 == 0b0:     #.0 I-Frame 
                            rx_counter += 1
                            handle_iFrame(msg, client_socket, ga_callback, iFrame_callback)
                    else:
                        h.log_error("Wrong size of incomming IEC 60870-5-104 Frame")

#--- U-Frame handle  ------------------------------------------------------
def handle_uFrame(frame, client):
    if frame[2] == 0x07:                              
        h.log("<- U (STARTDT act)")
        data = bytearray(frame)
        data[2] = 0x0B
        client.send(data)
        h.log("-> U (STARTDT con)")
    elif frame[2] == 0x13:                              
        h.log("<- U (STOPDT act)")
        data = bytearray(frame)
        data[2] = 0x23
        client.send(data)
        h.log("-> U (STOPDT con)")
    elif frame[2] == 0x43:                              
        h.log("<- U (TESTFR act)")
        data = bytearray(frame)
        data[2] = 0x83
        client.send(data)
        h.log("-> U (TESTFR con)")
    else:
        h.log('<- unknown U {}'.format(frame))
   
#--- S-Frame handle  ------------------------------------------------------
def handle_sFrame(frame):
    h.log("<- S (Rx con) Rx = " + str((frame[4] | frame[5]<<8)>>1))

#--- I-Frame handle  ------------------------------------------------------
def handle_iFrame(frame, client, ga_callback, iFrame_callback):
    global rx_counter, tx_counter
    APDU = T104.APDU(frame)
    h.log("<- I [{}-{}-{}] - {} - {}".format(APDU.ASDU.InfoObject.address._1,
                                            APDU.ASDU.InfoObject.address._2,
                                            APDU.ASDU.InfoObject.address._3,
                                            APDU.ASDU.TI.ref,
                                            APDU.ASDU.TI.des))
    APDU.pO()
        
    #confirm activation frame
    if APDU.ASDU.COT.short == "act":
        data = bytearray(frame)
        data[2] = (tx_counter & 0b0000000001111111) << 1
        data[3] = (tx_counter & 0b0111111110000000) >> 7
        data[4] = (rx_counter & 0b0000000001111111) << 1
        data[5] = (rx_counter & 0b0111111110000000) >> 7
        data[8] = APDU.ASDU.Test<<8 | APDU.ASDU.PN<<7 | 7 
        client.send(data)
        h.log("-> I ({}/{}) - COT = {}".format(tx_counter, rx_counter,
                                                d.cot[7]["long"]))
        tx_counter += 1
        
    #callback to main
    if APDU.ASDU.TI.Typ == 100:     #C_IC_NA_1 - (General-) Interrogation command 
        ga_callback(APDU)
    else:
        iFrame_callback(APDU)  #other I-Frame

#--- send I-Frame  --------------------------------------------------------
def send_iFrame(ti, value):
    list = [0x68, 0x0E, 0x02, 0x00, 0x02, 0x00,
            ti, 0x01, 0x03, 0x00, 0x0a, 0x0b, 
            0x32, 0x33, 0x3C, value]       
    data = bytearray(list)
    data[2] = (tx_counter & 0b0000000001111111) << 1
    data[3] = (tx_counter & 0b0111111110000000) >> 7
    data[4] = (rx_counter & 0b0000000001111111) << 1
    data[5] = (rx_counter & 0b0111111110000000) >> 7
        
    #APDU = splitFrame(data)
    #print_iFrame(APDU)
        
    client_socket.send(data)
    print ("-> I ({}/{})".format(tx_counter, rx_counter))
    tx_counter += 1                                   



Der läuft soweit..... was würde man hier jetzt noch anders machen?
Wenn ich den Client ausschalte merkt der Server dies nicht jedes Mal, kann das noch verbessert werden?
__deets__
User
Beiträge: 14528
Registriert: Mittwoch 14. Oktober 2015, 14:29

Das ist doch immer noch das gleiche - da gehst du davon aus, dass du immer ein komplettes Datenpaket bekommst. Was ein *Strom* dir nunmal nicht garantiert. Wenn du mit diesen Konzepten so kaempfst, wuerde ich nachdruecklich zu einer Middleware wie ZeroMQ, nanomsg, nng oder gar einem HTTP-Service via Flask oder FastAPI raten. Damit sind diese unteren Ebenen abgehandelt.

Zu deinem eigentlichen Problem: es kommt hierbei darauf an, was denn dein Main-Thread, in dem du die COM-Objekte instantiiert hast, eigentlich macht. Wenn es da die Moeglichkeit gibt, regelmaessig eine Queue mit Instruktionen abzuarbeiten, dann ist das die Loesung - dein Server bekommt Arbeitsauftraege, und packt die in die Queue.

Wenn das nicht geht, dann geht ggf. eine Teilung in einen Main-Thread, einen Server-Thread, und einen COM-Worker-Thread. Wobei sich dann die Frage stellt, was denn das Hauptprogramm so wichtiges tut, dass es nicht unterbrochen werden kann.
Pf@nne
User
Beiträge: 43
Registriert: Donnerstag 18. April 2013, 16:50

__deets__ hat geschrieben: Mittwoch 19. Januar 2022, 15:35 Das ist doch immer noch das gleiche - da gehst du davon aus, dass du immer ein komplettes Datenpaket bekommst. Was ein *Strom* dir nunmal nicht garantiert.
Ich hab doch hier zumindest eine Kontrolle auf Vollständigkeit implementiert.
In Byte 2 steht die Framegröße.

Code: Alles auswählen

                
                if msg[0] == 0x68:  #start
                    if msg[1] == len(msg)-2:
                        #S-Frame
                        if msg[2] & 0b00000011 == 0b01:    #01 S-Frame
                            handle_sFrame(msg)
                        #U-Frame
                        if msg[2] & 0b00000011 == 0b11:    #11 U-Frame
                            handle_uFrame(msg, client_socket)
                        #I-Frame        
                        if msg[2] & 0b00000001 == 0b0:     #.0 I-Frame 
                            rx_counter += 1
                            handle_iFrame(msg, client_socket, ga_callback, iFrame_callback)
                    else:
                        h.log_error("Wrong size of incomming IEC 60870-5-104 Frame")
__deets__ hat geschrieben: Mittwoch 19. Januar 2022, 15:35 Zu deinem eigentlichen Problem:
Ich habe den Server-Thread jetzt erstmal wieder entfernt, da es in diesem Fall nicht zwingend erforderlich ist.
Dadurch klappt der Aufruf aus dem Callback wieder wie gewohnt.

Was mich noch stört, ist die Tatsache, dass der Server es nicht immer merkt, wenn der Client sich verabschiedet.
Benutzeravatar
sparrow
User
Beiträge: 4187
Registriert: Freitag 17. April 2009, 10:28

Und was genau macht die andere Seite 20 Byte sendet und .recv() jetzt 20 Mal 1 Byte liefert?
Du brauchst ein Protokoll um das sicherzustellen.
Ich schließe mich der Aussage von __deets__ an: Benutz etwas Fertiges.
Sirius3
User
Beiträge: 17741
Registriert: Sonntag 21. Oktober 2012, 17:20

Zum Code im verlinkten Repository: das Modul `helper` ist Größtenteils überflüssig. Der Sinn der `start` und `handle` Funktionen erschließt sich mir nicht, und beim Logging lies Dir nochmal genau die Dokumentation durch. Es darf nur ein `basicConfig` geben, und zwar beim Programmstart. Jedes Modul holt sich seinen eigenen Logger per getLogger und nutzt den auch, direkt und nicht indirekt über helper-Funktionen. Statt logEx gibt es logging.exception.
Klassen schreibt man mit Großem Anfangsbuchstaben `IdleTimer`, und Funktionen und Variablennamen schreibt man komplett klein. Benutze keine kryptischen Abkürzungen `fPL`?
Vergiss gleich wieder, dass es `global` gibt, das hat in einem sauberen Programm nichts zu suchen.
Mit `send` hast Du das selbe Problem, wie mit `recv`. Da wird auch nur garantiert, dass mindestens 1 Byte gesendet wird, um sicher zu sein, dass alle Bytes gesendet werden, gibt es `sendall`.
Antworten