Durchlaufzeit von Funktionen rausfinden / Datum angeben

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
BlackJack

@Serpens66: Ich würde sagen das 1000 Threads schon zu viel sind, 10000 sind eindeutig jenseits von dem wofür man Threads gebrauchen kann. Ich weiss das ich vor Jahren mal versucht hatte so viele Threads wie möglich zu starten und war irgendwo in den Bereich von ±800 gekommen bis mir das mit Hinweis auf zu viele Threads abgebrochen wurde.

Ich wüsste auch nicht wofür man wirklich *so* viele Sachen *echt* parallel bräuchte, denn ich sehe nicht wo das Vorteile bringt. Weder bei CPU noch bei I/O. Je mehr desto besser stimmt hier ja nicht, denn die Parallelität hat mit den Resourcen auch ihre Grenzen. Man hat normalerweise keine 1000 Prozessoren/Kerne oder so viele Massenspeicher das man die I/O-Last von 1000 Threads darauf verteilen könnte ohne das I/O der Flaschenhals wird und es dann eher langsamer wird als schneller.

Bei der Doku würde ich ja fast schon wieder den alten Hasen gegenüber dem Anfänger raushängen lassen, der sich einfach nicht genug mit dem Thema beschäftigen möchte. ;-) So ziemlich gleich am Anfang der Seite, ich muss dafür noch nicht einmal runters scrollen, ist `threading.enumerate()` dessen Dokumentation mit dem Satz „Return a list of all Thread objects currently alive.“ beginnt. Wobei `Thread` ein Link auf die Doku von `threading.Thread` ist, also genau die Objekte die Du haben möchtest. Nicht enthalten in der Liste sind „[…] terminated threads and threads that have not yet been started.“

Also so etwas in dieser Richtung (ungetestet):

Code: Alles auswählen

    LOG.debug('Active threads with `aufgabe`: %s', len(threads))
    for thread in threading.enumerate():
        if hasattr(thread, 'aufgabe'):
            LOG.debug(
                '%r\n  %r\n  %r', thread, thread.aufgabe, thread.arguments
            )
Mit einem entsprechenden `Logger`-Objekt vom `logging`-Modul und dem Level `logging.DEBUG` kann man damit eine Ausgabe bekommen bei der auch gleich ein aktueller Zeitstempel mit ausgegeben wird. Sorry, wieder eine neue Baustelle, aber wie gesagt, Logging will man bei Prozessen die mehrere Stunden laufen, eigentlich immer haben. Kannst natürlich auch ``print`` verwenden.

`Futures.result()` löst dann eine Ausnahme aus wenn die Funktion dazu eine Ausnahme ausgelöst hat. Und zwar genau diese Ausnahme:

Code: Alles auswählen

In [40]: e = concurrent.futures.ThreadPoolExecutor(10)

In [41]: def f():
    ...:     assert False
    ...: 

In [42]: r = e.submit(f)

In [43]: r.result()
---------------------------------------------------------------------------
AssertionError                            Traceback (most recent call last)
<ipython-input-43-6401ee0d034b> in <module>()
----> 1 r.result()

/usr/local/lib/python2.7/dist-packages/concurrent/futures/_base.pyc in result(self, timeout)                                                                    
    398                 raise CancelledError()
    399             elif self._state == FINISHED:
--> 400                 return self.__get_result()
    401 
    402             self._condition.wait(timeout)

/usr/local/lib/python2.7/dist-packages/concurrent/futures/_base.pyc in __get_result(self)
    357     def __get_result(self):
    358         if self._exception:
--> 359             reraise(self._exception, self._traceback)
    360         else:
    361             return self._result

/usr/local/lib/python2.7/dist-packages/concurrent/futures/_compat.pyc in reraise(exc, traceback)                                                                
    105     def reraise(exc, traceback):
    106         locals_ = {'exc_type': type(exc), 'exc_value': exc, 'traceback': traceback}
--> 107         exec('raise exc_type, exc_value, traceback', {}, locals_)
    108 else:
    109     def reraise(exc, traceback):

/usr/local/lib/python2.7/dist-packages/concurrent/futures/thread.pyc in run(self)                                                                               
     59 
     60         try:
---> 61             result = self.fn(*self.args, **self.kwargs)
     62         except BaseException:
     63             e, tb = sys.exc_info()[1:]

<ipython-input-41-cfcdc8007ab4> in f()
      1 def f():
----> 2     assert False

