Thread Verwaltung

Python auf Einplatinencomputer wie Raspberry Pi, Banana Pi / Python für Micro-Controller
Antworten
Lungentorpedo
User
Beiträge: 12
Registriert: Donnerstag 10. April 2014, 22:44

hi,
gleich zu meinem problem.
Ich möchte ein Programm schreiben welches in der Lage ist threads zu starten und diese fals sie abstürzen neuzustarten. Desweiteren soll der user in der Lage sein über eien Tastaureingabe das Programm anzuweisen bestimmte threads zu beenden und nicht mehr zu starten und wieder zu ändern

Der Teil des Programms der die Threads startet und neustarted funktioniert schon.
Bloß weiß ich nicht wie ich einen thread nach benutzereingabe beenden kann.

Code: Alles auswählen

class management (object):
    def __init__(self):
        self._threadNames = {"server0" : ["serverTCP", "0"], "server1" : ["serverTime" ,"0"], "server2": ["serverM", "0"]}


    def main (self):
        while(1):

            for i in range(len(self._threadNames)):

                exist = 0
                for t in threading.enumerate():
                    if self._threadNames["server"+str(i)][0] == t.name:
                        exist = 1
                    time.sleep(1.0)

                if exist == 0 and self._threadNames["server"+str(i)][1] == "0":

                    thd = Thread(target = getattr(main, self._threadNames["server"+str(i)][0]), name = self._threadNames["server"+str(i)][0])
                    thd.start()

                if exist == 1 and self._threadNames["server"+str(i)][1] == "1":
                    print("stop")
                    thd = Thread(target = getattr(main, self._threadNames["server"+str(i)][0]), name = self._threadNames["server"+str(i)][0])
                    thd._stop()
                    th = threading.enumerate()
                    print(th)

                time.sleep(2.0)

    def serverTCP (self):
        while(1):
            print("Ich laufe serverTCP")
            time.sleep(5.0)

    def serverTime (self):
        while(1):
            print("Ich laufe serverTime")
            time.sleep(5.0)

    def serverM (self):
        #while(1):
            print(self._threadNames)
            print("Schreibe server0, server1, server2 um die Threads zu beenden oder zu starten")
            eingabe = input("Eingabe >>>")
            time.sleep(5.0)
            if eingabe == "server0":
                if self._threadNames["server0"][1] == "0":
                    self._threadNames["server0"][1] = 1
                else:
                    self._threadNames["server0"][1] = 0
            if eingabe == "server1":
                if self._threadNames["server1"][1] == "0":
                    self._threadNames["server1"][1] = 1
                else:
                    self._threadNames["server1"][1] = 0
            if eingabe == "server2":
                if self._threadNames["server2"][1] == "0":
                    self._threadNames["server2"][1] = 1
                else:
                    self._threadNames["server2"][1] = 0
            else:
                print("False input")


main = management()
main.main()

Weiß da einer vieleicht eine lösung oder hat einen ganz anderen ansatz wie ?


Mit freundlichen Grüßen

Fabi
BlackJack

@Lungentorpedo: Der Code in den Threads muss so geschrieben sein das man ihn von aussen bitten kann sich selbst zu beenden. Alles andere ist unschön bis gefährlich.

Man könnte ansonsten noch Prozesse verwenden statt Threads, weil man die tatsächlich hart von aussen abschiessen kann. Mit dem `multiprocessing`-Modul hat man grundsätzlich auch die Thread-API. Kommunikation zwischen den Prozessen ist natürlich geringfügig aufwändiger als wenn man sich den gleichen Adressraum teilt. Das kann ein Nachteil sein, oder aber auch ein Vorteil.
Lungentorpedo
User
Beiträge: 12
Registriert: Donnerstag 10. April 2014, 22:44

Danke für die schnelle Antwort werde das mal einfügen und schauen wies läuft
Gruß

Fabi
Sirius3
User
Beiträge: 17753
Registriert: Sonntag 21. Oktober 2012, 17:20

@Lungentorpedo: für Wahrheitswerte gibt es True und False, Du benutzt wahlweise 0, 1 oder "0" und "1", was erstens nicht richtig und zweitens fehlerhaft ist. Wie Du mit Wörterbüchern umgehst, ist auch nicht so, wie das eigentlich gemeint ist. Hier mal der überarbeitete Code:

Code: Alles auswählen

class Management(object):
    def __init__(self):
        self._threadNames = {
            "server0": ["serverTCP", False],
            "server1": ["serverTime", False],
            "server2": ["serverM", False],
        }

    def main(self):
        while True:
            for name, running in self._threadNames.values():
                exist = any(name == t.name for t in threading.enumerate())
                time.sleep(1.0)
 
                if not exist and not running:
                    thd = Thread(target=getattr(self, name), name=name)
                    thd.start()
 
                if exist and running:
                    print("stop")
                    thd = Thread(target=getattr(self, name), name=name)
                    thd._stop()
                    th = threading.enumerate()
                    print(th)
                time.sleep(2.0)
 
    def serverTCP(self):
        while True:
            print("Ich laufe serverTCP")
            time.sleep(5.0)
 
    def serverTime(self):
        while True:
            print("Ich laufe serverTime")
            time.sleep(5.0)
 
    def serverM(self):
        #while True:
            print(self._threadNames)
            print("Schreibe {} um die Threads zu beenden oder zu starten".format(', '.join(self._threadNames)))
            eingabe = input("Eingabe >>>")
            time.sleep(5.0)
            try:
                data = self._threadNames[eingabe]
            except KeyError:
                print("False input")
            else:
                data[1] = not data[1]
  
main = Management()
main.main()
Ansonsten versteh ich noch nicht ganz, was Du mit Deinen Threads so anstellst. Besser Du speicherst explizit Deine Threads, statt sie über enumerate abzufragen.
Benutzeravatar
noisefloor
User
Beiträge: 3856
Registriert: Mittwoch 17. Oktober 2007, 21:40
Wohnort: WW
Kontaktdaten:

Hallo,
Ich möchte ein Programm schreiben welches in der Lage ist threads zu starten und diese fals sie abstürzen neuzustarten.
Gibt's doch schon, nennt sich Erlang *SCNR*

Gruß, noisefloor
Lungentorpedo
User
Beiträge: 12
Registriert: Donnerstag 10. April 2014, 22:44

Hi, danke für den überarbeiteten Code sieht sehr gut aus.
Da muss ich noch viel lernen das ich das auch selber so schreiben kann.

Hab aber immer noch ein problem udn zwar ist eier der Threads ein TCP socket Server der auf eingehende verbindungen wartet.
Das heißt der thread steht so lange still bis bis ein verbundender socket daten bereit stellt.
Nun weiß ich net wie ich den Thread von innen heraus beenden soll, da dieser still steht und nicht immer wieder eine schleife durchläuft.

Code: Alles auswählen

lesen, schreiben, oob = select.select([server] + clients, [], [])
Der "select" Befehl wartet so lange bis daten bereitstehen.
kann ich da ein timeout einstellen, sodass ich nach einer geringen wartezeit, der select befehl abbricht und der weitere Code ausgeführt wird ?

Noch mal den ganzenn code:

Code: Alles auswählen

    def serverTcp (self):
        clients = []
        print "ich bin das Aquarium Programm!"
        server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        server.bind(("192.168.2.100", 50001))

        server.listen(1)
        try:
            while True:
                lesen, schreiben, oob = select.select([server] + clients,
                                              [], [])

                for sock in lesen:
                    if sock is server:
                        client, addr = server.accept()
                        clients.append(client)
                        print "+++ Client %s verbunden" % addr[0]
                    else:
                        daten = sock.recv(1024)
                        daten = pickle.loads(daten)
                        ip = sock.getpeername()[0]
                        if "Close" in daten:
                            print "+++ Verbindung zu %s beendet" % ip
                            sock.close()
                            clients.remove(sock)
                            server.close()
                            #sys.exit()
                        else:
                            control = aquaServer.modules(daten)
                            sock.send(pickle.dumps(control))

        finally:
            for c in clients:
                c.close()
                server.close()
Gruß

Fabi
BlackJack

@Lungentorpedo: `select()` hat ein Argument mit dem Namen `timeout` — rate mal wofür das da ist. ;-)

