Strukturproblem mit RabbitMQ

Sockets, TCP/IP, (XML-)RPC und ähnliche Themen gehören in dieses Forum
Fipsi
User
Beiträge: 21
Registriert: Dienstag 2. Juli 2024, 04:14

Hallo zusammen,

aufgrund der Empfehlung letzter Woche will ich mein System jetzt mit RabbitMQ umsetzen, hab aber noch "etwas" Verständnisprobleme, die teilweise auch sicher auf mein schwaches Englisch zurückzuführen sind. Damit ihr versteht/wisst, was ich vorhabe und bisher "verstanden" hab, mein System "kurz" erklärt:

In der "Mitte" steht ein Server, in Form eines Raspberry Pis, auf diesem ist RabbitMQ installiert und läuft das "Hauptscript" dieses Systems.
Es gibt zwei Typen von Clients, die "von außen" sich mit dem Rabbit noch verbinden. Die Anzahl der Typen ist variabl.
Die Scripte sind bisher alle mit Python und pika oder mit Angular und der lib @stomp/stompjs geplant.

Typ 1 kann in zwei weitere Untergruppen unterteilt werden: (der Einfachheit halber folgend "Displays" genannt)
Typ 1A sind weitere Raspberrys, die LED-Matrixen und ein paar GPIOs steuern. Außerdem sollen sie zyklisch ein paar Daten, wie z. B. auch CPU-Temperatur, an den Server senden, mit denen die überwacht werden können und ich bei diesem Beispiel weiß, ob ich das Gehäuse nochmal umarbeiten muss.
Typ 1B sind Angular-Anwendungen, das kann also jedes x-beliebiges Gerät wie ein Laptop oder Smartphone (und wahrscheinlich smartHome-Kühlschränke 🙃) sein, die zeigen nur auf dem Bildschirm was an (und evtl. Tonwiedergabe).

Typ 2 kann evtl. auch in zwei Geräte unterschieden werden, da bin ich mir noch nicht sicher (der Einfachheit halber Controller genannt).
Typ 2A ist ebenfalls eine Angular-Anwendung, mit der man die Displays steuern kann. Außerdem wird der aktuelle Status der Displays angezeigt.
Typ 2B könnte was a la ESP32 sein, ein Hardware-Client, mit z. B. Tasten zur Steuerung der Displays.

Zudem soll es möglich sein, dass die gesamten Clients nochmal unterteilt werden, wenn es mehrere räumliche Bereiche gibt, dann muss ich beiden Typen jeweils einen Bereich zuordnen, die unabhängig voneinander agieren.

Prinzipiell soll die Verarbeitung des Status' und der Controller nur auf dem Server laufen. Also die Displays sind "dumm", die bekommen nur vom Server gesagt, was sie anzuzeigen haben. Die Controller reden auch nicht direkt mit den Displays, sondern nur mit dem Server und sagen ihm, welche Aufgaben er zu tun hat. Je räumliche Aufteilung gibt's im Server einen eigenen Task, welcher z. B. Counter laufen lässt und bei Änderungen diese an die Display schickt.
Der Einfachheit halber geh ich jetzt von nur einem räumlichen Bereich, also keiner Client-Aufteilung aus, die aber möglich sein muss.
Ich hab mir das jetzt systematisch auf mehrere Kanäle ausgelegt:
Ein Kanal ist der Status der Displays, dieser wird an alle Clients, also Typ 1 und 2, (in selbem räumlichen Bereich) geschickt. Wichtig ist, dass es immer nur eine Nachricht geben darf. Das heißt, wird eine Nachricht versendet, aber kann nicht zugestellt werden, darf sie nur so lange leben, bis eine neue Nachricht kommt. Also sollte eine neue Nachricht kommen, muss die ältere gelöscht werden.
Der zweite Kanal ist der Status der Controller. Es gibt bei den Controllern die Möglichkeit Vorlagen anzulegen, welche bei den Displays nichts verloren haben, aber bei allen Controllern synchron sein soll. Gibt ein Controller einen neuen Befehl, gibt er das an alle anderen Controller und den Server weiter.
Der dritte Kanal wäre der für die Display-Infos, also die z. B. genannte CPU-Temperatur. Die soll an den Server geschickt werden, welcher die Infos dann an die Controller weiter leitet.
Beim vierten Kanal bin ich mir noch nicht ganz sicher: das soll der "neue Client"-Kanal sein. Sobald sich ein neuer Client zum Rabbit verbindet, muss er sich "vorstellen", also sagt welcher Typ Client er ist und welche technische Daten er hat (bei Displays z. B. die Anzahl der vorhandenen Textzeilen). Die Info muss an alle Controller gehen. Ist der neue Client ein Controller, kann er sich selbst einem Bereich zuordnen, ist es ein Display, bekommt er eine eindeutige Zahl, damit man weiß, welches Gerät das ist und von Controllern einem Bereich zugeordnet werden kann.

