Funktion in Flask soll jeweils nur 1x laufen (Thread)

Django, Flask, Bottle, WSGI, CGI…
Antworten
chris_adnap
User
Beiträge: 27
Registriert: Freitag 23. September 2022, 09:36

Hallo,

ich komme zu keiner Lösung. Ich habe eine lauende Flask Applikation. Bisher funktioniert alles.
Diese ruft von einer externen Seite Daten ab, bereitet diese auf, und gibt dies aus. (Also nichts neues)
"Daten von externen", es werden immer nur Daten seit dem letzten Abruf nachgeladen. Dies geht recht schnell. 1-5sek.

Nun habe ich festgestellt, das sich ältere Daten auch mal ändern können. Nun möchte ich ein Force Update implementieren. Welche einen größeren/gesamten Zeitraum betrachtet.
Dieser Prozess würde ca. 3-6min dauern.

Bis zu diese Stelle gibt es auch kein Problem.
Aber ...
Ich möchte diesen Update Prozess nur 1x ausführen, egal wie oft diese Funktion ab/aufgerufen wird. Es sollte weder ein Reload noch ein weitere User diesen Prozess nochmals ausführen, solange dieser am laufen ist.
Und hier komme ich nicht weiter.

Das mit dem Thread ist mir als erstes eingefallen, falls es andere Möglichkiten gibt, ich bin offen.

Ich dachte, ich lasse dieses Funktion über ein Thread laufen und weitere Aufrufe sollten mit einem "Update läuft gerade" beantwortet werden.
Egal was/wie ich es mit dem Threading heute Probiert habe, ".is_alive()" liefert mit immer ein "False" zurück.

Code: Alles auswählen

@blueprint.route('/force_update', methods=['GET'])
def force_update():
    
    t = threading.Thread(target=force_update, name="force_update")
    t.start()

    return 'Update Prozess läuft. Bitte warten', 200

Code: Alles auswählen

# Funktioniert so alleine nicht, dient nur als einfaches bsp.
@blueprint.route('/force_update', methods=['GET'])
def force_update():
    
    if t.is_alive():
        return 'Update läuft noch', 200
    else:
        t = threading.Thread(target=force_update, name="force_update")
        t.start()
        return "Update Prozess startet. Bitte warten", 200
Ja das " if t.is_alive():" fehlt, dies habe ich mit einer einer leeren Variable "t" in der __init__ probiert und diese global eingebunden, usw... (egal was ich machte) nichts half.

Ich bin nicht der Profi, das Problem aber sollte daran liegen, das jeder Aufruf in einer eigenen Instanz/Thread läuft und dieser eben das "t" nicht findet.

Eine alternative, ich schreibe mit Aufruf der Funktion ein is_running True in die DB und prüfe darauf. Ich wollte es aber einfacher halten. (sonst müsste ich z.b. bei einem neustart der Applikation diesen Eintrag immer reseten, sonst ist duch diesen Falschen eintrag kein erneuter Aufruf möglich., etc...)

Ist solch eine Funktionsweiße eher ungewöhnlich (eine Funktion darf/soll nur 1x laufen)?
Denn egal nach was ich suchte, ich konnte nicht mal ein ähnliches Problem/bsp. finden. (Suche war eingeschränkt, da ich immer nach "Threads" in diesem Zusammenhang gesucht habe.)

Viele Grüße
Chris
chris_adnap
User
Beiträge: 27
Registriert: Freitag 23. September 2022, 09:36

Hallo,

ich antworte mir mal selbst :D

Vor 1 Std bin ich auf "eine" Lösung gekommen...
Ich prüfe nicht mit "..is_alive()" sondern gebe dem Kind einen Namen. Hier nun "force_update" und damit kann ich Thread übergeifend prüfen, ob dieser aktiv/oder nicht ist.

Code: Alles auswählen

def test():
    thread_name = 'force_update'
    run_thread = True
    for th in threading.enumerate():
        if th.name == thread_name:
            run_thread = False
            break

    if run_thread:
        threading.Thread(target=f, name=thread_name, daemon=False).start()
        return 'Update Prozess startet. Bitte warten', 200

    return 'Update Prozess läuft noch. Bitte warten.', 200
