In einem eigentlich gut laufendem threading-Beispiel wird eine callback Funktion nicht aufgerufen. Weiß jemand Rat?

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
holger47110815
User
Beiträge: 12
Registriert: Sonntag 17. März 2024, 19:24

Hallo in die Runde,

ich habe ein Programm, das fünf Threads startet. Jeder einzelne Thread gibt „Hello World!“ auf der Konsole aus. Jeder Thread wartet via
time.sleep()
eine zufällige Zeit. Am Ende eines Threads sollte eine Callback-Funktion aufgerufen werden. Diese gibt „Goodby folks“ auf der Konsole aus.

Um zu zeigen, dass das Threading funktioniert, werden die Laufzeiten jedes einzelnen Threads gemessen. Zum Vergleich wird auch die Gesamtlaufzeit gemessen des Programms.

Die Gesamtlaufzeit entspricht der Laufzeit des am längsten laufenden Threads. Aber die Ausgabe „Auf Wiedersehen Leute“ fehlt alle fünf Male. Das heißt, aus einem mir unbekanntem Grund, wird die Callback-Funktion nicht aufgerufen bzw. erreicht.

Hat einer von Euch einen Tip für mich, wo hier mein Denkfehler liegt?

Die Ausgabe sieht so aus, leider eben ohne 'Goodby folks':

Code: Alles auswählen

saying Hello World! took 0.126 s
saying Hello World! took 0.439 s
saying Hello World! took 0.645 s
saying Hello World! took 0.994 s
saying Hello World! took 1.469 s

total runtime is 1.479 s
Das dazugehörige Programm ist dieses hier:

Code: Alles auswählen

import random
import sys
import time
from typing import Callable

from PyQt5.QtCore import QCoreApplication, QThread, QObject, pyqtSignal


class MeasureRuntime:
    def __enter__(self):
        self.__start = time.perf_counter()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.__stop = time.perf_counter()
        self.elapsed = self.__stop - self.__start
        return True


class MyQtThreads(QObject):

    def __enter__(self):
        self.__running_threads: list = []
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.__waitForThreadsListEmpty()
        return True

    def __waitForThreadsListEmpty(self):
        while self.__running_threads:
            for thread in self.__running_threads:
                if thread.isFinished():
                    self.__running_threads.pop(self.__running_threads.index(thread))

    def doInThread(self, fn, *args, callback: Callable = None, **kwargs) -> int:
        thread = MyQtThread(fn, *args, **kwargs)
        if callback:
            thread.SIG_FINISHED.connect(callback)
        self.__running_threads.append(thread)
        thread.start()
        return id(thread)


class MyQtThread(QThread):
    SIG_FINISHED = pyqtSignal(object)

    def __init__(self, fn, *args, **kwargs):
        super().__init__()

        self.fn = fn
        self.args = args
        self.kwargs = kwargs

    def run(self):
        ret = self.fn(*self.args, **self.kwargs)
        result_info = {"thread_id": id(self), "args": ret}
        self.SIG_FINISHED.emit(result_info)


def say_hello():
    with MeasureRuntime() as rt:
        random.seed()
        time.sleep(random.random()*3)
        print(f'saying Hello World! took ', end='')
    print(f'{rt.elapsed:.3f} s')


def say_adieu(args):
    print('Goodby folks')


class Main(QThread):

    def __init__(self):
        super().__init__()

    def run(self):

        print()
        with MeasureRuntime() as rt:

            with MyQtThreads() as threads:
                for _ in range(5):
                    threads.doInThread(say_hello, callback=say_adieu)

        print(f'\ntotal runtime is {rt.elapsed:.3f} s')


if __name__ == '__main__':
    qcoreapp = QCoreApplication(sys.argv)
    thread = Main()
    thread.finished.connect(qcoreapp.exit)
    thread.start()
    qcoreapp.exec()
Benutzeravatar
__blackjack__
User
Beiträge: 13116
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

