Multiprocessing - wie richtig nutzen?

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
hera
User
Beiträge: 18
Registriert: Sonntag 7. April 2013, 17:16

Hallo zusammen,

habe mir jetzt das ein oder andere Tutorial und Programmbeispiel zu multiprocessing angeschaut (u.a. http://jeetworks.org/node/81 und http://jeetworks.org/node/81) komme aber nicht weiter und hoffe ihr könnt mir helfen.

Folgendes will ich tun:
Aus meinem Hauptprogramm rufe ich regelmäßig ein Unterprogramm auf (Methode), die mir Ergebnisse für das Hauptprogramm liefert. Ohne diese Ergebnisse kann das Hauptprogramm nichtweiterlaufen. Die Methode im Unterprogramm wird in einer while-Schleife bis zu 7 mal aufgerufen. Erst wenn alle max. 7 Aufrufe erfolgt sind und die Ergebnisse (=list) aus allen Aufrufen vorliegen, können diese Ergebnisse im Hauptprogramm zusammengesetzt werden und weiterverarbeitet werden. Also wird an irgendeiner Stelle ein join() eingebaut werden müssen.

Weil die Aurufe des Unterprogramms parallel bearbeitet werden können und unabhängig voneinander sind, würde ich die natürlich auch gerne parallel bearbeiten. Was ich bisher umsetzen konnte ist die Aufrufe im Unterprogramm als Threads durchzuführen. So schön, so gut, aber die Berechnung erfolgt leider weiterhin nur auf einem Prozessor und gewonnen habe ich dadurch nichts. Im Gegenteil, die Rechenzeit hat sogar zugenommen. Deshalb würde ich gerne alle Prozessoren in der Maschine nutzen, daher multiprocessing.

In einer Art Pseudocode sieht das im Moment so aus:

Code: Alles auswählen

Aus dem Hauptprogramm kommend:
while weitersuchen:
	Ergebnisse_aus_allen_max_7_durchlaufen = []
	for p in [1, 2, ..., 7]: # also die max. 7 Aufrufe
		Ergebnis = Klasse.solve_Problem(p)
		
		if len(Ergebnis)>0:
			Ergebnis = self.tue_was(Ergebnis, Vergleichswert)
		len_Ergebnis = len(Ergebnis)
		if len_Ergebnis>0:
			for i in range(len_Ergebnis):
				Ergebnisse_aus_allen_max_7_durchlaufen.append(Ergebnis[i])
	weitersuchen = False
Was ich gerne hätte wäre:

Code: Alles auswählen

Aus dem Hauptprogramm kommend: 
while weitersuchen:
    for a in range(meine_max_7_Aufrufe):
        Ergebnis = Berechne_in_einem separaten_Prozess_das_Ergebnis_von(Klasse.Methode(Parameter1, Parameter2, a))
        warte_bis_alle_Ergebnisse_vorliegen
        vereinige_alle_Ergebnisse_zu_einer_langen_Liste
        len_Ergebnis = len(vereinige_alle_Ergebnisse_zu_einer_langen_Liste)
        if len_Ergebnis>0:
 	    for i in range(len_Ergebnis):
                Ergebnisse_aus_allen_max_7_durchlaufen.append(Ergebnis[i])
    weitersuchen = False
Hat da jemand ein Beispiel, wie so was aussehen kann.
Meine Versuche es mit Multiprocessing umzusetzen sind gescheitert, mir wird nicht klar wir ich die Anzahl der Prozesse steuern kann und wie ich die Ergebnisse zusammenführe. Bisher werde unendlich viele Prozesse erzeugt, aber kein Ergebnis...

Würde mich freuen, wenn jemand etwas mit diesem Beispiel anfangen kann.

HeRa
BlackJack

@hera: Der Pseudocode sieht teilweise ein wenig gruselig aus. Diese völlig sinnfreie ``while``-Schleife aussen, oder das ``for i in range(len(sequence)):`` nur im `i` für den Indexzugriff zu verwenden „anti pattern”.

Zum Problem: Normalerweise würde man einen `multiprocessing.Pool` erstellen, eine Liste mit zu verarbeitenden Daten und eine Funktion die *ein* Datum verarbeiten kann und dann `Pool.map()` zum abarbeiten aufrufen. Zum entwickeln kann man erst einmal die eingebaute `map()`-Funktion verwenden (unter Python 3 zusammen mit einem `list()`-Aufruf auf das Ergebnis) und wenn das läuft `Pool.map()` einsetzen.
hera
User
Beiträge: 18
Registriert: Sonntag 7. April 2013, 17:16

Hallo BlackJack,

die Sachen, die ich mir bisher angeschaut habe erzeugen tasks_queues und result_queues. Das Konzept des Pools scheint nun wieder was anderes zu sein. Ein kleines Beispiel dazu habe ich http://stackoverflow.com/questions/5442 ... -arguments gefunden. Wenn mein Versuch des "Pseudocodes" auch nicht wirklich elegant ist, wäre es möglich mir ein ähnliches Beispiel zu posten, um mit den Pools und den maps vertraut zu werden. Wo würde ich bspw. meine Schleife unterbringen, um die bis zu 7 Aufrufe auszuführen, wo würde ich die Aufrufe dann starten (ist bei mir eine Methode namens solve_problem(), um das Ergebnis zu bekommen und wo werden die bis zu 7 unterschiedlichen Ergebnislisten dann ausgegeben/zusammengeführt? Oh und ich verwende Python 2.7. Ein Upgrade ist nicht möglich, da noch andere Bibliotheken eingebunden sind, die nicht mit neueren Pythonversionen zurechtkommen.

Grüße und vielen Dank für den Pool-Hinweis
HeRa
BlackJack

@hera: `Pool.map()` funktioniert wie das normale `map()` in Python 2.7, nur das die Ausführung parallel ist. Und es gibt noch das `chunksize`-Argument, welches man bei nur sieben Elementen wohl explizit als 1 angeben sollte.

Beim Beispiel ist das Problem das ich Deinen Pseudocode nicht verstehe. Ich dachte ja erst, dass die ``while``-Schleife sinnfrei ist, sehe jetzt aber, dass die Einrückung falsch ist, man also gar nicht sagen kann ob die Schleife sinnfrei ist, oder einfach nur wegen der kaputten Einrückung so aussieht.
hera
User
Beiträge: 18
Registriert: Sonntag 7. April 2013, 17:16

das mit dem pseudocode versuche ich nun nochmal, das while kann man ignorieren. So sieht der Ablauf also im Moment auf. Parallelisieren würde ich gerne den Aufruf Klasse.solve_Problem(.).
Die Ergebnisse aus den parallelen Prozessen müssen so nachbehandelt werden, wie auch im Code unten. Also bereinigen um doppelte Einträge und dann der Gesamtergebnisliste anhängen.

Code: Alles auswählen

   
# Eine liste wird erzeugt, die die Ergebnisse aus max. 7 Aufrufen enthält
Ergebnisse_aus_allen_max_7_durchlaufen = []

    # die Methode solve_Problem wird max. 7mal aufgerufen und je Aufrufe wird ein Ergebnis in Form einer liste
    # zurückgegeben
    for p in [1, 2, ..., 7]: 
        # je Iteration wird ein Ergebnis für ein neues p erzeugt
        # ein Ergebnis eines Durchgangs ist eine Liste
        Ergebnis = Klasse.solve_Problem(p, Parameter1, Parameter2) 
        # ist in der Ergebnislist überhaupt was enthalten?
        if len(Ergebnis)>0:
            # lösche die Ergebnisse aus der Liste, die dem Vergleichswert entsprechen
            Ergebnis = self.tue_was(Ergebnis, Vergleichswert)
        # wenn nach dem löschen aus der ersten if-Prüfung immer noch Ergebnisse enthalten sind
        # dann hänge die noch vorhandenen Ergebnisse an die Gesamtergebnisliste an (aus den max. 7 Läufen) 
        if len(Ergebnis)>0:
           for i in range(len_Ergebnis):
               Ergebnisse_aus_allen_max_7_durchlaufen.append(Ergebnis[i])
Ich hoffe, ich konnte es nun etwas verständlicher machen.
HeRa
BlackJack

@hera: Die ``if``-Abfragen sind eigenltich überflüssig. Wenn `Ergebnis` leer ist, dann passiert bei den beiden Operationen auch nichts mit den nicht vorhandenen Elementen, also kann man sich das ``if`` jeweils sparen.

Und wie gesagt: ``for i in len(sequence):`` ist in Python ein „anti pattern”, weil das unnötig umständlich ist den Umweg über einen Index zu gehen. Man kann *direkt* über Elemente einer Liste beziehungsweise iterierbaren Objekten im Allgemeinen iterieren. Und wenn man die Elemente eines iterierbaren Objektes an eine Liste anfügen möchte, braucht man gar keine explizite Schleife, denn dafür haben Listen die `extend()`-Methode. Das würde letztendlich auf das hier zusammenschrumpfen:

Code: Alles auswählen

    ergebnisse_aus_allen_max_7_durchlaeufen = list()
    for result in pool.map(
        lambda p: Klasse.solve_problem(p, parameter1, parameter2), range(1, 8)
    ):
        ergebnisse_aus_allen_max_7_durchlaeufen.extend(
            self.tue_was(result, vergleichswert)
        )
Wo jetzt hier die Probleme liegen könnten wäre zum Beispiel `Klasse`. Ist das tatsächlich eine Klasse und `solve_problem()` eine statische Methode oder eine Klassenmethode (`staticmethod()` oder `classmethod()`) oder ist `Klasse` hier entgegen der Namenskonvention ein Exemplar? Das muss dann ja auch in alles Prozessen existieren, und ich weiss nicht ob das serialisiert und übertragen wird, oder ob man sich darum nicht selbst kümmern muss.

Letztendlich hängt viel davon ab was Du *tatsächlich* tust/tun willst, um sagen zu können wie man `multiprocessing` damit benutzt, oder ob es überhaupt dafür geeignet ist.
Benutzeravatar
snafu
User
Beiträge: 6740
Registriert: Donnerstag 21. Februar 2008, 17:31
Wohnort: Gelsenkirchen

@hera: Probier doch mal concurrent.futures für dein Vorhaben aus. Beispiel zur Nutzung:

Code: Alles auswählen

from concurrent import futures
import random
import time

def calculate(arg):
    time.sleep(1)
    x = arg + random.randrange(1, 10)
    return random.sample(range(x * 1000), 10)

def get_results(num_calculations):
    args = list(range(num_calculations))
    random.shuffle(args)
    results = []
    workers = num_calculations // 2
    with futures.ProcessPoolExecutor(max_workers=workers) as executor:
        for values in executor.map(calculate, args):
            results.extend(values)
    return results

def test():
    return get_results(10)
`calculate()` schläft hier 1 Sekunde lang und gibt dann 10 zufällige Werte als Liste zurück. Beachte, dass mein `time.sleep()` natürlich nur irgendeine längere Berechnung simulieren soll. In `get_results()` füttere ich dann die `calculate()` mit Zufallsargumenten zum Ausrechnen. Der jeweilige Befehl zum Errechnen mit dem nächsten Argument wird dabei über `executor.map()` gesteuert. BlackJack hatte die grundsätzliche Funktionsweise von `map()` ja bereits angesprochen: Es durchläuft halt sein zweites Argument (hier: `args`) und wendet das aufrufbare erste Argument (hier die `calculate`-Funktion) auf jedes dieser Elemente an. Sobald eine der von `calculate()` durchgeführten Berechnungen fertig geworden ist, wird dessen Ergebnis automatisch für den nächsten Durchlauf der `for`-Schleife in `get_results()` übergeben und im Schleifenkörper entsprechend mittels `.extend()` an die Liste aller Ergebnisse angehangen. Wenn am Ende alle Berechnungen fertig geworden sind, dann wird das Gesamtergebnis zurückgeliefert.

Du hast ja sicherlich auch schon `max_workers` bzw `workers` entdeckt: Dies ist ein optionales Argument. Wenn dies nicht angegeben wird, dann entspricht die Anzahl der Workers der Anzahl der zur Verfügung stehenden Prozessoren. Ich habe hier einfach mal die Hälfte der durchzuführenden Berechnungen genommen. Da wirst du aber sicherlich ein bißchen rumprobieren müssen. In meiner `test`-Funktion starte ich das Programm dann für 10 verschiedene Berechnungen. Jede dieser Berechnungen bräuchte ja normalerweise je 1 Sekunde. Durch die parallele Abarbeitung mit in dem Fall 5 Workern, sind es aber insgesamt nur circa 2 Sekunden bis das Gesamtergebnis angezeigt wird. Dies ist allerdings auf die Eigenheiten von `sleep()` zurückzuführen. Eine echte Berechnung bräuchte selbstverständlich länger. Ich würde wahrscheinlich zuerst das `max_workers`-Argument weglassen und dann ausprobieren, ob eine explizite Angabe verschiedener Testwerte irgendwas an Verbesserungen bringt.

Achja, der Code ist für Python 3 geschrieben, aber auch unter Python 2 ausführbar. Da Python 2 kein `concurrent.futures` Modul mitbringt, müsstest du es dort nachträglich installieren. Je nach Betriebssystem entweder über die Paketverwaltung deiner Distribution (z.B. hier für Ubuntu) oder aus PyPi (hier).

Ich hoffe, ich konnte dir damit ein bißchen weiterhelfen. :)
Zuletzt geändert von snafu am Donnerstag 18. Juli 2013, 14:47, insgesamt 1-mal geändert.
BlackJack