Alternativ könnte der ”Threadmanager” sich selbst als Client mit dem Server verbinden und Du könntest ein Kommando zum beenden des Servers mit in das Protokoll einbauen. Damit das nicht jeder Client kann, könnte man dabei einen zufällig generierten Wert übermitteln den beide kennen (laufen ja im selben Prozess).

Dein Code für TCP ist fehlerhaft. Ein `recv(n)` liefert einen Ausschnitt aus dem Datenstrom der 1 bis maximal n Bytes lang ist. Also im Extremfall muss Dein Code auch damit klarkommen können wenn immer nur 1 Byte pro Aufruf kommt. Du musst also so lange lesen bis eine Nachricht komplett ist, bevor Du sie weiterverarbeitest. Da kann man selber Code schreiben, was bei Pickle-Daten als ”Protokol” nicht so einfach ist, oder man lässt sich vom Socket ein Dateiobjekt geben und verwendet das dann mit `pickle.load()`. Wobei diese ”naive” Lösung an der Stelle dann eine einfache aber sehr effektive DOS-Attacke auf Deinen Server zulässt die man mit dem Lesen der gepickleten Daten in einem eigenen Thread umgehen könnte.

Der ``if 'Close' in daten:``-Test ist IMHO ungünstig. Damit darf es dann niemals legitime Daten geben die eine solche Zeichenkette enthalten aber kein Beenden der Verbindung bedeuten. Das würde ich eindeutiger regeln. Zum Beispiel mit einem eigenen Datentyp der den Abbruch signalisiert.

Edit: Ich bin übrigens wie Sirius3 der Meinung das Dein Code für die Verwaltung der Threads komisch ist. Das man Threads Namen geben kann habe ich noch nie irgendwo gebraucht. Das kann zu Debugging- oder Logging-Zwecken ganz nett sein, aber anhand dieser Namen irgendwelche Entscheidungen zu treffen halte ich für keine gute Idee. Man kann direkt die Thread-Objekte hinterlegen oder `None` wenn noch kein Thread existiert und ob ein Thread läuft kann man ihn direkt fragen, die haben nämlich eine `is_alive()`-Methode.
Lungentorpedo
User
Beiträge: 12
Registriert: Donnerstag 10. April 2014, 22:44

hi, Ich glaube in dem script von sirius geht der "thd._stop()" befehl net .
hab jetzt aber auch möglichenkeiten gefunden das der thread sich von innen herraus beenden kann.
wäre aber trotzdem schön zu wissen wieso der "thd._stop()" befehl net geht !

Das mit den namen für die threads passt schon so

Das mit dem Timeout hab ich 3 minuten nach meinem post dann auch geschnallt.

Trotzdem danke

Gruß

Fabi

Noch mal eine neue erkenntniss standt 20:43

thd object has no atribute _stop()

hmm
BlackJack

@Lungentorpedo: Eine `_stop()`-Methode gibt es nicht. Die müsstest Du dann implementieren, was natürlich nur geht wenn Deine Threads von `Thread` erben und nicht mit dem `target`-Argument Funktionen/Methoden gestartet werden. Wichtig ist, dass man wie auch immer man das löst einen Thread in der Verwaltung erst als beendet betrachtet wenn er das auch tatsächlich ist, also nicht schon nachdem man den Wunsch übermittelt hat der Thread möge sich beenden. Sonst kann es Probleme geben. Zum Beispiel gibt es ja so Zwischenzustände wie: der Benutzer hat einem Thread gesagt er soll sich beenden, und sagt gleich darauf es soll ein neuer für diese Aufgabe gestartet werden. Solange bis der erste sich dann tatsächlich beendet hat, laufen nun zwei parallel. Beziehungsweise im Fall des TCP-Servers schlägt der Start fehl solange das Server-Socket noch vom ersten Thread belegt ist.

Was meinst Du mit „passt schon so“? Das ist ein hässlicher Hack Threads per `threading.enumerate()` und Namensvergleiche zu identifizieren statt sie in einer Datenstruktur zu halten. Dazu ist der Name nicht gedacht.
Lungentorpedo
User
Beiträge: 12
Registriert: Donnerstag 10. April 2014, 22:44

warum schreibt er den den:

Code: Alles auswählen

