Schleife Parallelisieren ohne später Daten zusammenzufügen

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
Nras
User
Beiträge: 24
Registriert: Dienstag 25. März 2014, 10:38

Hallo zusammen,

nachdem ich nun einige Zeit in Python investiert habe, komme ich schon gut zurecht und kriege fast alles hin, was ich möchte. Mein Programm sieht grob zusammengefasst so aus:

Code: Alles auswählen

file_ids = ['blabla1', 'blablub19',' blablala22']
a, b, c, d = 1, 'blub', 3, 4  # dummy parameter
for file_id in file_ids:
    data = get_data(file_id, a, b)   # load data from a db
    result = do_something(data, c, d)   # process data and geta result (numpy calculations)
    result.store() # store data in a db !
Die wichtigen Details sind mit drin:
Der Funktion get_data werden neben einem file identifier auch noch zwei weitereParameter (a, b) übergeben, sodass die richtigen Daten aus der Datenbank geladen werden können.
Der funktion do_something() werden neben den ausglesenen Daten noch weiterer Kram (c, d) übergeben, damit auch das richtig läuft.
Das alles funktioniert wunderbar nur gibt es noch einiges zu optimieren.

Während get_data() läuft, schlummern die CPUs gut rum, da hier "nur" aus einer Datenbank gelesen wird und somit auf die Daten gewartet wird. Während dann do_something() läuft, ist eine CPU auf 100% und die restlichen schlummern rum. in file_ids sind etwa 1000 Einträge.
Ich würde gerne diesen Prozess parallelisieren auf 3 CPUs, alle lokal auf meinem Rechner. Nun habe ich einige Sachen dazu gefunden, insbesondere das Modul "multiprocessing", davon dann "Pool". Die meisten Beispiele sind aber eher darauf ausgelegt, Sachen parallel zu berechnen und hinterher wieder zusammenzufügen. Den Part mit zusammenfügen brauche ich gar nicht, da alles direkt in eine Datenbank geschrieben wird.

Kann mir jemand dabei helfen, obigen Code zu parallelisieren, zunächst auf 3 CPUs?

Vielen Dank,
Nras,
Zuletzt geändert von Nras am Mittwoch 16. April 2014, 13:53, insgesamt 1-mal geändert.
anogayales
User
Beiträge: 456
Registriert: Mittwoch 15. April 2009, 14:11

Hallo Nras,

ich sehe da leider nicht wirklich viel Parallelisierungspotential. do_something(data, c, d) benötigt ja das Ergebnis von get_data und *muss* darauf warten. Man könnte höchste die Funktion get_data aufbohren und über das Producer & Consumer Problem das Ganze lösen. Dazu wissen wir aber zu wenig über die Funktion get_data. Ob sich der Aufwand bei 1000 Datensätzen lohnt ist auch sehr fragwürdig.

Grüße,
anogayales
Zuletzt geändert von anogayales am Mittwoch 16. April 2014, 13:58, insgesamt 2-mal geändert.
Nras
User
Beiträge: 24
Registriert: Dienstag 25. März 2014, 10:38

Hallo,

genau. Aber das Ergebnis von file_ids ist unabhängig vom Ergebnis von file_ids[i+1]. Das heißt die ganzen Berechnungen sollten für 3 file_ids parallel laufen können, nicht für eine einzelne file_id.
Also jede CPU soll sich immer die nächste abzuarbeitende file_id schnappen und get_data() und do_something() und .store() damit machen.

Viele Grüße,
Nras.
anogayales
User
Beiträge: 456
Registriert: Mittwoch 15. April 2009, 14:11

Okay das ist prinzipiell möglich. Weißt du ob deine DB konkurrierenden (multi threaded) Zugriff erlaubt?
BlackJack

@Nras: Macht es denn Sinn dann dreimal `getdata()` parallel laufen zu haben wenn das doch der Beschreibung nach der Flaschenhals ist? Das dauert dann im besten Fall nur dreimal so lange wie ein einzeln laufendes `getdata()` also parallel genau so lange wie sonst sequentiell, aber vielleicht auch *länger*. Man müsste also eher das holen der Datensätze sequentiell machen, und das Berechnen kann dann parallel zum holen des jeweils nächsten Datensatzes passieren. Ob man dann wirklich drei CPUs damit beschäftigen kann, hängt davon ab wie die Zeitverhältnisse zwischen Daten holen und Verarbeiten sind.