Auch wenn ich jetzt einen Lösungsweg gefunden habe, bin ich trotzdem noch an Feedback interessiert.

Was haltet Ihr von diesem Weg?
Gibt es bessere alternativen?

Erst mal ist aber der Druck draussen, hierfür eine Lösung finden zu müssen. :)

Viele Grüße
Chris
Benutzeravatar
__blackjack__
User
Beiträge: 13117
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@chris_adnap: @chris_adnap: Man sollte keine kryptischen Abkürzungen verwenden. Wenn man `thread` meint, nicht nur `th` schreiben.

Das kann man mit `any()` deutlich kompakter ausdrücken:

Code: Alles auswählen

def test():
    if any(thread.name == "force_update" for thread in threading.enumerate()):
        threading.Thread(target=f, name=thread_name, daemon=False).start()
        return "Update Prozess startet. Bitte warten", 200

    return "Update Prozess läuft noch. Bitte warten.", 200

Aber das rettet einen leider nicht davor, dass hier eine „race condition“ vorliegt. Wenn die Funktion zweimal aufgerufen wird, kann es passieren das beide im ``if`` zum Schluss kommen, dass gerade kein Update-Thread läuft bevor sie ihn dann beide starten.

Erschwerend kommt dazu das die Aufrufe nicht mal im gleichen Process sein müssen, weil WSGI-Anwendungen auch auf mehreren Prozessen laufen können.

Das muss man beispielsweise über die Datenbank synchronisieren. Also in einer Transaktion prüfen ob der Wert für „Update läuft“ da drin steht, und falls nicht den reinschreiben und commiten, und nur wenn das erfolgreich war, den Thread starten.

Oder man schreibt sich einen eigenen Dienst für das Update. Oder man verwendet eine Task Queue wie Celery dafür.
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
chris_adnap
User
Beiträge: 27
Registriert: Freitag 23. September 2022, 09:36

Hallo __blackjack__,

danke für deine Antwort.
@chris_adnap: @chris_adnap: Man sollte keine kryptischen Abkürzungen verwenden. Wenn man `thread` meint, nicht nur `th` schreiben.

Das kann man mit `any()` deutlich kompakter ausdrücken:
Nachdem mann zig Sachen/Kombinationen/Möglichkeiten ausprobiert hat, versucht man in diesem Stadium so einfach/wenig wie möglich zu schreiben.
Dies war nur mein Funktionierendes bsp.

Sicher kann man alles was ich so tippe, vereinfachen. Das ist mir selbst bewusst. ;) Aber in diesem leben werde ich nicht mehr der Profi in solchen sachen sein/werden.
Ich war erst einmal froh, überhaupt eine "augenscheinliche" Lösung gefunden zu haben.

Aber das rettet einen leider nicht davor, dass hier eine „race condition“ vorliegt.
Dies kommt dadurch zu stande, das die Abfrage über eine Schleife läuft. Ist solch eine Abfrage nicht mit einem anderem/direkten Aufruf möglich?

Erschwerend kommt dazu das die Aufrufe nicht mal im gleichen Process sein müssen, weil WSGI-Anwendungen auch auf mehreren Prozessen laufen können.
So etwas, dies nur mit mehr Hintergrundwissen zu verstehen ist, hatte ich mir irgendwie schon gedacht. Das dies nicht "die" Lösung seien kann. Wäre auch zu einfach.

Aber nur um es zu verstehen. Würden mehrere "Prozesse" laufen, so würde die jetzige abfrage "for thread in threading.enumerate()" nur in dem einen Prozess zum Ergebniss führen und in einem anderen leer ausfallen? Habe ich die Aussage so richtig verstanden?



Das muss man beispielsweise über die Datenbank synchronisieren. Also in einer Transaktion prüfen ob der Wert für „Update läuft“ da drin steht, und falls nicht den reinschreiben und commiten, und nur wenn das erfolgreich war, den Thread starten.
Also genau der Weg, welchen ich erst mal aussen vor lassen wollte. :D Aber natürlich, mit den mehr an Informationen, macht dies Sinn.