AssertionError: 
Und da ändert auch `wait()` nichts dran:

Code: Alles auswählen

In [44]: r = e.submit(f)

In [45]: concurrent.futures.wait([r])
Out[45]: DoneAndNotDoneFutures(done=set([<Future at 0xb418acc state=finished raised AssertionError>]), not_done=set([]))

In [46]: _.done.pop().result()
---------------------------------------------------------------------------
AssertionError                            Traceback (most recent call last)
<ipython-input-46-d26524687e7c> in <module>()
----> 1 _.done.pop().result()

/usr/local/lib/python2.7/dist-packages/concurrent/futures/_base.pyc in result(self, timeout)                                                                    
    398                 raise CancelledError()
    399             elif self._state == FINISHED:
--> 400                 return self.__get_result()
    401 
    402             self._condition.wait(timeout)

/usr/local/lib/python2.7/dist-packages/concurrent/futures/_base.pyc in __get_result(self)
    357     def __get_result(self):
    358         if self._exception:
--> 359             reraise(self._exception, self._traceback)
    360         else:
    361             return self._result

/usr/local/lib/python2.7/dist-packages/concurrent/futures/_compat.pyc in reraise(exc, traceback)
    105     def reraise(exc, traceback):
    106         locals_ = {'exc_type': type(exc), 'exc_value': exc, 'traceback': traceback}
--> 107         exec('raise exc_type, exc_value, traceback', {}, locals_)
    108 else:
    109     def reraise(exc, traceback):

/usr/local/lib/python2.7/dist-packages/concurrent/futures/thread.pyc in run(self)
     59 
     60         try:
---> 61             result = self.fn(*self.args, **self.kwargs)
     62         except BaseException:
     63             e, tb = sys.exc_info()[1:]

<ipython-input-41-cfcdc8007ab4> in f()
      1 def f():
----> 2     assert False

AssertionError:
Benutzeravatar
snafu
User
Beiträge: 6744
Registriert: Donnerstag 21. Februar 2008, 17:31
Wohnort: Gelsenkirchen

Serpens66 hat geschrieben:Daher würde ich gerne deinen Vorschlag durchführen und gucken, welche Threads das sind. Doch wie mache ich das? Ich konnte in der Doku nichts finden, wie ich mir die aufgabe der noch aktiven Threads ausgeben lassen könnte https://docs.python.org/3.4/library/threading.html
Direkt als vierter Aufruf wird dort threading.enumerate() beschrieben. Entweder du hast es übersehen oder nicht verstanden. Jedenfalls tut die Funktion genau das, was du willst.
Serpens66
User
Beiträge: 259
Registriert: Montag 15. Dezember 2014, 00:31

enumerate habe ich sehr wohl gesehen, bin ja nicht blind =P

Aber wie ich davon nun auf die aufgerufene Funktion==aufgabe==target komme, steht dort nirgends. Wenn man das Thread object hat, steht dort nur, dass man thread.ident oder ähnliches aufrufen kann. thread.target hat nicht funktioniert, weshalb ich nicht weiter wusste.

Mit Logging habe ich mich deshalb nicht beschäftigt, weil das für mich damals als Anfänger viel zu viel neues war. Ich habe mir stattdessen mein eigenes logging geschrieben, von dem ich weiß wie es funktioniert und es reicht aus. Soll heißen es ist einfach nur eine Funktion, der ich ähnlich wie print() einen Text übergebe, der dann in eine txt Datei geschrieben wird. Dh. in dieser txt Datei steht dann alles was ich selbst berücksichtigt habe.
Logging würde doch vermutlich nicht anders funktionieren, oder? Also wenn ich nicht selbst den command "LOG.debug" oderso schreibe, dann wird das auch nicht geloggt. Somit sollte meine eigene Funktion doch erstmal ausreichen, oder nicht?
@Serpens66: Ich würde sagen das 1000 Threads schon zu viel sind, 10000 sind eindeutig jenseits von dem wofür man Threads gebrauchen kann. Ich weiss das ich vor Jahren mal versucht hatte so viele Threads wie möglich zu starten und war irgendwo in den Bereich von ±800 gekommen bis mir das mit Hinweis auf zu viele Threads abgebrochen wurde.
Ich wüsste auch nicht wofür man wirklich *so* viele Sachen *echt* parallel bräuchte, denn ich sehe nicht wo das Vorteile bringt. Weder bei CPU noch bei I/O. Je mehr desto besser stimmt hier ja nicht, denn die Parallelität hat mit den Resourcen auch ihre Grenzen. Man hat normalerweise keine 1000 Prozessoren/Kerne oder so viele Massenspeicher das man die I/O-Last von 1000 Threads darauf verteilen könnte ohne das I/O der Flaschenhals wird und es dann eher langsamer wird als schneller.
Ich wüsste nicht, wo ich behauptet habe, dass ich 1000 Threads oderso gar mehr brauche? :D
Brauchen tue ich immer nur ~10 bis 100 aufeinmal (um genau so viele API Calls zeitgleich zu machen und auf Antwort zu warten). Danach sollen die Threads ja eigentlich verschwinden, doch das tun sie manchmal nicht, wodurch sie sich auf über 1000 summieren und irgendwann das Programm wegen zuvielen Threads beendet wird, weshalb ich hier ja um Rat frage :D