Man könnte also das holen der Daten parallel laufen lassen, und die Berechnung dann dem Pool übergeben, damit das so parallel wie möglich läuft. Beim Ergebnis zurückschreiben müsste man dann wieder schauen ob man das nicht besser wieder parallelisiert um nicht die Datenbankverbindung damit zu „verstopfen” und es am Ende langsamer zu machen.
Nras
User
Beiträge: 24
Registriert: Dienstag 25. März 2014, 10:38

Hallo,

ich bin totaler Datenbank-Laie. Ehrlich gesagt bin ich mir, nach kurzer Google-Befragung, nicht sicher, was das überhaupt ist und somit weiß ich auch nicht, ob die Datenbank das zulässt. Falls das so richtig wichtig ist, kann ich das in Erfahrung bringen. Ansonsten hilft es vielleicht schon, wenn du weißt, dass es eine mysql Datenbank ist und auch der Name Django des öfteren erwähnt wurde.
Die Datenbank, aus der ich die Daten lese ist auch eine andere als ich reinschreibe. Vielleicht löst das schon das Problem?

Viele Grüße,
Nras.
Nras
User
Beiträge: 24
Registriert: Dienstag 25. März 2014, 10:38

@BlackJack
Also der get_data()-Part braucht deutlich kürzer als der do_something()-Part. Jedoch benutzen beide Teilparts jeweils nur einen Kern.
Ich denke, ich kann mich schlecht ausdrücken.

Ich möchte
CPU 1: macht data = get_data(file_ids[0], a, b) --> do_something(data, c, d) --> store.
gleichzeitig soll passieren:
CPU 2: macht data = get_data(file_ids[1], a, b) --> do_something(data, c, d) --> store.
CPU 3: macht data = get_data(file_ids[2], a, b) --> do_something(data, c, d) --> store.

Nun sind die 3 beschäftigt, eine davon wird als erstes ferig werden, beispielsweise CPU 2. Dann soll CPU 2 mit file_ids[3] weitermachen.

Das ist das, was ich mit "parallelisieren" meine.
BlackJack

@Nras: Dann erstelle einfach einen Pool und verwende entweder `apply_async()` um die Funktion die das alles macht mit jeder ID und a, b, c, und d aufzurufen. Oder erstelle eine entsprechende Liste mit Elementen die diese Daten enthalten und verwende `imap_unordered()` und iteriere über das Ergebnis. Was davon hängt so ein bisschen davon ab wie Du mit Fehlern umgehen willst. Bei `imap_unordered()` wird beim Iterieren eine Ausnahme ausgelöst wenn in der asynchron ausgeführten Funtkion eine Ausnahme ausgelöst wurde. Bei `apply_async()` passiert das beim Abfragen des Rückgabewertes beziehungsweise kann man `wait()` aufrufen und dann mit `successful()` abfragen ob es ein Problem gab.

Dann wäre da noch die Frage wie umfangreich a, b, c, und d sind. Also ob das ein Problem wird, die bei jedem asynchronen Aufruf zum Prozess zu übermitteln der diesen Aufruf abarbeitet.
Nras
User
Beiträge: 24
Registriert: Dienstag 25. März 2014, 10:38

@BlackJack
@Nras: Dann erstelle einfach einen Pool und verwende entweder `apply_async()` um die Funktion die das alles macht mit jeder ID und a, b, c, und d aufzurufen. Oder erstelle eine entsprechende Liste mit Elementen die diese Daten enthalten und verwende `imap_unordered()` und iteriere über das Ergebnis
Ich glaube nach meiner Suche im Internet, dass ich genau das will, denn das mit multiprocessing und Pool hatte ich in meinem Eingangspost erwähnt. Bei genau dem Part würde ich gerne die Hilfe in Anspruch nehmen und fragen, wie das bei dem obigen mini-Beispiel auszusehen hätte.
Die Parameter a,b,c und d sind nicht groß (und in Wirklichkeit noch ein paar mehr). Aber es sind kurze Strings bzw datetime Objekte oder integers. Also alles keine Listen oder ganze Arrays.

Vielen Dank,
Nras.
BlackJack

Wenn Fehler/Ausnahmen bei der Verarbeitung einer ID völlig egal sind, oder die Behandlung komplett in der Verarbeitungsfunktion abgewickelt wird, dann könnte das so aussehen (natürlich ungetestet):

