Suche Alternative zu ThreadPoolExecutor um Threads bzw. Prozesse zu beenden

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Sirius3
User
Beiträge: 18278
Registriert: Sonntag 21. Oktober 2012, 17:20

Vielleicht ist meine Antwort untergegangen. Bei Dir wird die komplette Instanz, die an `self` gebunden ist, in den Subprocess übertragen. Da ist alles mögliche dabei, wohl auch ein _queue.SimpleQueue-Objekt.

Deshalb habe ich die Funktion so umgeschrieben, dass sie kein `self` mehr braucht, und damit hast Du hoffentlich kein Problem mehr (es sei denn der Source-provider hat auch irgendwelche seltsame Objekte).
kasi45
User
Beiträge: 12
Registriert: Montag 11. August 2025, 18:44

@Sirius3

ohne 'self' - das hab ich auch getestet. einmal nur die Funktion _load_movie_data() außerhalb der klasse und auch beide Funktionen in einen eigenen file gepackt.
in keinem Fall kam es dazu das pydevd reagiert hat
def _load_movie_data(queue):
import pydevd
pydevd.settrace('localhost', port=12345, stdoutToServer=True, stderrToServer=True)
print(queue)
Sirius3
User
Beiträge: 18278
Registriert: Sonntag 21. Oktober 2012, 17:20

Woher kommt die Ausgabe, die Du zeigst?

Was hast Du ausprobiert? Welcher Code? Was passiert und wie weicht das von Deiner Erwartung ab?
kasi45
User
Beiträge: 12
Registriert: Montag 11. August 2025, 18:44

Hallo,
Ich habe erst mal versucht mich mit ein paar Grundlagen zu beschäftigen und der folgende Test Code funzt auch erst mal.
Ich habe mir drei Prozesse erstellt mit unterschiedlichen Laufzeiten (sleep)
Zum Test werden zwei Prozesse in einer vorbestimmten Zeit beendet und der dritte "moviestar" gewollt nicht.
Dank beep() konnte ich nachverfolgen wie die Prozesse laufen.
TODO:
- Innerhalb der Schleife benötige ich wiederholt eine Liste aller aktiven Prozesse
- nach dem Ende der Schleife sollen die noch aktiven Prozesse beendet werden. (im Beispiel 'moviestar')
- Ein return an die aufrufende Funktion run() mit allen Ergebnissen aus den Prozessen die regulär in der Zeit (schleife) beendet wurden (im Beispiel 'filmpalast' und 'filmpro')

Code: Alles auswählen

import winsound
import multiprocessing
from time import sleep

def beep():
    frequency = 2500  # Set Frequency To 2500 Hertz
    duration = 500  # Set Duration To 1000 ms == 1 second
    winsound.Beep(frequency, duration)


def get_stream_sources(params):
    title = params.get('title')
    originaltitle = params.get('originaltitle')
    titles = {title,originaltitle}
    year = int(params.get('year')) if 'year' in params else None
    imdb = params.get('imdb') if 'year' in params else None
    season = int(params.get('season')) if 'season' in params else None
    episode = int(params.get('episode')) if 'episode' in params else None

    # -- zum test --
    modules_scraper = [('filmpalast',5), ('filmpro',10), ('moviestar',30)]    # {name,sleep}
    # ----

    try:
        processes = [
            multiprocessing.Process(
                target=load_stream_source,
                name=modul_name,
                # args=[titles, year, season, episode, imdb, modul_name, modul_source]
                args=[modul_name, modul_source]
            )
            for modul_name, modul_source in modules_scraper
        ]
        for process in processes:
            process.start()

        # Todo
        out = 3
        for i in range(0, 4 * out): # nach 12 sek wird die schleife beendet
            sleep(2)
            # hier benötige ich eine Liste aller noch aktiven processes
            for p in multiprocessing.active_children():
                print('aktiv: '+p.name )

        # Todo
        # hier alle noch laufende processes beenden (process moviestar ist noch nicht beendet)
        for p in multiprocessing.active_children():
            print('kill: ' + p.name)

        # Todo
        # hier eine liste der Rückgabe aller processes
        print('return Liste')

    except Exception as e:
        print('Exception: ' + str(e))

