Regex auf ein bytestream mit variabler Länge anwenden.

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
chris_adnap
User
Beiträge: 27
Registriert: Freitag 23. September 2022, 09:36

Hallo,

ich habe gerade einiges probiert und auch irgendwo (glaube ich) gelesen zu haben, dass es so nicht möglich seien soll.


Meine erste Idee war ein einfacher Weg über .pattern.
Habe ich dann aber schnell verworfen, da für meinen Fall es doch zu unflexibel ist.


Jetzt bin ich auf Regex übergegangen, aber...

Es geht um einen Bytestream, welche ich auf einem Socketserver empfange ...

So könnte es z.b. aussehen...
b'0011Z2start0012xxxxxxxxxxxx0011Z4start0011xxxxxxxxxxx0011Z1start0010xxxxxxxxxx'

Aufbau:
Statischer Header == 15 Byte (gesamt)
Headerlänge: 4 Byte (hier die länge in byte OHNE die Headerlänge selbst)
status: 2 Byte
Type: start 5 Byte
Datenlänge: 4 Byte

... gefolgt von Daten.

Leider gibt es kein Ende Zeichen und auch die Daten länge ist variabel.

-------------------------------------------

Mein erster Versuch...
0011Z[0-4]start(\d{4}).*?(?=0011Z[0-4]start|$)

Hat schon super funktioniert.
Aber bei letztem Eintrag, habe ich keine Referenz/Anhaltspunkt ob der bisher empfangene Datensatz auch "vollständig" ist.

Regex würde somit auf ...
b'0011Z2start0012xxxxxxxxxxxx0011Z4start0011xxxxxxxxxxx0011Z1start0010xxxxxxxxxx'
... ABER auch auf ...
b'0011Z2start0012xxxxxxxxxxxx0011Z4start0011xxxxxxxxxxx0011Z1start0010xxxx'
... matchen.

-------------------

Also meine nächste Idee. Ich versuche die Datenlänge mit einzubeziehen.
Statisch würde dies super funktionieren ...

0011Z[0-4]start(\d{4}).{10}(?=0011Z[0-4]start|$) ...
0011Z[0-4]2start(\d{4}).{0011}(?=0011Z[0-4]start|$) ...
0011Z[0-4]start(\d{4}).{012}(?=0011Z[0-4]start|$) ...

... findet mir perfekt alle gesuchten Datensätze.
Sollte mal ein Datensatz unvollständig sein, so wird dieser ignoriert.



Versuche ich dies nun Variabel zu gestalten, so habe ich bisher aber keine Lösung gefunden...

Hier mal ein paar Beispiele, welche ich versucht habe ...

0011Z[0-4]start(\d{4}).{\1}(?=0011Z[0-4]start|$)
0011Z[0-4]start(\d)(\d)(\d)(\d).{\1\2\3\4}(?=0011Z[0-4]start|$)
0011Z[0-4]start([0-9])([0-9])([0-9])([0-9]).{\1\2\3\4}(?=0011Z[0-4]start|$)
0011Z[0-4]start(\d)(\d)(\d)(\d).{[0-9]*\1[0-9]*\2[0-9]*\3[0-9]*\4}(?=0011Z[0-4]start|$)
0011Z[0-4]start(\d)(\d)(\d)(\d).{\d*\1\d*\2\d*\3\d*\4}(?=0011Z[0-4]start|$)

Man kann erkennen was ich da versucht habe.
Ich wollte aus der ersten Gruppe == Datenlänge Feld, die länge extrahieren und mit \1 als .{\1} übergeben. Versuche ich dies aber als Länge in {} einzutragen, so hat dies keine Wirkung.

Ist dies überhaupt, stand heute mit RegEx, so umsetzbar?
Oder muss ich dies auf zwei Schritte machen? Im ersten Schritt die Datenlänge auslesen, in einer Liste Speichern und im zweiten durchlauf die Längenangabe aus der Liste übergeben?