Code: Alles auswählen

def process_file_id(file_id, a, b, c, d):
    data = get_data(file_id, a, b)
    result = do_something(data, c, d)
    result.store()


def main():
    file_ids = ['blabla1', 'blablub19',' blablala22']
    a, b, c, d = 1, 'blub', 3, 4
    pool = Pool()
    for file_id in file_ids:
        pool.apply_async(process_file_id, (file_id, a, b, c, d))
    pool.join()
Nras
User
Beiträge: 24
Registriert: Dienstag 25. März 2014, 10:38

@BlackJack
Danke, das habe ich gleich mal getestet (s.u. für lauffähiges Beispiel).
Wenn ich es richtig sehe, benötige ich aber das "pool.join()" in Zeile 13 gar nicht, weil aus meiner for-Schleife nichts zusammengebaut werden muss, sondern die einzelnen Sachen direkt weggeschrieben werden. Wäre das korrekt?

Zu den Fehlern:
Es gibt um das get_data(file_id, a, b) noch ein try/except folgender Bauweise, die ich bisher hier nicht geschrieben habe:

Code: Alles auswählen

file_ids = ['blabla1', 'blablub19',' blablala22']
a, b, c, d = 1, 'blub', 3, 4  # dummy parameter
for file_id in file_ids:
    try:
        data = get_data(file_id, a, b)   # load data from a db
    except NoDataException as e:
        # hier wird in ein log geschrieben, etc
        continue
    result = do_something(data, c, d)   # process data and compute the result (numpy calculations)
    result.store() # store data in db
Ansonsten werden alle Exceptions in den jeweiligen Funktionen abgewickelt.

Folgendes Minibeispiel mit Dummyimplementationen läuft schon mal durch, macht aber unerwartete Sachen:

Code: Alles auswählen

from multiprocessing import Pool

def get_data(file_id, a, b):
    if file_id == 'blablub19':
        raise ValueError
    return 1

def store_result(result):
    print 'result stored'

def do_something(data, c, d):
    return data

def process_file_id(file_id, a, b, c, d):
    data = get_data(file_id, a, b)
    result = do_something(data, c, d)
    store_result(result)

def mainfun():
    file_ids = ['blabla1', 'blablub19',' blablala22']
    a, b, c, d = 1, 'blub', 3, 4
    pool = Pool()
    for file_id in file_ids:
        try:
            pool.apply_async(process_file_id, (file_id, a, b, c, d))
            print file_id + ' done'
        except ValueError:
            print file_id + ' failed'
            continue
Ein Aufruf davon in ipython liefert:

Code: Alles auswählen

mainfun()
blabla1 done
blablub19 done
result stored
 blablala22 done
result stored
Es wird zwei mal gespeichert (das ist ok), "blabla1 done" kommt direkt vor "blabla19 done", das deutet darauf hin, dass das Parallelisieren klappt.
Aber es ist für alle file_ids der try-Block abgearbeitet worden und es wurde nicht in den except-Block gegangen. Das ist vermutlich das, was du ansprechen wolltest mit deinen Fragen zur Fehlerbehandlung? Ich hätte erwartet, dass ich ein "blablub19 failed" lese....

Schon mal herzlichen Dank und viele Grüße,
Nras.
BlackJack

@Nras: Das `join()` steht dort um zu warten bis alle Aufträge des Pools abgearbeitet sind. Denn wenn das Hauptprogramm endet, möchte man ja nicht das die gestarteten Prozesse mit dem Hauptprogramm beendet werden auch wenn sie noch gar nicht alles abgearbeitet haben.

Es wird nicht in `mainfun()` in den ``except:``-Zweig gegangen weil da ja nichts ist was einen `ValueError` auslöst. Das `apply_async()` startet die übergebene Funktion asynchron. Und auch nicht zwingend sofort, denn wenn man mehr Funktionen startet als gleichtzeitig Arbeitsprozesse im Pool sind, werden die in eine Warteschlange eingereiht.

Wenn man wissen will ob es bei der asynchron ausgeführten Funktion eine Ausnahme gab, muss man das Ergebnis untersuchen. Also warten bis die Funktion tatsächlich ausgeführt wurde und dann schauen ob es ein Ergebnis oder eine Ausnahme gab.

Code: Alles auswählen

