Schleife Parallelisieren ohne später Daten zusammenzufügen

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
BlackJack

Donnerstag 17. April 2014, 16:03

@Nras: Asynchron heisst die Funktion wird gleichzeitig zur aktuellen ausgeführt. Das heisst der `apply_async()`-Aufruf wartet nicht bis `process_file_id()` ausgeführt wurde, sondern kehrt *sofort* zurück. Das bedeutet auch das die Schleife schon längst zuende sein kann/wird, bevor alle Aufrufe von `process_file_id()` durchgelaufen sind oder überhaupt gestartet wurden. Wir reden ja von diesem Quelltext, und warum da nie ein `ValueError` kommt, auch wenn der von `process_file_id()` ausgelöst wird:

Code: Alles auswählen

def mainfun():
    file_ids = ['blabla1', 'blablub19',' blablala22']
    a, b, c, d = 1, 'blub', 3, 4
    pool = Pool()
    for file_id in file_ids:
        try:
            pool.apply_async(process_file_id, (file_id, a, b, c, d))
            print file_id + ' done'
        except ValueError:
            print file_id + ' failed'
Es wird `pool.apply_async()` aufgerufen und dann sofort ohne zu warten das ``print`` in der nächsten Zeile aufgerufen. Die `process_file_id()` ist zu dem Zeitpunkt entweder noch gar nicht aufgerufen worden, sie fangt gerade an ausgeführt zu werden. Die Schleife ``for``-Schleife wird ziemlich schnell durchlaufen und zwar unabhängig von der Zeit die ein `process_file_id()`-Aufruf insgesamt benötigt, denn entweder wird einem freien Worker-Prozess gesagt „führ die Funktion mal aus”, oder der Aufruf wird in eine Warteschlange gesteckt falls als Worker-Prozesse gerade schon beschäftigt sind. Das ``except`` bezieht sich nur auf den ``try``-Block, also nur für den Zeitraum wo auch wirklich diese beiden Zeilen ausgeführt werden, in *diesem* Prozess.

Die ``for``-Schleife wird auch beendet sein bevor alle `process_file_id()`-Aufrufe abgearbeitet worden sind, sofern die nicht wirklich sehr kurz sind. Deshalb würde ich ja bei dieser Variante den `pool.join()` machen nach der Schleife um sicherzustellen, dass die `mainfun()` erst zum Aufrufer zurückkehrt wenn alle IDs abgearbeitet wurden.

Beim `join()` muss man vorher `close()` oder `terminate()` aufgerufen haben, das habe ich übersehen. Bei meiner letzten Variante wo ich die Ergebnisse vom `apply_async()` aufgehoben habe, braucht man das `join()` auch nicht weil ich ja auf jedem Ergebnis einmal `wait()` aufgerufen habe, also sicher sein kann, dass die am Ende auch alle zuende abgearbeitet wurden.

Wenn man die Berechnungen abbrechen möchte, kann man `imap_unordered()` verwenden, was automatisch abbricht wenn man versucht ein Ergebnis abzufragen bei dem eine Ausnahme aufgetreten ist. Ein Problem könnte sein, dass man im Hauptprozess an der Stelle nicht mehr herausfinden kann welche `file_id` von dem Fehler betroffen war. Ich habe das mal in der `process_file_id()` ausgeben lassen:

Code: Alles auswählen

#!/usr/bin/env python
import random
import time
from multiprocessing import Pool


class NoDataError(ValueError):
    pass


def get_data(file_id, a, b):
    if file_id not in ['a', 'c', 'd']:
        raise NoDataError
    return 1


def store_result(result, file_id):
    if file_id == 'c':  # <--- some unforseen Error
        raise NameError
    print 'file_id: ' + file_id + ' -- stored'


def do_something(data, c, d):
    # while True:  # <--- testing number of active cpus
        # pass
    return data