Jetzt kommt endlich der "spannende" Teil, die Umsetzung im RabbitMQ:
Soweit ich das verstanden hab, sind die Queues meine Kanäle und über die Exchanges sag ich, welche Kanäle genutzt werden?
Also mein Ansatz wäre jetzt gewesen, dass jeder Client, der sich neu verbindet, nur für sich selbst den Kanal 1 anlegt und als Consumer lauscht.
Außerdem geht jeder Client auf den gemeinsamen Kanal 4, damit neue Clients erkannt und bei den Controllern angezeigt werden kann. Die Displays können nach ihrer Bereichszuordnung, bzw. theoretisch sogar schon nach der Nummernzuteilung, von diesem Kanal abmelden.
Jeder Controller bekommt zudem seinen Kanal 2, über den die Controller-Infos ausgetauscht werden.
Und zu guter Letzt gibt es wieder einen gemeinsamen Kanal 3 für alle Clients, auf dem die Displays als producer fungieren und die Controller als consumer.

Die Queues bräuchte ich dann so, dass die Kanäle 1 und 2 immer nur jeweils die letzte Nachricht enthalten und die Kanäle 3 und 4 von allen Clients, und nicht nur von einem, abgehört werden. Lässt sich letzteres so überhaupt konfigurieren? Ich hab da noch irgendwie keine aussagekräftige Antwort verstanden, aber ich befürchte fast, dass ich da auch jedem Client eine eigene Queue erstellen muss?

Der nächste Punkt ist dann der Exchange, ich glaub, von dem hab ich effektiv noch gar nichts begriffen. Ich glaube, dass jede Nachricht eines Producers an einen Exchange geht und dieser verteilt dann die Nachricht(en) an die entsprechenden Queues? Ich bräuchte also für jeden Kanal in jedem räumlichen Bereich einen eigenen Exchange, in dem ich jeden Client-Kanal registrier, was über die Bindings abläuft? Für Kanal 1 und 2 bräuchte ich Exchanges vom Typ topic, für Kanal 3 und 4 den Typ fanout?
Wenn ich dann eine neue Nachricht schicken will brauch ich einmal den Namen des Exchanges, der die Kombi aus räumlichen Bereich und einem der 4 Kanäle ist? Was ist dann noch der Routing key, für was brauch ich den?

Mir schwirrt da langsam der Kopf :? :?:

Ich hoffe, das war jetzt nicht zu viel durcheinander und verwirrend, damit man versteht, worauf ich raus will und ihr mir helfen könntet, die Umsetzung zu konzipieren.

Vielen Dank im voraus und
liebe Grüße

Fipsi
Fipsi
User
Beiträge: 21
Registriert: Dienstag 2. Juli 2024, 04:14

Hallo zusammen,

ich sag's ganz ehrlich: ich bin gerade echt genervt 🙃. Und ja, wahrscheinlich ist meine herangehensweise nicht immer die richtige, aber das ist die, mit der ich normal arbeiten kann.
Tut mir leid, dass ich ganz "etwas" stinkig bin, aber diese Probleme gehen mir sowas von auf den Keks...
Ich werd mal wieder daran erinnern, warum ich Frameworks und Libs vermeide, soweit es geht. Und ich lern lösungsorientiert, weshalb Grundlagen fehlen, dessen bin ich mir bewusst. Aber wenn für mehr oder weniger allgegenwärtiges keine brauchbaren Erklärungen/Tutorials bei Google raus kommen, dann zerrt das an meinen Nerven.
Ich versuch gerade das Serverscript hinzubekommen und bleib an dem loopio hängen. Wie krieg ich da meinen eigenen Schuh rein? Ich find immer nur, dass alles auf Events von rabbitMQ reagiert, aber nicht, dass mein Script rabbit steuert und darauf reagiert.
Ich hab dieses Beispiel abgeändert: https://github.com/pika/pika/blob/main/ ... example.py
Die Example Consumer ist grundlegend gleich geblieben. Das Reconnect-Object hab ich "aufgelöst". Sieht im Moment wie folgt aus:

Code: Alles auswählen

def main():
    rabbit_client = RabbitClient()
    should_reconnect = True
    reconnect_delay = 2

    while True:
        try:
            rabbit_client.run()
            
            if rabbit_client.is_connected_trigger():
                print("Connected")
                rabbit_client.open_channel()
            
            if rabbit_client.is_channel_open_trigger():
                rabbit_client.setup_exchange("new-connection", "topic")
            
            if rabbit_client.setup_exchange_trigger():
                pass

        except KeyboardInterrupt:
            rabbit_client.stop()
            sys.exit(0)
    
        except Exception as e:
            traceback.print_exc()
            print("Shutdown")
            rabbit_client.stop()

        if should_reconnect and not rabbit_client.is_connected():
            rabbit_client.stop()

            time.sleep(reconnect_delay)
            rabbit_client = RabbitClient()

                
            
if __name__ == '__main__':
    main()

Code: Alles auswählen

#rabbitmq.py
Class RabbitClient():
	[...]
	def run(self):
        """Run the example consumer by connecting to RabbitMQ and then
        starting the IOLoop to block and allow the SelectConnection to operate.

        """
        self._connection = self.connect()
        self._connection.ioloop.start()
        [...]
