Flask + uWSGI mit nur "einem" dauerhaften Backgroud Task/Programm (threading)

Django, Flask, Bottle, WSGI, CGI…
Antworten
chris_adnap
User
Beiträge: 27
Registriert: Freitag 23. September 2022, 09:36

Um es gleich vorweg zu nehmen. Ich bin auf eine Lösung gekommen (kommt am Ende). Auch wenn ich es lieber direkt in Python integriert hätte. (ohne abhängigkeit von uWSGI)
Jetzt etwas ausführlicher, sollte jemand das gleiche Problem haben.

Grobe Sache um was es geht "Stichworte" damit der Nächste weiß ob es passt.
Externer PostgreSQL-Server, Reverse SSH-Tunnel, Background-Task, Mailversand, Python, SQLite, uWSGI, Flask, nginx.
Ein schon kleines fertiges Tool/Programm. Läuft in einer Endlosschleife (Pause mit sleep(xx)) und macht eine DB-Abfragen, versendet Mails, etc... (Background Task)
Um nun die Einstellungen zu sehen, bzw. um überhaupt zu sehen ob/wann etwas passiert/versendet wurde, baue im um dieses Tool ein Frondend mit Flask.
Und hier sollte es eigentlich nur als ein Background Task im Hintergrund weiterlaufen.


Struktur:

Code: Alles auswählen

main.py
|
app/ -
     |
    __init__.py
    watchdog.py  # Das eigentliche "Programm". Der Rest ist nur für das Frontend da.
    frontend/ -
                      |
                      static/
                      template/
                      routes.py



Es ist evtl. ein Tick von mir, aber ich versuche immer ein Programm/Projekt mit allen benötigten Dingen unter einem Hut zu bekommen. Beides gehört ja auch zueinander.
Und hier liegt auch das eigentliche Problem.


Bisher war es immer so, das ich mit einem Thread und Paramiko einen Reverse Tunnel bei Programmstart aufgebaut habe, scheint auch bisher gut funktioniert zu haben. (bis auf, siehe nächsten Absatz)
Da ich jetzt aber Tool Nr. xx geschrieben habe, habe ich den Reverse Tunnel ausgelagert. Als ein "systemd" Service für alle Tools, welchen diesen benötigt.
Bisher stabil und keine Fehler (mehr).

