Script wartet auf Abarbeitung der Funktion trotz Threading

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
kiaralle
User
Beiträge: 68
Registriert: Donnerstag 19. August 2021, 19:11

Hallo,

ich versuche mich gerade am Threading.
Ich starte aus einer while-Schleife heraus in regelmäßigen Abständen eine Funktion.
Das funktioniert auch, Daten werden in die Funktion übergeben und in die InfluxDB geschrieben.

Nur bremst mir der Schreibvorgang in die InfluxDB meine Schleife weiterhin aus.
Das sind immer 2sec, statt der sonnst 0,5sec welche ich erreichen kann.

Scheinbar hab ich da noch einen Gedankenfehler. Threading oder doch Multiprozessing ?

Code: Alles auswählen

*
*
*
def register_write_master(influx_master, result_master_all, growatt_registers):
    print('write master all')
    for value, (name, multiplier) in zip(result_master_all.registers, growatt_registers):
        influx_master.write_points(
            [
                {
                    "measurement": name,
                    "tags": {"user": user},
                    "fields": {name: round(float(value * multiplier), 2)}
                    }
                ]
            )
*
*
*
while True:
    if time.time() >= inverter_register_last_timer:
        result_master_all = client.read_input_registers(0, len(growatt_registers), inverter_master_adress)
        if not result_master_all.isError():
            t_master = Thread(target=register_write_master, args=(influx_master, result_master_all, growatt_registers))
            t_master.run()
        inverter_register_last_timer += inverter_register_timer
*
*
*
Gruß Ralf
kiaralle
User
Beiträge: 68
Registriert: Donnerstag 19. August 2021, 19:11

Hab gerade start() entdeckt. Scheint viel besser zu laufen.
run() ruft die Funktion ganz normal auf?
Benutzeravatar
__blackjack__
User
Beiträge: 13117
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@kiaralle: Ja, `run()` ist einfach ein normaler Aufruf. Die Methode ist dazu da überschrieben zu werden wenn man von `Thread` eigene Klassen ableitet. Die ”Magie” das ein neuer Thread entsteht, passiert in `start()`.

`time.time()` ist falsch, das garantiert nicht, das die Zeit immer vorwärts läuft. Dafür gibt es `time.monitonic()`.

Und dann ist das mit der Schleife „busy waiting“ was unnötig Rechenzeit verbrät. Da ruft man eher `sleep()` mit einer entsprechenden Wartezeit auf.

`t_master` ist kein guter Name. So wirklich brauchen tut man da sowieso keinen Namen für. An der Stelle sollte man noch über das `daemon`-Argument beim `Thread` erstellen nachdenken. In der Regel will man das angeben. Hier vielleicht nicht, aber man sollte sich über die Konsequenzen im klaren sein.

`register_write_master()` ist Yoda-mässig benannt. Normale Reihenfolge der Worte wäre `write_master_registers()`.

Die ``for``-Schleife steht an der falschen Stelle. Statt für jeden Wert eine `write_points()`-Methode mit einer Liste mit immer genau einem Wert aufzurufen, würde man da die Methode *einmal* mit einer Liste mit allen Werten aufrufen. Da bietet sich eine „list comprehension“ an.

Der `float`-Aufruf ist unsinnig. Entweder sind die beiden Multiplikanden ganze Zahlen, dann macht es keinen Sinn die in eine Gleitkommazahl umzuwandeln und auf zwei Nachkommastellen zu runden, denn die Nachkommastellen sind dann ja auch vor dem Runden alle 0. Oder da kommt bei der Multiplikation schon eine Gleitkommazahl heraus. Da hat `float()` dann keinerlei Effekt mehr. Und `round()` kann man auch eine ganze Zahl übergeben wenn es sein muss.

`user` kommt in der Funktion magisch aus dem nichts. Das sollte entweder ein Argument sein, oder eine Konstante.

Code: Alles auswählen

def write_master_registers(influx_master, master_result, growatt_registers):
    print("write master all")
    influx_master.write_points(
        [
            {
                "measurement": name,
                "tags": {"user": USER},
                "fields": {name: round(value * multiplier, 2)},
            }
            for value, (name, multiplier) in zip(
                master_result.registers, growatt_registers
            )
        ]
    )

