Vielleicht ist meine Antwort untergegangen. Bei Dir wird die komplette Instanz, die an `self` gebunden ist, in den Subprocess übertragen. Da ist alles mögliche dabei, wohl auch ein _queue.SimpleQueue-Objekt.
Deshalb habe ich die Funktion so umgeschrieben, dass sie kein `self` mehr braucht, und damit hast Du hoffentlich kein Problem mehr (es sei denn der Source-provider hat auch irgendwelche seltsame Objekte).
Suche Alternative zu ThreadPoolExecutor um Threads bzw. Prozesse zu beenden
@Sirius3
ohne 'self' - das hab ich auch getestet. einmal nur die Funktion _load_movie_data() außerhalb der klasse und auch beide Funktionen in einen eigenen file gepackt.
in keinem Fall kam es dazu das pydevd reagiert hat
ohne 'self' - das hab ich auch getestet. einmal nur die Funktion _load_movie_data() außerhalb der klasse und auch beide Funktionen in einen eigenen file gepackt.
in keinem Fall kam es dazu das pydevd reagiert hat
def _load_movie_data(queue):
import pydevd
pydevd.settrace('localhost', port=12345, stdoutToServer=True, stderrToServer=True)
print(queue)
Hallo,
Ich habe erst mal versucht mich mit ein paar Grundlagen zu beschäftigen und der folgende Test Code funzt auch erst mal.
Ich habe mir drei Prozesse erstellt mit unterschiedlichen Laufzeiten (sleep)
Zum Test werden zwei Prozesse in einer vorbestimmten Zeit beendet und der dritte "moviestar" gewollt nicht.
Dank beep() konnte ich nachverfolgen wie die Prozesse laufen.
TODO:
- Innerhalb der Schleife benötige ich wiederholt eine Liste aller aktiven Prozesse
- nach dem Ende der Schleife sollen die noch aktiven Prozesse beendet werden. (im Beispiel 'moviestar')
- Ein return an die aufrufende Funktion run() mit allen Ergebnissen aus den Prozessen die regulär in der Zeit (schleife) beendet wurden (im Beispiel 'filmpalast' und 'filmpro')
Im voraus schon mal vielen Dank für Hilfe und Anregungen zur Lösung
Gruß kasi45
Ich habe erst mal versucht mich mit ein paar Grundlagen zu beschäftigen und der folgende Test Code funzt auch erst mal.
Ich habe mir drei Prozesse erstellt mit unterschiedlichen Laufzeiten (sleep)
Zum Test werden zwei Prozesse in einer vorbestimmten Zeit beendet und der dritte "moviestar" gewollt nicht.
Dank beep() konnte ich nachverfolgen wie die Prozesse laufen.
TODO:
- Innerhalb der Schleife benötige ich wiederholt eine Liste aller aktiven Prozesse
- nach dem Ende der Schleife sollen die noch aktiven Prozesse beendet werden. (im Beispiel 'moviestar')
- Ein return an die aufrufende Funktion run() mit allen Ergebnissen aus den Prozessen die regulär in der Zeit (schleife) beendet wurden (im Beispiel 'filmpalast' und 'filmpro')
Code: Alles auswählen
import winsound
import multiprocessing
from time import sleep
def beep():
frequency = 2500 # Set Frequency To 2500 Hertz
duration = 500 # Set Duration To 1000 ms == 1 second
winsound.Beep(frequency, duration)
def get_stream_sources(params):
title = params.get('title')
originaltitle = params.get('originaltitle')
titles = {title,originaltitle}
year = int(params.get('year')) if 'year' in params else None
imdb = params.get('imdb') if 'year' in params else None
season = int(params.get('season')) if 'season' in params else None
episode = int(params.get('episode')) if 'episode' in params else None
# -- zum test --
modules_scraper = [('filmpalast',5), ('filmpro',10), ('moviestar',30)] # {name,sleep}
# ----
try:
processes = [
multiprocessing.Process(
target=load_stream_source,
name=modul_name,
# args=[titles, year, season, episode, imdb, modul_name, modul_source]
args=[modul_name, modul_source]
)
for modul_name, modul_source in modules_scraper
]
for process in processes:
process.start()
# Todo
out = 3
for i in range(0, 4 * out): # nach 12 sek wird die schleife beendet
sleep(2)
# hier benötige ich eine Liste aller noch aktiven processes
for p in multiprocessing.active_children():
print('aktiv: '+p.name )
# Todo
# hier alle noch laufende processes beenden (process moviestar ist noch nicht beendet)
for p in multiprocessing.active_children():
print('kill: ' + p.name)
# Todo
# hier eine liste der Rückgabe aller processes
print('return Liste')
except Exception as e:
print('Exception: ' + str(e))
def load_stream_source(modul_name, modul_source): # modul_source zum test wert für sleep
try:
beep()
sleep(modul_source)
beep()
return modul_name
except Exception as e:
print('Exception: ' + str(e))
return str(e)
def run(params):
items = get_stream_sources(params)
print('items: '+ str(items))
if __name__ == '__main__':
params = {
'action': 'playExtern',
'title': 'Wilhelm Tell',
'originaltitle': 'William Tell',
'year': '2025', 'imdb': 'tt22478818',
'tmdb': '1195631',
'id': '1195631',
'name': 'Wilhelm Tell (2025)',
'season': '0',
'episode': '0'
}
run(params)
Gruß kasi45
Hallo,
die aktiven Prozesse hast du doch mit `active_children` und anstatt `name` aufzurufen kannst du doch sicherlich mit `terminate` den Prozess dann auch beenden, nach deiner Schleife. Wenn du übrigens 12 Schleifendurchgänge hast und in der Schleife 2 Sekunden wartest, dann geht die Schleife so ca. 24 Sekunden und keine 12. Wenn man für eine gewisse Zeit etwas machen will bzw. die Zeit messen, dann bietet sich `time.monotonic` an.
Grüße
Dennis
die aktiven Prozesse hast du doch mit `active_children` und anstatt `name` aufzurufen kannst du doch sicherlich mit `terminate` den Prozess dann auch beenden, nach deiner Schleife. Wenn du übrigens 12 Schleifendurchgänge hast und in der Schleife 2 Sekunden wartest, dann geht die Schleife so ca. 24 Sekunden und keine 12. Wenn man für eine gewisse Zeit etwas machen will bzw. die Zeit messen, dann bietet sich `time.monotonic` an.
Grüße
Dennis
"When I got the music, I got a place to go" [Rancid, 1993]
@Dennis89
Danke!
Sooo einfach, wenn man es weiss
Jetzt bleibt für mich aber noch die große Frage wie ich an die "Ergebnisse" der Prozesse ran komme.
Danke!
Code: Alles auswählen
for p in multiprocessing.active_children():
p.terminate()
print('kill: ' + p.name)