@snafu: Der `ImportError` sollte eigentlich niemals auftreten. Der Backport steckt auch in einem `concurrent`-Package.
Benutzeravatar
snafu
User
Beiträge: 6740
Registriert: Donnerstag 21. Februar 2008, 17:31
Wohnort: Gelsenkirchen

BlackJack hat geschrieben:@snafu: Der `ImportError` sollte eigentlich niemals auftreten. Der Backport steckt auch in einem `concurrent`-Package.
Stimmt. Ich werd's gleich mal editieren...
hera
User
Beiträge: 18
Registriert: Sonntag 7. April 2013, 17:16

Hallo zusammen,

erst einmal vielen Dank für die Antworten und die Hilfestellung.
Mit dem Ansatz von snafu kam ich nicht weiter, weil Python 3.2 nicht mit meiner Version von CPLEX zusammenspielt. Die Links zum Installieren führten zu .tar Dateien. Unter Windows kam ich damit nicht weit. Auch die Suche nach einer Version für Windows war nicht erfolgreich.
Dennoch habe ich mir das Beispiel von snafu angeschaut und unter Python 3.2 laufen lassen. Mein Ergebnis hier: es werden ziemlich viele Python-Prozesse erzeugt und der Rechner ist dann auch bald abgeschmiert.
gestartet habe ich mit test(). Was habe ich denn da falsch gemacht?!