Vielleicht habe ich auch nicht weit genug gedacht.
Ich möchte aus dem Datenstream/Puffer vollständige/komplette Daten auslesen und entfernen.
Damit der Puffer weiter befüllt werden, ich aber die schon vollständigen Daten abarbeiten kann.

Viele Grüße
Chris
__deets__
User
Beiträge: 14545
Registriert: Mittwoch 14. Oktober 2015, 14:29

Regulaere Ausdruecke koennen das nicht. Dieses Protokoll musst du parsen, indem du die fixen 15 Bytes einliest, und dann aus der geparsten Laenge einen zweiten Leseschritt der erwarteten Nachrichtenlaenge.
Sirius3
User
Beiträge: 17754
Registriert: Sonntag 21. Oktober 2012, 17:20

Wenn Du ein festes Format hast, sind reguläre Ausdrücke unsinn.
Wenn Du einen Datenstrom hast, dann ließt Du einfach die Anzahl Bytes, die gegeben sind und verarbeitest diese:

Code: Alles auswählen

header_length = int(stream.read(4))
header = stream.read(header_length)
data_length = int(header[-4:])
data = stream.read(data_length)
chris_adnap
User
Beiträge: 27
Registriert: Freitag 23. September 2022, 09:36

Hallo,

danke, dafür habe ich mir reinen TCP-Stream bisher wenig zu tun gehabt.

data_raw = self.request.recv(2048)
data_puffer.extend(data_raw)

Kann man sagen, dass der Anfang immer aus dem Header besehen muss?
Natürlich bearbeite ich "data_puffer" indem ich komplette Daten identifiziere und extrahiere... (und wenn dies die einzige Stelle ist, welche solch einen Fehler provozieren kann. Dann ist alles in Ordnung)

b'0011Z2start0012xxxxxxxxxxxx' <-- ein einzelner vollständiger Datensatz. OK
b'0011Z2start0012xxxxxxxxxxxx0011Z2start0012xxxxxxxxxxxx' <-- zwei Datensätze... OK
b'0011Z2start0012xxxxxxxxxxxx0011Z2start0012xxxxxxxxxxxx0011Z2start0012xxx' <-- drei Datensätze, wobei der dritte noch unvollständig ist... OL

hier sieht man schön, das der Anfang immer aus dem Header besteht.
Ja, dann ist ein einfaches Auslesen mit [0:15] etc.. einfach zu realisieren.

Aber, kann es aber mal vorkommen, das der "Puffer" so aussieht? ...
b'art0012xxxxxxxxxxxx'
... also das der direkte Anfang/Header unvollständig ist?
Sollte/dürfte durch TCP nicht vorkommen. Aber da habe ich zu wenig Erfahrung.
Nur aus diesem Grund bin ich auf ein flexibles RegEx eingestiegen.

Bisher habe ich gesehen das Folgedaten in teilen übertragen werden. Also ...
1. b'0011Z2start0012xx' <-- erster empfang
2. b'xxxxxxxxxx' <-- zweitere empfang
daraus wird dann wieder ein b'0011Z2start0012xxxxxxxxxxxx'

Wenn dam so ist, dann stelle ich es sofort um.

Viele Grüße und Danke
Chris
Sirius3
User
Beiträge: 17754
Registriert: Sonntag 21. Oktober 2012, 17:20

Warum benutzt Du recv? Hast Du mehrere Sockets, die Du parallel lesen mußt und arbeitest Du daher non-blocking?
Da Du schreibst, dass Du noch wenig Erfahrung hast, dann ist die Antwort wahrscheinlich nein, und dann ist das benutzen von recv falsch. Lass Dir per makefile ein Filehandle geben und arbeite mit read, wie ich es gezeigt habe.
__deets__
User
Beiträge: 14545
Registriert: Mittwoch 14. Oktober 2015, 14:29

