Exceptin _wait_for_tstate_lock in wenn ich Multiprocessing.Queue mit großen Daten nutze

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
MoonKid
User
Beiträge: 106
Registriert: Mittwoch 10. Dezember 2014, 16:24

X-Post: https://stackoverflow.com/q/56321756/4865723

Ich bekomme Exceptions in threading._wait_for_tstate_lock wenn relativ große Datenmengen mit Hilfe von multiprocessing.Queue zwischen einem Process und einem Thread austausche.

Hier ist der vollständige Beispielcode

Code: Alles auswählen

#!/usr/bin/env python3

import multiprocessing
import threading
import time
import gi
gi.require_version('Gtk', '3.0')
from gi.repository import Gtk
from gi.repository import GLib


class MyThread (threading.Thread):
    """This thread just starts the process."""
    def __init__(self, callback):
        threading.Thread.__init__(self)
        self._callback = callback

    def run(self):
        print('Running MyThread...')
        self.result = []

        queue = multiprocessing.Queue()
        process = MyProcess(queue)
        process.start()
        process.join()

        while not queue.empty():
            process_result = queue.get()
            self.result.append(process_result)
        print('MyThread stoppd.')
        GLib.idle_add(self._callback)


class MyProcess (multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def run(self):
        print('Running MyProcess...')
        for i in range(3):
            self.queue.put((i, 'x'*102048))
        print('MyProcess stoppd.')

class MyWindow (Gtk.Window):
    def __init__(self):
        Gtk.Window.__init__(self)
        self.connect('destroy', Gtk.main_quit)
        GLib.timeout_add(2000, self.do_start)

    def do_start(self):
        print('MyWindow::do_start()')
        # The process need to be started from a separate thread
        # to prevent the main thread (which is the gui main loop)
        # from freezing while waiting for the process result.
        self.thread = MyThread(self.callback_thread_finished)
        self.thread.start()

    def callback_thread_finished(self):
        result = self.thread.result
        for r in result:
            print('{} {}...'.format(r[0], r[1][:10]))

if __name__ == '__main__':
    win = MyWindow()
    win.show_all()
    Gtk.main()
Und das ist der vollständige Output / Error:

Code: Alles auswählen

MyWindow::do_start()
Running MyThread...
Running MyProcess...
MyProcess stoppd.
^CProcess MyProcess-1:
Exception ignored in: <module 'threading' from '/usr/lib/python3.5/threading.py'>
Traceback (most recent call last):
  File "/usr/lib/python3.5/threading.py", line 1288, in _shutdown
    t.join()
  File "/usr/lib/python3.5/threading.py", line 1054, in join
    self._wait_for_tstate_lock()
  File "/usr/lib/python3.5/threading.py", line 1070, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt
Traceback (most recent call last):
  File "/usr/lib/python3.5/multiprocessing/process.py", line 252, in _bootstrap
    util._exit_function()
  File "/usr/lib/python3.5/multiprocessing/util.py", line 314, in _exit_function
    _run_finalizers()
  File "/usr/lib/python3.5/multiprocessing/util.py", line 254, in _run_finalizers
    finalizer()
  File "/usr/lib/python3.5/multiprocessing/util.py", line 186, in __call__
    res = self._callback(*self._args, **self._kwargs)
  File "/usr/lib/python3.5/multiprocessing/queues.py", line 198, in _finalize_join
    thread.join()
  File "/usr/lib/python3.5/threading.py", line 1054, in join
    self._wait_for_tstate_lock()
  File "/usr/lib/python3.5/threading.py", line 1070, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt
Benutzeravatar
__blackjack__
User
Beiträge: 14044
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@MoonKid: Schaust Du hier in der Dokumentation: https://docs.python.org/3.5/library/mul ... and-queues

In der zweiten Warnung steht das es zu einem Deadlock kommen kann wenn man ein `join()` auf einem Prozess macht *bevor* alle Daten aus der Queue verarbeitet sind. Und genau das machst Du. Du startest den `MyProcess` und gleich in der nächsten Zeile wartest Du darauf das der zuende ist – aber erst *danach* verarbeitest Du die Daten aus der Queue. Das ist einfach die falsche Reihenfolge.

Lösung: Einen Wert bestimmen den der Prozess in die Queue packt um das Ende der Daten zu signalisieren und beim Thread nach dem Start und vor dem `join()` so lange Daten aus der Queue lesen bis dieses Signal kommt:

Code: Alles auswählen

#!/usr/bin/env python3
import multiprocessing
import threading

import gi
gi.require_version('Gtk', '3.0')
from gi.repository import GLib
from gi.repository import Gtk


class Thread(threading.Thread):
    """This thread just starts the process."""
    def __init__(self, callback):
        threading.Thread.__init__(self)
        self._callback = callback
        self.result = []

    def run(self):
        print('Running Thread...')
        queue = multiprocessing.Queue()
        process = Process(queue)
        process.start()
        while True:
            process_result = queue.get()
            if process_result is None:
                break
            self.result.append(process_result)
        process.join()
        print('Thread stoppd.')
        GLib.idle_add(self._callback)


class Process(multiprocessing.Process):
    
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def run(self):
        print('Running Process...')
        for i in range(3):
            self.queue.put((i, 'x' * 102048))
        self.queue.put(None)
        print('Process stoppd.')


class Window(Gtk.Window):
    
    def __init__(self):
        Gtk.Window.__init__(self)
        self.connect('destroy', Gtk.main_quit)
        self.thread = None
        GLib.timeout_add(2000, self.do_start)

    def do_start(self):
        print('Window::do_start()')
        # 
        # The process need to be started from a separate thread to prevent the
        # main thread (which is the gui main loop) from freezing while waiting
        # for the process result.
        # 
        self.thread = Thread(self.callback_thread_finished)
        self.thread.start()

    def callback_thread_finished(self):
        result = self.thread.result
        for i, data in result:
            print('{} {}...'.format(i, data[:10]))


def main():
    win = Window()
    win.show_all()
    Gtk.main()
    

if __name__ == '__main__':
    main()
„A life is like a garden. Perfect moments can be had, but not preserved, except in memory. LLAP” — Leonard Nimoy's last tweet.
__deets__
User
Beiträge: 14545
Registriert: Mittwoch 14. Oktober 2015, 14:29

Noch eine Anmerkung: so vorzugehen ist unüblich. Statt immer einen Prozess zu spawnen und dann zu beenden solltest du zu beginn deines Programmes einen oder mehrere Worker erzeugen, und die dann arbeiten lassen bei bedarf. Wenn du zusätzlich noch die in der Dokumentation immer wieder betonten Manager benutzt, dann ist das alles einfacher und es kommt nicht zu solchen Problemen.

Mir ist auch so, als ob unter bestimmten OS das multiprocessing so früh wie möglich aufgesetzt werden muss, und du darum Probleme bekommst wenn du das später on demand machst. Das ist aber jetzt nicht verifiziert.
Benutzeravatar
__blackjack__
User
Beiträge: 14044
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

Wenn man das mit einem Pool von Workern machen will, sollte man auch mal in `concurrent.futures` schauen, da gibt es schon Unterstützung für Thread- und Prozess-Pools.
„A life is like a garden. Perfect moments can be had, but not preserved, except in memory. LLAP” — Leonard Nimoy's last tweet.
MoonKid
User
Beiträge: 106
Registriert: Mittwoch 10. Dezember 2014, 16:24

__deets__ hat geschrieben: Montag 27. Mai 2019, 13:02 Noch eine Anmerkung: so vorzugehen ist unüblich. Statt immer einen Prozess zu spawnen und dann zu beenden solltest du zu beginn deines Programmes einen oder mehrere Worker erzeugen...
Danke für den Hinweis. Das kann ich gut nachvollziehen.

Im konkreten Fall reicht IMO aber ein Prozess. Die Anwendung lädt an dem Punkt Favicons für Newsfeeds nach, die in einem TreeCtrl dargestellt werden. Das ist nicht wichtig (weil als erstes default icons angezeigt werden) und muss auch nicht schnell sein - es darf nur die Anwendung selbst nicht belasten. Die Favicons werden (wen vorhanden) lokal von der Festplatte geladen oder remote gesucht und lokal gespeichert. Dafür reicht ein IMO separater Prozess, der die IO-Operationen (Datei öffnen, downloaden, lokal speichern) auch noch asynchron durchführt.

Mit Pools bin ich einigermaßen vertraut. Die nutze ich wiederum, um neue Einträge für die Fees herunterzuladen. Konkret nutze ich eine Kombination aus asyncio, einem Thread und einem Process-Pool. Klappt erstaunlich gut. Was prozuderal mehrere Minuten dauert, erledigt er so in wenigen Sekunden - hauptsächlich a.G. von asyncio.
__deets__
User
Beiträge: 14545
Registriert: Mittwoch 14. Oktober 2015, 14:29

Bei solchen IO-lastigen Aufgaben würde ich nen Thread nehmen & gut ist.
MoonKid
User
Beiträge: 106
Registriert: Mittwoch 10. Dezember 2014, 16:24

__deets__ hat geschrieben: Dienstag 28. Mai 2019, 08:12 Bei solchen IO-lastigen Aufgaben würde ich nen Thread nehmen & gut ist.
Möglich, aber ineffizient.

Heute haben doch selbst billige Maschienen mehrere Cores. Der Prozess macht ja auch nicht nur IO, sondern parsed teilweise auch XML/HTML - ist also auch CPU relevant.
Und bei der vielen Datei-Fummelei sehe ich die Gefahr, das ein Thread den Haupt-GUI-Thread negativ beeinflusst. Den Threads laufen ja nie paralell, sondern wechseln sich nur in ihrer Ausführung ab.
Es ist als würde man einen Prosche nur im 2. Gang fahren. Die Cores sind da, also bitte auch bei jeder Gelegenheit nutzen. Der höhere Verwaltungsaufwand (eigene Python-Interpreter-Instanz incl. Speichermanagment, etc) durch die Nutzung eines Prozesses ist in meinem Fall IMO gerechtfertigt.

Der Unterschied ist sicher marginal und nicht überlebenswichtig. In meinem Fall fand ich es einfach eine gute Gelgenheit mich etwas intensiver mit Prozessen zu beschäftigen, etwas zu lernen und später auf meine Software (multiprozessor-tauglich) draufschreiben zu können. ;)
Zuletzt geändert von MoonKid am Dienstag 28. Mai 2019, 11:37, insgesamt 1-mal geändert.
Antworten