Nun zum wichtigen Teil:
Danke für deinen Code. Meine Frage sollte dann wohl durch "hasattr()" beantwortet sein. Werde es gleich mal ausprobieren... (ok sehe gerade es ist nur ein check, ob aufgabe enthalten ist)
Du schreibst ja direkt "aufgabe". Also nehme ich an, dass man damit nach der Variablen namens "aufgabe" sucht?
Ich verwende jetzt ja stattdessen concurrent.futures. Dort habe ich keine Kontrolle darüber wie ich irgendwas benenne.
Wie heißt das nun? Habe im thread.py skript von concurrent.futures mal geschaut, dort wird "fn" und "args" verwendet, aber auch thread.fn gibts leider nicht..
(habs mal für meinen Thread Code getestet und da funktioniert es, das ist schonmal super :) Muss ich nur noch rausfinden, wie concurrent.futures die funktion nennt...)

(Die Sache mit dem future.result() muss ich mir nochmal in Ruhe angucken, sieht kompliziert aus)
Serpens66
User
Beiträge: 259
Registriert: Montag 15. Dezember 2014, 00:31

ah, ich hatte noch nicht realisiert, dass concurrent.futures nur eine Hilfe bei der Anweundung von threading ist. Dh. wenn ich was über die Thread objekte wissen will, muss ich natürlich im threading modul nachschauen.
Es heißt also "_target" :)

ok...
das ergebnis über thread, target und args sieht in meinem kurzen Test zb so aus:
THREAD:<Thread(Thread-1, started daemon 7132)>
TARGET: <function _worker at 0x02BA8078>
ARGS: (<weakref at 0x028B0600; to 'ThreadPoolExecutor' at 0x0284F930>, <queue.Queue object at 0x0284F8B0>)

Das ist ja erstmal nicht sonderlich hilfreich... ich muss also einmal die argumente entschlüsseln, bzw muss auf den Namen der Funktion kommen.
Nach meiner google suche wird fn.__name__ und fn.func_name vorgeschlagen, doch beides funktioniert nicht (thread._target.func_name)...
mal weiter suchen..

edit:
okay, nur bei manchen Threads funktioniert __name__ nicht (weil _target None ist, weils der mainthread ist). Bei den anderen wird "_worker" ausgegeben... das hilft ja jetzt nicht so viel =/
hmm ne, jetzt komm ich nicht mehr weiter. Kenne mich noch zu wenig aus, um im concurrent.futures.thread script nun zu erkennen, wie ich von _worker auf die fn Funktion komme..
BlackJack

@Serpens66: Ich ging bei `aufgabe` noch von Deiner eigenen Thread-Klasse aus, wo Du das Attribut `aufgabe` eingeführt hattest. Wenn `_target` funtkioniert ist das okay, aber eher nur in Code um Fehler zu finden, nicht unbedingt in produktiv einegesetztem Code. Bei Python 2.7 haben `threading.Thread`-Objekte das Attribut zum Beispiel nicht und der Unterstrich signalisiert das es ein Implementierungsdetail ist.

Ein fertiges Logging-Modul ist halt der Standardweg und es kann auch deutlich mehr als `print()` oder etwas schnell selbst gebasteltes, und damit meine ich nicht das es mehr kann als man braucht — das vielleicht auch — sondern wirklich nützliche Sachen. wie das Formatieren mit Zeitstempeln und Logleveln, das filtern nach Logleveln, und zum Beispiel auch das loggen von Ausnahmen inklusive Traceback. Und wenn eine Anwendung grösser wird, kann man sich einen Logger pro Programmbereich erstellen und/oder eine Hierarchie aufbauen und Loglevel für die einzelnen Bereiche/Zweige einzeln konfigurieren. Bei einem grösseren Programm möchte man beispielsweise oft nicht die Debug-Ausgaben aus dem gesamten Programm sehen, sondern nur von den Bereichen wo man gerade einen Fehler sucht.