Oder man schreibt sich einen eigenen Dienst für das Update.
Was ist damit gemeint? Etwas externen wie cron?

Oder man verwendet eine Task Queue wie Celery dafür.
Nicht Celery, aber ein anderes hatte ich auch gestern gefunden. Kann jetzt nicht sagen ob es die Funktion erfüllt hätte.
Aber auch hier die Frage. Stand heute, würde ich diese Funktion nur ein einziges mal nutzen. Da wollte ich eben kein extra Tool/Modul dafür installieren.
Würde sicherlich Sinn ergeben, wenn man dies an mehreren Stellen, oder in einem größeren Funktionsumpfang benötigen würde.

Celery ist ja in Python geschrieben. Ist meine gewünschte Funktion doch nicht so einfach selbst umzusetzten, das sich solch eine Installation lohnen würde?


Viele Grüße
Chris
__deets__
User
Beiträge: 14545
Registriert: Mittwoch 14. Oktober 2015, 14:29

Fuer Leute ohne viel (!) Erfahrung ist auf ein dokumentiertes und mit vielen Anleitungen da draussen versehenes Tool die bessere Wahl, als sich ueber lange Zeit Fehler fuer Fehler an das gewuenschte Ziel heranzutasten. Nebenlaeufigkeit und deren Syncronsiation sind mit die haertesten Probleme, welche die Informationstechnik kennt. Alles, was du da aus dem Regal ziehen kannst, ist ein Gewinn. Installier dir Celery, und gut ist.
Benutzeravatar
DeaD_EyE
User
Beiträge: 1021
Registriert: Sonntag 19. September 2010, 13:45
Wohnort: Hagen
Kontaktdaten:

Wie schon die Vorredner sagten, ist Nebenläufigkeit nicht so einfach.

Ein Ansatz wäre es, einen ThreadPoolExecutor zu verwenden und nur einen Task zuzulassen.
Wenn man mehrere Hintergrundprozesse will, dann wird es etwas komplexer, da man dann ja die Tasks irgendwas zuordnen muss (z.B. einem eingeloggtem User).

Die Routen würde ich aufteilen. Eine Route um die Berechnung zu starten und eine Route für das Ergebnis.

Hier der Ansatz:

Code: Alles auswählen

from concurrent.futures import ThreadPoolExecutor
from random import uniform
from time import sleep

from flask import Flask, Response, redirect, url_for

executor = ThreadPoolExecutor(4, "worker")
app = Flask(__name__)
futures = []


def worker():
    sleep(uniform(5, 10))
    return "Done"


@app.get("/")
def index():
    return """
        <html>
        <head><title>Hello World</title></head>
        <body>
          <ul>
            <li><a href="/update_ergebnis">Ergebnis</a></li>
            <li><a href="/force_update">Update starten</a></li>
          </ul>
        </body>
        </html>
    """


@app.get("/force_update")
def force_update():
    if not futures:
        futures.append(executor.submit(worker))

    return redirect(url_for("update_result"), code=302)


@app.get("/update_ergebnis")
def update_result():
    if futures and futures[0].done():
        return f"Ergebnis: {futures.pop().result()}"
    elif futures:
        return Response(
            "Die Berechnung läuft",
            headers={"Refresh": 5, "url": url_for("update_result")},
        )
    else:
        return "Kein Ergebnis vorhanden, da keine Berechnung gestartet worden ist"

sourceserver.info - sourceserver.info/wiki/ - ausgestorbener Support für HL2-Server
Sirius3
User
Beiträge: 17754
Registriert: Sonntag 21. Oktober 2012, 17:20

@chris_adnap: um Threads zu koordinieren, benutzt man passende Datensturkturen, wie Queues, Locks oder Events:

Code: Alles auswählen

from threading import Event, Thread

update_event = Event()
def update():
    while True:
        update_event.clear()
        update_event.wait()
        ...

Thread(target=update, daemon=True).start()

@blueprint.route('/force_update', methods=['GET'])
def force_update():
    if update_event.is_set():
        return 'Update läuft noch', 200
    else:
        update_event.set()
        return "Update Prozess startet. Bitte warten", 200
