mqtt server (mosquitto) abfragen

Python auf Einplatinencomputer wie Raspberry Pi, Banana Pi / Python für Micro-Controller
Antworten
eagleflight
User
Beiträge: 28
Registriert: Dienstag 12. Februar 2019, 19:45

Ich habe mehrere nodemcu/esp-12 die über Wlan Daten an einen Mosquitto Server liefern. Die node's melden sich alle 2 min am Server an und schicken dann einen Datensatz vom Format

Code: Alles auswählen

{"Sensor":1,"Temperature":22.80,"Humidity":41.20,"Voltage"V:523.00}
abgefragt wird der Mosquitto Server aus Python mit den folgenden Funktionen:

Code: Alles auswählen


global ESP_TOPIC

# werte vom mqtt server holen
ESP_TOPIC = "ESP_01"
def mqtt_read() :
    client = mqtt.Client()                                      #mqtt client aktivieren
    client.on_connect = on_connect                  # prüfen ob verbindung besteht
    client.on_message = on_message              # message funktion aktivieren  
    client.connect("localhost", 1883, 60)          # serverdaten festlegen
    client.loop_start()                                           # abfrageschleife starten
    time.sleep(1)                                                   # kurz warten
    
    esp_01=""
    msg = list()                                      # msg leeren
    if not q1.empty():                           # und falls etwas drinnen steht
       esp_01 = q1.get()                        # text aus Queue holen und zerlegen
                                                              # Format: {"Sensor":1,"Temperature":22.80,"Humidity":41.20,"Voltage"V:523.00}
                                                               
       msg =  ["S:", esp_01[esp_01.find('Sensor') + 8 : esp_01.find('Temperature')-2],
                     "T:", esp_01[esp_01.find('Temperature') + 13 : esp_01.find('Humidity')-2],
                    ",H:", esp_01[esp_01.find('Humidity')    + 10 : esp_01.find('Voltage')-2],
                    ",V:", esp_01[esp_01.find('Voltage')     + 10 : esp_01.find('}')-1]]
              
       with q1.mutex:                            # queue nach auslesen wieder löschen
           q1.queue.clear()
         
    client.loop_stop()                         # schleife anhalten
    return(msg)                                   # und als Liste ausgeben Format: ['T:', '22.80', ',H:', '41.10', ',V:', '525.0']


def on_connect(client, userdata, flags, rc):    # MQTT Server prüfen
    #print(ESP_TOPIC)                                         #print("Connected with result code " + str(rc)) 
    client.subscribe(ESP_TOPIC)                       # Topic festlegen

def on_message(client, userdata, message):    # Nachricht vom Server lesen
                                                                                   # print(msg.topic + " " + str(msg.payload))
    q1.put(("{'" + str(message.payload) + "', " + str(message.topic) + "}"))     # in Queue schreiben
    

die aus einer Hauptschleife mit:

Code: Alles auswählen


def_main():
     while True :
            # mqtt ESP01 client abrufen
            esp_return = mqtt_read()
            if esp_return:
                sensor = float(esp_return[1])
                if  sensor == 1   :                 #sensor 1
                    esp_01_values[0] = float(esp_return[3])
                    esp_01_values[1] = float(esp_return[5])
                    esp_01_values[2] = ((1024-float(esp_return[7]))/10)  # in % umgerechnet aus 0-1024
                    esp_01_feedback = esp_01_feedback + 1
                elif sensor == 2 :                 #sensor 2
                    esp_02_values[0] = float(esp_return[3])
                    esp_02_values[1] = float(esp_return[5])
                    esp_02_values[2] = ((1024-float(esp_return[7]))/10)
                    esp_02_feedback = esp_02_feedback + 1  
  ......  
    usw.
   
Das Ganze läuft so stabil nur.....
Die Sensor Nummer mit zu übertragen ist eigentlich nicht notwendig, dafür hat mqtt ja die Kanäle die man einzeln auslesen kann.
Durch Setzen der globalen Variablen ESP_TOPIC kann man vor jeder Abfrage den Kanal festlegen. Nur mit der Zuordnung der Queue
scheitert es, da ich keine Ahnung habe wie ich die Queue in der Unter_Funktion beim Aufruf ändern und anschließend löschen kann.
Ohne Queue zu arbeiten geht leider nicht, da die mqtt_read Funktion aus einer Schliefe aufgerufen wird und ohne den Zwischenspeicher
der Queue Daten verloren gehen.

Am Ende hängen etwa 20 ESP's am Mosquittoserver, dann kommt es zu Kollisionen, wenn alle auf demselben Kanal senden.
Hat jemand eine Idee wie man die Queue_Nr relativ übergeben kann, da sie ja in der Unterfunktion on_message nur angesprochen wird ?