Auf der einen Seite kratzt das nur an den vielfältigen Möglichkeiten die das Modul noch bietet, auf der anderen Seite kann man für den Anfang auch einfach `logging.basicConfig()` ganz am Anfang des Programms aufrufen und erst einmal die Funktionen auf Modulebene (`logging.debug()` & Co) verwenden, was wesentlich weniger Aufwand ist als sich Logging mit der gleichen Funktionalität selbst zu schreiben.

Aus der Frage ob man den Pool am besten gleich mit 10.000 Workern erstellen sollte und das es so um die 1.000 im Betrieb sind, hatte ich halt geschlossen 1.000 wäre das was Du hast/willst und 9.000 sind dann als Raum für die, die nicht sterben wollen. :-D

So kompliziert finde ich `Futures.result()` eigentlich nicht‽ Das Ziel ist das man eine Funktion asynchron aufrufen kann ohne das man dabei etwas an API verliert. Eine Funktion kann nun mal ein Ergebnis liefern oder aber eine Ausnahme auslösen. Also wenn man die Funktion `do_something` hat, dann sollte gelten…

Code: Alles auswählen

    # irgenwelcher code
    result = do_something()
    # irgenwelcher code
…ist mehr oder weniger das gleiche wie…

Code: Alles auswählen

    # irgenwelcher code
    future = executor.submit(do_something)
    result = future.result()
    # irgenwelcher code

Und das schliesst natürlich auch mit ein, dass `do_something()` eine Ausnahme auslösen könnte und das ”irgendwelcher code” auch ein ``try``/``except`` sein könnte wie hier:

Code: Alles auswählen

  try:
      result = do_something()
  except SomeException:
      # handle exception
Also sollte sich das hier genau so verhalten:

Code: Alles auswählen

  try:
      future = executor.submit(do_something)
      result = future.result()
  except SomeException:
      # handle exception
Das `wait()` brauchst Du übrigens nicht, denn `result()` blockiert so lange bis das Ergebnis tatsächlich da ist. Oder eine Ausnahme. Du berücksichtigst den Rückgabewert von `wait()` ja auch gar nicht.
Serpens66
User
Beiträge: 259
Registriert: Montag 15. Dezember 2014, 00:31

danke :)

ich hatte logging zu anfang mal probiert, aber da ich noch nichts mit den logleveln anfangen konnte, hat der mir dann jeden mist geloggt den ich nicht brauchte :D Deswegen entstand dann die eigene funktion, welches ebenfalls uhrzeit mit loggt und 3 unterschiedliche "Level" hat, also wichtige, praktische und unwichtigere Dinge. Irgendwann werd ich sicherlich auch auf logging umschwenken, aber aktuell ist meine TODO Liste so unendlich lang, dass ich mich jetzt schon ärgere, dass ich mich nun schon 2 Tage mit diesem doofen Thread Problem rumschlagen muss :D

Ja durch dein Beispiel mit dem result ist es mir nun auch klar geworden :)
Wenn man das mit dem ursprünglichen Thread Code vergleicht... Dort sollte der Fehler dann in der aufgerufenen Funktion passieren.... und die ergebnisliste parallel() enthält dann einfach einen None Eintrag.
So hatte ich mir das auch bei result() vorgestellt.

Wenn ich also am Ende dieselbe Liste haben will, muss ich es wohl so machen:

Code: Alles auswählen

            results = []
            for future in futures:
                try:
                    results.append(future.result())
                except Exception as err:
                    results.append(None)
                    print("Fehler usw {}\n{}".format(err,traceback...))