Benutzeravatar
__blackjack__
User
Beiträge: 13117
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@chris_adnap: Das ist nicht zwingend weil die Abfrage in einer Schleife läuft, sondern weil die Abfrage und das starten des Threads zusammengenommen eine atomare Aktion sein müssen. Also eine Aktion die *zusammen* nicht ”unterbrechbar” sein darf. Wenn die Abfrage alleine atomar wäre, würde die Angriffsfläche etwas kleiner, aber sie wäre immer noch vorhanden.

Das mit den Prozessen hast Du richtig verstanden. Jeder Prozess hat seine eigenen Threads.

Mit Dienst war etwas wie Celery gemeint, nur eben speziell für diese Aufgabe statt einer allgemeinen Task Queue. Ein Prozess der nur das Update macht wenn er dazu beauftragt wird. Und der die Regel dass nur ein Update zur gleichen Zeit läuft sicherstellt.
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
chris_adnap
User
Beiträge: 27
Registriert: Freitag 23. September 2022, 09:36

Hallo an allen und danke für eure Antworten.

Ich habe mich nun für die Variante, die schon vorhandene DB dafür zu nutzen, entschieden.

__deets__ hat geschrieben: Samstag 9. September 2023, 10:58 ... Installier dir Celery, und gut ist.
Ich habe dies mir kurz angesehen. Wenn man es im größeren Umpfang nutzt, dann sicherlich super. Celery benötigt einen Broker, damit dieser Funktioniert.
Jetzt meine ich zu verstehen, wie es Celery macht. Also warum dies Threads und Prozess übergreifend funktioniert.
Sogesehen habe ich nun mit der DB den gleichen Effekt erreicht und erspare mit den Broker+Celery jetzt mit einzubinden.


DeaD_EyE hat geschrieben: Samstag 9. September 2023, 12:07 Wie schon die Vorredner sagten, ist Nebenläufigkeit nicht so einfach.

Ein Ansatz wäre es, einen ThreadPoolExecutor zu verwenden und nur einen Task zuzulassen.
Wenn man mehrere Hintergrundprozesse will, dann wird es etwas komplexer, da man dann ja die Tasks irgendwas zuordnen muss (z.B. einem eingeloggtem User).

Die Routen würde ich aufteilen. Eine Route um die Berechnung zu starten und eine Route für das Ergebnis.
Das es wirklich nicht so einfach ist, das habe ich jetzt auch verstanden. :) Aber auch erst so wirklich durch euren Feedback, um dadurch zu verstehen warum/wieso, etc...

Es ging "stand jetzt" wirklich nur um einen einzig laufenden Hintergrundprozess.
Eine User Zuordnung wäre nicht mal nötig gewesen, da dies von jedem ausgeführt werden hätte kann/soll. Um aber eine x fache Ausführung zu vermeiden , wollte ich dies auf besagten einen Prozess begrenzen.

Es gibt schon mehrere Routen. Dieser Prozess sollte nur gestartet werden (von einer anderen Route aus) und nichts zurückgeben müssen, außer "Update startet, etc...).
Diese Route hätte einfach nur im Hintergrund ein Update durchführen sollen, die User hätten ganz normal weitermachen können.

Nach der ganzen Thematik, Threads Prozesse etc. wüsste ich nicht, ob es mit "ThreadPoolExecutor" auch wirklich so funktioniert hätte.
Darum der Weg über die DB.


Sirius3 hat geschrieben: Samstag 9. September 2023, 13:28 @chris_adnap: um Threads zu koordinieren, benutzt man passende Datensturkturen, wie Queues, Locks oder Events:
Hier dasselbe. Wenn ich wissen würde ob es Threads/Prozess übergreifend auch richtig funktioniert/sicher ist, dann hätte ich es gerne so umgesetzt.
Da mir dies nicht bekannt ist und ich nicht später überrascht werden möchte, :) hab ich es über den DB Eintrag erledigt.


__blackjack__ hat geschrieben: Samstag 9. September 2023, 14:32 Das mit den Prozessen hast Du richtig verstanden. Jeder Prozess hat seine eigenen Threads.