Die doppelten führenden Unterstriche bedeuten nicht „private“, das sollte nur *ein* führender Unterstrich sein.

Alle Attribute sollten nach Ablauf der `__init__()`-Methode existieren und nicht in anderen Methoden zur Laufzeit hinzu kommen.

`MeasureRuntime._stop` muss kein Attribut sein.

`My` ist in 99,999% der Fälle ein völlig unnötiger und nichtssagender Namenspräfix. So auch hier.

`random.seed()` ohne Argument macht keinen Sinn. Unter bestimmten Umständen kann das sogar für weniger zufällige Werte sorgen.

Eine Methode die nur die Methode auf der Basisklasse aufruft, mit genau den gleichen Argumenten, kann man sich sparen, denn das passiert ja auch so schon.

In `say_hello()` gibt es ein f-Zeichenkettenliteral wo kein Wert hinein formatiert wird.

``self._running_threads.pop(self._running_threads.index(thread))`` ist umständlich für ``self._running_threads.remove(thread)``. Wobei die Schleife mit dem Busy-Waiting sinnlos Rechenzeit verbrät. Das würde man mit der `wait()`-Methode lösen.

"args" in `result_info` sollte wohl eher "result" heissen.

Damit Signale verarbeitet werden muss die EventLoop für den empfangenden Thread laufen oder zumindest mal alle anstehenden Signale abgearbeitet werden.

Code: Alles auswählen

import random
import sys
import time

from PyQt5.QtCore import QCoreApplication, QObject, QThread, pyqtSignal


class MeasureRuntime:
    def __init__(self):
        self._start = None
        self.elapsed = None

    def __enter__(self):
        self._start = time.perf_counter()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        assert self._start is not None
        self.elapsed = time.perf_counter() - self._start
        return True


class QtThread(QThread):
    onResult = pyqtSignal(object)

    def __init__(self, function, *args, **kwargs):
        super().__init__()

        self.function = function
        self.args = args
        self.kwargs = kwargs

    def run(self):
        self.onResult.emit(
            {
                "thread_id": id(self),
                "result": self.function(*self.args, **self.kwargs),
            }
        )


class QtThreads(QObject):
    def __init__(self):
        super().__init__()
        self._threads = []

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        for thread in self._threads:
            thread.quit()
            thread.wait()
        return True

    def doInThread(self, fn, *args, callback, **kwargs):
        thread = QtThread(fn, *args, **kwargs)
        if callback:
            thread.onResult.connect(callback)
        self._threads.append(thread)
        thread.start()


def say_hello():
    with MeasureRuntime() as rt:
        time.sleep(random.random() * 3)
        print("saying Hello World! took ", end="")
    print(f"{rt.elapsed:.3f} s")


def say_adieu(result):
    print(f"Goodby folks ({result})")


class Main(QThread):
    def run(self):
        print()
        with MeasureRuntime() as rt:
            with QtThreads() as threads:
                for _ in range(5):
                    threads.doInThread(say_hello, callback=say_adieu)

        print(f"\ntotal runtime is {rt.elapsed:.3f} s")
        QCoreApplication.instance().processEvents()


def main():
    app = QCoreApplication(sys.argv)
    thread = Main()
    thread.finished.connect(app.exit)
    thread.start()
    app.exec()


if __name__ == "__main__":
    main()
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
holger47110815
User
Beiträge: 12
Registriert: Sonntag 17. März 2024, 19:24

Vielen Dank für Deine Mühe! Aus Deinen Anmerkungen kann ich viel mitnehmen, Danke!

Der Schlüssel zum Erfolg in Deinem Code ist die Zeile
QCoreApplication.instance().processEvents()
Warum ist das notwendig? Und, ist das nicht eine schlechte Praxis?
Ich frage, weil die einzelnen Threads könnten ja auch mit Signalen um sich hauen. Ist es dann auch nötig, processEvent() geschickt einzusetzen, damit die Signalverarbeitung am Laufen bleibt oder überhaupt erst in Laufen kommt?