Das ist der klassisch falsche Ansatz, da recv nicht garantiert, dass die angeforderte Anzahl an bytes zurueckkommt. Und damit koenne das auch weniger als zB 15 eines fertigen Headers sein. Mit dem Hinweis von Sirius3 zusammen kann der Parser aus zwei reads bestehen - einmal 15 Bytes, und dann N bytes, basierend auf dem Header. Dann geht's wieder von vorne los.
chris_adnap
User
Beiträge: 27
Registriert: Freitag 23. September 2022, 09:36

Hallo Sirius3,

danke für dein Feedback.
Aktuell nur ein Socket, da noch im Aufbau. Später ja, werden es mehrere parallele Sitzungen. (habe ich aktuell gar nicht mal mehr daran gedacht)

Aktuell rufe ich es so auf... und es funktioniert auch wunderbar wie es soll. (aktuell nur mit einer Verbindung/Socket)

Code: Alles auswählen

class TCPHandler(socketserver.BaseRequestHandler):

    def handle(self) -> None:
        data_puffer = bytearray()
       
        while True:
            
            data_raw = self.request.recv(2048)
            
            if not data_raw:
                break
            
            data_puffer.extend(data_raw)
            # ... und hier die weitere bearbeitung

def start_server():
    server = socketserver.ThreadingTCPServer(("0.0.0.0", 12345), TCPHandler)
    server.serve_forever()

start_server()
Damit sollte (meine vermutung nach) jeder Socket seinen eigenen "data_puffer" haben.
So falsch, sollte doch der Aufbau nicht sein, oder?

ich baue es gerade mal um, nach dem Statischen Prinzip, dann sehe ich ja ob es so klappt.
Es läuft ja schon (fast reibungslos), bis ich erst am nächsten Tag auf die Daten-Fragmentierung aufmerksam wurde.

Viele Grüße
Chris
chris_adnap
User
Beiträge: 27
Registriert: Freitag 23. September 2022, 09:36

Hallo __deets__

ja, darum wirke ich mit dem Puffer entgegen. Erst alles Sammeln, dann auswerten und wenn unvollständig auf nächste Daten warten.
Bei dem nächsten durchlauf, prüfe ich ja wieder auf Vollständigkeit und arbeite erst dann weiter.

Das ich vorher direkt auf self.request.recv(2048) gegangen bin. Ja das war mein Fehler :)

Leider weiß ich vorher nicht, was ich an Bytes überhaupt empfange/bekomme.
Ich starte nur die initiierung ab dann bekomme ich Daten.

Viele Grüße
Chris
__deets__
User
Beiträge: 14545
Registriert: Mittwoch 14. Oktober 2015, 14:29

Wie gesagt, wenn du das was Sirius3 vorgemacht hat nutzt, musst du nicht puffern und selbst durchlaufen, sondern kannst in zwei diskreten Schritten protokollkonform lesen.

Aber das ist jetzt ja auch genug gesagt worden. Wer's anders will, dem ist das natuerlich freigestellt.
chris_adnap
User
Beiträge: 27
Registriert: Freitag 23. September 2022, 09:36

Hallo,