def load_stream_source(modul_name, modul_source): # modul_source zum test wert für sleep
    try:
        beep()
        sleep(modul_source)
        beep()
        return modul_name
    except Exception as e:
        print('Exception: ' + str(e))
        return str(e)

def run(params):
    items = get_stream_sources(params)
    print('items: '+  str(items))


if __name__ == '__main__':
    params = {
        'action': 'playExtern',
        'title': 'Wilhelm Tell',
        'originaltitle': 'William Tell',
        'year': '2025', 'imdb': 'tt22478818',
        'tmdb': '1195631',
        'id': '1195631',
        'name': 'Wilhelm Tell (2025)',
        'season': '0',
        'episode': '0'
    }
    run(params)
Im voraus schon mal vielen Dank für Hilfe und Anregungen zur Lösung
Gruß kasi45
Benutzeravatar
Dennis89
User
Beiträge: 1562
Registriert: Freitag 11. Dezember 2020, 15:13

Hallo,

die aktiven Prozesse hast du doch mit `active_children` und anstatt `name` aufzurufen kannst du doch sicherlich mit `terminate` den Prozess dann auch beenden, nach deiner Schleife. Wenn du übrigens 12 Schleifendurchgänge hast und in der Schleife 2 Sekunden wartest, dann geht die Schleife so ca. 24 Sekunden und keine 12. Wenn man für eine gewisse Zeit etwas machen will bzw. die Zeit messen, dann bietet sich `time.monotonic` an.

Grüße
Dennis
"When I got the music, I got a place to go" [Rancid, 1993]
kasi45
User
Beiträge: 12
Registriert: Montag 11. August 2025, 18:44

@Dennis89

Danke!

Code: Alles auswählen

        for p in multiprocessing.active_children():
            p.terminate()
            print('kill: ' + p.name)
Sooo einfach, wenn man es weiss :D

Jetzt bleibt für mich aber noch die große Frage wie ich an die "Ergebnisse" der Prozesse ran komme.
kasi45
User
Beiträge: 12
Registriert: Montag 11. August 2025, 18:44

Es hat lange gedauert - aber hier ist das funktionierende Grundgerüst vom Test Code mit allem was ich benötige :D

Code: Alles auswählen

import multiprocessing
from time import sleep

is_beep = True # beep on/off

try:
    import winsound
    def beep():
        frequency = 2500  # Set Frequency To 2500 Hertz
        duration = 500  # Set Duration To 1000 ms == 1 second
        winsound.Beep(frequency, duration)
except:
    is_beep = False


class sources:
    def __init__(self):
        self.sources = []


    def get_stream_sources(self, params):
        title = params.get('title')
        originaltitle = params.get('originaltitle')
        titles = {title,originaltitle}
        year = int(params.get('year')) if 'year' in params else None
        imdb = params.get('imdb') if 'year' in params else None
        season = int(params.get('season')) if 'season' in params else None
        episode = int(params.get('episode')) if 'episode' in params else None

        # -- zum test --
        modules_scraper = [('filmpalast',5), ('filmpro',10), ('moviestar',30)]    # {name,sleep}
        # ----

        manager = multiprocessing.Manager()
        return_dict = manager.dict()
        try:
            processes = [
                multiprocessing.Process(
                    target=self.load_stream_source,
                    name=modul_name,
                    args=[modul_name, sleep_sec, return_dict]
                )
                for modul_name, sleep_sec in modules_scraper
            ]
            for process in processes:
                process.start()

            for i in range(0, 12): # 12*2 sleep - nach 24 sek wird die schleife beendet
                sleep(2)    # weniger print()

                active_children = [
                    p.name  for p in multiprocessing.active_children() if p.name != 'SyncManager-1'
                ]
                # die Werte von 'active_children' und 'return_dict.values()' stehen zur verfügung
                print('active_children: '+str(active_children))
                print('return_dict_values: ' + str(return_dict.values()))

            self.sources =  return_dict.values()

            # beendet Prozesse die länger brauchen als die Zeit Schleife (24 sek) - 'moviestar' braucht 30 sekunden
            for p in multiprocessing.active_children():
                p.terminate()
                print('kill: ' + p.name)

            return self.sources
        except Exception as e:
            print('Exception: ' + str(e))


    def load_stream_source(self, modul_name, sleep_sec, return_dict): # modul_source zum test wert für sleep
        try:
            if is_beep: beep()
            sleep(sleep_sec)
            if is_beep: beep()
            return_dict[modul_name]=modul_name
        except Exception as e:
            print('Exception: ' + str(e))


    def run(self,params):
        items = self.get_stream_sources(params)
        print('items: '+  str(items))