Ich freue mich auf Deine Anmerkungen. Vielen Dank auf alle Fälle!!!!

VG Holger
__deets__
User
Beiträge: 14544
Registriert: Mittwoch 14. Oktober 2015, 14:29

Die schlechte Praxis faengt viel frueher an. Man ueberlaedt in QThreads nicht die run-Methode. Stattdessen baut man ein Worker-Objekt, dessen Thread-Ownership bei dem neuen Thread liegt. Dann haengt man zB das started-Signal oder ein beliebiges Anwendungssignal an einen slot des Workers, und startet so die eigentliche Aufgabe. Vorteile

- der Thread behaelt seinen eigenen Event-Loop.
- Qt fummelt durch die ownership selbststaendig raus, dass Signal/Slot-Verbindungen queued sein muessen, und macht die dadurch Thread-Safe.

Siehe https://doc.qt.io/qt-6/qthread.html

Als ein Seiteneffekt kann man sich auch solche Tricksereien wie processEvents sparen.
holger47110815
User
Beiträge: 12
Registriert: Sonntag 17. März 2024, 19:24

Das werde ich mal lernen und probieren. Vielen Dank!!!!
holger47110815
User
Beiträge: 12
Registriert: Sonntag 17. März 2024, 19:24

Dein Vorschlag bezieht sich auf die Konstruktion Worker(QObject) mit moveToThread() usw.?
__deets__
User
Beiträge: 14544
Registriert: Mittwoch 14. Oktober 2015, 14:29

Jupp. Das ist das kanonische Vorgehen, wie es heute angewandt werden sollte.
Benutzeravatar
__blackjack__
User
Beiträge: 13116
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@holger47110815: Was ist denn das eigentliche Ziel hier? Einfach ein bisschen mit Threads spielen um zu lernen? Denn für produktiven Einsatz würde ich nicht so etwas wie `concurrent.futures.ThreadPoolExecutor` selbst nachbauen wollen. Das gibt's ja schon fertig.
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
Benutzeravatar
__blackjack__
User
Beiträge: 13116
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@holger47110815: Den `Main`-Thread könnte man sich mit einem `QTimer` sparen, dann wird die Ereignisverarbeitung durch das `app.exec()` schon erledigt. Das Ende der ganzen Threads könnte man sich dann mit einem Signal melden lassen um die Anwendung zu beenden.

Code: Alles auswählen

import random
import sys
import time
from functools import partial
from threading import Lock

from PyQt5.QtCore import QCoreApplication, QObject, QThread, QTimer, pyqtSignal


class MeasureRuntime:
    def __init__(self):
        self._start = None
        self.elapsed = None

    def __enter__(self):
        self._start = time.perf_counter()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        assert self._start is not None
        self.elapsed = time.perf_counter() - self._start
        return True


class QtThread(QThread):
    onResult = pyqtSignal(object)

    def __init__(self, function, *args, **kwargs):
        super().__init__()
        self.function = function
        self.args = args
        self.kwargs = kwargs

    def run(self):
        self.onResult.emit(
            {
                "thread_id": id(self),
                "result": self.function(*self.args, **self.kwargs),
            }
        )


class QtThreads(QObject):
    finished = pyqtSignal()
    
    def __init__(self):
        super().__init__()
        self._lock = Lock()
        self._exited = False
        self._threads = set()

    def __enter__(self):
        self._exited = False
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        with self._lock:
            self._exited = True
            if not self._threads:
                self.finished.emit()

        return True

    def _on_finished(self, thread):
        with self._lock:
            assert self._threads
            self._threads.remove(thread)
            if self._exited and not self._threads:
                self.finished.emit()

    def doInThread(self, fn, *args, callback, **kwargs):
        if self._exited:
            raise ValueError("already exited")
        
        thread = QtThread(fn, *args, **kwargs)
        if callback:
            thread.onResult.connect(callback)

        with self._lock:
            self._threads.add(thread)
            thread.finished.connect(partial(self._on_finished, thread))
        
        thread.start()