...

    while True:
        master_result = client.read_input_registers(
            0, len(growatt_registers), inverter_master_adress
        )
        if not master_result.isError():
            Thread(
                target=write_master_registers,
                args=(influx_master, master_result, growatt_registers),
            ).start()

        time.sleep(inverter_register_timer)
Ich weiss nicht ob das wirklich so sinnvoll ist für jeden Schreibvorgang einen Thread zu starten. Threads scheinen mir hier insgesamt nicht sinnvoll. Entweder ist das Schleifenintervall ausreichend die Werte auch wegzuschreiben, dann kann man das in der Schleife machen. Oder man kommt mit dem Schreiben nicht hinterher, dann nützt das aber auch nicht immer mehr und mehr Threads zu starten bis das irgendwann kracht.
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
Benutzeravatar
noisefloor
User
Beiträge: 3857
Registriert: Mittwoch 17. Oktober 2007, 21:40
Wohnort: WW
Kontaktdaten:

Hallo,

startet doch einen Worker-Thread und verbinde den mit dem Hauptprogramm über eine Queue. Dann sparst du dir auch den Overhead für das neuen Starten einen Threads. Plus du kannst darauf reagieren, wenn die Queue schneller gefüllt wird als das die Werte weg geschrieben werden können.

Gruß, noisefloor
kiaralle
User
Beiträge: 68
Registriert: Donnerstag 19. August 2021, 19:11

@__blackjack__
wie immer hast du gute Hinweise.
Ja der Yoda kommt bei mir immer wieder mal durch.
Das mit dem float erscheint mir jetzt auch schlüssig. Schön das du mir das noch mal so erklärt hast.
Mein Bildungsweg ist noch lang :D

Wenn mir das Threading nicht so viel bringt,
Überlege ich mir gerade, das ich das komplette Lesen der Register zerlege.
Für die Steuerung und kurzes Ansprechverhalten meiner Relais, benötige ich nur 4 Register die ich schnell und flüssig erfassen muß.
Die restlichen Register kann ich ja schrittweise und einzeln lesen und schreiben. Hoffe das schreiben geht dann auch kürzer.
Glaub das hatte ich mal getestet und war auch nicht so glücklich.


@noisefloor
was ist ein Worker-Thread? Hab ich noch nichts darüber gelesen.
kiaralle
User
Beiträge: 68
Registriert: Donnerstag 19. August 2021, 19:11

Ich weiss nicht ob das wirklich so sinnvoll ist für jeden Schreibvorgang einen Thread zu starten. Threads scheinen mir hier insgesamt nicht sinnvoll. Entweder ist das Schleifenintervall ausreichend die Werte auch wegzuschreiben, dann kann man das in der Schleife machen. Oder man kommt mit dem Schreiben nicht hinterher, dann nützt das aber auch nicht immer mehr und mehr Threads zu starten bis das irgendwann kracht.
Stirbt der losgetretene Tread nicht wenn er abgearbeitet wurde?
Wenn ich den Tread immer wieder starte, hat er aber immer die gleiche ID.
Bringt mich jetzt etwas zum nachdenken.
Benutzeravatar
__blackjack__
User
Beiträge: 13117
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@kiaralle: Ja der Thread beendet sich wenn er abgearbeitet wurde, aber wenn man schneller neue Threads startet als die abgearbeitet werden, dann werden das natürlich immer mehr die noch nicht fertig sind. Und irgendwann dem System auch mal zu viele wenn das dauerhaft passiert.

Aber selbst wenn das nur manchmal passiert und es kein hartes Problem wird, ist es nicht so wirklich sinnvoll einen Haufen Threads zu haben, die sich gegenseitig auf den Füssen stehen und sich um *eine* Ressource kloppen. Da macht *ein* Thread, der die Schreibvorgänge serialisiert mehr Sinn. Das wäre dann der angesprochene Worker-Thread. Statt für jeden Messvorgang einen Thread zum schreiben der Daten zu starten, würde man *einen* Thread starten, der dauerhaft läuft und auf Daten zum Beispiel aus einer Queue wartet und die weg schreibt. Blockierendes warten auf eine Queue kostet keine Rechenzeit, und wenn man zeitweise mehr Daten kommen als weg geschrieben werden können, dann sammeln die sich in der Queue.