def main():
    file_ids = ['blabla1', 'blablub19',' blablala22']
    a, b, c, d = 1, 'blub', 3, 4
    pool = Pool()
    results = [
        (file_id, pool.apply_async(process_file_id, (file_id, a, b, c, d)))
        for file_id in file_ids
    ]
    for file_id, result in results:
        result.wait()
        print file_id, 'done' if result.successful() else 'failed'
Das ``continue`` in `mainfun()` ist übrigens überflüssig. Das ist eine Anweisung die ich sowieso generell versuche zu meiden, weil das ein Sprung mitten aus verschachteltem Code heraus an den Schleifenanfang ist, den man der Quelltextstruktur nicht so leicht ansehen kann.
Nras
User
Beiträge: 24
Registriert: Dienstag 25. März 2014, 10:38

@BlackJack
Vielen Dank für deine Mühe. Du hast recht, das ``continue`` war in dem Fall unnötig, in dem Code-Beispiel dadrüber hatte ich es aber gebraucht.

Wieso wird denn aber in dem `pool.apply_async(process_file_id, (file_id, a, b, c, d))` kein ValueError ausgelöst? Dort wird doch `get_data()` aufgerufen und und für die file_id 'blablub19' wird dieser ValueError ausgelöst (entschuldige die Namen der file_ids, hätte ich geahnt, dass das so oft auftauchen wird, hätte ich da etwas anderes gewählt).

Ich bin mir nicht sicher, ob ich das, was du schreibst, verstehe. Aber vor deiner Antwort habe ich die ``try-except``-Geschichte aus der Schleife geholt und in `process_file_id()` reingeschrieben, sodass ich folgenden Code erhalte:

Code: Alles auswählen

from multiprocessing import Pool

def get_data(file_id, a, b):
    if file_id == 'blablub19':
        raise ValueError
    return 1

def store_result(result):
    print 'result stored'

def do_something(data, c, d):
    return data

def process_file_id(file_id, a, b, c, d):
    try:
        data = get_data(file_id, a, b)
        result = do_something(data, c, d)
        store_result(result)
        print file_id + ' done'
    except ValueError:
        print file_id + ' failed'

def mainfun():
    file_ids = ['blabla1', 'blablub19',' blablala22']
    a, b, c, d = 1, 'blub', 3, 4
    pool = Pool()
    for file_id in file_ids:
        pool.apply_async(process_file_id, (file_id, a, b, c, d))
Bitte korrigier mich, wenn ich falsch liege, aber ich denke, dass das nun das ist, was ich haben möchte, denn die Ausgabe von einem Aufruf davon in ipython sieht nun so aus, wie ich mir das vorstelle.

Code: Alles auswählen

In [2]: mainfun()
In [3]: result stored
blabla1 done
blablub19 failed
result stored
 blablala22 done
Noch mals herzlichen Dank und viele Grüße,
Nras.
BlackJack

@Nras: Das ``continue`` in dem Beispiel darüber kann man mit einem ``else`` loswerden:

Code: Alles auswählen

for file_id in file_ids:
    try:
        data = get_data(file_id, a, b)   # load data from a db
    except NoDataException as e:
        pass # hier wird in ein log geschrieben, etc
    else:
        # process data and compute the result (numpy calculations)
        result = do_something(data, c, d)
        result.store()  # store data in db
`pool.apply_async()` führt `get_data()` ja nicht direkt aus, sondern asynchron in einem anderen Prozess. Die `apply_async()` Methode muss doch *sofort* zurückgekehren, sonst macht das doch alles überhaupt keinen Sinn wenn die an der Stelle erst einmal warten würde ob irgendwo und irgendwann in `process_file_id()` eine Ausnahme auftritt. Man will doch gerade nicht warten sondern sofort den Aufruf für die nächste `file_id` an den Pool übergeben. Wenn beim Abarbeiten der Funktion die man `apply_async()` eine Ausnahme auftritt, dann wird das in dem Ergebnisobjekt vermerkt, welches von `apply_async()` zurückgegeben wird. Von da Frage ich das bei meinem letzten Beispiel dann ja auch ab.

Dein letzter Quelltext verlagert die Ausnahmebehandlung in `process_file_id()`. Wobei da nur `ValueError` behandelt wird. Andere Ausnahmen die in der Funktion auftreten werden stillschweigend ignoriert.