Ok, damit fällt ein Thread/Process nun aus vielen Programmen raus. Was die Sache einfacher und weniger fehleranfälliger macht... (
Bei einem Projekt wo ich viele "kleine" Programme/Tools zusammentrage, ist es seit dem letzten Tool welches auch einen Tunnel benötigt, öfters dazu gekommen, das der Frontend Aufruf über nginx/Flask nicht mehr funktioniert hat, glaube 504 (TimeOut).
Seitdem ich den Tunnel ausgelagert habe, läuft es Problemlos.
Auch hier wieder Flask/uWGSI/nginx und Threads...



Jetzt gab es ein neues Projekt...
Ich habe eine Lokale SQLite-DB in welcher ich Einstellungen Speicher, auf was reagiert werden soll. Ich rufe eine externe Postgres-DB über den nun vorhandenen System SSH Tunnel ab, prüfe auf besondere Gegebenheiten, trifft etwas zu versende ich eine Mail und Speicher den Versand in die lokale SQLite-DB (damit man weiß, wurde schon versendet).
Dies habe ich als erstes geschrieben und lief Problemlos. (auf der Windows Entwicklungsumgebung)
Dann ist es innerhalb 1-2 Wochen gewachsen, mit mehreren Settings welche man vornehmen kann, welche zutreffen können/müssen, um ein versenden der Mail zu veranlassen.

Den Ablauf eines Aufrufs habe ich mal gemessen. ~1,4 Sekunden für einen Durchlauf und dies Remote über noch einen extra SSH-Tunnel.
Direkt auf dem Produktiv System sicherlich 1 Sekunden oder weniger.
Also nichts, was Ressourcen Intensiv ist, oder was extram lange braucht.


Jetzt kam meine Idee, ich baue mit Flask ein kleines Frontend auf.
Dies liest die Lokale SQLite-DB (nur lesend) und zeigt alle aktuellen Einstellungen und ob Mails versendet wurden an.
Auch dies verlief/lief Problemlos.


Hier nun die erste Version... Die "******" Kennzeichen, an welcher Stelle ich versucht habe, das "thread/multiprocessing" einzubauen/zu starten.
Bis heute kann ich nicht mal sagen, wo "wirklich" dafür die richtige Stelle ist. Macht auch sicherlich einen unterschied wo es aufgerufen wird ...

main.py

Code: Alles auswählen

from app import app

if __name__ == "__main__":
	******
    app.run()

__init__.py:

Code: Alles auswählen

import xxxx

******

def create_app():
    app_ini = Flask(__name__)
    
    ******
    # Hier die Überwachung ... Eine While True Schleife welche ich mit sleep(60) alle 1 min Aufrufe. (der Aufruf selbst ~1 Sekunde)
    from app import watchdog
    t_watchdog = Thread(target=watchdog.monitoring, args=(SSH_LOCAL_PORT,), name="background_watchdog", daemon=True)
    t_watchdog.start()
    
	# Hier rufe ich das Blueprint für das Frontend auf
    from .status.routes import status_bp
    app_ini.register_blueprint(status_bp, url_prefix='/status')

    return app_ini

******

app = create_app()

Wie sollte es auch anders sein.
Sobald ich es in das Produktiv System aufgenommen habe, startete alles, aber das Frontend (Flask) über uWSGI/nginx ließ sich nicht aufrufen...
Ich konnte nachvollziehen das der Thread startet und dieser hat auch gearbeitet. Aber das Frontend ließ sich nicht aufrufen.
Ein paar Mal ging es, konnte aber nicht mehr 100% nachvollziehen ob es direkt nach einem einfachen Neustart des systemd war, oder doch eine Einstellung welche ich vorgenommen habe.
Aber auch wenn es mal funktionierte, war es ein Glücksspiel, ob es auch so bleibt.



uWSGI Settings:

Code: Alles auswählen

[uwsgi]
strict = true
module = main:app

; ************************
Hier habe ich unzählige Einstellungen probiert. 
Das Einfachste wäre ein lazy-apps=true. Damit ließ sich das Flask Frontend ohne Probleme aufrufen. Dann hätte ich aber diesen Background Task in "allen" workern gehabt. Würde also 4x laufen, was nicht seien muss und ist auch keine Lösung mit der ich mich zufriedengebe.
enable-threads = true/false
Master = true/false
Process = xx
threads = xx


enable-threads = true                  ; sobald Threads in Python ins spiel kommen, muss dies aktiviert sein. Sonst würden der/die Threads in Python nicht starten.
master = true
processes = 4
threads = 2

;lazy-apps=true				; Funktioniert. Läd aber die "Anwendung" komplett in jeden "Worker/process". Somit würde dieser "Watchdog" auch 4x laufen. Dies ist aber keine Lösung, es sollte nur 1x laufen.
; ************************

callable = app
socket = watchdog_app.socket
chmod-socket = 660
vacuum = tue
die-on-term = true
need-app = true

harakiri = 240
; ************************
max-worker-lifetime = 3600           ; Restart workers after xxxx seconds
; ************************
Also liegt es irgendwie damit zusammen, dass sobald ein "forken" ins Spiel kommt, es zu Problemen kommt.
Mal lief es mit irgendwelchen Settings, aber nach einem manuellen "restart" oder nach Ablauf von "max-worker-lifetime" ging es wieder nicht. War eher ein Glücksspiel.
Ich habe auch versucht das Threading an verschiedenen Stellen einzubauen. mal direkt in die "main.py", noch vor dem "def create_app():" in der __init__ oder eben im "def create_app():" Teil.


Dann habe ich eine Augenscheinliche einfache Lösung gefunden.
Ich ersetze threading mit multiprocessing, dafür muss man nicht viel ändern.

Auf dem Windows Entwicklungssystem gab es dann wieder Probleme, da das multiprocessing aus dem __init__.py teil unter Windows nicht lief.

Code: Alles auswählen

RuntimeError: 
            Attempt to start a new process before the current process
            has finished its bootstrapping phase.
Ok, "multiprocessing.Process" will unter Windows nur unter/aus __main__ starten.
Gesagt getan, es lief.

Ab auf das Produktiv System. Flask lief, aber der Hintergrundprozess lief nicht...
von __main__ wieder ins __init__.py verschoben.
Es lief!
Der Background Prozess lief und auch Flask ließ sich öffnen.
Damit dachte ich, die Lösung gefunden zu haben.


1 Stunde später ... geschuldet durch "max-worker-lifetime = 3600" gab es Fehlermeldungen ... 'can only join a child process'.
Was mich auch hier etwas stutzig machte, warum kam der Fehler bei "jedem" Worker auf.
Auch wenn es für mich danach ausgesehen hatte, das der Hintergrund Task nur 1x lief und es lief nur 1x.
Die Meldung aber machte mich stutzig.

Code: Alles auswählen

...
Okt 11 13:29:31 app_srv uwsgi[974038]: Respawned uWSGI worker 2 (new pid: 974379)
Okt 11 13:29:31 app_srv uwsgi[974038]: Respawned uWSGI worker 1 (new pid: 974378)

Okt 11 13:29:30 app_srv uwsgi[974350]: AssertionError: can only join a child process
Okt 11 13:29:30 app_srv uwsgi[974350]:     assert self._parent_pid == os.getpid(), 'can only join a child process'
Okt 11 13:29:30 app_srv uwsgi[974350]:   File "/usr/lib/python3.9/multiprocessing/process.py", line 147, in join
Okt 11 13:29:30 app_srv uwsgi[974350]:     p.join()
Okt 11 13:29:30 app_srv uwsgi[974350]:   File "/usr/lib/python3.9/multiprocessing/util.py", line 357, in _exit_function
Okt 11 13:29:30 app_srv uwsgi[974350]: Traceback (most recent call last):
Okt 11 13:29:30 app_srv uwsgi[974350]: Error in atexit._run_exitfuncs:

... DIESER FEHLER TEIL 4x. FÜR JEDEN WORKER 1x. ...

Okt 11 13:29:30 app_srv uwsgi[974352]: Error in atexit._run_exitfuncs:
Okt 11 13:29:30 app_srv uwsgi[974038]: worker 4 lifetime reached, it was running for 61 second(s)
Okt 11 13:29:30 app_srv uwsgi[974038]: worker 3 lifetime reached, it was running for 61 second(s)
...
Nachdem ich diesen Fehler das erste Mal gesehen hatte, da ließ sich auch das Frontend nicht mehr öffnen. Erst dadurch bin ich überhaupt auf diesen Fehler gestoßen.
Bei weiteren Tests mit einem kleineren Timer hatte ich bisher immer das Glück, der Frondend aufzurufen ging. Es wurde trotzdem dieser Fehler weiterhin produziert.
Daraufhin habe ich auch diese Lösung verworfen.


Durch Zufall bin ich gestern dann auf uWSGI "add_cron" bzw. besser für meinen Fall "add_timer" gestoßen.

Code: Alles auswählen

import uwsgi
from app import watchdog
uwsgi.register_signal(99, "", watchdog.monitoring)
uwsgi.add_timer(99, 60)

ef create_app():
    app_ini = Flask(__name__)
	....
Mit diesem läuft es seit gestern Problemlos durch.
Dies wird, wenn nicht anders angegeben nur 1x ausgeführt und ruft im angegebenen Intervall die watchdog auf.
Natürlich musste ich diese umschreiben, ohne das while True... aber dann macht es genau was es soll.
Heute um ca ~16Uhr sehe ich dann auch ob alles wirklich klappt.


Eine andere, bisher meine letzte Idee wäre gewesen, welche ich aber nur zu allerletzt aufgegriffen hätte.
Ich erstelle zwei Appliaktionen/systemd. Eines nur für den watchdog und eines für das Frontend/Flask.
Damit würde ich weder timer/thread noch multiprocessing benötigen.

-------

Ich kenne das Problem mit dem GIL und wirklichem "parallelen" ausführen. Sollte aber bei 1Sek ausführung "eigentlich" keine Probleme darstellen.

Auch kann ich nicht sagen, was das forken (im wirklichen Sinn/Prozess) genau macht. Aber hier irgendwo scheint der Fehler zu liegen.

Ich dachte wirklich, ein Prozess, welcher nur 1 bis max. 2 Sekunden Ausführungszeit benötigt, würde ich ohne Probleme in einem Thread verpacken können.
Oder ich habe aber irgendwo einen riesigen Leichtsinnsfehler gemacht?


Ich habe kein Problem damit, das es nicht 100% parallel läuft. Die 1-2sek delay bei einem Aufruf des Frondend, sollte man genau diesen Zeitpunkt erwischen, stellen überhaupt kein Problem dar.

Ich möchte aber gerne immer alles unter einem Hut behalten, was auch zusammengehört.
Ein ganz klein wenig stört mich da das "uwsgi.add_timer(99, 60)", da ich mich hier wieder von uwsgi abhängig mache.


Ich hab dann z.b. mal "Apscheduler" ausprobiert. Soll ja genau für das gemacht sein.

Code: Alles auswählen

scheduler = BlockingScheduler()
scheduler.add_job(func=my_job, trigger='interval', seconds=2, id='my custom task')
scheduler.start()
... ich habe genau geschaut was das scheduler.add_job macht.
Im Endeffekt macht es genau das gleiche, startet am Ende, wie ich oben den Thread/Process. Also würde mir solch ein Tool in diesem Fall nicht weiterhelfen.


Bis heute und ich habe viel dazu gelsen, konnte ich keine Beispiellösung dazu finden.
Ist Python dafür wirklich vielleicht die Falsche Sprache? Wie gesagt, es geht nicht um wirkliches paralleles ausführen.

Prinzipielles an dieser Geschichte. Gehört der Aufruf eines Thread/multiprocessing evtl. an eine andere Stelle, als dies jetzt oben steht? (z.b. doch vor dem vor dem create_app(), ... ?)

Und bevor jemand nun Apscheduler, Celery etc. voreilig in den Raum wirft.
Es macht einen unterschied wie die Applikation am Ende betrieben wird. Denn in der Entwicklungsumgebung, haben alle Möglichkeiten welche ich probiert haben, ohne Probleme Funktioniert.
Erst mit nginx/uwsgi auf dem Produktions-System kommt es zu diesen Fehler(n).
Und diese Tools würden am Ende den gleichen Aufruf machen, nur einfacher mit mehr Funktionen, aber den gleichen Fehler provozieren.


Es liegt an dem "forken" (im uwsgi teil). Die Dokumentation habe ich gelesen, nur verstehen tue ich es nicht. Kann mir jemand wirklich nur "einfach und kurz" erklären, was dies genau macht, was passiert da?
Dann verstehe ich auch vielleicht den Fehler/ das Problem besser, da es damit zusammenhängt.

Mache ich nämlich ein "lazy-app=true" so läuft es, wenn auch nicht wie gewünscht nur 1x, sondern eben abhängig wieviele worker aktiv sind.

Würdet Ihr dies überhaupt eher als ein "Problem" seitens uWSGI einordnen?

Oder es ist wirklich nur eine Kleinigkeit und es fehlt nur an 1-2 Zeilen?

Viele Grüße
Chris
Benutzeravatar
noisefloor
User
Beiträge: 3856
Registriert: Mittwoch 17. Oktober 2007, 21:40
Wohnort: WW
Kontaktdaten:

Hallo,

also was WSGI Applikationsserver wie uwsgi, Gunicorn etc machen ist: liefern deine WSGI-kompatible Applikations (wie z.B. mit Flask erstellt) aus. Das ganz läuft über den Einstiegspunkt, den dein Webframework bereit stellt. Alles, was in `if __name__ == '__main__:'`steht spielt dann keine Rolle, weil der Teil gar nicht ausgeführt werden sollte.

WSGI-Applikationsserver kann man so konfigurieren, dass sie mit X Workern laufen und, je nach Server, ob die Worker in Threads, in Prozessen, synchron oder asyncron laufen. Worker sind vereinfacht gesagt dazu da, dass mehrere Anfragen parallel abgearbeitet werden können.
`app.run()` macht im Prinzip auch nichts anderes, als einen einfachen Server zu starten, der halt immer nur einen Request synchron abarbeiten kann.

Deine Ausführungen mit den Threads etc. haben ich überhaupt nicht verstanden. Scheinbar ist es aber so, dass irgendwie irgendwas irgendwo blockiert und es zu Timeouts kommt.

Allgemein: wenn man zwei Prozesse oder Threads miteinander kommunizieren lassen will / muss, nimmt man entweder eine Queue (darüber kann man auch Daten hin- und her schieben) oder ein Event. Das kann Python mit Bordmitteln. Die Queue oder das Event übergibt man dann allen Threads / Prozessen, die darauf Zugriff haben sollen.

Du kannst mal testweise gunicorn als WSGI-Server nehmen. Der ist (wesentlich) einfacher zu konfigurieren als uwsgi. Wobei das IMHO eigentlich nicht das eigentliche Problem sein sollte.

Gruß, noisefloor
chris_adnap
User
Beiträge: 27
Registriert: Freitag 23. September 2022, 09:36

Hallo noisefloor,

danke für deine Rückmeldung.
das mit dem Einstiegspunkt dachte ich mir schon, darum ist auch der Prozess welchen ich testweise aus __main__ gestartet habe, nie aktiv geworden. Macht auch Sinn.

Ja x Worker, da kommt aber der Punkt welchen ich nicht verstehe.
Bei uWSGI-Appserver heißt es … „lazy-apps“.
Stelle ich dies auf „true“ läuft der/die Threads. Dann aber wird in „jedem“ Worker der Thread gestartetm, was aber bei nur einem benötigten Thread nicht zielführend ist.

Lass ich diese Option weg, so habe ich wie gewollt, den Thread nur 1x am Laufen. Dies hat aber nie zuverlässig funktioniert. Meist eben 504er Time-Out.
Und eben hier heißt es, der Main/Master Prozess wird geforkt.
Was dabei aber genau passiert, das erschließt sich mir nicht.

Mit der Funktion selbst, welcher aus dem Thread heraus aufgerufen wurde, gibt es keine Probleme.
Dieser läuft Lokal, wie auch auf dem Produktionssystem, auch wenn der Applikationsserver ein 504 Time Out meldet der Thread lief im 2min Intervall durch.
Der Aufruf dauert 1-2 Sek und die Schleife läuft alle 2min einmal durch.
Und da verstehe ich bis heute nicht, warum der Applikationsserver ein 504 liefert.

Zu dem Allgemein.
Dieser Prozesse/Threads muss nicht einmal miteinander kommunizieren, da diese vollständig autark läuft.
Der Thread sorgt nur für neue Daten (Aktualisierung der lokalen DB).
Über den Applikationsserver (Flask) greife ich dann auf diese Lokale DB zu, um daraus den Output zu generieren.

Ja, gunicorn könnte ich mal probieren. Dafür muss ich aber erst noch ein Testsystem aufsetzen.



Die Frage welche ich mir aber immer noch stelle, ist diese vorgehensweiße falsch. Macht man dies so, kann man dies so machen?
Lassen wir mal außen vor, was der Prozess macht.
Ein Frontend über (Flask, Django, Bottle, etc…) welches einen Applikationsserver (uWSGI, gunicorn, …) benötigt.
In dem soll es unabhängig der Konfiguration (Anzahl der Worker), nur "ein" Hintergrund Task laufen. (Backup, Update, …).

Ist da meine vorgehensweiße ok/richtig?
Hier alles in einem Code zusammengefasst. Nur falls ich hier schon einen Fehler mache.
Macht es einen unterschied, ob ich den Thread aus "create_app()" heraus aufrufe/starte, oder sollte dies "vor" dem create_app() passieren. Falls dies überhaupt einen unterschied macht.

Code: Alles auswählen

from flask import Flask
from threading import Thread

def db_ini():
    pass
    #... Hier Prüfe ich ob es schon eine lokale DB gibt, falls nicht lege ich diese hier an

db_ini()


SSH_LOCAL_PORT = 12345


def create_app():
    app_ini = Flask(__name__)
	
    # ***** Hier dachte ich mir, ist der richtige Platz zum starten des Hintergrund Thread/Prozess ****
    from app import watchdog
    t_watchdog = Thread(target=watchdog.monitoring, args=(SSH_LOCAL_PORT,), name="background_watchdog", daemon=True)
    t_watchdog.start()
	
    # Blueprint für das Frontend 
    from .status.routes import status_bp
    app_ini.register_blueprint(status_bp, url_prefix='/status')

    return app_ini

app = create_app()


if __name__ == "__main__":
    app.run()

PS. über das Wochenende hindurch hab ich es mit ...

Code: Alles auswählen

from app import watchdog
uwsgi.register_signal(99, "", watchdog.monitoring)
uwsgi.add_timer(99, 60)

...laufen lassen. Alles wunderbar. Der Backround Task startete jede Minute und das Frontend war immer zu erreichen.
Würde ich aber jetzt z.b. auf gunicorn wechseln, würde dieser Weg nicht mehr funktionieren.

Viele Grüße
Chris
Benutzeravatar
noisefloor
User
Beiträge: 3856
Registriert: Mittwoch 17. Oktober 2007, 21:40
Wohnort: WW
Kontaktdaten:

Hallo,

also wenn ich das richtige verstehe wäre die Antwort auf
Ist da meine vorgehensweiße ok/richtig?
nein.

Die Aktualisierung der DB kann doch komplett unabhängig von der Webapplikation laufen. Und selbst wenn das DB Update nur laufen soll, wenn die Webapplikation läuft, kannst du das doch in einen unabhängigen Prozess auslagern.

Wenn du einen Timeout bekommst heißt das, dass deine Flask-Applikation nicht innerhalb des Timeout-Fensters antwortet. Um den Punkt zu finden, wo es hängt, müsste du halt mal an sinnvollen Stellen Logging-Informationen in ein Log schreiben.

Gruß, noisefloor
chris_adnap
User
Beiträge: 27
Registriert: Freitag 23. September 2022, 09:36

Hallo noisefloor,

ok. Darum aber meinte ich auch, nicht betrachten das dieser Teil "eigentlich" unabhängig laufen kann.

Mit ...

Code: Alles auswählen

einen unabhängigen Prozess auslagern.
... meine ich zu verstehen. Für dieses Programm ein eigenen "systemd" einrichten?

Genau, dies möchte ich, wenn es denn nicht wirkich nötig ist vermeiden.

Nehmen wir an, dieser Task steht im Zusammenhang mit der Webapplikation, löscht Cache/Sitzungen nach Intervall xx etc...
Ich versuche erst einmal zu verstehen, ob dieser Code vom Aufbau überhaupt richtig ist.

Viele Grüße
Chris
Benutzeravatar
noisefloor
User
Beiträge: 3856
Registriert: Mittwoch 17. Oktober 2007, 21:40
Wohnort: WW
Kontaktdaten:

Hallo,
Für dieses Programm ein eigenen "systemd" einrichten?

Genau, dies möchte ich, wenn es denn nicht wirkich nötig ist vermeiden.
Weil...? Also warum willst du keine eigene systemd Service Unit dafür erstellen. Die Mechanismen von systemd nutzen ist mit Sicherheit stabiler und besser getestet, also wenn man das selber in Python programmiert.
Nehmen wir an, dieser Task steht im Zusammenhang mit der Webapplikation, löscht Cache/Sitzungen nach Intervall xx etc...
Kann ja sein - aber was macht der Task bei _dir_? Hypothetische Probleme brauchen wir hier nicht diskutieren. Soweit ich deinen 1. Post verstanden hatte, ziehst du alle X Minuten Daten über eine SSH Verbindung von einem anderen DB-Server und speicherst die irgendwie irgendwo lokal.

Und selbst wenn es so wäre, dass das im direkten Zusammenhang mit deiner Web-App steht: systemd Units können Befehle vor dem eigentliche Befehl ausführen (`ExecStartPre`) und können anderen Units starten (`Requires=`).
Ich versuche erst einmal zu verstehen, ob dieser Code vom Aufbau überhaupt richtig ist.
Dazu müsstest du halt mal den kompletten, realen Code zeigen (natürlich ohne Passwörter, Zugangsdaten etc.)

Gruß, noisefloor
chris_adnap
User
Beiträge: 27
Registriert: Freitag 23. September 2022, 09:36

Hi,

Achtung, es wird wieder "Hypothetische".
Nein jetzt wirklich im Ernst, genau in diesem einen Fall ist die Funktion/Programm losgelöst, ja.
Aber sehr oft steht der Background Task direkt mit dem Programm im Zusammengang.

Also nein, es ist nicht nur ein Hypothetische Problem.
Weil...? Also warum willst du keine eigene systemd Service Unit dafür erstellen. Die Mechanismen von systemd nutzen ist mit Sicherheit stabiler und besser getestet, also wenn man das selber in Python programmiert.
"Ich" bin es gewohnt. Installiert/startet man ein Programm, dass damit auch alles läuft.
So bin ich es gewohnt und so macht es eigentlich auch jedes normale Programm.

Bezüglich dem Service. Ich habe ja schon eines welches die Flask Applikation startet/aufruft ...

Code: Alles auswählen

[Unit]
Description=auto mail
After=network.target

[Service]
User=srv
Group=www
Restart=on-failure
RestartSec=5

WorkingDirectory=/home/srv/apps/auto_mail
Environment="PATH=/home/srv/apps/auto_mail/env_auto_mail/bin"
ExecStart=/home/srv/apps/auto_mail/env_auto_mail/bin/uwsgi --ini auto_mail.ini

[Install]
WantedBy=multi-user.target


Und selbst wenn es so wäre, dass das im direkten Zusammenhang mit deiner Web-App steht: systemd Units können Befehle vor dem eigentlichen Befehl ausführen (`ExecStartPre`) und können anderen Units starten (`Requires=`).
Ok, auf diese Idee bin ich nicht gekommen. Sry.
Genau testen werde ich dies auf jeden Fall die tage noch.

Nur um es richtig zu verstehen.
Ich müsste folgende Zeile nach dem Environment ergänzen, damit hätte ich mein "alles unter einem Hut" Wunsch?
ExecStartPre=...bin/python3 /home/srv/...watchdog.py
... und dies könnte es schon gewesen sein?

Natürlich müsste ich mein watchdog.py wieder als Schleife umbauen.
Dann würde erst die watchdog, gefolgt von der uwsgi Anwendung starten?

Falls ich damit grob richtig liegen, ist das eine Lösung.



Dazu müsstest du halt mal den kompletten, realen Code zeigen (natürlich ohne Passwörter, Zugangsdaten etc.)
Auch wenn es jetzt schon 5 Tage, ohne auch nur einen Fehler(meldung) durchgelaufen ist, so habe ich mir das Thema Thread gerade nochmals angesehen und versucht diesen einzukreisen.
Dem Fehler bin ich auf der Spur, habe einiges Probiert, aber eine Lösung habe ich "noch" nicht.
Dran bleibe ich auf jeden Fall...

Code: Alles auswählen


import pandas as pd
from time import sleep
from datetime import datetime, timedelta
from app import error_msg_mail
from app import SSH_LOCAL_PORT
from .database import URL, create_engine, SessionLocal


def check_db_connection(local_port):
    url_db = URL.create(
        drivername="postgresql+psycopg2", # ......
    )
   engine = create_engine(url_db, echo=False, connect_args={'connect_timeout': 10})  #, pool=None, poolclass=NullPool, echo_pool="debug"
    try:
        with engine .connect() as connection:	
            connection.close()
        engine .dispose()
        # conn = engine .connect()
        # conn.close()
        return True, None
        
    except Exception as e:
        return False, repr(e)


def monitoring():

    loop_time = 60
    while True:
        try:
            """
            Der erste test war ein ...
            print(f'START: {datetime.now()}')
            sleep(2)  # Simmuliere durch Pause die Berechnung
            print(f'ENDE: {datetime.now()}')
            sleep(loop_time)
            ... direkt in der while Schleife und dies hat sich ohne Probleme, auch mehrmals starten lassen... Kein Time-out Problem mehr.

            ... so wollte ich nach und nach schauen, wann es zu dem Fehler kommt.
            Und der kam schon gleich zu Anfang.
            """
			
            print(f'START: {datetime.now()}')
            
            # die Funktion "check_db_connection" erzeugt den Time-Out.
            connection_check, network_err_msg = check_db_connection(SSH_LOCAL_PORT)  # Prüfe auf DB Verbindung, incl. 10Sek. TimeOut

            if connection_check:  # erst weiter machen wenn DB-Verbindung steht
                sleep(2)  # Simmuliere durch Pause die Berechnung
            else:
		pass  # send error mail, etc...
		print(network_err_msg)

            print(f'ENDE: {datetime.now()}')  # DEBUG Start/Stop
            sleep(loop_time)  # Pause für 1 Minute

        except Exception as e:
            print('ERROR')
            sleep(loop_time)

Den Fehler mache natürlich ich, liegen tut dieser aber bei "create_engine". Man sieht die tests mit ... #, pool=None, poolclass=NullPool, echo_pool="debug"
Mit dieser Funktion wollte ich nur prüfen, ob die DB erreichbar ist. Damit prüfe ich gleichzeitig auch den SSH Tunnel auf Funktion.
Erst wenn dies erfolgreich ist, arbeitet das Programm weiter.

Und genau hier kommt es zu dem Problem, das Monitoring selbst läuft immer normal durch, aber das Frontend bleibt meist bei dem 504er Time-Out stehen.
Auch hier wüsste ich eine alternative (ungetestet).
Statt an dieser Stelle mit "create_engine" zu arbeiten könnte ich auch für den Verbindungstest mit "psycopg2.connect" arbeiten.

Lasse ich diese "Überprüfung" weg, so läuft es auch mit dem Thread.

Nehmen wir nun an, diese kleine Umstellung wäre schon die Lösung. Was würde dann gegen, ein im Thread gestartetes watchdog, sprechen?

Viele Grüße
Chris
Benutzeravatar
noisefloor
User
Beiträge: 3856
Registriert: Mittwoch 17. Oktober 2007, 21:40
Wohnort: WW
Kontaktdaten:

Hallo,
So bin ich es gewohnt und so macht es eigentlich auch jedes normale Programm.
Ja - aber das ich doch hier irrelevant. Weil du ein Skript doch _nicht_ als "Programm" laufen lassen willst, sondern als Dienst im Hintergrund.
ExecStartPre=...bin/python3 /home/srv/...watchdog.py
Sollten gehen, ja. Wobei wenn das Skript periodisch ausgeführt werden soll und die Periode irgendwo bei 1 Minute plus liegt, dann macht IMHO eine periodische Ausführung per systemd Timer Unit mehr Sinn.

Den relevanten Code zum Frontend hast du immer noch nicht gezeigt. Solange wirst du auch mit deinem 504 Problem alleine bleiben.

Und nie nackte `try...except` verwenden, weil du damit alle Fehler wegbügelst, inkl. Syntaxfehlern und sonstigen Programmierfehlern.

Gruß, noisefloor
chris_adnap
User
Beiträge: 27
Registriert: Freitag 23. September 2022, 09:36

Guten Abend,

Den relevanten Code zum Frontend hast du immer noch nicht gezeigt. Solange wirst du auch mit deinem 504 Problem alleine bleiben.
Hier nun der Frontend Teil.
Aber an dem liegt es nicht. Denn wenn der vorherige Teil "prüfung auf DB Verbindung" wegfällt. Läuft alles andere, bis auf den Hintergrund Task.
Und habe ich die funktion "db-check" aktiv, kommt es nicht mal bis hier her. Nicht mal print('DEBUG: TEST FRONTEND') wird ausgegeben.

Wie schon zu anfang vermutet, hat dies irgendwas mit dem fork Prozess zu tun.
Da ich bis heute nicht verstehe was dieser genau macht, kann ich auch nicht sagen warum es zu dem Fehler kommt, immerhin heruasfinden, an welcher Stelle/wo es zu diesem Fehler kommt.

Code: Alles auswählen

import pandas as pd
from datetime import datetime, timedelta
from flask import Blueprint, render_template

from app import check_db_connection  # Die Funktion habe ich in der __init__ ausgelagert
from app import SSH_LOCAL_PORT  # Der Port aus der __init__
from app.status import db_query  # Hier wieder die DB-Querys

status_bp = Blueprint('status_bp', __name__, template_folder='templates', static_folder='static', static_url_path='/static')


@status_bp.route('/')
def status_site():
    print('DEBUG: TEST FRONTEND')
    # Prüfe, ob eine Verbindung zur DB möglich ist.
    if check_db_connection(SSH_LOCAL_PORT):
        db_connection_work = True
    else:
        db_connection_work = False

    # für Ausgabe
    final_output = {
        'settings': {},
        'data': {}
    }

    from app import models
    from app import SessionLocal

    with SessionLocal() as session:
		all_settings = session.query(models.Settings)
		if all_settings .count() > 0:
			# Ab hier wird die DB Abgefragt, was auf der Webseite ausgegeben werden soll und fülle damit mein "final_output" für die Ausgabe.
			# ...
			if db_connection_work:
			    # Dann rufe ich Daten aus der externen DB mit ab.
			else:
			    # Sonst arbeite ich nur, mit den vorgehaltenen Lokalen Daten.
		

    return render_template("status.html", db_connection_work=db_connection_work, final_output=final_output)
Sollten gehen, ja. Wobei wenn das Skript periodisch ausgeführt werden soll und die Periode irgendwo bei 1 Minute plus liegt, dann macht IMHO eine periodische Ausführung per systemd Timer Unit mehr Sinn.
Naja. Durch die Schleife hat es meiner meinung nach, aber ich habe öffters gesehen das ich damit falsch liegen, den gleichen Effekt.


Und nie nackte `try...except` verwenden, weil du damit alle Fehler wegbügelst, inkl. Syntaxfehlern und sonstigen Programmierfehlern.
Dies ist mir bewusst und nur für den ersten Monat zum Debuggen.
Ob evtl. doch etwas später noch Fehler vorhanden sind, welche ich so nicht bedacht habe. Diese Meldungen werden mir auch gleich per Mail zugeschickt.
Dieses direkte try...except, würde auch später komplett rausfliegen.

Viele Grüße
Chris
Benutzeravatar
noisefloor
User
Beiträge: 3856
Registriert: Mittwoch 17. Oktober 2007, 21:40
Wohnort: WW
Kontaktdaten:

Hallo,

na ja, wenn die Abfrage der DB hängt läufst du in einen Timeout. Da würde ich mal zum Debugging ansetzen.
Dies ist mir bewusst und nur für den ersten Monat zum Debuggen.
Gerade dann ist ein nacktes try... except noch falscher als falsch, weil du Fehler wegbügelst statt das Programm crashen zu lassen und hoffentlich hilfreiche Infos aus dem Stacktrace zu bekommen.

Gruß, noisefloor
Benutzeravatar
__blackjack__
User
Beiträge: 13117
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@chris_adnap: Gerade bei Webanwendungen bin ich es gewohnt das man *nicht* *ein* Programm installiert und startet. Webserver, Datenbank, und Anwendungsserver sind in der Regel schon mal drei verschiedene Dienste, die am Ende zusammen die Anwendung ausmachen. Da können dann durchaus noch Cronjobs, die ab uns zu mal gestartet werden, zum Beispiel zum Aufräumen von Caches, dazu kommen. Und/oder eine Taskqueue für Aufgaben, die länger dauern als ein Anfrage/Antwort-Zyklus an den Webserver/die Webanwendung.

Beim `monitoring()`-Modul wird einiges importiert was nicht verwendet wird. Aus `datetime` sollte da nichts verwendet werden, weil man sich kein `logging` mit `print()` selber bastelt.

`close()` auf einer `Connection` die mit einem ``with`` verwaltet wird, ist überflüssig.

Wenn in jedem Zweig am Anfang oder am Ende das gleiche gemacht wird, dann gehört das vor, beziehungsweise nach diesem Konstrukt *einmal* hingeschrieben.

Kommentare sollen dem Leser einen Mehrwert über den Code geben. Faustregel: Kommentare beschreiben nicht *was* der Code macht, denn das steht da bereits als Code, sondern warum er das macht. Sofern das nicht offensichtlich ist. Offensichtlich ist in aller Regel auch was in der Dokumentation von Python und den verwendeten Bibliotheken steht.

Besonders sind Kommetare die leicht falsch werden können. Wenn man bei ``sleep(loop_time)`` kommentiert das pausiert für eine Minute, `loop_time` allerdings viel weiter oben definiert wird, kann der Kommentar ganz leicht “kaputt” gehen.

Ungetestet:

Code: Alles auswählen

from logging import getLogger
from time import sleep

from app import SSH_LOCAL_PORT

from .database import URL, create_engine

LOG = getLogger(__name__)


def check_db_connection(local_port):
    url_db = URL.create(
        drivername="postgresql+psycopg2",  # ...
    )
    engine = create_engine(
        url_db, echo=False, connect_args={"connect_timeout": 10}
    )  # , pool=None, poolclass=NullPool, echo_pool="debug"
    with engine.connect():
        pass
    engine.dispose()


def monitoring():
    loop_time = 60
    while True:
        LOG.debug("START")
        
        try:
            check_db_connection(SSH_LOCAL_PORT)
            sleep(2)  # Simuliere durch Pause die Berechnung.
        except:
            LOG.exception("Fehler in Hauptschleife")
        
        LOG.debug("ENDE")
        sleep(loop_time)
Im zweiten Quelltext ist `check_db_connection()` hoffentlich nicht die Funktion aus dem anderen Quelltext? Denn dann würde *immer* der ``if``-Fall eintreten, denn nicht-leere Tupel sind immer ”wahr”.

Die Reihenfolge in der Funktion sieht auch komisch aus: Prüfen ob DB-Verbindung geht, dann erstellen einer Session und DB-Abfrage, und danach dann erst etwas bedingt ausführen je nach dem ob die Verbindung zur DB geht die gerade abgefragt *wurde*. Oder eben auch nicht, denn vielleicht ist es ja genau *das* was da dann hängt‽
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
chris_adnap
User
Beiträge: 27
Registriert: Freitag 23. September 2022, 09:36

Hallo __blackjack__,

danke auch für deinen Beitrag.
Ja, ok. Webserver, AppServer, DB etc... zähle ich nicht zu dem eigen geschriebenen Script :)