Mit Dienst war etwas wie Celery gemeint, nur eben speziell für diese Aufgabe statt einer allgemeinen Task Queue. Ein Prozess der nur das Update macht wenn er dazu beauftragt wird. Und der die Regel dass nur ein Update zur gleichen Zeit läuft sicherstellt.
Danke auch dir für deine Rückmeldung.
Wenn man nun weiß wie solch ein Aufruf genau Funktioniert, also das genaue zusammenspiel von Thread und Prozess, dann kann man sich das passende raussuchen.

Wie oben schon geschrieben, Celery arbeitet über einen externen Broker. Dieser ist komplett losgelöst von Threads/Prozessen und darum funktioniert diese.
Statt eben einen Broker+Celery zu installieren nehme ich die vorhandene DB. Hat, von Prinzip her, den selbigen Effekt. Einfach eine Schnittstelle/Platz, welche Thread-, Prozessübergreifend arbeitet/sicher ist.

Auch nutze ich jetzt die DB dafür, einen Zeitstempel des letzten durchlaufes zu hinterlegen. Damit kann ich ein mehrfaches hintereinander aufrufen verhindern. (also eine Zwangspause)
Und dies hätte ich schlussendlich so oder so in die DB schreiben müssen.

Gerade umgesetzt bekommen und funktioniert soweit und durch euch erfahren, das manche "kleinigkeit" doch mal komplexer ausfallen kann. :D

Viele Grüße
Chris
__deets__
User
Beiträge: 14545
Registriert: Mittwoch 14. Oktober 2015, 14:29

Ohne zu sehen, wie das mit der Datenbank geloest ist, kann man da nicht wirklich was zu sagen, ob es denn richtig funktioniert. Die hilft einem sicher dabei, weil sie einen Weg darstellt, einen verteilten Synchronisationsmechanismus aufzubauen. Ublicherweise durch select for update und Konsorten. Aber muss eben auch richtig gemacht werden.
Sirius3
User
Beiträge: 17754
Registriert: Sonntag 21. Oktober 2012, 17:20

@chris_adnap: Datenbank alleine löst Dein Problem nicht, da braucht es auch eine explizite Synchronisation der Prozesse, automatisch passiert da gar nichts. Für viele Datenbanksysteme gibt es da entsprechende Mechanismen, die man aber kennen muß.
Welche Datenbank benutzt Du und wie sieht Deine Lösung jetzt aus?
chris_adnap
User
Beiträge: 27
Registriert: Freitag 23. September 2022, 09:36

Hallo und einen guten Abend,

sry, an ein bsp. hatte ich heute mittag nicht mehr gedacht.
Ist aber nichts großartiges.

Code: Alles auswählen

@home_blueprint.route('/test', methods=['GET'])
def test():
    thread_name = 'force_update'
    if not TP.ForceUpdate().status_check()[0]:  # PRÜFE OB PROZESS GERADE LÄUFT
	    TP.ForceUpdate().status_set('start_force_update'):  # <-- FALLS NICHT, SETZE ICH GLEICH ALS ERSTES DEN WERT VON FALSE AUF TRUE

	    # start des Update in einem Thread. GANZ ZUM SCHLUSS SETZE ICH MIT ('stop_force_update')" den Wert wieder auf FALSE + setze datetime.now() einen Timestamp.
            threading.Thread(target=TP.ForceUpdate().start_force_update, name=thread_name, daemon=False).start()  
            return 'Update Prozess startet. Bitte warten', 200
            
    else:
        if not TP.ForceUpdate().status_check()[1]:
            return 'Update Prozess läuft gerade. Bitte warten.', 200
        else:
            return TP.ForceUpdate().status_check()[1], 200
            
 ....
 
def status_check(self):
    force_update_status = self.session.query(db.Settings).filter(db.Settings.option == 'force_update_running').one()
    if force_update_status.boolean:
        return True, None
Kurze Erklärung:
Egal von wem nun diese Funktion aufgerufen wird, gleich zu Anfang wird mit "status_check" geprüft, läuft der Prozess oder nicht. Default Wert in der DB "false" und wir bei jedem start der Applikation wieder auf "false" gesetzt.
Wenn "true" == "Update Prozess läuft gerade..." ausgeben und schluss.
Wenn "false" == so setze ich gleich im nächsten Schritt den Wert mit ".status_set('start_force_update')" auf True. Jetzt startet der Update ... zum Schluss wird mit .status_set('stop_force_update') der Eintrag in der DB wieder auf "false" gesetzt.