Die main-Funktion läuft jetzt bist zum "rabbit_client.run()" und "steckt da fest".
Dass der Code so noch nicht viel Sinn macht ist mir bewusst, ich will das jetzt erst mal überhaupt zum laufen kriegen und verstehen können.

Für mich macht das ganze nicht wirklich Sinn. Mei Verständnis, bzw. was ich will, versuch ich mal in folgendem Pseudo-Code darzustellen:

Code: Alles auswählen

import time
import sys
import traceback
import pika

client = pika.SelectConnection([...])
last_time = time.time()
time_interval = 50

while True:
    try:
        client.run()
        # Prüft auf neue Nachrichten oder
        # verschickt im Buffer befindliche Nachrichten

        if client.new_messages():
            # Nachrichten abarbeiten
            pass
        
        if last_time + time_interval <= time.time():
            # meine Funktionen
            # z. B. Nachricht verschicken
            pass

    except disconnected:
        client.reconnect()
    
    except KeyboardInterrupt:
            sys.exit(0)
    
    except Exception as e:
        traceback.print_exc()
        print("Shutdown")
Ich nutze kein Tornado oder sonstige Konsorten, sondern einfach was wie der Pseudo-Code und zwar - sehr wichtig - nicht blockierend.

Habt noch einen schönen Sonntag
Fipsi
Sirius3
User
Beiträge: 18250
Registriert: Sonntag 21. Oktober 2012, 17:20

Klar blockiert `run` weil es die blockierende Variante ist.
Ich hatte ja schon in Deinem anderen Thread geschrieben, dass wenn Du verschiedene Dinge parallel machen willst, Du am besten auf async umsteigst.
Hier gibt es genug Beispiele: https://aio-pika.readthedocs.io/en/latest/
Fipsi
User
Beiträge: 21
Registriert: Dienstag 2. Juli 2024, 04:14

Warum heißt's dann überall, dass SelectConnection non-blocking ist? Oder steh ich da jetzt komplett auf dem Schlauch?
Ich schau mir deinen Link an, danke dafür.

Liebe Grüße
Benutzeravatar
DeaD_EyE
User
Beiträge: 1217
Registriert: Sonntag 19. September 2010, 13:45
Wohnort: Hagen
Kontaktdaten:

pika.SelectConnection - asynchronous adapter without third-party dependencies.
Intern läuft es dann auch asynchron, aber das ändert nichts an der blockierenden run-Methode des Clients.

Das ist so ähnlich wie bei asyncio. Wenn man dort die run-Funktion aufruft, blockiert diese Funktion.
Aber alles im Kontext von asyncio, ist nebenläufig und erreicht wird das mit Select.

Ich würde die asyncio Variante nutzen, wenn das Programm mehr machen soll, als nur diesen einen Client laufen zu lassen.
sourceserver.info - sourceserver.info/wiki/ - ausgestorbener Support für HL2-Server
Fipsi
User
Beiträge: 21
Registriert: Dienstag 2. Juli 2024, 04:14

@DeaD_EyE: okay. danke für die Erklärung

Ich versuch mich gerade an der asyncio-Variante, aber ich häng wieder an einer Blockierung. Ich bin mir ziemlich sicher, dass das an meinem nach wie vor lückenhaften Verständnis von asyncio liegt, aber 1,5 h googlen und experimentieren haben mich noch nicht weiter gebracht. Folgendes ist der aktuelle Code:

Code: Alles auswählen

#main.py
async def main():

    rabbit_client = RabbitClient()
    
    await rabbit_client.connect()
    
    #while True:
    try:
        #rabbit_client.run()

        incoming_messages = rabbit_client.get_incoming_messages()
        
        if len(incoming_messages) > 0:
            for message in incoming_messages:
                print(message.body)
            
        
    except KeyboardInterrupt:
        sys.exit(0)

    except Exception as e:
        traceback.print_exc()
        print("Shutdown")

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.run_forever()
    asyncio.run(main())

Code: Alles auswählen

#rabbit.py
class RabbitClient():

    def __init__(self):

        self._username = 'user'
        self._password = 'password'
        self._hostname = 'hostname'
        self._virtualHost = "v-host"
        
        self._incoming_messages = []
    
    async def connect(self):

        self._connection = await connect(
            host = self._hostname,
            login = self._username,
            password = self._password,
            virtualhost = self._virtualHost)
        
        async with self._connection:
            print(" [x] successfull connected to " + self._hostname)

            self._channel = await self._connection.channel()
            print(" [x] successfull opend channel")

            self._queue = await self._channel.declare_queue(
                name = "new-connection",
                durable = True)
            print(" [x] successfull opend queue")

            await self._queue.consume(
                callback = self._recive_messages,
                no_ack = True)

            #await asyncio.Future()

    
    async def run(self):
        await self._queue.consume(
            callback = self._recive_messages,
            no_ack = True)
        #asyncio.Future()
        pass

    async def _recive_messages(self, message: AbstractIncomingMessage):
        self._incoming_messages.append(message)
        print(" [x] Received message: %r" %message.body)
    
    def get_incoming_messages(self):
        messages = self._incoming_messages
        self._incoming_messages = []
        return messages