Vielleicht muss ich mich da etwas von "meiner" Idee verabschieden...
Aber wird wirklich, bei größeren Projekten, so viel händisch angelegt. Also hier mal 3 Dienste, da mal 2 Cron Jobs, etc.

Beim `monitoring()`-Modul wird einiges importiert was nicht verwendet wird. Aus `datetime` sollte da nichts verwendet werden, weil man sich kein `logging` mit `print()` selber bastelt.
Nein, das ist alle schon in Benutzung. Dies kommt dann im sleep() Teil zur Anwendung. Den Teil habe ich aber weggelassen.

`close()` auf einer `Connection` die mit einem ``with`` verwaltet wird, ist überflüssig.
Das ist mir bekannt. An der stelle hab ich heute einiges rumprobiert, da hab ich einfach das .close() dort stehen gelassen. Aber weder close, noch dispose hatten den gewünschten Effekt.

Wenn in jedem Zweig am Anfang oder am Ende das gleiche gemacht wird, dann gehört das vor, beziehungsweise nach diesem Konstrukt *einmal* hingeschrieben.
Kommentare sollen dem Leser einen Mehrwert über den Code geben. Faustregel: Kommentare beschreiben nicht *was* der Code macht, denn das steht da bereits als Code, sondern warum er das macht. Sofern das nicht offensichtlich ist. Offensichtlich ist in aller Regel auch was in der Dokumentation von Python und den verwendeten Bibliotheken steht.