Also statt jetzt den Thread mit ".is_alive()" oder ".enumerate()" und was es nicht alles noch gibt zu überprüfen ob dieser Thread/Prozess gerade läuft, setze ich ein einfaches True/False in die DB und kann dies jetzt unabhängig von Threads/Prozessen etc. abrufen. Ich denke, viel falsch kann man bei diesem Weg nicht machen. :D

Es ist nichts anderes als eine Variable "force_update_running" = True/False
Nur das eine einfache Variable, duch Threads/Prozesse, nicht problemlos/sicher von überall richtig ausgelesen werden kann. So ist nun die DB der Ersatz dieser Variable. Nichts anderes.



__deets__ hat geschrieben: Montag 11. September 2023, 14:16 Ohne zu sehen, wie das mit der Datenbank geloest ist, kann man da nicht wirklich was zu sagen, ob es denn richtig funktioniert. Die hilft einem sicher dabei, weil sie einen Weg darstellt, einen verteilten Synchronisationsmechanismus aufzubauen. Ublicherweise durch select for update und Konsorten. Aber muss eben auch richtig gemacht werden.
Genau so in etwas habe ich es gelöst. Gleich zu Anfang das automatische zurücksetzen nach einem neustart, gleich als erstes eingebaut, war zu Anfang Gold wert :D
Am "Update Prozess" selbst, bin ich gerade noch dran. Hier muss ich nur aufpassen, sollte es mal zu einem Fehler kommen, weil die andere DB nicht abrufbar ist, etc. das dies nicht in einer exception landet und der Wert auf "True" verbleibt. Dies ist der einzige Punkt auf den ich achten muss, sonst kann es kein Celery, etc. besser. (ich rede jetzt nur von dieser einzigen Funktion) :)

Sirius3 hat geschrieben: Montag 11. September 2023, 14:51 @chris_adnap: Datenbank alleine löst Dein Problem nicht, da braucht es auch eine explizite Synchronisation der Prozesse, automatisch passiert da gar nichts. Für viele Datenbanksysteme gibt es da entsprechende Mechanismen, die man aber kennen muß.
Welche Datenbank benutzt Du und wie sieht Deine Lösung jetzt aus?
Postgres ist die DB und mein Lösungsweg habe ich oben beschrieben.
Die Datenbank dient einzig und alleine dafür, zu hinterlegen, wurde die Funktion aufgerufen und läuft diese gerade. Mehr macht dies auch nicht.

Wie geschrieben, es ging mir einzig und allein darum, das diese Funktion jeweils immer nur 1x am laufen ist.


Ich weiß jetzt nicht wie hoch die Wahrscheinlichkeit dabei liegt, deswegen habe ich die Überprüfung und das Setzen des Wertes gleich hintereinander gelegt ...
Aber das rettet einen leider nicht davor, dass hier eine „race condition“ vorliegt. Wenn die Funktion zweimal aufgerufen wird, kann es passieren das beide im ``if`` zum Schluss kommen, dass gerade kein Update-Thread läuft bevor sie ihn dann beide starten.
... evtl. kann es hier auch zu einer „race condition“ kommen, wenn man 1000 Requests zur selbigen Zeit erhällt. Zwar ist die Wahrscheinlichkeit wesentlich geringer, als mit der der threading.enumerate() Schleife, aber nichts ist unmöglich.
Könnte man "verbessern" indem man nach dem Setzen nochmals weitere Checks unternimmt (Anzahl offener/laufendes Threads mit diesem Namen, etc.), aber dies wird in meinem Fall sicher nicht nötig sein, so viele Anfragen bzw. starten dieser einen Funktion erwarte ich nicht.
Und sollte es aus einem unwahrscheinlichen Grund doch einmal dazu kommen, dann läuft das eine mal diese Funktion 2x gleichzeitig... Egal.
Ich wollte einzig und alleine Resourcen auf meiner Seite und vorallem der Seite wo die Daten abgerufen werden, einsparen. Und dies sollte ich nun erreicht haben.