ich Antworte mir mal selbst. Gestern fast Punkt 0:00 Uhr umgesetzt, lief 30min... Dann doch noch ein Fehler. ( leider keine Debug Output an dieser Stelle gehabt :( )

Zwar ist dieser Fehler bis jetzt nicht nochmals vorgekommen, dafür ein anderer, welchen ich heute nachvollziehen und korrigieren konnte.
Über Nacht lief es Problemlos durch. Zwar 7 Fehler und damit eine neuinitiierung des Sockets, dies war aber alle diesem einen Fehler (mein Fehler) zuzuschreiben.

Es ging nie um die Art der Implementierung. Nur darum wie mit den Daten umzugehen ist.
Man kann immer streiten was/welcher Weg besser ist. Ich kann nun sagen. recv() macht gar keine Probleme. Solage "ich" keine Fehler bei der Imprementierung mache.

Da ich gestern auch geschrieben habe, das ich hauptsächlich 90% nur Daten empfange, so gehe ich nun mal davon aus, hätte ein .read() hier 0 Vorteile gehabt/gebracht.

Es ging mir um den TCP-Stream und wie sich dieser verhält.
Mein erster Gedankengang war eine statische [x:x] Auslesung/Auswertung.
Warum auch immer, bin ich dann auf RegEx übergegangen, da ich dachte das solch ein Stream eben nicht so konstant ist. (Löcher, etc...)
Wenn man den TCP-Stack besser kennt, hätte man sich dies gleich sparen können. :D

Auch bei dem Puffer habe ich mir vorher zu viele Gedanken gemacht 3-4 if/else Abfragen nur um den Puffer zu ergänzen. (und da hätte ich aktiv eingegriffen, was später sicher mehr Probleme gemacht hätte, als alles direkt zu übernehmen)

Darum habe ich gestern mir nochmals gesagt, mache ich es ganz einfach...
Heute/jetzt kann ich sagen, dies alles war unnötig...

Ein einzeiler "data_buffer.extend(data_raw)" macht seine Arbeit Top.

Auch der Gedankengang bezüglich Löcher im "TCP"-Stream (darum auch RegEx) hat sich als unnötig erwiesen.
Alles zuvor verworfen und auf eine Statische Auslesung [x:x] umgeschrieben.
Seit gestern Abend ~8000-10.000 Datenpakete erhalten, mal einzeln, mal mehrere in einem Stream, auch Fragmentierte.

Läuft alles Rund. :)

Dies Info nur, wenn jemand mal nach solch einem Thema sucht. Soll ja nicht offen bleiben.

Viele Grüße
Chris
Sirius3
User
Beiträge: 17754
Registriert: Sonntag 21. Oktober 2012, 17:20

Du hast jetzt einen Weg gefunden, mit recv zu arbeiten, der anscheinend funktioniert (da Du keinen Code zeigst, kann ich das ja nicht beurteilen). Im Normalfall ist aber makefile deutlich einfacher und daher der empfohlene Weg.
Ein möglichst einfaches Programm zu haben ist immer ein Vorteil, weil man weniger Chancen hat, Fehler zu machen.
chris_adnap
User
Beiträge: 27
Registriert: Freitag 23. September 2022, 09:36

Hallo Sirius3

den Code hatte ich gestern schon mitgeteilt.
Gerne hier nochmals zusammengefasst, incl. wie ich nun mit dem bytearray arbeite. Können andere evtl. damit noch was anfangen.


Ich Frage trotzdem. Warum heißt es "Du hast jetzt einen Weg gefunden, mit recv zu arbeiten, der anscheinend funktioniert".
Gibt es damit sonst mehr Probleme? Gerne auch mit einem bsp. damit ich dies besser verstehe/nachvollziehen kann, warum von eurer Seite davon abgeraten wird.


- Im Normalfall ist aber makefile deutlich einfacher und daher der empfohlene Weg.
Ganz ehrlich, "normalfall/einfacher"... Diesen Weg kannte ich z.b. nicht.

Und ich meine aktuell 0 Zeichen Code mehr verbraucht zu haben. Denn diese Überprüfung hätte ich auch mit dem Weg über Makefile, etc. machen müssen.

Gestern beim Start kammen ca. 2000 Nachrichten innerhalb von 40 Sek. rein. Lief auch mit der kleineren Menge komplett stabil.
Aktuell seit 09:42:00 am laufen 3872 empfangene Nachrichten.

Code: Alles auswählen