Besonders sind Kommetare die leicht falsch werden können. Wenn man bei ``sleep(loop_time)`` kommentiert das pausiert für eine Minute, `loop_time` allerdings viel weiter oben definiert wird, kann der Kommentar ganz leicht “kaputt” gehen.
Diese jetzigen Kommentare habe ich nur für hier mit eingebunden. Dachte könnte hilfreich sein, damit man schneller sieht was was macht.
Z.b. sollte "# Simuliere durch Pause die Berechnung." heißen, nicht was Sleep macht, sondern das an dieser Stelle die DB Abgefragt und Ausgewertet werden. Und da dieses ca. 1-2 Sekunden dauert, an dieser Stelle die Simulierte Pause.
Im zweiten Quelltext ist `check_db_connection()` hoffentlich nicht die Funktion aus dem anderen Quelltext? Denn dann würde *immer* der ``if``-Fall eintreten, denn nicht-leere Tupel sind immer ”wahr”.
Nein, sollte/ist zweimal das gleiche.
Diese Aufsplittung ist nur gekommen, da ich anfangs den Verdacht hatte, dass dadurch das Problem kommen könnten. Darum habe ich c&p den Code kopiert. An dem lag es aber nicht.
Die Reihenfolge in der Funktion sieht auch komisch aus: Prüfen ob DB-Verbindung geht, dann erstellen einer Session und DB-Abfrage, und danach dann erst etwas bedingt ausführen je nach dem ob die Verbindung zur DB geht die gerade abgefragt *wurde*. Oder eben auch nicht, denn vielleicht ist es ja genau *das* was da dann hängt‽
Ja, stimmt. Hat aber schon seine Richtigkeit. Es sind zwei DB.
Einmal die externe welche ich Abfrage (für diesen den Check) und dann aktuell eine lokale sqlite, in welcher ich das Ergebnis abspeichere. Für diese Lokale nutze ich die Session.