Der Code macht das was Du möchtest, er arbeitet `process_file_id()` auf mehreren Prozessen parallel ab.

Ich würde da noch ein `pool.join()` ans Ende von `mainfun()` setzen, damit man sicher sein kann, dass am Ende der `mainfun()` auch tatsächlich alle `file_ids` abgarbeitet wurden.
Nras
User
Beiträge: 24
Registriert: Dienstag 25. März 2014, 10:38

@BlackJack,
Ich hoffe, ich falle dir nicht zu sehr zur Last :). Nun habe ich es fast verstanden. Aber (leider) kommt nun noch eine Rückfrage von mir
`pool.apply_async()` führt `get_data()` ja nicht direkt aus, sondern asynchron in einem anderen Prozess. Die `apply_async()` Methode muss doch *sofort* zurückgekehren, sonst macht das doch alles überhaupt keinen Sinn wenn die an der Stelle erst einmal warten würde ob irgendwo und irgendwann in `process_file_id()` eine Ausnahme auftritt. Man will doch gerade nicht warten sondern sofort den Aufruf für die nächste `file_id` an den Pool übergeben. Wenn beim Abarbeiten der Funktion die man `apply_async()` eine Ausnahme auftritt, dann wird das in dem Ergebnisobjekt vermerkt, welches von `apply_async()` zurückgegeben wird. Von da Frage ich das bei meinem letzten Beispiel dann ja auch ab.
Also das mit dem Warten ist mir noch nicht klar. Vielleicht ist mir auch das Prinzip "asynchron" nicht klar.
Nach meiner Vorstellung soll folgendes passieren:
  • Es soll für den Fall, dass `get_data()` einen "NoDataError" wirft, einfach kurz was geschrieben werden und dann soll weitergemacht werden mit einer weiteren ``file_id``. Das ist zur Zeit der Fall und das ``apply_async()`` meldet hinterher auch, dass das ordnungsgemäß abgearbeitet wurde. Im Log steht dann auch, dass es nicht geklappt hat.
  • Alle anderen Fehler/Ausnahmen, die nicht behandelt werden, werden zur Zeit hinterher in ``results`` vermerkt und es wird auch mit der nächsten file_id weitergearbeitet. Ich bin mir aber nicht sicher, ob ich das möchte. Wenn beispielsweise bei ``store_result(result)`` was schief läuft oder ``get_data()`` was anderes wirft als ``NoDataError``, wäre es sinnvoll, *nicht* mit den anderen file_ids weiterzuarbeiten, sondern abzubrechen und nicht erst am Ende festzustellen, dass ewig lange berechnet wurde, aber Sachen nicht funktioniert haben (Beispiel: ``file_id``: 'd', da klappt was beim Speichern nicht).
Geht soetwas auch zu implementieren?

Übrigens gibt mir das ``pool.join()`` eine Assertion. Die kriege ich die jedoch weg, wenn ich zuvor ein pool.close() schreibe: http://stackoverflow.com/questions/3682 ... -processes. Leider weiß ich auch hier nicht wirklich, was ich damit tue.
Code zur Zeit:

Code: Alles auswählen

from multiprocessing import Pool

class NoDataError(ValueError):
    pass

def get_data(file_id, a, b):
    if not file_id in ['a', 'c', 'd']:
        raise NoDataError
    return 1

def store_result(result, file_id):
    if file_id == 'd':  # <--- some unforseen Error
        raise NameError 
    print 'file_id: ' + file_id +' -- stored'

def do_something(data, c, d):
    #while True:  # <--- testing number of active cpus
        #pass
    return data

def process_file_id(file_id, a, b, c, d):
    try:
        data = get_data(file_id, a, b)
    except NoDataError:
        print 'file_id: ' + file_id +' -- raised NoDataError' # log
    else:
        result = do_something(data, c, d)
        store_result(result, file_id)

def mainfun():
    file_ids = ['a', 'b', 'c', 'd']
    a, b, c, d = 1, 'blub', 3, 4
    pool = Pool(3)
    results = [
        (file_id, pool.apply_async(process_file_id, (file_id, a, b, c, d)))
        for file_id in file_ids
    ]

    for file_id, result in results:
        result.wait()
        print file_id, 'done!' if result.successful() else 'failed'
    # pool.close()
    pool.join()
Ausgabe nach Aufruf in ipython

Code: Alles auswählen

