@Nras: Asynchron heisst die Funktion wird gleichzeitig zur aktuellen ausgeführt. Das heisst der `apply_async()`-Aufruf wartet nicht bis `process_file_id()` ausgeführt wurde, sondern kehrt *sofort* zurück. Das bedeutet auch das die Schleife schon längst zuende sein kann/wird, bevor alle Aufrufe von `process_file_id()` durchgelaufen sind oder überhaupt gestartet wurden. Wir reden ja von diesem Quelltext, und warum da nie ein `ValueError` kommt, auch wenn der von `process_file_id()` ausgelöst wird:
Code: Alles auswählen
def mainfun():
file_ids = ['blabla1', 'blablub19',' blablala22']
a, b, c, d = 1, 'blub', 3, 4
pool = Pool()
for file_id in file_ids:
try:
pool.apply_async(process_file_id, (file_id, a, b, c, d))
print file_id + ' done'
except ValueError:
print file_id + ' failed'
Es wird `pool.apply_async()` aufgerufen und dann sofort ohne zu warten das ``print`` in der nächsten Zeile aufgerufen. Die `process_file_id()` ist zu dem Zeitpunkt entweder noch gar nicht aufgerufen worden, sie fangt gerade an ausgeführt zu werden. Die Schleife ``for``-Schleife wird ziemlich schnell durchlaufen und zwar unabhängig von der Zeit die ein `process_file_id()`-Aufruf insgesamt benötigt, denn entweder wird einem freien Worker-Prozess gesagt „führ die Funktion mal aus”, oder der Aufruf wird in eine Warteschlange gesteckt falls als Worker-Prozesse gerade schon beschäftigt sind. Das ``except`` bezieht sich nur auf den ``try``-Block, also nur für den Zeitraum wo auch wirklich diese beiden Zeilen ausgeführt werden, in *diesem* Prozess.
Die ``for``-Schleife wird auch beendet sein bevor alle `process_file_id()`-Aufrufe abgearbeitet worden sind, sofern die nicht wirklich sehr kurz sind. Deshalb würde ich ja bei dieser Variante den `pool.join()` machen nach der Schleife um sicherzustellen, dass die `mainfun()` erst zum Aufrufer zurückkehrt wenn alle IDs abgearbeitet wurden.
Beim `join()` muss man vorher `close()` oder `terminate()` aufgerufen haben, das habe ich übersehen. Bei meiner letzten Variante wo ich die Ergebnisse vom `apply_async()` aufgehoben habe, braucht man das `join()` auch nicht weil ich ja auf jedem Ergebnis einmal `wait()` aufgerufen habe, also sicher sein kann, dass die am Ende auch alle zuende abgearbeitet wurden.
Wenn man die Berechnungen abbrechen möchte, kann man `imap_unordered()` verwenden, was automatisch abbricht wenn man versucht ein Ergebnis abzufragen bei dem eine Ausnahme aufgetreten ist. Ein Problem könnte sein, dass man im Hauptprozess an der Stelle nicht mehr herausfinden kann welche `file_id` von dem Fehler betroffen war. Ich habe das mal in der `process_file_id()` ausgeben lassen:
Code: Alles auswählen
#!/usr/bin/env python
import random
import time
from multiprocessing import Pool
class NoDataError(ValueError):
pass
def get_data(file_id, a, b):
if file_id not in ['a', 'c', 'd']:
raise NoDataError
return 1
def store_result(result, file_id):
if file_id == 'c': # <--- some unforseen Error
raise NameError
print 'file_id: ' + file_id + ' -- stored'
def do_something(data, c, d):
# while True: # <--- testing number of active cpus
# pass
return data
def process_file_id((file_id, a, b, c, d)):
try:
try:
data = get_data(file_id, a, b)
except NoDataError:
print 'file_id: ' + file_id + ' -- raised NoDataError' # log
else:
result = do_something(data, c, d)
time.sleep(random.random() * 5)
store_result(result, file_id)
except:
print 'file_id:', file_id, 'failed'
raise
def main():
file_ids = ['a', 'b', 'c', 'd']
a, b, c, d = 1, 'blub', 3, 4
pool = Pool(3)
for _ in pool.imap_unordered(
process_file_id, ((file_id, a, b, c, d) for file_id in file_ids)
):
pass
if __name__ == '__main__':
main()