Nicht das sich zu viel Mühe gemacht wird.
Der Fehler ist im Monitoring Script, welches im Hintergrund läuft zu Suchen.
Ich habe genau den Inhalt wie im letzten Post genutzt, also so wie er wirklich zu sehen ist.

Erst mit einem reinem

Code: Alles auswählen

while True:
    print('start')
    sleep(2)
    print('ende')
    sleep(60)
Lief genau wie gewünscht. Thread im __ini__ 1x gestartet und Applikationsserver hat ausgeliefert. Gleich 2-5x den Dienst komplett gestoppt und neu gestartet. Jedes Mal, kein Problem.

....
zweiter Test. schon das reicht, das es zum 504 Time-Out kommt

Code: Alles auswählen

while True:
    print('start')
    connection_check, network_err_msg = check_db_connection(SSH_LOCAL_PORT)
    if connection_check:
        sleep(2)
    else:
        pass  # send error mail, etc...
    sleep(60)
Schlussfolgerung. Hier liegt das Problem.
Lass ich nähmlich diesen Check weg. Läuft alles.

Code: Alles auswählen

def check_db_connection(local_port):
    url_db = URL.create(
        drivername="postgresql+psycopg2", # ......
    )
   engine = create_engine(url_db, echo=False, connect_args={'connect_timeout': 10})  #, pool=None, poolclass=NullPool, echo_pool="debug"
    try:
        with engine .connect() as connection:	
            connection.close()
        engine .dispose()
        # conn = engine .connect()
        # conn.close()
        return True, None
        
    except Exception as e:
        return False, repr(e)