if exist and running:
                    print("stop")
                    thd = Thread(target=getattr(self, name), name=name)
                    thd._stop()
Wenn das so garnicht ausführbar ist ?

Gruß

fabi
BlackJack

@Lungentorpedo: Was hätte er denn sonst schreiben sollen? Über den Inhalt Deiner Threadfunktionen war bis dahin ja noch nicht viel bekannt, und wie Du letztendlich die Kommunikation löst auch nicht.

Ich verstehe auch noch nicht so ganz was `main()` und `serverM()` sollen. Eigentlich sollte `serverM()` doch reichen und abgestürzte Threads würde ich nicht durch regelmässiges prüfen aller Threads herausfinden sondern einen Wrapper schreiben der mitbekommt wenn ein Thread beendet ist und prüft ob das regulär war, oder ob der neu gestartet werden muss.
Lungentorpedo
User
Beiträge: 12
Registriert: Donnerstag 10. April 2014, 22:44

Hab halt mit wrappern noch nichts gemacht werd mir das mal anschauen.
Habs halt erstmal mit meinen mir bekannten Mitteln versucht umzusetzten.

Würdest mir das mit den wrappern den erklären oder hast ein example auf deutsch ?
Würde mir das verstehen leichter machen

Mit freundlichen Grüßen

Fabi
BlackJack

@Lungentorpedo: Als „wrapper“ bezeichnet man Funktionen oder Objekte in die man andere Funktionsaufrufe oder Objekte ”verpackt”. Das ist eine ganz allgemeine Bezeichnung. Also nichts was Du nicht schon kennst, nehme ich jetzt mal an. Du musst halt dafür sorgen dass Du einen ”Absturz”, was nichts weiter ist als eine Ausnahme in der `run()`-Methode eines `Thread`-Exemplars mit der Dein Code für die `run()`-Methode nicht gerechnet hat. Also könnte man eine von `Thread` abgeleitete Klasse schreiben die neben `target` & Co zum Beispiel auch eine Rückruffunktion übergeben bekommt die sie aufruft wenn eine Ausnahme beim Aufrufen der `run()`-Methode der Basisklasse auftritt.
Sirius3
User
Beiträge: 17753
Registriert: Sonntag 21. Oktober 2012, 17:20

@Lungentorpedo: ich habe Deinen Code, mit all seinen Unschönheiten, nur 1:1 in eine lesbare Form gebracht, um zu erkennen, wo Dein Problem ist.
Am einfachsten ist es doch, die Thread-Funktionen selbst absturzsicher zu machen:

Code: Alles auswählen

    def serverTime(self):
        while True:
            try:
                print("Ich laufe serverTime")
                time.sleep(5.0)
            except Exception:
                logging.exception("Unexpected error. Continuing with loop.")
BlackJack

Und wenn man das mehrfach machen muss, könnte man einen Decorator dafür schreiben. :-)
BlackJack

@Lungentorpedo: Ich schaue mir gerade noch mal deinen TCP-Server an. Der ist echt richtig kaputt, denn nicht nur `recv()` sondern auch `send()` wird falsch verwendet. `send()` sendet *nicht* garantiert alle Daten die man übergibt. Die Methode gibt eine Zahl zurück die sagt wieviele Bytes von dem Argument versendet wurden und das müssen nicht alle sein! Man muss das also so lange wiederholt aufrufen bis alle Daten tatsächlich gesendet wurden, oder `sendall()` verwenden. Bei der Methode hat man dann aber wieder das Problem das die blockieren kann. Es macht wenig Sinn sich auf der einen Seite mit `select()` herum zu schlagen und auf der anderen Seite blockierende Socket-Methoden zu verwenden.

Da man bei nicht-blockierenden Aufrufen das Protokoll zumindest in so weit kennen muss um beim Empfangen entscheiden zu können wann eine Nachricht komplett angekommen ist, und ich das, beziehungsweise *die* Pickle-Formate (Plural) eher als „black box“ betrachten würde, da das tatsächliche Binärformat IMHO ein Implementierungsdetail bleiben sollte, kann man das Empfangen mit `recv()` gar nicht machen. Also muss man eine Datei aus dem Socket-Objekt machen und hat damit blockierende Leseaufrufe. Und damit ist dann `select()` raus, weil die Kombination sinnfrei ist.