Mit dem Beispiel von BlackJack bin ich heute auch nicht weitergekommen.

Code: Alles auswählen

    ergebnisse_aus_allen_max_7_durchlaeufen = list()
    for result in pool.map(
        lambda p: Klasse.solve_problem(p, parameter1, parameter2), range(1, 8)
    ):
        ergebnisse_aus_allen_max_7_durchlaeufen.extend(
            self.tue_was(result, vergleichswert)
        )
@BlackJack: an welcher Stelle muss ich denn pool.map mit dem Rest des Codes bekannt machen? ich habe mit import multiprocessing bzw. from multiprocessing import Pool experimentiert, das gab aber regelmäßig Fehlermeldungen.

Opfert sich jemand, um mir über Desktop-Sharing eine Unterstützung zu geben? Bin etwas ratlos, wie ich den Programmablauf noch besser im Forum ausformulieren kann. Erkenntnisse würde ich im Anschluss ins Forum posten. Sorry, für den etwas unkonventionellen Vorschlag.
BlackJack

@hera: *.tar-Dateien sollte man auch unter Windows entpackt bekommen, zum Beispiel mit 7zip oder WinZIP.

Sag mal kann es sein, dass Du den Hinweis aus der Dokumentation nicht beachtet hast, dass man das Hauptmodul ohne Seiteneffekte importieren können muss und das dann in jedem neuen Prozess wieder anfängt Prozesse zu starten? Siehe Programming guidelines → Windows, dritter Punkt „Safe importing of main module”. Und man sollte den Code auch nicht direkt in den ``if``-Block schreiben, sondern dort eine Funktion aufrufen, die den Code enthält. Sonst hat man unnötig Namen im Modulnamensraum, die dort nichts zu suchen haben.