Lass ich uWSGI mit der Option "lazy-app" laufen. Dann läuft alles, incl. dem DB-Check.
Nur wird dieser Background Task "pro" Worker ausgeführt.

Ohne "lazy-app" Startet der Thread nur 1x, aber mehr als dass er diesen Thread startet (und dieser wird auch im Intervall ausgeführt), kommt er nicht.

Da ich aber die Funktion "check_db_connection" nur zu einer Überprüfung benutze, so stelle ich es morgen mal auf ein "psycopg2.connect()" um.
Evtl. macht der "pool" welchen sqlalchemy nutzt hier mir ein Strich durch die Rechnung.
Diesen hatte ich versucht mit close.() und/oder dispose() oder mit poolclass=NullPool, etc.. zu deaktivieren. Hatte aber alles nicht den Effekt.

Ich werde nach der Umstellung berichten, ob dies einen/den Effekt gebracht hat.

Viele Grüße
Chris
chris_adnap
User
Beiträge: 27
Registriert: Freitag 23. September 2022, 09:36

Hallo,

und wie versprochen, ich berichte...

Code: Alles auswählen

__init__.py
Hier hat sich nicht viel verändert... Thread starte ich aus create_app() heraus.

from flask import Flask
from . import models


SSH_LOCAL_PORT = 12345