Wenn man `select()` verwendet, dann muss man auch nicht-blockierende Aufrufe verwenden, nicht nur das Lesen sondern auch das schreiben damit abwickeln, und dementsprechend Lese- und Schreibpuffer für jeden Client haben. Und natürlich ein Protokoll verwenden wo man beim lesen entscheiden kann wann eine Nachricht komplett ist, damit sie verarbeitet werden kann. Mit einem Thread pro Client und dann blockierenden Aufrufen, also am besten die Datei-API, ist die Socket-Programmierung wirklich *deutlich* einfacher. Es gibt einen Grund warum das heute kaum jemand mehr selber macht sondern ein Rahmenwerk verwendet.
Lungentorpedo
User
Beiträge: 12
Registriert: Donnerstag 10. April 2014, 22:44

Hab das hier bei Google gefunden.

Code: Alles auswählen

def wrap(pre, post):
       def decorate(func):
           def call(*args, **kwargs):
               pre(func, *args, **kwargs)
               result = func(*args, **kwargs)
               post(func, *args, **kwargs)
              return result
           return call
       return decorate


def trace_in(func, *args, **kwargs):
   print "Entering function",  func.__name__

def trace_out(func, *args, **kwargs):
   print "Leaving function", func.__name__

@wrap(trace_in, trace_out)
def calc(x, y):
   return x + y
hab das auch mal ausgeführt. Führt halt eine Aktion vor und nach dem auffruf der Methode "calc" aus.
Allerdings versteh ich net alle Codeschnipsel
Die Def Wrap Methode versteh ich net, auch net die übergebenen Variablen.
Kann mir das vlt. einer erklären was das programm da macht ?


Gruß

Fabi

*Edit den Socket server hab ich von Galileo Computing einfach kopiert!
das ist nicht mein eigener code also wie soll ich wissen das dass so falsch ist!
BlackJack

@Lungentorpedo: Die Einrücktiefe von jeder Zeile entspricht nicht der Konvention und Zeile 7 ist sogar fehlerhaft eingerückt.

Den Abschnitt Keyword Arguments aus dem Python-Tutorial kennst Du bereits? Der und die beiden folgenden Abschnitte sollten die Argumente mit den Sternchen erklären.

Die Decorator-Syntax ist ja nur syntaktischer Zucker für folgendes:

Code: Alles auswählen

@expression
def function():
    pass

# <=>

def function():
    pass

function = (expression)(function)
Hinter dem ``@`` muss also ein beliebiger Audruck stehen der als Ergebnis etwas Aufrufbares hat das eine Funktion als Argument erwartet und dann eine Funktion zurück gibt. Oft steht hinter dem @ nur ein Funktionsname. In diesem Fall ist es ein Funktionsaufruf. Das heisst es wird eine Funktion aufgerufen die eine Funktion zurückgeben muss, welche eine Funktion als Argument erwartet und eine Funktion zurück gibt. Und diese Funktion die von `wrap()` zurückgegben wird, wird auf die definierte `function` angewendet und der Name wird dann an das Ergebnis dieses Aufrufs gebunden.

Zu allem Überfluss bekommt `wrap()` auch noch zwei Funktionen als Argumente. Nach diesem Beispiel sollte man also auf jeden Fall verinnerlicht haben das Funktionen in Python Werte erster Klasse sind, man die also als Argumente übergeben kann und auch Funktionen schreiben kann die neue Funktionen als Rückgabwert liefern. :-)

Welches Buch war das denn?
BlackJack

@Lungentorpedo: Also ich habe gerade mal in das OpenBook in der Version zu Python 2.x geschaut und da ist der Server nur für den Empfang programmiert, allerdings auch mit der fehlerhaften Verwendung von `recv()`. Das kommt halt dabei heraus wenn zwei Studenten die wenig Erfahrung/Ahnung von Python zu scheinen haben, ein Buch schreiben das so viele Themen durchhechelt. Socketprogrammierung ist nicht so einfach, da sollte man vielleicht besser Quellen lesen die sich nur auf dieses Thema spezialisieren. Das Buch an sich ist nicht so der Hit.
Antworten