Gemacht hab ich bisher folgendes:
- in RabbitClient.connect das await asyncio.Future() aktiviert => blockiert. Neue Nachrichten werden im callback _recive_messages ausgegeben, aber im main() läuft kein folgender Code mehr
- asyncio.Future() auskommentiert => main while True: loop läuft, aber keine neuen Nachrichten werden empfangen
- asyncio.Future() und _queue.consume() jeweils einzeln in RabbitClient:run() "aktiviert", die run() in die main-While True-Loop eingebunden => nichts mehr passiert
- asyncio.Future() komplett entfernt, ebenso asyncio.run(main()) und stattdessen die get_event_loop()-Variante genommen => selbes Ergebnis, die vorhandenen Nachrichten im Queue werden ausgegeben, danach ist Sense.
- main while True-loop entfernt und die main mit get_event_loop laufen lassen => nix passiert

Mit Google und der Doc konnte mir keine non blocking-Alternative für asyncio.Future() finden.
Kann mir noch mal jemand das richtige zeigen? :?:

Vielen Dank und
liebe Grüße

P. S.: warum bekomm ich keine Benachrichtigung per E-Mail, wenn hier wer antwortet, aktiviert hab ich alles 🤔
Sirius3
User
Beiträge: 18250
Registriert: Sonntag 21. Oktober 2012, 17:20

Eine `await asyncio.Future()` wartet auf das Ergebnis eines Futures, da Du aber nirgends einen Task hast, der dieses Future füllt, wartet das natürlich für immer. In den Beispielen ist genau das gewollt.
`get_incoming_messages` ist nicht asynchron, es gibt also in Deiner Hauptschleife überhaupt keinen Ausstiegpunkt, an dem andere Tasks an die Reihe kommen könnten.

Mir ist nicht ganz klar, was Du jetzt eigentlich willst. Deine Messages kommen rein und werden in _recive_messages verarbeitet. Ansonsten hast Du noch keine Tasks, die nebenher was machen könnten.
Wenn Du asynchron Nachrichten verarbeitest, mußt Du Dich von einem linearen Programmablauf verabschieden.

Ansonsten gilt wie bei jedem nebenläufigen Programm, dass die verschiedenen Threads über Queues kommunizieren.

Code: Alles auswählen

ass RabbitClient:

    def __init__(self):
        self._username = "user"
        self._password = "password"
        self._hostname = "hostname"
        self._virtualHost = "v-host"
        self.incoming_messages = asyncio.Queue()

    async def connect(self):

        self._connection = await connect(
            host=self._hostname,
            login=self._username,
            password=self._password,
            virtualhost=self._virtualHost,
        )

        async with self._connection:
            print(" [x] successfull connected to " + self._hostname)

            self._channel = await self._connection.channel()
            print(" [x] successfull opend channel")

            self._queue = await self._channel.declare_queue(
                name="new-connection", durable=True
            )
            print(" [x] successfull opend queue")

            await self._queue.consume(callback=self._recive_messages, no_ack=True)

    async def run(self):
        await self._queue.consume(callback=self._recive_messages, no_ack=True)

    async def _recive_messages(self, message):
        await self.incoming_messages.put(message)
        print(" [x] Received message: %r" % message.body)


async def main():
    rabbit_client = RabbitClient()
    await rabbit_client.connect()

    while True:
        incoming_message = await rabbit_client.incoming_messages.get()
        print(incoming_message.body)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.run_forever()
    asyncio.run(main())
Benutzeravatar
__blackjack__
User
Beiträge: 13997
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@Fipsi Das sieht nach programmieren durch raten aus, das funktioniert nicht. Was denkst Du denn beispielsweise wie ``await asyncio.Future()`` jemals *nicht* bis in alle Ewigkeit blockieren sollte? Wer sollte denn da welches Ergebnis setzen‽

Eine ``async``-Funktion oder Methode muss man mit ``await`` aufrufen, oder mit dem Coroutinen-Objekt was da zurückgegeben wird entsprechend umgehen, damit das auch ausgeführt wird.
“The best book on programming for the layman is »Alice in Wonderland«; but that's because it's the best book on anything for the layman.” — Alan J. Perlis
Fipsi
User
Beiträge: 21
Registriert: Dienstag 2. Juli 2024, 04:14

__blackjack__ hat geschrieben: Dienstag 16. Juli 2024, 08:49 @Fipsi Das sieht nach programmieren durch raten aus
das ist es gerade gefühlt auch...

Jetzt steh ich komplett auf dem Schlauch 🤯