In [2]: mainfun()
file_id: a -- stored
file_id: b -- raised NoDataError
file_id: c -- stored
a done!
b done!
c done!
d failed
---------------------------------------------------------------------------
AssertionError                            Traceback (most recent call last)
<ipython-input-2-b2465dbe9329> in <module>()
----> 1 mainfun()

<ipython-input-1-c307934b1341> in mainfun()
     47         result.wait()
     48         print file_id, 'done!' if result.successful() else 'failed'
---> 49     pool.join()

/usr/lib64/python2.7/multiprocessing/pool.pyc in join(self)
    431     def join(self):
    432         debug('joining pool')
--> 433         assert self._state in (CLOSE, TERMINATE)
    434         self._worker_handler.join()
    435         self._task_handler.join()

AssertionError: 

Falls du dazu noch Zeit und Lust hättest, wäre ich hoch erfreut.
Viele Grüße,
Nras.
BlackJack

@Nras: Asynchron heisst die Funktion wird gleichzeitig zur aktuellen ausgeführt. Das heisst der `apply_async()`-Aufruf wartet nicht bis `process_file_id()` ausgeführt wurde, sondern kehrt *sofort* zurück. Das bedeutet auch das die Schleife schon längst zuende sein kann/wird, bevor alle Aufrufe von `process_file_id()` durchgelaufen sind oder überhaupt gestartet wurden. Wir reden ja von diesem Quelltext, und warum da nie ein `ValueError` kommt, auch wenn der von `process_file_id()` ausgelöst wird:

Code: Alles auswählen

def mainfun():
    file_ids = ['blabla1', 'blablub19',' blablala22']
    a, b, c, d = 1, 'blub', 3, 4
    pool = Pool()
    for file_id in file_ids:
        try:
            pool.apply_async(process_file_id, (file_id, a, b, c, d))
            print file_id + ' done'
        except ValueError:
            print file_id + ' failed'
Es wird `pool.apply_async()` aufgerufen und dann sofort ohne zu warten das ``print`` in der nächsten Zeile aufgerufen. Die `process_file_id()` ist zu dem Zeitpunkt entweder noch gar nicht aufgerufen worden, sie fangt gerade an ausgeführt zu werden. Die Schleife ``for``-Schleife wird ziemlich schnell durchlaufen und zwar unabhängig von der Zeit die ein `process_file_id()`-Aufruf insgesamt benötigt, denn entweder wird einem freien Worker-Prozess gesagt „führ die Funktion mal aus”, oder der Aufruf wird in eine Warteschlange gesteckt falls als Worker-Prozesse gerade schon beschäftigt sind. Das ``except`` bezieht sich nur auf den ``try``-Block, also nur für den Zeitraum wo auch wirklich diese beiden Zeilen ausgeführt werden, in *diesem* Prozess.

Die ``for``-Schleife wird auch beendet sein bevor alle `process_file_id()`-Aufrufe abgearbeitet worden sind, sofern die nicht wirklich sehr kurz sind. Deshalb würde ich ja bei dieser Variante den `pool.join()` machen nach der Schleife um sicherzustellen, dass die `mainfun()` erst zum Aufrufer zurückkehrt wenn alle IDs abgearbeitet wurden.

Beim `join()` muss man vorher `close()` oder `terminate()` aufgerufen haben, das habe ich übersehen. Bei meiner letzten Variante wo ich die Ergebnisse vom `apply_async()` aufgehoben habe, braucht man das `join()` auch nicht weil ich ja auf jedem Ergebnis einmal `wait()` aufgerufen habe, also sicher sein kann, dass die am Ende auch alle zuende abgearbeitet wurden.

Wenn man die Berechnungen abbrechen möchte, kann man `imap_unordered()` verwenden, was automatisch abbricht wenn man versucht ein Ergebnis abzufragen bei dem eine Ausnahme aufgetreten ist. Ein Problem könnte sein, dass man im Hauptprozess an der Stelle nicht mehr herausfinden kann welche `file_id` von dem Fehler betroffen war. Ich habe das mal in der `process_file_id()` ausgeben lassen:

Code: Alles auswählen

#!/usr/bin/env python
import random
import time
from multiprocessing import Pool


class NoDataError(ValueError):
    pass


def get_data(file_id, a, b):
    if file_id not in ['a', 'c', 'd']:
        raise NoDataError
    return 1