Ich bin mir jetzt aber sicher, das ich mit diesem Weg, nichts falsch gemacht habe und bin auf Feedback gespannt. :D


Viele Grüße
Chris
__deets__
User
Beiträge: 14545
Registriert: Mittwoch 14. Oktober 2015, 14:29

Doch, das ist so falsch. Weil es immer noch die schon von __blackjack__ angesprochene Lücke zwischen Test und Update hat. Da ist noch genau gar nichts gewonnen.

Der richtige Weg beinhaltet vorher die Tabelle mit select for update zu sperren. Dann garantiert die DB, das ein weiterer Thread/Prozess/Client da nichts dran ändern kann. Dann prüft und setzt man den Status.

Und wenn dein System aus irgendwelchen Gründen abschmiert währen die Flagge gesetzt ist, ist es kaputt. Auch beim select for Update übrigens. Da muss man dann noch weiter gehen.

Ich bin immer noch der Meinung, das wäre inzwischen mit einem dafür entworfenen System wie celery gelöst. Aber du hast dich für den Weg der vielen Fehler entschieden 🤷🏼‍♂️

https://github.com/cameronmaske/celery-once
Sirius3
User
Beiträge: 17754
Registriert: Sonntag 21. Oktober 2012, 17:20

Das Thema korrekte Synchronisation ist schwierig und deshalb ist es nicht verwunderlich, dass Dein neuer Ansatz genauso fehlerhaft ist, wie die davor.

Die Lösung ist dagegen ganz einfach, mit with_for_update: https://docs.sqlalchemy.org/en/20/core/ ... for_update

Code: Alles auswählen

def set_is_running(self):
    force_update_status = self.session.query(db.Settings).filter_by(option='force_update_running').with_for_update().first()
    result = force_update_status.boolean
    if not result:
        force_update_status.boolean = True
    session.add(force_update_status)
    session.commit()
    return result
Die Methode hat natürlich noch den Schönheitsfehler, dass falls der Update-Prozess abstürzt, das Flag dauerhaft auf True steht und somit nie wieder ein Update gemacht werden kann. In Deinem Fall, wo Du weißt, dass der Updateprozess maximal ein paar Minuten dauern kann, wäre die einfachste Lösung, einen Timeout zu implementieren, dass man also die aktuelle Zeit speichert und prüft, wie lange der letzte Update schon her ist.

Sonstige Anmerkungen, TP ist eine kryptische Abkürzung, die zudem noch falsch geschrieben ist, weil TP eine Konstante wäre, was ja das Objekt offensichtlich nicht ist.
Es ist falsch, ständig eine neue Instanz von ForceUpdate zu erzeugen. Das läst befürchten, dass es sich gar nicht um eine richtige Klasse handelt.
chris_adnap
User
Beiträge: 27
Registriert: Freitag 23. September 2022, 09:36

Hi,

und ich war mir sicher, jetzt kann nicht mehr viel kommen ...

__deets__ hat geschrieben: Montag 11. September 2023, 22:15 Doch, das ist so falsch. Weil es immer noch die schon von __blackjack__ angesprochene Lücke zwischen Test und Update hat. Da ist noch genau gar nichts gewonnen.

Der richtige Weg beinhaltet vorher die Tabelle mit select for update zu sperren. Dann garantiert die DB, das ein weiterer Thread/Prozess/Client da nichts dran ändern kann. Dann prüft und setzt man den Status.

Und wenn dein System aus irgendwelchen Gründen abschmiert währen die Flagge gesetzt ist, ist es kaputt. Auch beim select for Update übrigens. Da muss man dann noch weiter gehen.

Ich bin immer noch der Meinung, das wäre inzwischen mit einem dafür entworfenen System wie celery gelöst. Aber du hast dich für den Weg der vielen Fehler entschieden 🤷🏼‍♂️

https://github.com/cameronmaske/celery-once