`pool` wäre in meinem Beispiel einfach nur definiert als ``pool = multiprocessing.Pool()``.
hera
User
Beiträge: 18
Registriert: Sonntag 7. April 2013, 17:16

... ok, was heißt das nun im Klartext? Ich verstehe die Formulierung nicht "dass man das Hauptmodul ohne Seiteneffekt importieren können muss". Was ist in diesem Zusammenhang das Hauptmodul und was ist ein Seiteneffekt?

hier ist jetzt das Stückchen Code, um das es geht:

Code: Alles auswählen

                LABE_aller_perioden = []
                pool = multiprocessing.Pool()
                for result in pool.map(
                    lambda p: self.solve_KW(self.MP_for_CG, p, 
                                            self.instanz, 
                                            self.node_id, self.iteration, 
                                            self.arcs_to_forbid,
                                            self.max_labels_to_create), Node.periods_SP
                ):
                    LABE_aller_perioden.extend(
                        self.delete_doppelte_variable_in_sol_array(result, self.MP_for_CG))
Dabei entspricht LABE_aller_perioden der Ergebnis-Liste.
solve_KW ist eine Methode in der ein Problem zur Ermittlung kürzester Wege gelöst wird. Die Methode gibt ein Liste kürzester Wege zurück. Node.periods_SP enthält eine Liste, die nacheineinander für p abgearbeitet werden soll.
delete_doppelte_variable_in_sol_array löscht alle Einträge aus der result-Liste, die bereits in früheren Durchläufen (steckt in self.MP_for_CG) gefunden wurden.