def say_hello():
    with MeasureRuntime() as rt:
        time.sleep(random.random() * 3)
        print("saying Hello World! took ", end="")
    print(f"{rt.elapsed:.3f} s")


def say_adieu(result):
    print(f"Goodby folks ({result})")


def start(on_finished):
    print()
    with MeasureRuntime() as rt:
        with QtThreads() as threads:
            threads.finished.connect(on_finished)
            for _ in range(5):
                threads.doInThread(say_hello, callback=say_adieu)

    print(f"\ntotal runtime is {rt.elapsed:.3f} s")


def main():
    app = QCoreApplication(sys.argv)
    QTimer.singleShot(0, partial(start, app.exit))
    app.exec()


if __name__ == "__main__":
    main()
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
holger47110815
User
Beiträge: 12
Registriert: Sonntag 17. März 2024, 19:24

@__blackjack__: Da ist bestimmt auch Spielen dabei. Aber drauf gekommen bin ich tatsächlich durch folgenden Wunsch.

Da gibt's eine Funktion mit einem Suchparameter. Die Funktion öffnet eine bestimmte website und führt dort eine Suche nach diesem Parameter aus und gibt ein dict mit den gefundenen Einträgen zurück. Nachdem ich die Vokale a, e, i, o und u suchen möchte, habe ich die Funktion eben fünf Mal hintereinander aufgerufen.
Nachdem ich kurz zuvor gelernt hatte, wie man with-Blöcke selbst programmiert, wollte ich diese schöne Syntax für meine Suche hernehmen und das Ganze parallelisieren. Das hat auch alles gut funktioniert, war nur nicht schön geschrieben. Beim schöner machen stieß ich dann aber auf das hier beschriebene Problem.
__blackjack__ hat geschrieben: Montag 18. März 2024, 11:42 @holger47110815: Was ist denn das eigentliche Ziel hier? Einfach ein bisschen mit Threads spielen um zu lernen? Denn für produktiven Einsatz würde ich nicht so etwas wie `concurrent.futures.ThreadPoolExecutor` selbst nachbauen wollen. Das gibt's ja schon fertig.
holger47110815
User
Beiträge: 12
Registriert: Sonntag 17. März 2024, 19:24

Das mit dem QTimer ist eine schöne Idee!!!!! :-) Danke!!!!!
__blackjack__ hat geschrieben: Montag 18. März 2024, 12:19 @holger47110815: Den `Main`-Thread könnte man sich mit einem `QTimer` sparen, dann wird die Ereignisverarbeitung durch das `app.exec()` schon erledigt. Das Ende der ganzen Threads könnte man sich dann mit einem Signal melden lassen um die Anwendung zu beenden.
holger47110815
User
Beiträge: 12
Registriert: Sonntag 17. März 2024, 19:24

__deets__ hat geschrieben: Montag 18. März 2024, 10:42 Jupp. Das ist das kanonische Vorgehen, wie es heute angewandt werden sollte.
Hi __deets__,

jetzt hab ich eine Version, die versucht, Deinen Vorschlag zu erfüllen. Aber meine Version kommt nicht ohne QCoreApplication.processEvents() an einer Stelle aus. Wahrscheinlich stelle ich mich einfach etwas an. Ehrlich gesagt, raucht mir grad der Kopf und ich schwimme etwas auf. Vielleicht hast Du die Muße, Dir meine Zeilen anzuschauen. Über Deine Antwort dazu würde ich mich sehr freuen.

Danke schonmal ;-)

Code: Alles auswählen

import random
import sys
import time
from functools import partial
from typing import Callable

from PyQt5.QtCore import QCoreApplication, QThread, QObject, pyqtSignal, QTimer


class MeasureRuntime:
    def __enter__(self):
        self.__start = time.perf_counter()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.elapsed = time.perf_counter() - self.__start
        return True