Andere Aufgaben gibt's gerade deshalb noch nicht, weil ich jetzt erst mal die Kommunikation auf die Reihe bringen will/wollte. Die Queues fütter ich aus der GUI.
Meine Intention war, dass der Client asynchron läuft. Wenn es neue Nachrichten gibt, holt er die halt async und steckt sie in meinen Buffer "_incoming_messages", welchen ich dann in der main while True-Loop abruf. In diesem Loop werden Zeit- und eventgesteuert auch andere Aktionen ausgeführt (die hierfür jetzt nicht relevant sein sollten). Ein "erweitertes" Object wollte ich aus dem aoi_pika machen, damit ich mir die Multiclient-Verwaltung erleichtere.
Meine Erwartung war, dass der Client in jedem Durchlauf prüft, ob er neue Nachrichten kriegt oder verschicken soll und die dann async im Buffer verstaut oder raus schickt, ohne den Loop zu blockieren. Mein Ansatz wäre also gewesen, dass ich im Loop eine client.run() hab, die dann eben immer prüft und ggf. was tut. Dafür hab ich aber wohl offensichtlich noch nicht so ganz die richtige Syntax gefunden. Threads hab ich bisher als nicht nötig erachtet, da es ja nicht parallel laufen muss, sondern es reicht, wenn's "irgendwann" passiert (ich hab etwa 100 ms Zeit).

Ich wollt wohl langsam den Kopf in Sand stecken *facepalm* *depremiert*

Liebe Grüße

Fipsi
Sirius3
User
Beiträge: 18250
Registriert: Sonntag 21. Oktober 2012, 17:20

Woher kommt jetzt eine GUI? Ich dachte es geht um die Kommunikation von verschiedenen Raspberries und einem Web-Frontend.
Und in diesem Setup gibt es keine Hauptschleife, die irgendwas nacheinander abarbeitet. Du arbeitest Event-basiert. Wenn ein Event kommt, dann wird das abgearbeitet.
Du solltest Dich von einem linearen Programmablauf verabschieden.
Fipsi
User
Beiträge: 21
Registriert: Dienstag 2. Juli 2024, 04:14

Die GUI ist die von RabbitMQ, das Management-Plugin. Damit versuch ich erst mal generell das System zu begreifen.
Die Hauptschleife soll das while True in der main() sein. Steht halt aktuell nichts drin, weil ich da erst mal die Nachrichten raus bekommen will, also über das get_incoming_messanges().
Ich glaub, ich weiß jetzt, wo der Wurm bzgl. Event-basiert drin steckt:
Der Rabbit läuft richtig eventbasiert, wenn er ne Nachricht kriegt, tut er was. Meine Hauptschleife hab ich fälschlicherweise auch eventbasiert bezeichnet, weil die nur was tun soll, wenn Bedingungen erfüllt sind (was ja kein echtes eventbasiert ist).
Um jetzt den Wurm wieder raus zu bekommen:
Der Rabbit-Client soll eventbasiert laufen und empfangene Nachrichten in _incoming_messages packen, welche dann in der main-Schleife - nicht eventbasiert - abgearbeitet werden.
Ich glaube - korrigiert mich gern, wenn ich falsch lieg -, dass das, was die Schleife abarbeiten soll, nicht event-basiert funktioniert, da da sehr viel mit time.time() (also wenn Zeiten abgelaufen sind) gearbeitet wird.

Ich hoffe, jetzt wird langsam ein verständlicher Schuh drauß...

Liebe Grüße

Fipsi
Sirius3
User
Beiträge: 18250
Registriert: Sonntag 21. Oktober 2012, 17:20

So lange es nicht wirklich einen Grund für eine Schleife gibt, die irgendwas abarbeitet, sollte es diese Schleife einfach nicht geben.
Bisher hast Du noch nichts gezeigt, was eine Schleife rechtfertigen würde. So lange Du also nicht konkreter wirst, was Du eigentlich machen möchtest, bleibt es dabei: es ist Unsinn, nicht-eventbasiert zu arbeiten, weil das gegen das Konzept Messagequeue ist.
Benutzeravatar
grubenfox
User
Beiträge: 601
Registriert: Freitag 2. Dezember 2022, 15:49

Fipsi hat geschrieben: Dienstag 16. Juli 2024, 12:35 Ich glaube - korrigiert mich gern, wenn ich falsch lieg -, dass das, was die Schleife abarbeiten soll, nicht event-basiert funktioniert, da da sehr viel mit time.time() (also wenn Zeiten abgelaufen sind) gearbeitet wird.
die Tatsache das "Genau jetzt" die Zeitdauer Y abgelaufen ist, ist eigentlich (meiner Meinung nach) auch ein Event... gut, man müsste diese ominöse 'Schleife' wohl komplett umprogrammieren um die ganzen time.time() durch was anderes zu ersetzen. (ob es dadurch besser/übersichtlicher wird, kann ich nicht beurteilen)
Fipsi hat geschrieben: Dienstag 16. Juli 2024, 12:35 Meine Hauptschleife hab ich fälschlicherweise auch eventbasiert bezeichnet, weil die nur was tun soll, wenn Bedingungen erfüllt sind (was ja kein echtes eventbasiert ist).
... und das erste was ich wohl beim Event-Handling machen würde, ist Bedingungen zu prüfen damit ich bzw. der aufgerufene Eventhandler weiß ob es überhaupt etwas für `mich` zu tun gibt und welcher Codeteil vom Eventhandler dafür zuständig ist.
Also ich sehe da auch keinen Grund dafür dass die Hauptschleife nicht eventbasiert sein kann.
Fipsi
User
Beiträge: 21
Registriert: Dienstag 2. Juli 2024, 04:14