def create_app():
    app_ini = Flask(__name__)

    from threading import Thread
    from app import watchdog
    t_auto_mail = Thread(target=watchdog.monitoring, args=(SSH_LOCAL_PORT,), name="background_watchdog", daemon=True)
    t_auto_mail.start()

    from .status.routes import status_bp
    app_ini.register_blueprint(status_bp, url_prefix='/status')

    return app_ini


app = create_app()

Code: Alles auswählen

watchdog.py

from time import sleep
from datetime import datetime, timedelta
from app import SSH_LOCAL_PORT
from app.database import URL, create_engine
import psycopg2


def check_db_connection(local_port):
    # ********** NEU, damit funktioniert es **********
    conn = None
    try:
        conn = psycopg2.connect(host="127.0.0.1", port=local_port, connect_timeout=10, ....)
        cur = conn.cursor()
        cur.execute('SELECT version()')
        db_version = cur.fetchone()
        print(db_version)
        cur.close()

    except (Exception, psycopg2.DatabaseError) as error:
        return False, repr(error)

    finally:
        if conn is not None:
            conn.close()
            return True, None
        else:
            return False, 'ERROR'

    """
    # ALT. ********** Die benutzung von "create_engine" führt, in meinem Fall, hier zu dem 504 Time-Out. **********
    url_db = URL.create(drivername="postgresql+psycopg2" ....)
    engine = create_engine(url_db, echo=False, pool_size=5, pool_timeout=5, connect_args={'connect_timeout': 10})
    try:
        engine .connect()
        return True, None
    except Exception as e:
        return False, repr(e)
    """