if __name__ == '__main__':
    params = {
        'action': 'playExtern',
        'title': 'Wilhelm Tell',
        'originaltitle': 'William Tell',
        'year': '2025', 'imdb': 'tt22478818',
        'tmdb': '1195631',
        'id': '1195631',
        'name': 'Wilhelm Tell (2025)',
        'season': '0',
        'episode': '0'
    }
    sources().run(params)
Grüße kasi45
Benutzeravatar
__blackjack__
User
Beiträge: 14077
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@kasi45: Achtung: Das meiste in diesem Beitrag bezieht sich auf Deinen vorletzten Beitrag.

Also erst einmal sollte man es sich nicht so einfach machen und nur die Prozesse betrachten, die auch wirklich zu dieser Aufgabe gehören. Und da ist ja bereits eine Liste die `processes` heisst. Daraus die noch aktiven heraus zu filtern ist einfach, denn man kann Prozesse ja ”fragen” ob sie fertig sind. Ein Grund dafür sieht man im letzten Beitrag wo Du einen Prozess anhand des Namens herausfilterst. Der muss aber gar nicht so heissen, und es könnte auch noch andere geben. Das ist alles sehr wackelig sich da auf solche Implementierungsdetails zu verlassen.

Aber bevor man anfängt sich da was zu basteln was die Prozesse bündelt und selbst Schleifen zu schreiben um Prozesse abzufragen, zu beenden, Ergebnisse zu übermitteln und so weiter, sollte man vielleicht einfach einen `multiprocessing.Pool` nehmen, wo sehr vieles davon schon umgesetzt ist.

`get_stream_sources()` macht nichts sinnvolles mit dem Argument, also kann das komplett von `run()` an weg. An der Stelle die Anmerkung, dass man Daten validieren und normalisieren sollte sobald sie das Programm betreten. In der `get_stream_sources()`-Funktion sollten sehr wahrscheinlich schon alle Schlüssel vorhanden sein, und die Werte den richtigen Datentyp haben. Eventuell ist auch ein Wörterbuch an der Stelle nicht so das wahre, denn Wörterbücher die einen festen Satz an Zeichenketten als Schlüssel haben, sind eigentlich eher Objekte.

Zeichenketten und Werte mit `str()` und ``+`` zusammenbasteln ist eher BASIC als Python. Python hat eine `format()`-Methode auf Zeichenketten und f-Zeichenkettenliterale. Aber bei `print()` kann man auch einfach mehrere Argumente übergeben. Wenn das keine Zeichenketten sind, dann wandelt `print()` die selbst in solche um.

Mal ein minimales Beispiel wie man einen `Pool` mit einem Prozess pro Auftrag erstellt, die Aufträge absetzt, und den nach 12 Sekunden beendet. Die Ergebnisse werden per Rückruffunktion in einer Liste gesammelt:

Code: Alles auswählen

import multiprocessing
from time import sleep


# modul_source zum test wert für sleep
def load_stream_source(modul_name, modul_source):
    sleep(modul_source)
    return modul_name


def get_stream_sources():
    # -- zum test --
    # (name, sleep)
    modules_scraper = [("filmpalast", 5), ("filmpro", 10), ("moviestar", 30)]
    # ----
    results = []
    with multiprocessing.Pool(len(modules_scraper)) as pool:
        for arguments in modules_scraper:
            pool.apply_async(
                load_stream_source, arguments, callback=results.append
            )
        sleep(12)

    return results


def run():
    print("items:", get_stream_sources())


if __name__ == "__main__":
    run()
“Vir, intelligence has nothing to do with politics!” — Londo Mollari
Antworten