Gibt es denn zeitlich signifikante Unterschiede wie viele Register man liest? Oder ist am Ende die Frequenz mit der die Register für die Relaissteuerung erfasst werden müssen, einfach eine andere als man für die Erfassung der Daten in der Datenbank braucht?
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
kiaralle
User
Beiträge: 68
Registriert: Donnerstag 19. August 2021, 19:11

Mit den wartenden Thread war mir auch schon mal in den Gedanken gekommen.
Gefunden hatte ich noch nicht brauchbares.
Er wartet im Hintergrund und ich werfe ihn dann den result zu.

Gibt es da ein spezielles Kommando?

Gruß Ralf
Sirius3
User
Beiträge: 17754
Registriert: Sonntag 21. Oktober 2012, 17:20

Da gibt es kein "spezielles Kommando", sondern die schon erwähnte Queue:

Code: Alles auswählen

def write_master_registers(influx_master, growatt_registers, influx_queue):
    print("write master all")
    while True:
        registers = queue.get()
        influx_master.write_points([
            {
                "measurement": name,
                "tags": {"user": USER},
                "fields": {name: round(value * multiplier, 2)},
            }
            for (name, multiplier), value in zip(
                growatt_registers, registers
            )
        ])

...
    influx_queue = queue.Queue()
    Thread(
        target=write_master_registers,
        args=(influx_master, growatt_registers, influx_queue),
        daemon=True
    ).start()

    while True:
        master_result = client.read_input_registers(
            0, len(growatt_registers), inverter_master_adress
        )
        if not master_result.isError():
            influx_queue.put(master_result.registers)
        time.sleep(inverter_register_timer)
kiaralle
User
Beiträge: 68
Registriert: Donnerstag 19. August 2021, 19:11

Danke, das bringt mich doch weiter :wink:
Benutzeravatar
noisefloor
User
Beiträge: 3857
Registriert: Mittwoch 17. Oktober 2007, 21:40
Wohnort: WW
Kontaktdaten:

Hallo,

und noch ein paar Links für das allgemeinere Verständnis:

https://docs.python.org/3/library/queue.html
https://pymotw.com/3/queue/
https://realpython.com/queue-in-python/

Gruß, noisefloor
kiaralle
User
Beiträge: 68
Registriert: Donnerstag 19. August 2021, 19:11

Vielen Dank für Eure Hilfe.
Hab es noch etwas umgeschrieben.
Jetzt rennt es.

Code: Alles auswählen

def write_master_registers(influx_master, influx_slave, growatt_registers, influx_queue):
    while True:
        master_result = influx_queue.get()
        for value, (name, multiplier) in zip(master_result.registers, growatt_registers):
            influx_master.write_points(
                [
                    {
                        "measurement": name,
                        "tags": {"inverter": master},
                        "fields": {name: round(float(value * multiplier), 2)}
                        }
                    ]
                )

        slave_result = influx_queue.get()
        for value, (name, multiplier) in zip(slave_result.registers, growatt_registers):
            influx_slave.write_points(
                [
                    {
                        "measurement": name,
                        "tags": {"inverter": slave},
                        "fields": {name: round(float(value * multiplier), 2)}
                        }
                    ]
                )


Benutzeravatar
__blackjack__
User
Beiträge: 13117
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@kiaralle: Da kommen jetzt `slave` und `master` aus dem ”Nichts”, und es wiederholt sich Code fast identisch zweimal. Und soll das so das die Messungen immer abwechselnd in die beiden Datenbanken geschrieben werden?

Ohne die Code-Wiederholung wäre das etwas in dieser Richtung:

Code: Alles auswählen

from itertools import cycle


def write_registers(databases_and_tag_values, growatt_registers, influx_queue):
    results = iter(influx_queue.get, None)
    for result, (influx_db, tag_value) in zip(
        results, cycle(databases_and_tag_values)
    ):
        influx_db.write_points(
            [
                {
                    "measurement": name,
                    "tags": {"inverter": tag_value},
                    "fields": {name: round(float(value * multiplier), 2)},
                }
                for value, (name, multiplier) in zip(
                    result.registers, growatt_registers
                )
            ]
        )