Der optimale Aufruf wäre:

Code: Alles auswählen

esp_return = mqtt_read(ESP_CHANNEL, QUEUE_NO)    # jeweils als globale Vars

und Rückgabe mit

mqtt_read.....

return(ESP_CHANNEL, values)
Benutzeravatar
sparrow
User
Beiträge: 4193
Registriert: Freitag 17. April 2009, 10:28

So wie ich das sehe, erübrigt sich dein Problem, sobald du aufhörst globale Variablen zu benutzen, die du an irgend einer Stelle magisch aus den Ärmel zauberst.

Definiere Variablen dort, wo sie benutz werden.
Übergebe Parameter an Funktionen und nutze deren Rückgabewert um dein Programm zu strukturieren.
Benutze keine globalen Variablen.
Deine Konstante ist (in der Form) keine Konstante, denn offensichtlich möchtest du sie ändern, was sehr unkonstant ist.
Wenn man mit dem Gedanken spielt Variablen zu nummerieren, sollte man besser eine Liste benutzen.
Sirius3
User
Beiträge: 17747
Registriert: Sonntag 21. Oktober 2012, 17:20

Vergiß, dass es `global` gibt, an der Stelle, wo Du das benutzt, wirkt es sowieso nicht. Und `ESP_TOPIC` ist ja eine Konstante.
Für jede einzelne Nachricht eine Verbindung zum mqtt-Server aufzubauen ist, so vom Erfinder nicht gedacht. Keine Ahnung, was da alles passiert.
Der Payload ist ja offensichtlich JSON kodiert, dann benutze auch json.loads, statt selbst zu versuchen mit find und Stringgefrickel das zu parsen.
Das Ergebnis dann in eine Liste zu packen, wo jedes zweite Element der Schlüssel und jedes andere der Wert ist, ist auch nicht-optimal. Nimm das Wörterbuch, das Du vom json-Parsen bekommst, direkt.
`q1` kommt aus dem nichts und wird auch nirgends definiert, wenn dann noch multithreading ins Spiel kommt, dann ist das Chaos vorprogrammiert.
Das was Du dann in on_message in die Queue steckst ist auch unsinnig. Python kann auch komplexere Objekte über eine Queue übertragen. Da muß man kein Bytes-Objekt in seine String-Repräsentation umwandeln.
Das läuft also nicht "stabil", sondern Du hast an vielen Stellen einfach nur großes Glück, dass Dir das nicht um die Ohren fliegt.
Die meisten Kommentare am Ende Deiner Zeilen (wo sie übrigens schwer zu lesen sind, weil bei jeder Änderung auch die Einrückung der Kommentare kaputt geht), sind überflüssig, da sie nur das offensichtliche beschreiben. Kommentare sollten einen Mehrwert haben und dann in einer eigenen Zeile stehen.
Eingerückt wird einheitlich mit 4 Leerzeichen pro Ebene, nicht 3 und auch nicht 7.

Übrig bleibt dann das:

Code: Alles auswählen

ESP_TOPIC = "ESP_01"

def on_message(queue, client, userdata, message):
    # Nachricht vom Server lesen
    data = json.loads(message.payload)
    queue.put(data)

def main():
    messages = Queue()
    client = mqtt.Client()
    client.on_message = lambda *args, **kw: on_message(messages, **args, *kw)
    client.connect("localhost", 1883, 60)
    client.subscribe(ESP_TOPIC)
    client.loop_start()
    while True:
        message = messages.get()
        print(message)
        # in % umgerechnet aus 0-1024
        print((1024 - message['Voltage']) / 10)
        # mach auch immer was Du willst, aber bitte nicht in Listen
        # mit durchnummerierten Namen packen.
Ob man jetzt für jeden Sensor einen eigenen Kanal hat, ist Geschmackssache. Bei reiner Durchnummerierung sehe ich da jetzt keinen Vorteil.

Code: Alles auswählen

NUMBER_OF_SENSORS = 20
ESP_TOPIC = "ESP_{:02d}"

def on_message(queue, client, userdata, message):
    # Nachricht vom Server lesen
    data = json.loads(message.payload)
    data['Topic'] = message.topic
    queue.put(data)

def main():
    messages = Queue()
    client = mqtt.Client()
    client.on_message = lambda *args, **kw: on_message(messages, **args, *kw)
    client.connect("localhost", 1883, 60)
    for nr in range(1, NUMBER_OF_SENSORS + 1):
        client.subscribe(ESP_TOPIC % nr)
    client.loop_start()
    while True:
        message = messages.get()
        print(message)
Antworten