class TCPHandler(socketserver.BaseRequestHandler):

    def handle(self) -> None:
        data_buffer = bytearray()		# Bytearray für die gesamte Socket Verbindung.
		
        while True:
            data_raw = self.request.recv(2048)	# Empfange Daten
			
            if not data_raw:			# Wenn "nichts/leer" empfangen wird. Beende ich den laufenden Socket.
                break
			
            data_buffer.extend(data_raw)	# Schreibe direkt empfangene Daten in den Puffer.
			
			
            # **************************************************************************************
            # Hier kommt nun meine Logik zum Einsatz. Ich prüfe den Bytestream ob es darin vollständige Daten gibt ...
            valid_data: list = []		# Vollständige Daten schreibe ich dann hier in die Liste, welche ich dann unten weiter Bearbeite
            while True:
                try:
                    # b'0011Z2start0012xxxxxxxxxxxx' # Bsp. eines vollständigen Datensatz.
                    header_len_int: int = int(data_buffer[0:4])
                    header_datenlange: int = int(data_buffer[11:14])
                    gesamte_daten_lange: int = len(data_buffer.decode('windows-1252'))
					
                    if data_buffer[0:5] == b'0011Z' and b'1' <= data_buffer[6:7] <= b'4' and data_buffer[7:11] == b'start' and header_datenlange > 0:	# prüfe nochmals, ob der Header im Grundaufbau vorhanden/korrekt ist.
                        soll_header_and_data_len = 4 + header_len_int + header_datenlange
                        if soll_header_and_data_len > gesamte_daten_lange:
                            # Dartensatz unvollständig. Breche schleife ab und warte auf neue Daten ...
                            break	
                        elif soll_header_and_data_len == gesamte_daten_lange:
                            # Exakt ein Datensatz vorhanden. Diesen übernehme ich und setze Puffer zurück.
                            valid_data.append(data_buffer[0:soll_header_and_data_len].decode('windows-1252'))
                            data_buffer = bytearray()
                            break
                        else:
                            # Mehrere Datensätze vorhanden.
                            # Übernehme vollständige Daten und lösche diesen aus dem Puffer. So lange (continue) bis eine der anderen Regeln zutrifft.
                            valid_data.append(data_buffer[0:soll_header_and_data_len].decode('windows-1252'))
                            data_buffer = data_buffer[soll_header_and_data_len:]
                            continue
                    else:
                        # ToDo Hier könnte ich mir noch den Fall/Fehler vorstellen, das der Header Fragmentiert übertragen wird. Dies könnter hier noch besser gehandhabt werden. Da warte ich mal die nächsten Tage einen solchen Fehler ab.
                        logger.debug(data_buffer)
                        return None
                except:	# <-- Ja aktuell so gewollt um auf alles ungewöhnliche zu reagieren und sich darauf einzustellen. Später wird dies präzisiert ...
                    logger.debug(data_buffer)
                    logger.debug(data_raw)
                    return None
            # **************************************************************************************
			
            for message in valid_data:
                pass # Hier verarbeite und reagiere ich dann auf vollständigen Daten...


def start_server():
    server = socketserver.ThreadingTCPServer((settings.get("0.0.0.0"), settings.get(12345)), TCPHandler)
    server.serve_forever()


if __name__ == "__main__":
    start_server()
Viele Grüße
Chris
__deets__
User
Beiträge: 14545
Registriert: Mittwoch 14. Oktober 2015, 14:29

Der Code ist doch gezeigt worden - viewtopic.php?p=425012#p425012

4 Zeilen vs deinem Konvolut - ich weiss, was ich bevorzugen wuerde, aber natuerlich darf man nach Rom auch via dem Kap der guten Hoffnung reisen, statt nur kurz ueder die Alpen zu huepfen.
Sirius3
User
Beiträge: 17754
Registriert: Sonntag 21. Oktober 2012, 17:20

Du benutzt TCPHandler, das bedeutet, dass Du nur einzelnen Socket hast, was die ganze Argumentation, warum man nicht makefile benutzen sollte, zusammenbrechen läßt.
Typannotationen sollten Sinn ergeben, bei `valid_data: list = []` ist es überflüssig, weil jeder sieht, dass valid_data eine Liste ist.
Die Liste an sich ist überflüssig, wenn Du die Daten direkt liest, weil es dann immer nur eine Nachricht geben kann.
Dann hast Du Dich in den Indizes verzählt, oder Dein Beispiel-Header entspricht nicht der Wahrheit.
Und jetzt kannst Du nochmal behaupten, dass die Lösung mit read gegenüber Deiner Lösung mit recv null Vorteile hat:

