Strukturproblem mit RabbitMQ

Sockets, TCP/IP, (XML-)RPC und ähnliche Themen gehören in dieses Forum
Antworten
Fipsi
User
Beiträge: 12
Registriert: Dienstag 2. Juli 2024, 04:14

Hallo zusammen,

aufgrund der Empfehlung letzter Woche will ich mein System jetzt mit RabbitMQ umsetzen, hab aber noch "etwas" Verständnisprobleme, die teilweise auch sicher auf mein schwaches Englisch zurückzuführen sind. Damit ihr versteht/wisst, was ich vorhabe und bisher "verstanden" hab, mein System "kurz" erklärt:

In der "Mitte" steht ein Server, in Form eines Raspberry Pis, auf diesem ist RabbitMQ installiert und läuft das "Hauptscript" dieses Systems.
Es gibt zwei Typen von Clients, die "von außen" sich mit dem Rabbit noch verbinden. Die Anzahl der Typen ist variabl.
Die Scripte sind bisher alle mit Python und pika oder mit Angular und der lib @stomp/stompjs geplant.

Typ 1 kann in zwei weitere Untergruppen unterteilt werden: (der Einfachheit halber folgend "Displays" genannt)
Typ 1A sind weitere Raspberrys, die LED-Matrixen und ein paar GPIOs steuern. Außerdem sollen sie zyklisch ein paar Daten, wie z. B. auch CPU-Temperatur, an den Server senden, mit denen die überwacht werden können und ich bei diesem Beispiel weiß, ob ich das Gehäuse nochmal umarbeiten muss.
Typ 1B sind Angular-Anwendungen, das kann also jedes x-beliebiges Gerät wie ein Laptop oder Smartphone (und wahrscheinlich smartHome-Kühlschränke 🙃) sein, die zeigen nur auf dem Bildschirm was an (und evtl. Tonwiedergabe).

Typ 2 kann evtl. auch in zwei Geräte unterschieden werden, da bin ich mir noch nicht sicher (der Einfachheit halber Controller genannt).
Typ 2A ist ebenfalls eine Angular-Anwendung, mit der man die Displays steuern kann. Außerdem wird der aktuelle Status der Displays angezeigt.
Typ 2B könnte was a la ESP32 sein, ein Hardware-Client, mit z. B. Tasten zur Steuerung der Displays.

Zudem soll es möglich sein, dass die gesamten Clients nochmal unterteilt werden, wenn es mehrere räumliche Bereiche gibt, dann muss ich beiden Typen jeweils einen Bereich zuordnen, die unabhängig voneinander agieren.

Prinzipiell soll die Verarbeitung des Status' und der Controller nur auf dem Server laufen. Also die Displays sind "dumm", die bekommen nur vom Server gesagt, was sie anzuzeigen haben. Die Controller reden auch nicht direkt mit den Displays, sondern nur mit dem Server und sagen ihm, welche Aufgaben er zu tun hat. Je räumliche Aufteilung gibt's im Server einen eigenen Task, welcher z. B. Counter laufen lässt und bei Änderungen diese an die Display schickt.
Der Einfachheit halber geh ich jetzt von nur einem räumlichen Bereich, also keiner Client-Aufteilung aus, die aber möglich sein muss.
Ich hab mir das jetzt systematisch auf mehrere Kanäle ausgelegt:
Ein Kanal ist der Status der Displays, dieser wird an alle Clients, also Typ 1 und 2, (in selbem räumlichen Bereich) geschickt. Wichtig ist, dass es immer nur eine Nachricht geben darf. Das heißt, wird eine Nachricht versendet, aber kann nicht zugestellt werden, darf sie nur so lange leben, bis eine neue Nachricht kommt. Also sollte eine neue Nachricht kommen, muss die ältere gelöscht werden.
Der zweite Kanal ist der Status der Controller. Es gibt bei den Controllern die Möglichkeit Vorlagen anzulegen, welche bei den Displays nichts verloren haben, aber bei allen Controllern synchron sein soll. Gibt ein Controller einen neuen Befehl, gibt er das an alle anderen Controller und den Server weiter.
Der dritte Kanal wäre der für die Display-Infos, also die z. B. genannte CPU-Temperatur. Die soll an den Server geschickt werden, welcher die Infos dann an die Controller weiter leitet.
Beim vierten Kanal bin ich mir noch nicht ganz sicher: das soll der "neue Client"-Kanal sein. Sobald sich ein neuer Client zum Rabbit verbindet, muss er sich "vorstellen", also sagt welcher Typ Client er ist und welche technische Daten er hat (bei Displays z. B. die Anzahl der vorhandenen Textzeilen). Die Info muss an alle Controller gehen. Ist der neue Client ein Controller, kann er sich selbst einem Bereich zuordnen, ist es ein Display, bekommt er eine eindeutige Zahl, damit man weiß, welches Gerät das ist und von Controllern einem Bereich zugeordnet werden kann.