Wenn ich das ganze nun laufen lassen, werden nach wie vor unendlich viele Python-Prozesse gestartet, das sind vermutlich die Seiteneffekte - von denen ich leider nicht weiß, was ich dagegen tun kann. Oh und in solve_KW ist ein print-Befehl, der die results ausdrucken soll. Soweit kommt es aber nie, es werden halt nur die unendlich vielen Prozesse gestartet...
Jede weitere Idee willkommen.
Sirius3
User
Beiträge: 17753
Registriert: Sonntag 21. Oktober 2012, 17:20

@hera: hier ein Beispiel:

Code: Alles auswählen

from multiprocessing import Pool

print "Seiteneffekt"

def tu_was(p):
    print p
    return p*p

def main():
    print "Kein Seiteneffekt"
    pool = Pool()
    print pool.map(tu_was, range(8))

if __name__=='__main__':
    main()
Hier ist der Seiteneffekt nur eine Ausgabe auf dem Bildschirm. Hast Du jedoch »pool.map« nicht in einer Funktion, sondern auf oberster Ebene, dann ist der Seiteneffekt eben der, dass immer neue Prozesse gestartet werden.
Das Beispiel zeigt auch schon, wie man Seiteneffekte umgeht.
hera
User
Beiträge: 18
Registriert: Sonntag 7. April 2013, 17:16