def monitoring(signum):
    while True:
        try:
            network_connection_check, network_err_msg = check_db_connection(SSH_LOCAL_PORT)
            if network_connection_check:
                pass # WEITER....				
            
            else:
                pass # z.b. error mail ...
			
            sleep(60)
        except Exception as e:
            pass  # Debug Error Mail.
            sleep(60)
Nach Nutzung von "psycopg2.connect()" statt "create_engine()" zum testen der DB-Verbindung, startet nun watchdog als Thread (was aber schon immer ging) UND die Webaplikation lässt sich, ganz normal aufrufen.
Ich lasse es heute mal so durchlaufen, um zu sehen ob es nach einem Worker reload auch noch richtig Funktioniert.
Dann habe ich jetzt einige Möglichkeiten zur Umsetzung gefunden.


Ich habe dabei einiges, was es noch so für Möglichkeiten gibt... ExecStartPre, systemd Timer Unit, ..., mitgenommen.
Bisher hab ich das "Prinzip" über uwsgi irgendwie liebgewonnen. (hier als alternative wäre es das Prinzip als Timer)

Als "ExecStartPre" müsste das Script in einer Schleife laufen, dafür hätte ich wiederum alles in einer .service.

Bei einem "Timer Unit" ohne Schleife, da das Script vom Timer jedesmal neu aufgerufen wird. Soweit ich gerade gelesen habe, kann ich Timer nicht in die vorhandene .service "direkt" integrieren, oder?
Entschlackt aber dafür den Code, da Schleife und Sleep(s) entfallen.


Bei einer Umsetzung mit "ExecStartPre". Gerade habe ich die Option "Restart=on-failure" aktiv.
Würde es dann beides überwachen? Sollte also der Hintergrundtask einen Fehler verursachen, die uwsgi Applikation aber nicht, würde in diesem Fall der Service auch neu gestartet?



Eine letzte Frage, welche ich schon zu Anfang gestellt habe.
Der Aufruf von Thread, gerne an meinem aktuellen __init__ Beispiel.
Macht es einen Unterschied von wo aus Thread aufgerufen/gestartet wird? An Position 1,2,3.... ?

Code: Alles auswählen

__init__.py
from flask import Flask
from . import models
from threading import Thread
from app import watchdog


SSH_LOCAL_PORT = 12345

# START HIER (1) ?

def create_app():
    # START HIER (2) ?
    app_ini = Flask(__name__)
    
    # START HIER (3) ?
    
    t_auto_mail = Thread(target=watchdog.monitoring, args=(SSH_LOCAL_PORT,), name="background_watchdog", daemon=True)
    t_auto_mail.start()

    from .status.routes import status_bp
    app_ini.register_blueprint(status_bp, url_prefix='/status')

    return app_ini

# START HIER (4) ?
app = create_app()

# START HIER (5) ? # Bis hier her würde es glaube ich nicht mal kommen :)
Vielen Dank aber bis hier her für die Unterstützung.

Viele Grüße
Chris
Antworten