Jetzt bleibt für mich aber noch die große Frage wie ich an die "Ergebnisse" der Prozesse ran komme.
Es hat lange gedauert - aber hier ist das funktionierende Grundgerüst vom Test Code mit allem was ich benötige
Grüße kasi45

Code: Alles auswählen
import multiprocessing
from time import sleep
is_beep = True # beep on/off
try:
import winsound
def beep():
frequency = 2500 # Set Frequency To 2500 Hertz
duration = 500 # Set Duration To 1000 ms == 1 second
winsound.Beep(frequency, duration)
except:
is_beep = False
class sources:
def __init__(self):
self.sources = []
def get_stream_sources(self, params):
title = params.get('title')
originaltitle = params.get('originaltitle')
titles = {title,originaltitle}
year = int(params.get('year')) if 'year' in params else None
imdb = params.get('imdb') if 'year' in params else None
season = int(params.get('season')) if 'season' in params else None
episode = int(params.get('episode')) if 'episode' in params else None
# -- zum test --
modules_scraper = [('filmpalast',5), ('filmpro',10), ('moviestar',30)] # {name,sleep}
# ----
manager = multiprocessing.Manager()
return_dict = manager.dict()
try:
processes = [
multiprocessing.Process(
target=self.load_stream_source,
name=modul_name,
args=[modul_name, sleep_sec, return_dict]
)
for modul_name, sleep_sec in modules_scraper
]
for process in processes:
process.start()
for i in range(0, 12): # 12*2 sleep - nach 24 sek wird die schleife beendet
sleep(2) # weniger print()
active_children = [
p.name for p in multiprocessing.active_children() if p.name != 'SyncManager-1'
]
# die Werte von 'active_children' und 'return_dict.values()' stehen zur verfügung
print('active_children: '+str(active_children))
print('return_dict_values: ' + str(return_dict.values()))
self.sources = return_dict.values()
# beendet Prozesse die länger brauchen als die Zeit Schleife (24 sek) - 'moviestar' braucht 30 sekunden
for p in multiprocessing.active_children():
p.terminate()
print('kill: ' + p.name)
return self.sources
except Exception as e:
print('Exception: ' + str(e))
def load_stream_source(self, modul_name, sleep_sec, return_dict): # modul_source zum test wert für sleep
try:
if is_beep: beep()
sleep(sleep_sec)
if is_beep: beep()
return_dict[modul_name]=modul_name
except Exception as e:
print('Exception: ' + str(e))
def run(self,params):
items = self.get_stream_sources(params)
print('items: '+ str(items))
if __name__ == '__main__':
params = {
'action': 'playExtern',
'title': 'Wilhelm Tell',
'originaltitle': 'William Tell',
'year': '2025', 'imdb': 'tt22478818',
'tmdb': '1195631',
'id': '1195631',
'name': 'Wilhelm Tell (2025)',
'season': '0',
'episode': '0'
}
sources().run(params)
- __blackjack__
- User
- Beiträge: 14076
- Registriert: Samstag 2. Juni 2018, 10:21
- Wohnort: 127.0.0.1
- Kontaktdaten:
@kasi45: Achtung: Das meiste in diesem Beitrag bezieht sich auf Deinen vorletzten Beitrag.
Also erst einmal sollte man es sich nicht so einfach machen und nur die Prozesse betrachten, die auch wirklich zu dieser Aufgabe gehören. Und da ist ja bereits eine Liste die `processes` heisst. Daraus die noch aktiven heraus zu filtern ist einfach, denn man kann Prozesse ja ”fragen” ob sie fertig sind. Ein Grund dafür sieht man im letzten Beitrag wo Du einen Prozess anhand des Namens herausfilterst. Der muss aber gar nicht so heissen, und es könnte auch noch andere geben. Das ist alles sehr wackelig sich da auf solche Implementierungsdetails zu verlassen.
Aber bevor man anfängt sich da was zu basteln was die Prozesse bündelt und selbst Schleifen zu schreiben um Prozesse abzufragen, zu beenden, Ergebnisse zu übermitteln und so weiter, sollte man vielleicht einfach einen `multiprocessing.Pool` nehmen, wo sehr vieles davon schon umgesetzt ist.
`get_stream_sources()` macht nichts sinnvolles mit dem Argument, also kann das komplett von `run()` an weg. An der Stelle die Anmerkung, dass man Daten validieren und normalisieren sollte sobald sie das Programm betreten. In der `get_stream_sources()`-Funktion sollten sehr wahrscheinlich schon alle Schlüssel vorhanden sein, und die Werte den richtigen Datentyp haben. Eventuell ist auch ein Wörterbuch an der Stelle nicht so das wahre, denn Wörterbücher die einen festen Satz an Zeichenketten als Schlüssel haben, sind eigentlich eher Objekte.
Zeichenketten und Werte mit `str()` und ``+`` zusammenbasteln ist eher BASIC als Python. Python hat eine `format()`-Methode auf Zeichenketten und f-Zeichenkettenliterale. Aber bei `print()` kann man auch einfach mehrere Argumente übergeben. Wenn das keine Zeichenketten sind, dann wandelt `print()` die selbst in solche um.
Mal ein minimales Beispiel wie man einen `Pool` mit einem Prozess pro Auftrag erstellt, die Aufträge absetzt, und den nach 12 Sekunden beendet. Die Ergebnisse werden per Rückruffunktion in einer Liste gesammelt:
Also erst einmal sollte man es sich nicht so einfach machen und nur die Prozesse betrachten, die auch wirklich zu dieser Aufgabe gehören. Und da ist ja bereits eine Liste die `processes` heisst. Daraus die noch aktiven heraus zu filtern ist einfach, denn man kann Prozesse ja ”fragen” ob sie fertig sind. Ein Grund dafür sieht man im letzten Beitrag wo Du einen Prozess anhand des Namens herausfilterst. Der muss aber gar nicht so heissen, und es könnte auch noch andere geben. Das ist alles sehr wackelig sich da auf solche Implementierungsdetails zu verlassen.
Aber bevor man anfängt sich da was zu basteln was die Prozesse bündelt und selbst Schleifen zu schreiben um Prozesse abzufragen, zu beenden, Ergebnisse zu übermitteln und so weiter, sollte man vielleicht einfach einen `multiprocessing.Pool` nehmen, wo sehr vieles davon schon umgesetzt ist.
`get_stream_sources()` macht nichts sinnvolles mit dem Argument, also kann das komplett von `run()` an weg. An der Stelle die Anmerkung, dass man Daten validieren und normalisieren sollte sobald sie das Programm betreten. In der `get_stream_sources()`-Funktion sollten sehr wahrscheinlich schon alle Schlüssel vorhanden sein, und die Werte den richtigen Datentyp haben. Eventuell ist auch ein Wörterbuch an der Stelle nicht so das wahre, denn Wörterbücher die einen festen Satz an Zeichenketten als Schlüssel haben, sind eigentlich eher Objekte.
Zeichenketten und Werte mit `str()` und ``+`` zusammenbasteln ist eher BASIC als Python. Python hat eine `format()`-Methode auf Zeichenketten und f-Zeichenkettenliterale. Aber bei `print()` kann man auch einfach mehrere Argumente übergeben. Wenn das keine Zeichenketten sind, dann wandelt `print()` die selbst in solche um.
Mal ein minimales Beispiel wie man einen `Pool` mit einem Prozess pro Auftrag erstellt, die Aufträge absetzt, und den nach 12 Sekunden beendet. Die Ergebnisse werden per Rückruffunktion in einer Liste gesammelt:
Code: Alles auswählen
import multiprocessing
from time import sleep
# modul_source zum test wert für sleep
def load_stream_source(modul_name, modul_source):
sleep(modul_source)
return modul_name
def get_stream_sources():
# -- zum test --
# (name, sleep)
modules_scraper = [("filmpalast", 5), ("filmpro", 10), ("moviestar", 30)]
# ----
results = []
with multiprocessing.Pool(len(modules_scraper)) as pool:
for arguments in modules_scraper:
pool.apply_async(
load_stream_source, arguments, callback=results.append
)
sleep(12)
return results
def run():
print("items:", get_stream_sources())
if __name__ == "__main__":
run()
“Vir, intelligence has nothing to do with politics!” — Londo Mollari