@Sirius3, ich habe es jetzt wie folgt verstanden:

Code: Alles auswählen

    def solve_KW(self):
        LABE_aller_perioden = [] 
        pool = Pool()
        for result in pool.map(
            lambda p: ESPPRC.solve_Problem(self.MP_for_CG, p, 
                                    self.instanz, 
                                    self.node_id, self.iteration, 
                                    self.arcs_to_forbid,
                                    self.max_labels_to_create), Node.periods_SP
        ):
            LABE_aller_perioden.extend(
                self.delete_doppelte_variable_in_sol_array(result, self.MP_for_CG))
        return LABE_aller_perioden
was heisst: packe Pool() in die Methode, die aufgerufen wird, damit nicht mit jedem Aufruf aus dem Hauptprogramm nochmal ein neuer Prozess gestartet wird - was zu den unendlich vielen Prozessen führen würde.
Wenn ich das jetzt von der Idee her so mache und laufen lassen passiert folgendes:
ImportError: cannot import name Pickler
verursacht durch pool = Pool()
importiert wurde from multiprocessing import Pool
?
BlackJack

@hera: Wie sieht denn der komplette Traceback aus?
hera
User
Beiträge: 18
Registriert: Sonntag 7. April 2013, 17:16

so:

Code: Alles auswählen

Traceback (most recent call last):
  File "C:\Users\hera\workspace\my_Prog\program_control.py", line 746, in <module>
    lauf(data) 
  File "C:\Users\hera\workspace\my_Prog\program_control.py", line 232, in lauf
    root_node.cg(instanz_name, None, max_labels_to_create)
  File "C:\Users\hera\workspace\my_Prog\Node.py", line 157, in cg
    LABE_aller_perioden = self.solve_KW()
  File "C:\Users\hera\workspace\my_Prog\Node.py", line 121, in solve_KW
    pool = Pool()
  File "C:\Python27\lib\multiprocessing\__init__.py", line 232, in Pool
    return Pool(processes, initializer, initargs, maxtasksperchild)
  File "C:\Python27\lib\multiprocessing\pool.py", line 115, in __init__
    self._setup_queues()
  File "C:\Python27\lib\multiprocessing\pool.py", line 209, in _setup_queues
    from .queues import SimpleQueue
  File "C:\Python27\lib\multiprocessing\queues.py", line 48, in <module>
    from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition
  File "C:\Python27\lib\multiprocessing\synchronize.py", line 48, in <module>
    from multiprocessing.forking import assert_spawning, Popen
  File "C:\Python27\lib\multiprocessing\forking.py", line 58, in <module>
    from pickle import Pickler
ImportError: cannot import name Pickler
BlackJack

@hera: Führ mal in einem Skript ``import pickle; print pickle`` aus. Was wird dann ausgegeben?
hera
User
Beiträge: 18
Registriert: Sonntag 7. April 2013, 17:16

@BlackJack, das:

Code: Alles auswählen

<module 'pickle' from 'C:\Python27\lib\pickle.pyc'>
BlackJack

@hera: Da sollte dann aber auch ein Objekt unter dem Namen `Pickler` enthalten sein. Und was wird ausgegeben wenn Du das irgendwo in Deinem Code vor dem Fehler platzierst?
hera
User
Beiträge: 18
Registriert: Sonntag 7. April 2013, 17:16

@BlackJack:
unmittelbar vor pool = Pool() habe ich nun print(pickle) aufgerufen. Da passt die Ausgabe noch, das import pickle wurde erkannt. Innerhalb von Pool() wirds dann nicht mehr erkannt, obwohl pickle im modul multiprocessing separat aufgerufen wird. Ist denn allgemein der Aufruf von map() nun an der richtigen Stelle, also innerhalb der Funktion, die in mehrere Prozesse ausgelagert werden soll?
Antworten