def process_file_id((file_id, a, b, c, d)):
    try:
        try:
            data = get_data(file_id, a, b)
        except NoDataError:
            print 'file_id: ' + file_id + ' -- raised NoDataError'  # log
        else:
            result = do_something(data, c, d)
            time.sleep(random.random() * 5)
            store_result(result, file_id)
    except:
        print 'file_id:', file_id, 'failed'
        raise


def main():
    file_ids = ['a', 'b', 'c', 'd']
    a, b, c, d = 1, 'blub', 3, 4
    pool = Pool(3)
    for _ in pool.imap_unordered(
        process_file_id, ((file_id, a, b, c, d) for file_id in file_ids)
    ):
        pass


if __name__ == '__main__':
    main()
Nras
User
Beiträge: 24
Registriert: Dienstag 25. März 2014, 10:38

Dienstag 22. April 2014, 08:29

Hallo,

vielen Dank, ich denke, ich habe das nun verstanden.

Viele Grüße,
Nras.
Nras
User
Beiträge: 24
Registriert: Dienstag 25. März 2014, 10:38

Freitag 25. April 2014, 11:48

Hallo,

ich will noch eine kurze Zusammenfassung als Rückmeldung geben, falls jemand mal so ein ähnliches Problem hat und diese Seite hier findet:

Es gab noch kleinere Hürden und letztenendlich habe ich mich für ``map`` und nicht ``imap_unordered`` entschieden, da mir dies simpler erscheint und mir der Unterschied zwischen den beiden nicht klar war. Zudem bricht ``map`` auch bei nicht abgefangenen Fehlern/Ausnahmen sofort ab - das ist das, was ich gerne hätte (mag sein, dass ``imap_unordered`` das auch macht).
''map'' akzeptiert nur ein Argument, (das ``imap_unordered`` wohl auch, daher noch eine Wrapper-Funktion.
Die parallelisierte Funktion muss im selben Modul liegen, in dem die Parallelisierung stattfindet.
Da die Datenbankzugriffe (v.a. die ersten n Stück bei n CPUs) nahezu gleichzeitig stattfinden, gab es beim mysql noch das Problem, dass sie versuchen, dieselbe Verbindung zu nutzen, was in mysql-Fehlern endete. Lösung: zwingen, eine neue Verbindung aufzubauen: http://stackoverflow.com/questions/8242 ... onnections Also ein connection.close() zu Anfang jeder Datenbankabfrage.

Der lauffähige Code sieht nun so aus (Kommentare Zeile 23 und 24 entfernen für NameError und dadurch Abbruch)

Code: Alles auswählen

import random
import time
from multiprocessing import Pool

from django.db import connection

class NoDataError(ValueError):
    pass


def do_work(args):
    connection.close()  # <-- makes sure that a new db connection is used
    file_id, a, b, c, d = args
    return process_file_id(file_id, a, b, c, d)


def get_data(file_id, a, b):
    if not file_id in ['a', 'c', 'd']:
        raise NoDataError
    return 1

def store_result(result, file_id):
    # if file_id == 'd':  # <--- some unforseen Error (uncomment for testing)
    #     raise NameError
    print 'file_id: ' + file_id + ' -- stored'

def do_something(data, c, d, file_id):
    print 'doing something', file_id
    time.sleep(random.random()*5)
    return data

def _process_file_id(file_id, a, b, c, d):
    try:
        data = get_data(file_id, a, b)
    except NoDataError as e:
        print 'file_id: ' + file_id + ' -- raised ' + str(type(e))  # log
        return
    if True:
        result = do_something(data, c, d, file_id)
        store_result(result, file_id)

def process_file_id(args):
    connection.close()
    file_id, a, b, c, d = args
    return _process_file_id(file_id, a, b, c, d)

def main():
    file_ids = ['a', 'b', 'c', 'd']
    a, b, c, d = 1, 'blub', 3, 4
    pool = Pool(3)
    pool.map(process_file_id, [(file_id, a, b, c, d) for file_id in file_ids])
    pool.close()
    pool.join()

if __name__ == '__main__':
    main()
Also alles in Allem noch mal Danke für die Hilfe.
Viele Grüße,
Nras.
Antworten