def store_result(result, file_id):
    if file_id == 'c':  # <--- some unforseen Error
        raise NameError
    print 'file_id: ' + file_id + ' -- stored'


def do_something(data, c, d):
    # while True:  # <--- testing number of active cpus
        # pass
    return data


def process_file_id((file_id, a, b, c, d)):
    try:
        try:
            data = get_data(file_id, a, b)
        except NoDataError:
            print 'file_id: ' + file_id + ' -- raised NoDataError'  # log
        else:
            result = do_something(data, c, d)
            time.sleep(random.random() * 5)
            store_result(result, file_id)
    except:
        print 'file_id:', file_id, 'failed'
        raise


def main():
    file_ids = ['a', 'b', 'c', 'd']
    a, b, c, d = 1, 'blub', 3, 4
    pool = Pool(3)
    for _ in pool.imap_unordered(
        process_file_id, ((file_id, a, b, c, d) for file_id in file_ids)
    ):
        pass


if __name__ == '__main__':
    main()
Nras
User
Beiträge: 24
Registriert: Dienstag 25. März 2014, 10:38

Hallo,

vielen Dank, ich denke, ich habe das nun verstanden.

Viele Grüße,
Nras.
Nras
User
Beiträge: 24
Registriert: Dienstag 25. März 2014, 10:38

Hallo,

ich will noch eine kurze Zusammenfassung als Rückmeldung geben, falls jemand mal so ein ähnliches Problem hat und diese Seite hier findet:

Es gab noch kleinere Hürden und letztenendlich habe ich mich für ``map`` und nicht ``imap_unordered`` entschieden, da mir dies simpler erscheint und mir der Unterschied zwischen den beiden nicht klar war. Zudem bricht ``map`` auch bei nicht abgefangenen Fehlern/Ausnahmen sofort ab - das ist das, was ich gerne hätte (mag sein, dass ``imap_unordered`` das auch macht).
''map'' akzeptiert nur ein Argument, (das ``imap_unordered`` wohl auch, daher noch eine Wrapper-Funktion.
Die parallelisierte Funktion muss im selben Modul liegen, in dem die Parallelisierung stattfindet.
Da die Datenbankzugriffe (v.a. die ersten n Stück bei n CPUs) nahezu gleichzeitig stattfinden, gab es beim mysql noch das Problem, dass sie versuchen, dieselbe Verbindung zu nutzen, was in mysql-Fehlern endete. Lösung: zwingen, eine neue Verbindung aufzubauen: http://stackoverflow.com/questions/8242 ... onnections Also ein connection.close() zu Anfang jeder Datenbankabfrage.

Der lauffähige Code sieht nun so aus (Kommentare Zeile 23 und 24 entfernen für NameError und dadurch Abbruch)

Code: Alles auswählen

import random
import time
from multiprocessing import Pool

from django.db import connection

class NoDataError(ValueError):
    pass


def do_work(args):
    connection.close()  # <-- makes sure that a new db connection is used
    file_id, a, b, c, d = args
    return process_file_id(file_id, a, b, c, d)


def get_data(file_id, a, b):
    if not file_id in ['a', 'c', 'd']:
        raise NoDataError
    return 1

def store_result(result, file_id):
    # if file_id == 'd':  # <--- some unforseen Error (uncomment for testing)
    #     raise NameError
    print 'file_id: ' + file_id + ' -- stored'

def do_something(data, c, d, file_id):
    print 'doing something', file_id
    time.sleep(random.random()*5)
    return data

def _process_file_id(file_id, a, b, c, d):
    try:
        data = get_data(file_id, a, b)
    except NoDataError as e:
        print 'file_id: ' + file_id + ' -- raised ' + str(type(e))  # log
        return
    if True:
        result = do_something(data, c, d, file_id)
        store_result(result, file_id)

def process_file_id(args):
    connection.close()
    file_id, a, b, c, d = args
    return _process_file_id(file_id, a, b, c, d)

def main():
    file_ids = ['a', 'b', 'c', 'd']
    a, b, c, d = 1, 'blub', 3, 4
    pool = Pool(3)
    pool.map(process_file_id, [(file_id, a, b, c, d) for file_id in file_ids])
    pool.close()
    pool.join()

if __name__ == '__main__':
    main()
Also alles in Allem noch mal Danke für die Hilfe.
Viele Grüße,
Nras.
Antworten