Die eigentliche Frage hast du aber leider nicht beantwortet =(
Ziel ist es jetzt ja, die Threads, die eigentlich geschlossen sein sollten, zu identifizeren, indem ich den Namen der Funktion erfahre, die sie aufrufen. Dabei könnte ich feststellen, dass es zb immer nur bei API-Call a passiert, aber nie bei b. Und könnte dort dann genauer nachforschen.
Mit dem concurrent.futures Weg habe ich nun aber nur eine _worker Funktion und weiß nicht, wie ich auf die Funktion komme, welche ich bei pool.submit() übergeben habe...Kennst du da den Weg?
BlackJack

@Serpens66: Die Worker funktionieren ja ein bisschen anders. Es wird nicht pro Aufruf ein Worker gestartet sondern die werden beim ersten mal wenn sie gebraucht werden gestartet und beenden sich nach dem abarbeiten nicht, sondern warten auf die nächste Aufgabe. Threads starten und beenden kostet auch Zeit und Ressourcen. Wenn Du die Threads im `Executor` explizit beenden willst, musst Du `shutdown()` aufrufen, oder ihn mit ``with`` verwenden.

Das würde ich per logging verfolgen. Also protokollieren welche Funktionen asynchron gestartet wurden und nicht beendet. Wobei das nicht passieren dürfte wenn Du von jedem `Future` das Ergebnis abrufst.
Serpens66
User
Beiträge: 259
Registriert: Montag 15. Dezember 2014, 00:31

BlackJack hat geschrieben:@Serpens66: Die Worker funktionieren ja ein bisschen anders. Es wird nicht pro Aufruf ein Worker gestartet sondern die werden beim ersten mal wenn sie gebraucht werden gestartet und beenden sich nach dem abarbeiten nicht, sondern warten auf die nächste Aufgabe. Threads starten und beenden kostet auch Zeit und Ressourcen. Wenn Du die Threads im `Executor` explizit beenden willst, musst Du `shutdown()` aufrufen, oder ihn mit ``with`` verwenden.

Das würde ich per logging verfolgen. Also protokollieren welche Funktionen asynchron gestartet wurden und nicht beendet. Wobei das nicht passieren dürfte wenn Du von jedem `Future` das Ergebnis abrufst.
Hmmm... eine sinnvolle Änderung wäre also, nicht jedesmal
pool = concurrent.futures.ThreadPoolExecutor(x)
zu überschreiben, sondern nur einen Pool zu haben, welcher offen gehalten wird und immer wieder verwendet wird. So kann es dann eigentlich garnicht passieren, dass es immer mehr active threads werden.. (weil garkeine neuen geöffnet, sondern die alten weiterverwendet werden)

also bisher war der aufbau stark vereinfacht:

Code: Alles auswählen

while True:
    pool = concurrent.futures.ThreadPoolExecutor(10)
    # mache mit pool 10 api anfragen
    # rufe sie mit result() ab und werte die ergebnisse aus
    pool = concurrent.futures.ThreadPoolExecutor(10)
    # mache andere api anfragen und werte ergebnisse aus
    # sleep(x) und danach fängt die schleife von vorne an
Dh. ich hab tatsächlich ständig neue threads (mit dem thread code), bzw neue pools geöffnet.
Sollte ich also nun außerhalb der while Schleife einmal self.pool definieren und diesen dann immer wieder verwenden? Ja ich denke schon ^^

Doch werden dabei die worker, also die Threads wirklich offen gehalten? Auch wenn ich alle results() abgefragt habe? Da widersprichst du dir glaub ich, weil du erst schreibst, dass die worker offen gehalten werden und auf die nächste aufgabe warten, und dann schreibst du dass threads beendet werden, wenn ich alle results abgerufen habe. Oder darf ich worker nicht mit thread gleichsetzen? Also startet jeder Worker für eine neue Aufgabe auch immer einen neuen Thread?
Benutzeravatar
snafu
User
Beiträge: 6744
Registriert: Donnerstag 21. Februar 2008, 17:31
Wohnort: Gelsenkirchen

Ja, du erstellst den Pool einmalig und kannst ihn dann überall in deinem Programm verwenden. Um das Erstellen und Töten von Threads kümmert sich dieser Pool bzw seine Worker dann schon selbst.
Serpens66
User
Beiträge: 259
Registriert: Montag 15. Dezember 2014, 00:31

vielen Dank.

Also der Code sieht nun so aus:

Code: Alles auswählen

def main(a="",b=""):    
    Pool = concurrent.futures.ThreadPoolExecutor(100)
    while True:
        # mache mit pool 10-100 api anfragen und rufe sie mit result() ab und werte die ergebnisse aus
        futures = []
        for count in range(10):
            futures.append(Pool.submit(apicall,count))
        results = []
        for future in futures:
            try:
                results.append(future.result(180))
            except Exception as err:
                results.append(None)
                print("Fehler usw {}\n{}".format(err,traceback...))
        # ...
        # ...
        # ...
        # mache andere api anfragen und werte ergebnisse aus
        futures = []
        for count in range(50):
            futures.append(Pool.submit(andererapicall,count))
        results = []
        for future in futures:
            try:
                results.append(future.result(180))
            except Exception as err:
                results.append(None)
                print("Fehler usw {}\n{}".format(err,traceback...))
        # ...
        # ...
        # ...
        # mache iwelche Berechnungen, dessen ergebnis nicht interessiert (ist eigentlich in einer Klasse, dh Ergebnisse werden in self. Variablen gespeichert)
        futures = []
        for count in range(10):
            futures.append(Pool.submit(andererapicall,count))
        try:
            concurrent.futures.wait(futures,180) # ergebnisse interessieren nicht
        except Exception as err:
            print("Fehler usw {}\n{}".format(err,traceback...))
        # ...
        # ...
        # ...
        sleep(x) # und danach fängt die schleife von vorne an

if __name__ == "__main__":
    run = []
    run.append(main,(1,2))
    run.append(main,(3,4))
    run.append(main,(5,6))
    run.append(main,(7,8))
    parallel(run)
Das parallel() ist das parallel was zu meinem alten Thread Code gehört. die main() funktion soll gleichzeitig in 4 verschiedenen einstellungen laufen. Dabei geht es nicht darum 4 mal so schnell zu sein, sondern eben darum, dass es gleichzeitg gemacht wird.
Eventuell könnte ich diesen parellel() Teil auch noch mit concurrent ersetzen, aber ich glaub zur Lösung des Problems ist das erstmal nicht notwendig.

Die Zahl der API anfragen ist nicht immer 10, sondern kann bis maximal 100 sein, deswegen 100 worker.
sleep(x) ist so ausgelegt, dass ein Rundendurchlauf mindestens 5 sekunden dauert.
Wenn nun eine der laufenden main() Funktionen threading.active_count() ausgibt, dann gibt es logischerweise alle threads aus, die gerade im gesamten skript existieren.
Deswegen ist diese Anzahl nach ~5 Durchläufen der while Schleife bei ca. 430 und bleibt dann recht stabil bei diesem Wert jede Runde.

Es gibt bisher keine Unterbrechungen, dh. worker!=thread. Nur weil ein thread noch offen ist, blockiert er also nicht die worker bzw den fortlauf des skriptes, sofern die funktion die bearbeitet wurde, beendet ist.
Auch gibt es keinerlei Fehlermeldungen von result(timeout) oder von wait(fs,timeout).

Doch wie zuvor, als ich noch kein concurrent verwendet habe, steigt die Anzahl der Threads langsam aber stetig an.

Nach 200 Minuten Laufzeit, sind es jetzt 505 active threads.
Und ich bin mir sicher es werden nun immer mehr, bis über 1000 und es dann irgedwann wieder wegen zuvielen threads geschlossen wird, obwohl diese threads ja schon lange nichts mehr machen und auch kein worker sie mehr "betreut"...

edit:
Achja
. Ich habe den Code zum printen der Namen der activen Threads auch drin, sodass er alle Threads printet, die durch meinen ThreadCode also "parallel()" aufgerufen wurden. Dabei sind es auch jetzt nach den 200Minuten immernoch, wie gewünscht, die 4 Threads die ich zu anfang gestartet habe. Das bedeutet, die neu hinzugekommenen Threads, sind durch concurrent entstanden. -> Es ist wohl kein Fehler der im Thread Code liegt.
Serpens66
User
Beiträge: 259
Registriert: Montag 15. Dezember 2014, 00:31

also um es nochmal deutlicher zu machen:
Es ist wohl kein Fehler im Thread Code, aber natürlich genauso sicher kein Fehler im concurrent Code.

Sondern es wird irgendetwas anderes sein. Aber ich wüsste nicht was.
Ich würde nun von euch gerne wissen, was dieses Fehlverhalten von Threads verursachen kann, obwohl die Funktionen beendet werden, die worker fertig sind und auch Funktionen keinen Fehler raisen.

Ich nehme mal an, dass es da nicht viele Ursachen geben kann, bzw ihr vllt überhaupt keine kennt? Deswegen hoffe ich, dass ihr mir mit dem Code-aufbau den ich gezeigt habe, helfen könnt, ohne mehr von meinem Programm wissen zu müssen. (beispiel "auto geht nicht" -> hunderte Ursachen möglich -> ihr braucht mehr infos. Aber hier vermutlich nur sehr wenige Ursachen möglich, weshalb ihr Ursachen aufzählen könntet, ohne genaueres zu wissen? )

Ich hatte selbst auch schonmal so einen Aufbau mit irgendwelchen Pseudofunktionen gebaut, weil ich gehofft hatte ihn euch als vollständigen Code zu präsentieren, bei dem dasselbe Phänomen auftritt. Aber auch nach viele Stunden, wurden die Threads dort nicht mehr.
BlackJack

@Serpens66: Interessantes Phänomen, aber Du hast da IMHO sowieso einen Fehler drin: Du wartest 180 Sekunden auf ein Ergebnis, und ignorierst es wenn es bis dahin nicht fertig ist. Der Thread läuft dann aber trotzdem weiter und eventuell hängt der ja auch in einer Endlosschleife oder in einem API-Aufruf. Und der Worker steht dann im weiteren nicht mehr für die Berechnung für anderes zur Verfügung, denn es gibt ja insgesamt eine feste Anzahl von Workern im Pool.
BlackJack

@Serpens66: Du könntest Teile von Deinem Code durch Dummy-Funktionen ersetzen und schauen ab wo das Problem nicht mehr auftritt, wenn es nur mit Dummy-Funktionen nicht nachstellbar ist.

Hast Du irgendwo `__del__()`-Methoden implementiert?

Vielleicht kann das hier helfen den `Thread`-Objekten auf die Spur zu kommen: http://mg.pov.lt/objgraph/
Serpens66
User
Beiträge: 259
Registriert: Montag 15. Dezember 2014, 00:31

BlackJack hat geschrieben:@Serpens66: Interessantes Phänomen, aber Du hast da IMHO sowieso einen Fehler drin: Du wartest 180 Sekunden auf ein Ergebnis, und ignorierst es wenn es bis dahin nicht fertig ist. Der Thread läuft dann aber trotzdem weiter und eventuell hängt der ja auch in einer Endlosschleife oder in einem API-Aufruf. Und der Worker steht dann im weiteren nicht mehr für die Berechnung für anderes zur Verfügung, denn es gibt ja insgesamt eine feste Anzahl von Workern im Pool.
ja schon, aber dann sollte doch ein Timeout error geraised werden, der geloggt wird

(und ich dachte, dass damit dann auch die aufgabe==worker beendet wird.. wird es nicht? dann schau ich mal in die doku, wie ich das abbreche.. cancel() kann aber fehlschlagen...?!)

Und wie gesagt, gab es bisher keine Fehlermeldung.
Und als ich zuvor noch parallel() dafür verwendet habe, gab es so einen timeout nicht. Wenn da aber irgendeine Funktion oder API Call endlos lief, dann ging das gesamte Skript (bzw die gesamte main Funktion) nicht weiter, weil halt auf Ende gewartet wurde (hatte dazu auch mal einen Foren-Thread gemacht, wo ich gefragt hab, wieso mein Programm irgendwann "freezed" das war wegen einem API Call bei dem ich den timeout nicht definiert hatte und endlos auf Antwort gewartet wurde)

Demnach sollte das nichts mit dem Phänomen zu tun haben (ist aber ein richtiger Hinweis, sollte der timeout doch mal ausgelöst werden)
@Serpens66: Du könntest Teile von Deinem Code durch Dummy-Funktionen ersetzen und schauen ab wo das Problem nicht mehr auftritt, wenn es nur mit Dummy-Funktionen nicht nachstellbar ist.

Hast Du irgendwo `__del__()`-Methoden implementiert?

Vielleicht kann das hier helfen den `Thread`-Objekten auf die Spur zu kommen: http://mg.pov.lt/objgraph/
Ja das ist eine gute Idee... dann ersetz ich im ersten Schritt ausnahmslos alle von concurrent aufgerufenen funktionen durch
def dummy():
return
Den sleep(x) lasse ich auch weg. Dann sollte das skript ja superschnell sein, womit es auch schneller mehr threads werden sollten, falls das Phänomen dann noch da sein sollte.
Falls Threads normal beendet werden, aktivier ich dann nach und nach die Funktionen.

Mein Code ist immernoch eher Anfänger-niveau. Dh. ich verwende weder dinge wie @property oder __xy__ funktionen, abgesehen von __init__(). Das sind so Dinge mit denen ich mich noch nicht beschäftigt habe. Ich weiß also nicht, welche Funktion eine __del__() Methode hat. Deshalb benutz ich sie auch (noch) nicht. (property hab ich schon gelernt, verwende ich aber nicht, aber ich meinte jetzt allgemein Dinge mit @)

Falls ich mit den dummy Funktionen nicht weiter komme, schaue ich mir deinen Link noch an, danke :)