Jetzt kommt endlich der "spannende" Teil, die Umsetzung im RabbitMQ:
Soweit ich das verstanden hab, sind die Queues meine Kanäle und über die Exchanges sag ich, welche Kanäle genutzt werden?
Also mein Ansatz wäre jetzt gewesen, dass jeder Client, der sich neu verbindet, nur für sich selbst den Kanal 1 anlegt und als Consumer lauscht.
Außerdem geht jeder Client auf den gemeinsamen Kanal 4, damit neue Clients erkannt und bei den Controllern angezeigt werden kann. Die Displays können nach ihrer Bereichszuordnung, bzw. theoretisch sogar schon nach der Nummernzuteilung, von diesem Kanal abmelden.
Jeder Controller bekommt zudem seinen Kanal 2, über den die Controller-Infos ausgetauscht werden.
Und zu guter Letzt gibt es wieder einen gemeinsamen Kanal 3 für alle Clients, auf dem die Displays als producer fungieren und die Controller als consumer.

Die Queues bräuchte ich dann so, dass die Kanäle 1 und 2 immer nur jeweils die letzte Nachricht enthalten und die Kanäle 3 und 4 von allen Clients, und nicht nur von einem, abgehört werden. Lässt sich letzteres so überhaupt konfigurieren? Ich hab da noch irgendwie keine aussagekräftige Antwort verstanden, aber ich befürchte fast, dass ich da auch jedem Client eine eigene Queue erstellen muss?

Der nächste Punkt ist dann der Exchange, ich glaub, von dem hab ich effektiv noch gar nichts begriffen. Ich glaube, dass jede Nachricht eines Producers an einen Exchange geht und dieser verteilt dann die Nachricht(en) an die entsprechenden Queues? Ich bräuchte also für jeden Kanal in jedem räumlichen Bereich einen eigenen Exchange, in dem ich jeden Client-Kanal registrier, was über die Bindings abläuft? Für Kanal 1 und 2 bräuchte ich Exchanges vom Typ topic, für Kanal 3 und 4 den Typ fanout?
Wenn ich dann eine neue Nachricht schicken will brauch ich einmal den Namen des Exchanges, der die Kombi aus räumlichen Bereich und einem der 4 Kanäle ist? Was ist dann noch der Routing key, für was brauch ich den?

Mir schwirrt da langsam der Kopf :? :?:

Ich hoffe, das war jetzt nicht zu viel durcheinander und verwirrend, damit man versteht, worauf ich raus will und ihr mir helfen könntet, die Umsetzung zu konzipieren.

Vielen Dank im voraus und
liebe Grüße

Fipsi
Fipsi
User
Beiträge: 12
Registriert: Dienstag 2. Juli 2024, 04:14

Hallo zusammen,

ich sag's ganz ehrlich: ich bin gerade echt genervt 🙃. Und ja, wahrscheinlich ist meine herangehensweise nicht immer die richtige, aber das ist die, mit der ich normal arbeiten kann.
Tut mir leid, dass ich ganz "etwas" stinkig bin, aber diese Probleme gehen mir sowas von auf den Keks...
Ich werd mal wieder daran erinnern, warum ich Frameworks und Libs vermeide, soweit es geht. Und ich lern lösungsorientiert, weshalb Grundlagen fehlen, dessen bin ich mir bewusst. Aber wenn für mehr oder weniger allgegenwärtiges keine brauchbaren Erklärungen/Tutorials bei Google raus kommen, dann zerrt das an meinen Nerven.
Ich versuch gerade das Serverscript hinzubekommen und bleib an dem loopio hängen. Wie krieg ich da meinen eigenen Schuh rein? Ich find immer nur, dass alles auf Events von rabbitMQ reagiert, aber nicht, dass mein Script rabbit steuert und darauf reagiert.
Ich hab dieses Beispiel abgeändert: https://github.com/pika/pika/blob/main/ ... example.py
Die Example Consumer ist grundlegend gleich geblieben. Das Reconnect-Object hab ich "aufgelöst". Sieht im Moment wie folgt aus:

Code: Alles auswählen

def main():
    rabbit_client = RabbitClient()
    should_reconnect = True
    reconnect_delay = 2

    while True:
        try:
            rabbit_client.run()
            
            if rabbit_client.is_connected_trigger():
                print("Connected")
                rabbit_client.open_channel()
            
            if rabbit_client.is_channel_open_trigger():
                rabbit_client.setup_exchange("new-connection", "topic")
            
            if rabbit_client.setup_exchange_trigger():
                pass

        except KeyboardInterrupt:
            rabbit_client.stop()
            sys.exit(0)
    
        except Exception as e:
            traceback.print_exc()
            print("Shutdown")
            rabbit_client.stop()

        if should_reconnect and not rabbit_client.is_connected():
            rabbit_client.stop()

            time.sleep(reconnect_delay)
            rabbit_client = RabbitClient()

                
            
if __name__ == '__main__':
    main()

Code: Alles auswählen

#rabbitmq.py
Class RabbitClient():
	[...]
	def run(self):
        """Run the example consumer by connecting to RabbitMQ and then
        starting the IOLoop to block and allow the SelectConnection to operate.

        """
        self._connection = self.connect()
        self._connection.ioloop.start()
        [...]
Die main-Funktion läuft jetzt bist zum "rabbit_client.run()" und "steckt da fest".
Dass der Code so noch nicht viel Sinn macht ist mir bewusst, ich will das jetzt erst mal überhaupt zum laufen kriegen und verstehen können.

Für mich macht das ganze nicht wirklich Sinn. Mei Verständnis, bzw. was ich will, versuch ich mal in folgendem Pseudo-Code darzustellen:

Code: Alles auswählen

import time
import sys
import traceback
import pika

client = pika.SelectConnection([...])
last_time = time.time()
time_interval = 50

while True:
    try:
        client.run()
        # Prüft auf neue Nachrichten oder
        # verschickt im Buffer befindliche Nachrichten

        if client.new_messages():
            # Nachrichten abarbeiten
            pass
        
        if last_time + time_interval <= time.time():
            # meine Funktionen
            # z. B. Nachricht verschicken
            pass

    except disconnected:
        client.reconnect()
    
    except KeyboardInterrupt:
            sys.exit(0)
    
    except Exception as e:
        traceback.print_exc()
        print("Shutdown")
Ich nutze kein Tornado oder sonstige Konsorten, sondern einfach was wie der Pseudo-Code und zwar - sehr wichtig - nicht blockierend.

Habt noch einen schönen Sonntag
Fipsi
Sirius3
User
Beiträge: 17911
Registriert: Sonntag 21. Oktober 2012, 17:20

Klar blockiert `run` weil es die blockierende Variante ist.
Ich hatte ja schon in Deinem anderen Thread geschrieben, dass wenn Du verschiedene Dinge parallel machen willst, Du am besten auf async umsteigst.
Hier gibt es genug Beispiele: https://aio-pika.readthedocs.io/en/latest/
Fipsi
User
Beiträge: 12
Registriert: Dienstag 2. Juli 2024, 04:14

Warum heißt's dann überall, dass SelectConnection non-blocking ist? Oder steh ich da jetzt komplett auf dem Schlauch?
Ich schau mir deinen Link an, danke dafür.

Liebe Grüße
Benutzeravatar
DeaD_EyE
User
Beiträge: 1058
Registriert: Sonntag 19. September 2010, 13:45
Wohnort: Hagen
Kontaktdaten:

pika.SelectConnection - asynchronous adapter without third-party dependencies.
Intern läuft es dann auch asynchron, aber das ändert nichts an der blockierenden run-Methode des Clients.

Das ist so ähnlich wie bei asyncio. Wenn man dort die run-Funktion aufruft, blockiert diese Funktion.
Aber alles im Kontext von asyncio, ist nebenläufig und erreicht wird das mit Select.

Ich würde die asyncio Variante nutzen, wenn das Programm mehr machen soll, als nur diesen einen Client laufen zu lassen.
sourceserver.info - sourceserver.info/wiki/ - ausgestorbener Support für HL2-Server
Antworten