grubenfox hat geschrieben: Dienstag 16. Juli 2024, 13:39 die Tatsache das "Genau jetzt" die Zeitdauer Y abgelaufen ist, ist eigentlich (meiner Meinung nach) auch ein Event... gut, man müsste diese ominöse 'Schleife' wohl komplett umprogrammieren um die ganzen time.time() durch was anderes zu ersetzen. (ob es dadurch besser/übersichtlicher wird, kann ich nicht beurteilen)
Ich seh das generell auch als eventbasiert (deshalb hab ich's ja bisher bezeichnet), aber es is halt kein "von außen getriggertes" Event, wie die aio_pika.
grubenfox hat geschrieben: Dienstag 16. Juli 2024, 13:39 ... und das erste was ich wohl beim Event-Handling machen würde, ist Bedingungen zu prüfen damit ich bzw. der aufgerufene Eventhandler weiß ob es überhaupt etwas für `mich` zu tun gibt und welcher Codeteil vom Eventhandler dafür zuständig ist.
Also ich sehe da auch keinen Grund dafür dass die Hauptschleife nicht eventbasiert sein kann.
Das ist klar, dass ich die Bedingungen prüfen muss, sonst kommt ja nur Blödsinn bei raus.
Aber das gerade anstehende Problem ist ja für mich, dass ich die Hauptschleife nicht zum laufen bekomm. Bzw. die Hauptschleife ODER die aio_pika läuft. Anders ausgedrückt: Ich such immer noch die Methode, bzw. Syntax, mit der ich die aio_pika in der Hauptschleife am Leben halten kann..

Liebe Grüße

Fipsi
Sirius3
User
Beiträge: 18250
Registriert: Sonntag 21. Oktober 2012, 17:20

@Fipsi: wie das geht, habe ich Dir ja gezeigt. Wo hast Du also noch ein Problem?

Und wie ich schon geschrieben hatte, ohne dass Du Deinen wirklichen Code zeigst, kann man an der Stelle auch nicht weiterhelfen.
Fipsi
User
Beiträge: 21
Registriert: Dienstag 2. Juli 2024, 04:14

@Sirius3: Okay, wow.. ich hab's tatsächlich geschafft deinen Codeblock zu übersehen. Tut mir wirklich leid.
Wenn ich deinen Code 1:1 übernehm (bis auf die Host-Daten) und teste, dann hab ich das Ergebnis, dass zum einem die main()-Loop nicht durchläuft (erkannt durch eingefügen print()) UND wohl keine dauerhafte Verbindung zustande kommt. Wenn beim Scriptstart Nachrichten in der Queue sind, werden die gelesen und ausgegeben, sind keine vorhanden, passiert natürlich nichts. Kommen nach Scriptstart Nachrichten in die Queue passiert ebenfalls nichts. Auch kann ich im Management-UI keine Verbindung zu meinem Script sehen, die davor angezeigt wurde (wird also wohl gleich wieder geschlossen?).

Ich hab nicht wirklich mehr Code, den ich zeigen kann. Mit RabbitMQ ist das das einzige, was ich bisher hab, weil es für mich nicht viel Sinn macht weiter zu machen, wenn die Verbindung nicht hin haut. Ansonsten hab ich nur den anderen Code mit der socket-lib, von dem ja nicht mehr als das Konzept geblieben ist.

Liebe Grüße

Fipsi
Sirius3
User
Beiträge: 18250
Registriert: Sonntag 21. Oktober 2012, 17:20

Der with-Block ist an der falschen Stellen, denn sobald with verlassen wird, ist die Verbindung auch schon wieder geschlossen.

Code: Alles auswählen

class RabbitClient:

    def __init__(self):
        self.incoming_messages = asyncio.Queue()

    async def connect(self):
        self._connection = await connect(
            host=HOSTNAME,
            login=USERNAME,
            password=PASSWORD,
            virtualhost=VIRTUAL_HOST,
        )

        print(f" [x] successfull connected to {HOSTNAME}")

        self._channel = await self._connection.channel()
        print(" [x] successfull opend channel")

        self._queue = await self._channel.declare_queue(
            name="new-connection", durable=True
        )
        print(" [x] successfull opend queue")

        await self._queue.consume(callback=self._recive_messages, no_ack=True)
        return self._connection

    async def _recive_messages(self, message):
        await self.incoming_messages.put(message)
        print(f" [x] Received message: {message.body}")


async def main():
    rabbit_client = RabbitClient()
    async with await rabbit_client.connect():
        while True:
            incoming_message = await rabbit_client.incoming_messages.get()
            print(incoming_message.body)


if __name__ == "__main__":
    asyncio.run(main())
Aber eigentlich sollte der Code viel einfacher aussehen:

Code: Alles auswählen

import asyncio
import aio_pika

URL = "amqps://user:password@host/vhost"

async def process_message(message):
    async with message.process():
        print(message.body)

async def main():
    async with await aio_pika.connect_robust(URL) as connection:
        channel = await connection.channel()
        queue = await channel.declare_queue("new-connection", durable=True)
        await queue.consume(process_message)

        # Wait forever
        await asyncio.Future()

if __name__ == "__main__":
    asyncio.run(main())
Fipsi
User
Beiträge: 21
Registriert: Dienstag 2. Juli 2024, 04:14

also langsam fang ich echt an zu zweifeln...
Ich hab beide Codes von dir gerade getestet. Beim ersten bleibt das Script bei await rabbit_client.incoming_messages.get() hängen und läuft erst nach eingegangener Nachricht weiter. Ich hab den Code folgend ergänzt, um das zu erreichen, was ich vor hab:

Code: Alles auswählen

async def main():
    rabbit_client = RabbitClient()

    last_time = time.time()
    counter = 0

    async with await rabbit_client.connect():
        while True:
            incoming_message = await rabbit_client.incoming_messages.get()
            print(incoming_message.body)
            
            if last_time + 1 <= time.time():
                counter += 1
                last_time += 1
                print(counter)
Der zweite Code bleibt wie gehabt am asyncio.Future() hängen, die danach eingefügte time-Bedingung wird gar nicht abgefragt.
Mich beschleicht des Gefühl, dass ich die Abfrage der time-Bedingung mit einem await versehen muss, aber ich komm einfach nicht richtig drauf..
Sirius3
User
Beiträge: 18250
Registriert: Sonntag 21. Oktober 2012, 17:20

Natürlich blockiert get wenn keine Nachricht da ist, das ist ja der Sinn davon, dass nicht unnötigerweise eine Schleife läuft, die nichts macht.
Und das zweite Beispiel zeigt ja, wie es sein sollte, nämlich, dass es gar keine Hauptschleife gibt.
Wenn Du etwas neben den Nachrichten noch regelmäßig machen möchtest, gibt es sowas wie aiocron.


Hier das Beispiel, wie Du neben dem Task, der Nachrichten empfängt, auch einen hast, der zählt:

Code: Alles auswählen

import asyncio
import aio_pika
from itertools import count

URL = "amqps://user:password@host/vhost"

async def async_count():
    for counter in count():
        print(f"Counter: {counter}\n")
        await asyncio.sleep(1)


async def process_message(message):
    async with message.process():
        print(message.body)

async def main():
    task = asyncio.create_task(async_count())
    async with await aio_pika.connect_robust(URL) as connection:
        channel = await connection.channel()
        queue = await channel.declare_queue("new-connection", durable=True)
        await queue.consume(process_message)

        # Wait forever
        await asyncio.Future()

if __name__ == "__main__":
    asyncio.run(main())
Fipsi
User
Beiträge: 21
Registriert: Dienstag 2. Juli 2024, 04:14

Ich glaub, ich werd gleich mit ner Tastatur verhauen 🫣.
Soweit ich das gerade kopiert und versucht hab umzustellen, ist das für mich nicht zielführend. Ein einfacher Counter in einem anderen Task, der nicht mit dem rabbit-Client, der alles blockiert, solang er keine Nachricht bekommt, "ineinandergreifend" Arbeiten kann, bringt mir nicht wirklich was.

Ich versuch jetzt mal meinen alten Code mit der Socket-Variante, die ja für mich genau so funktioniert hat, wie ich sie brauch (übersehene Fehlerquellen usw. mal ignoriert), zu zeigen, was ich brauch/worauf ich hinaus will (ich versuch das nicht relevante aus dem Code zu entfernen, damit's ein wenig übersichtlicher wird):

Code: Alles auswählen

#sockets.py
import select
import socket
import queue
import json
import traceback

class Sockets():

    def __init__(self):
        self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        
        self.server.setblocking(0)

        self.server_adress = ('', 14400)
        self.server.bind(self.server_adress)
        print("Socket-Server open at " + self.server_adress[0] + ":" + str(self.server_adress[1]))

        self.server.listen(20)

        self.inputs = [self.server]
        self.outputs = []
        self.connections = {}
        self.msg_recv = {}
        self.msg_check_recv = {}
        self.message_queues = {}
        self.new_connections = []
        self.del_connections = []

    def run(self):
        readable, writeable, exceptional = select.select(self.inputs, self.outputs, self.inputs)

        for s in readable:
            if s is self.server:
                connection, client_adress = s.accept()

                connection.setblocking(0)
                self.inputs.append(connection)
                self.connections[connection.getpeername()[1]] = connection
                self.new_connections.append(connection.getpeername()[1])
                self.message_queues[connection.getpeername()[1]] = queue.Queue()
                self.msg_recv[connection.getpeername()[1]] = b""
                self.msg_check_recv[connection.getpeername()[1]] = []

            else:
                try:
                    data = s.recv(1024)
                except:
                    print("Break Connection while reciving")
                    data = False

                if data:
                    if s.getpeername()[1] not in self.msg_recv:
                        self.msg_recv[s.getpeername()[1]] = b""

                    self.msg_recv[s.getpeername()[1]] += data

                    if s not in self.outputs:
                        self.outputs.append(s)

                else:
                    try:
                        self.del_connection(s.getpeername()[1], s)
                    except:
                        pass

        for s in writeable:
            #pass
            next_msg = self.message_queues[s.getpeername()[1]].get_nowait()
            s.sendall(bytes(json.dumps(next_msg) + "\n", encoding="utf-8"))

        for s in exceptional:
            self.del_connection(s.getpeername(), s)

        self.check_recv_msg()

    def read_msg(self, peername):
        return_ = False

        if peername in self.msg_check_recv and len(self.msg_check_recv[peername]) != 0:
            return_ = []
            
            if len(self.msg_check_recv[peername]) != 0:
                try:
                    for msg in self.msg_check_recv[peername]:
                        
                        return_.append(json.loads(msg))
                    #return_ = json.loads(self.msg_check_recv[peername])
                    self.msg_check_recv[peername] = []
                
                except:
                    traceback.print_exc()
                    self.msg_check_recv[peername] = []
                    pass

        return return_

    def send_msg(self, peername, msg):
        return_ = False

        if peername in self.message_queues:
            self.message_queues[peername].put(msg)

            return_ = True

        return return_

    def check_recv_msg(self):

        for peername in self.msg_recv:
            message = self.msg_recv[peername]

            if len(message) != 0 and b"\n" in message:
                messages = message.split(b"\n")
                i = 0
                while i < len(messages) - 1:
                    self.msg_check_recv[peername].append(messages[i].decode("utf-8"))
                    message = message[(len(messages[i]) + 1):]

                    i += 1

                self.msg_recv[peername] = message

    def get_new_connection(self):
        return_ = False

        if len(self.new_connections) != 0:
            return_ = self.new_connections

            self.new_connections = []

        return return_


#main.py
import sys
import time
import traceback
from sockets import Sockets
from area import Area
from datetime import datetime
from controller import Controller
from display import Display

sockets = Sockets()

area_list = {}
controller_list = {}
display_list = {}

new_connections = []
del_connections = []

display_nr = 1
display_nrs = {}
display_state = {}
changed_display_state = False

while True:

    try:
        sockets.run()

        n_c = sockets.get_new_connection()
        d_c = sockets.get_del_connection()

        if n_c != False and len(n_c) != 0:
            print("Neuer Client entdeckt")
            for peername in n_c:
                new_connections.append(peername)

        if d_c != False and len(d_c) != 0:
            print("Client disconnected")
            for peername in d_c:
                if peername in controller_list:
                    print("Controller " + str(peername) + " disconnected")
                elif peername in display_list:
                    print("Display " + str(peername) + " disconnected")
                del_connections.append(peername)

        if len(new_connections) != 0:
            # neuen Client behandeln
            

        if len(area_list) != 0:
            # wenn diese Liste nicht leer ist, KANN ein Timer laufen, der verarbeitet werden muss,
            # und dabei Status-Änderungen bewirken kann. Statusänderungen werden im area-Object
            # gemerkt

        if len(controller_list) != 0:
            # wenn controller verbunden sind, wird hier auf Nachrichten geprüft und diese ggf.
            # an die entsprechende area weitergegeben, die im nächten loop-lauf das Kommando
            # verarbeitet

        if len(display_list) != 0:
            # wenn displays verbunden sind und eine Statusänderung im zugeordneten area-Objekt
            # gespeichert wurde, wird die Statusänderung hier an die Displays geschickt

        if len(del_connections) != 0:
            # Verarbeitung von geschlossenen Verbindungen

    except KeyboardInterrupt:
        sys.exit(0)
    
    except Exception as e:
        traceback.print_exc()
        print("Shutdown")
        time.sleep(10)
        pass
Ich hoffe, es ist damit verständlich, was ich will.
Ein "ich blockier jetzt einfach alles, bis jemand was von mir will", für mich/meinen Kopf der größte Blödsinn, damit kann ich - in diesem Fall - nicht arbeiten. Bei den Displays könnte das funktionieren, aber beim Server und Controller seh ich damit nur sch***, den mein Hirn niemals zum laufen bringen wird. :x
Bitte versteht mich nicht falsch, ich bin dankbar für Ratschläge und Hilfen, aber das, was wir hier bisher aufgestellt haben, kriegt mein Hirn niemals auf die Reihe. Für mein Hirn darf NICHTS blockierend sein. Ich brauch ein "du hast nichts? Na dann weiter zum nächsten", damit ich n Programm auf die Reihe krieg.

Liebe Grüße

Fipsi
Antworten