Ich bastel dann jetzt mal mit den dummy funktionen...
BlackJack

@Serpens66: Man kann Threads nicht beenden. Also enden auch Worker erst wenn die Funktion die dort ausgeführt wird, tatsächlich endet.

Die Funktion von `__del__()` sollte Dir dann auch nicht so wichtig sein, denn ein möglicher Nebeneffekt kann sein, dass die Objekte die das implementieren nicht mehr von der Speicherbereinigung erfasst werden.

Viel Glück beim suchen. :-)
Serpens66
User
Beiträge: 259
Registriert: Montag 15. Dezember 2014, 00:31

ich hab doch nochmal vollständig auf meinen Thread Code gewechselt, damit ich die Threads mit enumerate gut unterscheiden kann.

Dabei habe ich nun rausgefunden, dass das Problem keiner meiner Threads ist, sondern von Modulen die ich verwende.
Möglicherweise websocket module... ab und zu startet mein Skript die websocket verbindung neu.. eventuell ohne die alte vorher korrekt zu beenden...

diese Liste wird immer länger:

[<_MainThread(MainThread, started 140676421236480)>, <Timer(Thread-126367, started daemon 140675051898624)>, <Timer(Thread-126418, started daemon 140675060291328)>, <Timer(Thread-126419, started daemon 140675068684032)>, <Timer(Thread-125569, started daemon 140675043505920)>, <Timer(Thread-126366, started daemon 140675077076736)>, <Timer(Thread-126417, started daemon 140676041516800)>, <Connection(Thread-28, started daemon 140675521693440)>, <Connection(Thread-7, started daemon 140676058564352)>, <Timer(Thread-125565, started daemon 140675001542400)>, <Timer(Thread-126409, started daemon 140675538478848)>, <Timer(Thread-125571, started daemon 140675504908032)>, <Timer(Thread-126246, started daemon 140674892437248)>, <Timer(Thread-126245, started daemon 140674867259136)>, <Connection(Thread-58, started daemon 140675093862144)>, <Timer(Thread-126420, started daemon 140674934400768)>, <Timer(Thread-125573, started daemon 140674959578880)>, <Timer(Thread-125567, started daemon 140675085469440)>, <Connection(Thread-58630, started daemon 140674917615360)>, <Connection(Thread-5, started daemon 140676075349760)>, <Timer(Thread-126410, started daemon 140674993149696)>, <Timer(Thread-125564, started daemon 140675009935104)>, <Connection(Thread-37054, started daemon 140675018327808)>, <Connection(Thread-27, started daemon 140675530086144)>, <Connection(Thread-29, started daemon 140675513300736)>]

