Strukturproblem mit RabbitMQ

Sockets, TCP/IP, (XML-)RPC und ähnliche Themen gehören in dieses Forum
Sirius3
User
Beiträge: 18270
Registriert: Sonntag 21. Oktober 2012, 17:20

Du mußt wegkommen von dem Gedanken, dass es irgendwo eine große Schleife gibt, die alles kontrolliert.
Ich habe in meinem letzten Beispiel gezeigt, wie einzelne, kleine Schleifen nebeneinander laufen können, ohne dass sie sich gegenseitig blockieren.

Nichts, was Du in Deiner großen Schleife hast, muß in der großen Schleife sein, weil Du nur auf einzelne Ereignisse reagierst. Wenn ein neues Display verbunden wurde, dann verarbeitst Du dieses Ereignis in einer process_message funktion.
Wenn ein Web-Controller einen Anweisung schickt, dann verarbeitest Du das in der entsprechenden Message-Funktion von einem async-socketio-Server.
Wenn Du jede Sekunde auf allen Displays die Uhrzeit aktualisieren willst, dann machst Du das in einer seperaten Funktion, die alle Sekunde die Uhrzeit aktualisiert.
Benutzeravatar
__blackjack__
User
Beiträge: 14044
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

Vor allem braucht man nicht mehr selbst umständlich mit `select()` herumhantieren, sondern kann einfach für jede Clientverbindung einen eigenen Task starten der sich nur um den jeweiligen Client kümmert. Also im Grunde als wenn man das mit Threads machen würde, aber eben ohne die zusätzlichen Threads.
„A life is like a garden. Perfect moments can be had, but not preserved, except in memory. LLAP” — Leonard Nimoy's last tweet.
Benutzeravatar
pillmuncher
User
Beiträge: 1530
Registriert: Samstag 21. März 2009, 22:59
Wohnort: Pfaffenwinkel

Fipsi hat geschrieben: Mittwoch 17. Juli 2024, 08:53

Code: Alles auswählen

                    data = s.recv(1024)
recv() funktioniert so nicht. Es werden Daten geliefert bis maximal 1024 Bytes, es können aber auch weniger sein. Insbesondere können es weniger sein, als geschickt wurden. Es ist deine Aufgabe, solange Daten zu empfangen, bis die Nachricht komplett angekommen ist. Das macht man idR. in einer Schleife. Ein weiterer Grund, nicht direkt sockets und recv() zu verwenden, sondern eine der Möglichkeiten, die dir hier vorgeschlagen wurden.
In specifications, Murphy's Law supersedes Ohm's.
Sirius3
User
Beiträge: 18270
Registriert: Sonntag 21. Oktober 2012, 17:20

@pillmuncher: Ausnahmsweise ist der Code korrekt. Per select werden einzelne Pakete gesammelt und in check_recv_msg geschaut, ob ein komplettes Paket mit \n abgeschlossen vorliegt.
Fipsi
User
Beiträge: 21
Registriert: Dienstag 2. Juli 2024, 04:14

Sirius3 hat geschrieben: Mittwoch 17. Juli 2024, 12:54 Du mußt wegkommen von dem Gedanken, dass es irgendwo eine große Schleife gibt, die alles kontrolliert.
Ich habe in meinem letzten Beispiel gezeigt, wie einzelne, kleine Schleifen nebeneinander laufen können, ohne dass sie sich gegenseitig blockieren.

Nichts, was Du in Deiner großen Schleife hast, muß in der großen Schleife sein, weil Du nur auf einzelne Ereignisse reagierst. Wenn ein neues Display verbunden wurde, dann verarbeitst Du dieses Ereignis in einer process_message funktion.
Wenn ein Web-Controller einen Anweisung schickt, dann verarbeitest Du das in der entsprechenden Message-Funktion von einem async-socketio-Server.
Wenn Du jede Sekunde auf allen Displays die Uhrzeit aktualisieren willst, dann machst Du das in einer seperaten Funktion, die alle Sekunde die Uhrzeit aktualisiert.
Juhu, hab also wirklich keine Wahl außer mich von jeder Programmierweise, wie ich bisher hatte, komplett los lösen.. na das wird lustig.. gut, dass ich hier keinen Zeitdruck hab *hüstel*