Code: Alles auswählen

    def handle(self):
        stream = self.request.makefile('rb')
        while True:
            data = stream.read(15)
            if len(data) < 15:
                break
            if data[:4] != b'0011' or data[5:6] not in [b'Z1', b'Z2', b'Z3', b'Z4'] or data[6:11] != b'start':
                # wrong header
                logger.error("unexpected data %s", data)
                break
            message_length = int(data[-4:])
            message = stream.read(message_length)
            if len(message) < message_length:
                # incomplete message
                break
            message = message.decode('windows-1252')
            # Hier verarbeite und reagiere ich dann auf vollständigen Daten...
chris_adnap
User
Beiträge: 27
Registriert: Freitag 23. September 2022, 09:36

Hallo,

danke für das kleine Bsp. ...

Code: Alles auswählen

stream = self.request.makefile('rb')
... ich wusste nicht das makefile direkt zum Socket gehört. Ich dachte das wär komplett etwas eigenständiges. Ein komplett anderer Aufbau.
Gestern mit den begriffen gesucht was das ist. Konnte da nichts finden. Erst jetzt gesehen, welche Funktion das ist.
Das es sich aber so einfach einbauen lässt, sry. damit habe ich nicht gerechnet.

Ja, es erspart durch .read(), etc... etwas arbeit. Aber jetzt auch nicht die Welt.


Ich war mir auch schon zu 100% sicher das man natürlich ...

Code: Alles auswählen

if data[:4] != b'0011' or data[5:6] not in [b'Z1', b'Z2', b'Z3', b'Z4'] or data[6:11] != b'start':
.. besser gestalten kann.

Du benutzt TCPHandler, das bedeutet, dass Du nur einzelnen Socket hast, was die ganze Argumentation, warum man nicht makefile benutzen sollte, zusammenbrechen läßt.
Mitgeteilt wurde mir, das die Gegenstelle auch mehrere Verbindungen aufbaut. Aktuell nur eine, auf diese Aussage muss/habe ich mich verlassen.
Würde makefile dann nicht mehr funktionieren?

Typannotationen sollten Sinn ergeben, bei `valid_data: list = []` ist es überflüssig, weil jeder sieht, dass valid_data eine Liste ist.
Da war ich mit, ganz oder gar nicht dabei. Macht Sinn.

Die Liste an sich ist überflüssig, wenn Du die Daten direkt liest, weil es dann immer nur eine Nachricht geben kann.
Warum? Aktuell über .recv() kann es mal b'0011Z2start0012xxxxxxxxxxxx', aber auch mal b'0011Z2start0012xxxxxxxxxxxx0011Z2start0012xxxxxxxxxxxx0011Z2start0012xxxxxxxxxxxx0011Z2start0012xxxxxxxxxxxx' ... sein.
Dies wird sich doch durch makefile doch nicht ändern, oder?


Dann hast Du Dich in den Indizes verzählt, oder Dein Beispiel-Header entspricht nicht der Wahrheit.
Ja, einen falschen Header kopiert. Ich teste auch aus, wie es auf falsch empfangene Nachrichten reagiert.

Und jetzt kannst Du nochmal behaupten, dass die Lösung mit read gegenüber Deiner Lösung mit recv null Vorteile hat:
An welcher Stelle habe ich gesagt null Vorteile. Wenn ich nicht mal weiß um welche Funktion es sich handelt, kann ich auch keine Aussage treffen.


Viele Grüße
Chris
Benutzeravatar
__blackjack__
User
Beiträge: 13117
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

Mit `makefile()` bekommst Du eine Datei und die puffert die Daten für Dich. Damit wird die Liste überflüssig weil Du ja immer nur eine Nachricht ausliest und verarbeitest. Ob da intern eine oder schon mehrere im Datei-Objekt warten macht für den Code ja keinen Unterschied mehr.
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
Antworten