...


def main():
    ...

    Thread(
        target=write_registers,
        args=[
            [(influx_master, master), (influx_slave, slave)],
            growatt_registers,
            influx_queue,
        ],
        daemon=True,
    ).start()
Ich finde die Aufteilung aber eher nicht so gelungen. Da scheint mir Code beim speichern in die Datenbank(en) zu sein, der eigentlich eher zur Messung gehört.
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
kiaralle
User
Beiträge: 68
Registriert: Donnerstag 19. August 2021, 19:11

Ich finde die Aufteilung aber eher nicht so gelungen. Da scheint mir Code beim speichern in die Datenbank(en) zu sein, der eigentlich eher zur Messung gehört.
Das speichern in die Datenkank dauert für zwei Wechselrichter je 90-Werte zu lange.
Ohne das schreiben habe ich run-Zeiten von ca. 0.45sec.
Wenn ich das schreiben der Daten mit in die Messung schiebe, 1-3 Sekunden.
Da ich nur aller 30 Sekunden die kompletten Daten schreibe, finde ich diesen Weg schön, oder jetzt optimal.

Ich betreibe über den Raspberry und einer Relaiskarte einen Lastabwurf an meiner Inselanlage, also Solar.
Und da muss die Messung und das Schalten zugig ablaufen.

Über Nacht schalte ich weiterhin den Slave-Wechselrichter über meine Steuerung aus und wenn der Master genug PV-Power und PV-Spannung bekommt, wird der Slave wieder eingeschaltet.
Ich kann aus dem Speicher ohne PV eh nur 6kW ziehen und das schafft der Master alleine.
Grund ist der erhöhte Eigenverbrauch beider Wechselrichter im Verbund ca. 300W/h wegen der Synchronisation untereinander.
Solo, also nur der Master, verbraucht nur 20W. Das spart Speicher-Verbrauch über Nacht.

Das schnelle lesen der wichtigen Steuer-Register aus dem Modbus der Wechselrichter und dem Messwandler funktioniert jetzt sauber und ohne Unterbrechung.

Dank eurer Hilfe!
Benutzeravatar
__blackjack__
User
Beiträge: 13117
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@kiaralle: Es geht nicht um die Threads an sich sondern wo was passiert. Messen sollte Messergebnisse liefern wo wirklich die Messergebnisse kommen und nicht ”Rohwerte”, die dann beim Schreiben erst noch zu den echten Messwerten werden. Das Schreiben in die Datenbank(en) sollte nichts über die Register und Umrechnungsfaktoren wissen müssen. Das gehört *da* nicht hin.
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
Benutzeravatar
DeaD_EyE
User
Beiträge: 1021
Registriert: Sonntag 19. September 2010, 13:45
Wohnort: Hagen
Kontaktdaten:

Wenn ich das schreiben der Daten mit in die Messung schiebe, 1-3 Sekunden.
Läuft das auf einem RPi? Falls ja und falls die DB auf der SD-Karte liegt, wäre das der Grund, wieso das Schreiben so lange dauert.
90 Werte sind nicht viel.
sourceserver.info - sourceserver.info/wiki/ - ausgestorbener Support für HL2-Server
kiaralle
User
Beiträge: 68
Registriert: Donnerstag 19. August 2021, 19:11

Der Raspi läuft auf einer SSD

Kann schon sein, das das berechnen und umrechnen der Rohdaten, im Bereich des Schreibvorganges, das Zeit-Propblem verursacht.
Da die Werte, in Grafana/Influxdb, mich nicht interessieren, werde das aus der Steuerung heraus lassen.
Wie oft schaut man da nach einiger Zeit noch rein.
Einen Tot muss man sterben.

Die Einwände sind aber durchaus berechtigt
Benutzeravatar
__blackjack__
User
Beiträge: 13117
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@kiaralle: Ich glaube nicht dass das bisschen umrechnen irgendwo ein Zeitproblem verursacht. Das *deswegen* an einer unpassenden Stelle zur machen, nur weil man das *vermutet* ist eben genau das was daran nicht schön ist.
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
Antworten