Dann hab ich jetzt noch zwei Fragen:
- Bisher kam von euch bei den Beispielen mit Timer immer asynchio.sleep(time), während ich in meinen Schleifen bei last_time + time <= time.time(); last_time + time geblieben bin. Das ist hier wichtig, weil nicht aus 300 Sekunden übertrieben gesagt 310 Sekunden werden, weil die Prozesszeit, die vor dem sleep läuft, nicht "übersehen" werden darf. Wie kann ich das lösen?

- Es ist wahrscheinlich schon aufgefallen, dass ich versuch alles in Klassen und Objekte zu schieben, für's einfachere Handling. Wie krieg ich die zwischen den Tasks hin und her geschubst? Ich hatte gestern mal einen Versuch mit global gewagt:

Code: Alles auswählen

global messages
messages = asyncio.Queue()

async def async_timer():

    if messages.qsize() > 0:
        message = await messages.get()
        print(message)
    

async def process_message(message):
    async with message.process():
        await messages.put(message.body)
        print("proccesed")

async def main():
    task = asyncio.create_task(async_count())
    async with await aio_pika.connect_robust(host=hostname, login=username, password=password, virtualhost=virtualHost) as connection:
        channel = await connection.channel()
        queue = await channel.declare_queue("new-connection", durable=True)
        await queue.consume(process_message)

        # Wait forever
        await asyncio.Future()

if __name__ == "__main__":
    asyncio.run(main())
Der hat aber nicht funktioniert.
Ich weiß von andren Sprachen zumindest, dass man Objekte/Eigenschaften in eine Methode übergeben kann und da keine Kopie erstellt wird, sondern innerhalb und außerhalb der Methode das selbe Objekt ist. Mit fällt gerade ums verrecken der Begriff dafür nicht ein... das wird's bei Python doch auch geben?
Sirius3 hat geschrieben: Mittwoch 17. Juli 2024, 15:54 @pillmuncher: Ausnahmsweise ist der Code korrekt. Per select werden einzelne Pakete gesammelt und in check_recv_msg geschaut, ob ein komplettes Paket mit \n abgeschlossen vorliegt.
Na irgendwas muss ich ja auch mal richtig machen :lol: :D

Vielen Dank für eure Geduld mit mir
liebe Grüße

Fipsi
Sirius3
User
Beiträge: 18270
Registriert: Sonntag 21. Oktober 2012, 17:20

Niemand hindert dich daran, statt Funktionen eine Klasse mit Methoden zu schreiben, vor allem wenn Du gemeinsamen Zustand hast. Die Beispiele haben das nicht, von daher wäre eine Klasse übertrieben. global benutzt man nicht.
Auch wenn Du mit exakten Zeitpunkten arbeiten möchtest, dann benutzt man, wie schon geschrieben sowas wie asyn-cron. Will man das per Hand selbst programmieren, würde einfach die Zeit messen:

Code: Alles auswählen

async def async_count():
    next_time_slot = time.time() + 3
    for counter in count():
        print(f"Counter: {counter}\n")
        await asyncio.sleep(next_time_slot - time.time())
        next_time_slot += 3
Dein Timer ist wieder falsch, weil ein Timer nicht dazu da ist, Messages zu verarbeiten, sondern das passiert in process_message. Und man würde nicht qsize testen, sondern einfach get_nowait benutzen.
Fipsi
User
Beiträge: 21
Registriert: Dienstag 2. Juli 2024, 04:14

Den messages.get() war jetzt Macht der Gewohnheit in der Schleife 😇
Okay, dann versuch ich da jetzt mal, dass ich was Funktionierendes hin bekomm 😬

Vielen Dank euch allen für eure Geduld mit mir 🤗
Liebe Grüße und
schönes Wochenende

Fipsi
Antworten