class QtThreads(QObject):

    def __enter__(self):
        print('start collecting tasks...')
        self.__running_threads: dict[QThread, Worker] = {}
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.__start_threads()
        self.__waitForThreadsDictEmpty()
        return True

    def __start_threads(self):
        print(f'start threads')
        for thread in self.__running_threads.keys():
            thread.start()

    def __waitForThreadsDictEmpty(self):
        while self.__running_threads:
            QCoreApplication.processEvents()
        print('all threads done')

    def __thread_finished(self, thread):
        self.__running_threads.pop(thread)
        thread.deleteLater()

    def doInThread(self, fn, *args, callback: Callable = None, **kwargs) -> int:
        print(f'add task with {args=} ', end='')
        thread = QThread()
        worker = Worker(fn, *args, **kwargs)
        if callback:
            worker.onResult.connect(callback)
        worker.moveToThread(thread)
        thread.started.connect(worker.run)
        thread.finished.connect(partial(self.__thread_finished, thread))
        self.__running_threads[thread] = worker
        print(f' {id(thread)=}')
        return id(thread)


class Worker(QObject):
    onResult = pyqtSignal(object)

    def __init__(self, function, *args, **kwargs):
        super().__init__()

        self.function = function
        self.args = args
        self.kwargs = kwargs

    def run(self):
        result = self.function(*self.args, **self.kwargs)
        self.onResult.emit({'thread_id': id(QThread.currentThread()), 'args': result})
        QThread.currentThread().quit()


def search(vocal: str) -> dict[str, list]:
    with MeasureRuntime() as rt:
        result = [vocal for x in range(3) if True]
        random.seed()
        time.sleep(random.random() * 3)
    print(f'searching for vocal {vocal} took {rt.elapsed:.3f} s {result=}', end='')
    return {vocal: result}


def main(exit):
    result = {}

    def concat(args):
        nonlocal result
        result.update(args['args'])
        print(f' {args["thread_id"]=}')

    print()
    with MeasureRuntime() as rt:
        print('start search for vocals')
        with QtThreads() as threads:
            for vocal in ('a', 'e', 'i', 'o', 'u'):
                threads.doInThread(search, vocal, callback=concat)
        print('search for vocals done!')
    print(f'\ntotal runtime is {rt.elapsed:.3f} s for {result=}')
    exit()


if __name__ == '__main__':
    qcoreapp = QCoreApplication(sys.argv)
    QTimer.singleShot(0, partial(main, qcoreapp.exit))
    qcoreapp.exec()

Die Ausgabe sieht in etwa so aus:

Code: Alles auswählen

start search for vocals
start collecting tasks...
add task with args=('a',)  id(thread)=2159449500416
add task with args=('e',)  id(thread)=2159449500704
add task with args=('i',)  id(thread)=2159449500992
add task with args=('o',)  id(thread)=2159449501280
add task with args=('u',)  id(thread)=2159449501568
start threads
searching for vocal i took 0.462 s result=['i', 'i', 'i'] args["thread_id"]=2159449500992
searching for vocal a took 1.008 s result=['a', 'a', 'a'] args["thread_id"]=2159449500416
searching for vocal o took 1.251 s result=['o', 'o', 'o'] args["thread_id"]=2159449501280
searching for vocal u took 1.500 s result=['u', 'u', 'u'] args["thread_id"]=2159449501568
searching for vocal e took 2.517 s result=['e', 'e', 'e'] args["thread_id"]=2159449500704
all threads done
search for vocals done!

total runtime is 2.517 s for result={'i': ['i', 'i', 'i'], 'a': ['a', 'a', 'a'], 'o': ['o', 'o', 'o'], 'u': ['u', 'u', 'u'], 'e': ['e', 'e', 'e']}

Process finished with exit code 0
Benutzeravatar
__blackjack__
User
Beiträge: 13116
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@holger47110815: Schau noch mal in meinen Code, ich habe da ja nicht nur QTimer benutzt, sondern auch QtThreads so umgebaut, dass das ein Signal sendet wenn der Kontext verlassen wurde und alle Threads abgearbeitet wurden. Ausgabe davon ist:

Code: Alles auswählen

total runtime is 0.001 s
saying Hello World! took 0.324 s
Goodby folks ({'thread_id': 140093534267296, 'result': None})
saying Hello World! took 0.422 s
Goodby folks ({'thread_id': 140093534267728, 'result': None})
saying Hello World! took 1.482 s
Goodby folks ({'thread_id': 140093534267152, 'result': None})
saying Hello World! took 1.594 s
Goodby folks ({'thread_id': 140093534267584, 'result': None})
saying Hello World! took 2.060 s
Goodby folks ({'thread_id': 140093534267440, 'result': None})
Ja die „total runtime“ ist jetzt natürlich unsinnig, aber das kann man dann nicht mehr mit dem Kontextmanager messen. Die Stop-Zeit muss man ermitteln wenn das Signal empfangen wurde das alle Threads abgearbeitet sind. Man könnte natürlich auch in `QtThreads.__exit__()` auf eine `QWaitCondition` warten beziehungsweise etwas entsprechendes aus dem `threading`-Modul verwenden damit `__exit__()` blockiert bis alle Threads abgearbeitet sind, aber das geht IMHO so ein bisschen gegen Qt, das gerne asynchron mit Signalen arbeitet. Siehe auch den entsprechenden Dokumentationstext zu `QThread.wait()`.

Bei Dir ist jetzt auch komisch das die Threads erst gestartet werden wenn der Kontext verlassen wird, statt sofort. Das wäre mir zu magisch und überraschend.
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
holger47110815
User
Beiträge: 12
Registriert: Sonntag 17. März 2024, 19:24

@__blackjack__:
__blackjack__ hat geschrieben: Montag 18. März 2024, 22:23
Bei Dir ist jetzt auch komisch das die Threads erst gestartet werden wenn der Kontext verlassen wird, statt sofort. Das wäre mir zu magisch und überraschend.
Du hast recht, das unterscheidet sich von meinem ersten Beitrag. Mir war es so sympatischer; ist aber eigentlich egal. Ziel der Übung ist folgendes. Es gibt eine Aufgabe die es erfordert, dieselbe Funktion mehrfach hintereinander mit unterschiedlichen Parametern aufzurufen. Die Ergebnisse der einzelnen Aufrufe werden zu einem Gesamtergebnis gesammelt. So kam ich auf die Idee, diese wiederholten Aufrufe zu parallelisieren. Der with-Block erstellt über eine Schleife für die unterschiedlichen Parameter entsprechend viele Threads mit jeweils einem Aufruf der Funktion mit einem Parameter. Der with-Block wird erst dann verlassen, wenn alle diese Threads abgearbeitet sind. Die Callback-Funktion sorgt für das Zusammenführen der Einzelergebnisse zum Gesamtergebnis.

Alles in Allem funktioniert das jetzt; auch auf verschiedenen Wegen. Durch die Diskussion mit Euch beiden habe ich wieder viel gelernt. Vielen Dank dafür!!

Gruß
Holger
Benutzeravatar
__blackjack__
User
Beiträge: 13116
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@holger47110815: Kontextmanager erwecken beim Leser die Erwartung das in `__exit__()` *Aufräumarbeiten* nach getaner Arbeit erledigt werden. Das dort die Arbeit erst gestartet/getan wird ist mir zu überraschend.

Ich hätte da auch eher `concurrent.futures` verwendet.
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
holger47110815
User
Beiträge: 12
Registriert: Sonntag 17. März 2024, 19:24

__blackjack__ hat geschrieben: Dienstag 19. März 2024, 14:13 Ich hätte da auch eher `concurrent.futures` verwendet.
@__blackjack__: Das kannte ich bis dahin nicht. Werde ich mir mal ansehen.

Es gibt bestimmt viele, viele fertige Lösungen, um seine Aufgaben zu lösen. Man muß sie halt kennen. Aber so lernt man eine Menge dazu. Eben auch, daß es concurrent.futures gibt ;-)
Antworten