- Lücke zwischen Test und Update hat ... Ja, aber diese Lücke ist viel kleiner als zuvor. Ok, "select_for_updates, bzw. with_for_update" kannte ich noch nicht und macht in diesem Fall richtig Sinn. Werde ich umstellen.
- Bzw. abschmieren, da hatte ich ja geschrieben, das ich darauf reagieren werde.
- Celery. a, bis ich mich da einarbeite dauert es auch mindestens einen/zwei tag(e). b, wenn celery falsch eingebaut wird, kann ich auch mit diesem Tool Fehler erzeugen. c, Ich benutze gerne andere Systeme/Module wenn... (dies der einzige weg ist; ich mehr davon habe als nur eine eizige Funktion zu nutzen; ...).
Ganz ehrlich, durch die Gespräche jetzt habe ich viel mehr gelernt, als einfach ein fertiges System zu (be)nutzen.


Sirius3 hat geschrieben: Montag 11. September 2023, 22:26 Das Thema korrekte Synchronisation ist schwierig und deshalb ist es nicht verwunderlich, dass Dein neuer Ansatz genauso fehlerhaft ist, wie die davor.

Die Lösung ist dagegen ganz einfach, mit with_for_update: https://docs.sqlalchemy.org/en/20/core/ ... for_update

Code: Alles auswählen

def set_is_running(self):
    force_update_status = self.session.query(db.Settings).filter_by(option='force_update_running').with_for_update().first()
    result = force_update_status.boolean
    if not result:
        force_update_status.boolean = True
    session.add(force_update_status)
    session.commit()
    return result
Die Methode hat natürlich noch den Schönheitsfehler, dass falls der Update-Prozess abstürzt, das Flag dauerhaft auf True steht und somit nie wieder ein Update gemacht werden kann. In Deinem Fall, wo Du weißt, dass der Updateprozess maximal ein paar Minuten dauern kann, wäre die einfachste Lösung, einen Timeout zu implementieren, dass man also die aktuelle Zeit speichert und prüft, wie lange der letzte Update schon her ist.

Sonstige Anmerkungen, TP ist eine kryptische Abkürzung, die zudem noch falsch geschrieben ist, weil TP eine Konstante wäre, was ja das Objekt offensichtlich nicht ist.
Es ist falsch, ständig eine neue Instanz von ForceUpdate zu erzeugen. Das läst befürchten, dass es sich gar nicht um eine richtige Klasse handelt.
- Fehlerhaft, aber immerhin ganz nah dran . :)
- Danke auch die für den Hinweiß auf solch ein Flag. Hatte ich bisher nie benötigt, bzw. gewusst das es so etwas gibt. Darum wirklich Danke dafür, werde ich so einbauen.
- An diesen Schönheitsfehler habe ich gedacht und werde dies berücksichtigen. Aber die Idee mit einem zusätzlichen Timeout ist notiert. Für den Fall der fälle :)
- TP ist eine Klasse. Aber stimmt, jetzt wo du es sagst sehe ich es auch. Das sind halt meine Marotten. Erst gab es nur einen einzigen Aufruf dieser Klasse, dann kam noch diese/jener Check/Ausgabe hinzu und da war Copy/Paste schneller. Ich bin aber auch ganz ehrlich, hättest du dies jetzt nicht erwähnt, ich hätte diesbezüglich nicht nochmals drauf geschaut und wäre sicherlich so geblieben. Macht ja wirklich keinen Sinn :o ... Das ändere ich gleich morgen noch ab.

Code: Alles auswählen

thread_name = 'force_update'
force_update = tp.TPForceUpdate()
force_update_status = force_update.status_check()
if not force_update_status[0]:
    if force_update.status_set('start_force_update'):
        threading.Thread(target=force_update.start_force_update, name=thread_name, daemon=False).start()
        return 'Update Prozess startet. Bitte warten', 200
    else:
        return 'Fehler beim starten des Force Update Prozess.', 200
else:
    if not force_update_status[1]:
        return 'Update Prozess läuft noch. Bitte warten.', 200
    else:
        return force_update_status[1], 200
Man kann es immer verbessern, aber jetzt sieht es schon mal besser aus und arbeitet sinnvoller.


Wenn man sich gut/sehr gut mit einer Thematik auskennt, viel damit zu tun hat, etc. dann entwickelt man auch einen ganz anderen Blick dafür.

Viele Grüße
Chris
Antworten