Nun versuche ich rauszufinden, welches modul genau...
wisst ihr wie ich das rausfinden kann, welches Modul bzw welche Klasse diese Threads erstellt?

(konnte noch keine direkte Verbindung von websockets und diesen neuen threads finden, aber suche weiter...)
Serpens66
User
Beiträge: 259
Registriert: Montag 15. Dezember 2014, 00:31

Ja Ursache gefunden!

Es war tatsächlich mein "Neustarten" der Pusher (~=websocket) Verbindung.
Ich hatte keine Ahnung wie das pusherclient modul diesen erneuten connect Befehl umsetzt. Ich ging davon aus, dass die alte Verbindung dann einfach überschrieben wird bzw zb mit pings oderso nicht mehr offen gehalten wird und letztlich iwann geschlossen wird.

Jetzt schließe ich immer erst die Verbindung, bevor ich sie neustarte (wobei das regelmäßige neustarten vermutlich nichtmal nötig ist... habe im modul gesehen, dass es bereits die Verbindung regelmäßig checkt und selbst reconnected wenn möglich).

Memory Verbrauch steigt immernoch leicht an, aber das ist nicht so tragisch. Und mir ist ja jetzt klar geworden, dass ich auch in importierten Modulen suchen muss, wenn mein eigener Code so direkt keine logische Erklärung zulässt, also könnte das mit dem memory zuwachs ähnlich gelagert sein.
Aber das braucht hier jetzt nicht behandelt werden.

Ich danke euch und vorallem BlackJack für die Ratschläge, die mich auf die Spur gebracht haben :)
Antworten