Code: Alles auswählen
def mainfun():
file_ids = ['blabla1', 'blablub19',' blablala22']
a, b, c, d = 1, 'blub', 3, 4
pool = Pool()
for file_id in file_ids:
try:
pool.apply_async(process_file_id, (file_id, a, b, c, d))
print file_id + ' done'
except ValueError:
print file_id + ' failed'
Die ``for``-Schleife wird auch beendet sein bevor alle `process_file_id()`-Aufrufe abgearbeitet worden sind, sofern die nicht wirklich sehr kurz sind. Deshalb würde ich ja bei dieser Variante den `pool.join()` machen nach der Schleife um sicherzustellen, dass die `mainfun()` erst zum Aufrufer zurückkehrt wenn alle IDs abgearbeitet wurden.
Beim `join()` muss man vorher `close()` oder `terminate()` aufgerufen haben, das habe ich übersehen. Bei meiner letzten Variante wo ich die Ergebnisse vom `apply_async()` aufgehoben habe, braucht man das `join()` auch nicht weil ich ja auf jedem Ergebnis einmal `wait()` aufgerufen habe, also sicher sein kann, dass die am Ende auch alle zuende abgearbeitet wurden.
Wenn man die Berechnungen abbrechen möchte, kann man `imap_unordered()` verwenden, was automatisch abbricht wenn man versucht ein Ergebnis abzufragen bei dem eine Ausnahme aufgetreten ist. Ein Problem könnte sein, dass man im Hauptprozess an der Stelle nicht mehr herausfinden kann welche `file_id` von dem Fehler betroffen war. Ich habe das mal in der `process_file_id()` ausgeben lassen:
Code: Alles auswählen
#!/usr/bin/env python
import random
import time
from multiprocessing import Pool
class NoDataError(ValueError):
pass
def get_data(file_id, a, b):
if file_id not in ['a', 'c', 'd']:
raise NoDataError
return 1
def store_result(result, file_id):
if file_id == 'c': # <--- some unforseen Error
raise NameError
print 'file_id: ' + file_id + ' -- stored'
def do_something(data, c, d):
# while True: # <--- testing number of active cpus
# pass
return data
def process_file_id((file_id, a, b, c, d)):
try:
try:
data = get_data(file_id, a, b)
except NoDataError:
print 'file_id: ' + file_id + ' -- raised NoDataError' # log
else:
result = do_something(data, c, d)
time.sleep(random.random() * 5)
store_result(result, file_id)
except:
print 'file_id:', file_id, 'failed'
raise
def main():
file_ids = ['a', 'b', 'c', 'd']
a, b, c, d = 1, 'blub', 3, 4
pool = Pool(3)
for _ in pool.imap_unordered(
process_file_id, ((file_id, a, b, c, d) for file_id in file_ids)
):
pass
if __name__ == '__main__':
main()