Multiprocessing Frage

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
Nebelhom
User
Beiträge: 155
Registriert: Mittwoch 19. Mai 2010, 01:31

Hi,

ich versuche gerade einen meiner Prozesse zu beschleunigen, aber so wie ich mir das vorstellte, will das nicht so wirklich (wenn dem nicht so waere wuerde ich das hier natuerlich nicht posten... :? )

Ich habe eine lange Liste von Pfaden (aktuell ca. 7000, aber das kann auch laenger sein), die dann linear durchlaufen wird, um ein paar Operationen durchzufuehren (welche ist fuer diese Frage erstmal nebensaechlich). Das dauert viel, viel zu lange, weshalb ich mir dachte, ich koennte die Liste in viele Kleine aufspalten und diese Listen parallel durchlaufen lassen. Da ich ohne viel eigenes Wissen zu haben, viele gelesen habe, dass threading in Python wegen dem GIL (Global Interpreter Lock?) eher nicht das erwuenschte Ergebnis gibt, dachte ich an das multiprocessing modul, was ich vorher noch nicht benutzt habe.

Lange Rede, kurzer Sinn. Ich habe mir eine Uebungsfunktion zusammengeschustert, die einfach nur eine Liste von Integern nimmt und eine Liste wiedergibt, die alle Werte+1 wiedergibt. Leider ist meine multiprocessing benutzende Funktion um ein vielfaches langsamer als die linear funktionierende Version und ich frage mich wirklich warum. Den multiprocessing code, habe ich aus der offiziellen Dokumentation genommen :(. Die grouper function habe ich aus stack overflow.

Bin ich hier auf dem falschen Dampfer oder fehlt mir nur ein kleines Puzzleteil zu meinem Glueck? Danke schonmal fuer die Anregungen.

Code: Alles auswählen

from multiprocessing import Process
from itertools import izip_longest
from timeit import Timer

def grouper(iterable, n, fillvalue=None):
    args = [iter(iterable)] * n
    return list(izip_longest(*args, fillvalue=fillvalue))

def plus_one(li):
    new = []
    for i in li:
        if i == None:
            pass
        else:
            new.append(i+1)
    return new

def plus_one_multi(li, divider):
    new = []
    groups = grouper(li, divider)
    for group in groups:
        p = Process(target=plus_one, args=(group,))
        new.append(p)
        p.start()
        p.join()
    return new

def test():
    r = xrange(20000)
    plus_one_multi(r, 1000)

if __name__ == '__main__':
    t = Timer("test()", "from __main__ import test")
    print t.timeit(1000)
    # simple plus_one took 4.62936496735 sec
    # plus_one_multi took 45.3931000233 sec
Zuletzt geändert von Nebelhom am Samstag 26. Mai 2012, 15:05, insgesamt 2-mal geändert.
Benutzeravatar
darktrym
User
Beiträge: 784
Registriert: Freitag 24. April 2009, 09:26

Sehe ich das richtig, du erzeugst 1000 Prozesse?
„gcc finds bugs in Linux, NetBSD finds bugs in gcc.“[Michael Dexter, Systems 2008]
Bitbucket, Github
Nebelhom
User
Beiträge: 155
Registriert: Mittwoch 19. Mai 2010, 01:31

nein, ich teile die 20000 integer lange liste in 20 eintausend integer lange listen auf... oder zumindest will ich das ;)
BlackJack

@darktrym: Nein, nur 20 — was natürlich auch nicht so sinnvoll ist, wenn der Rechner nicht 20+ Prozessoren hat.

@Nebelhom: Du startest jeden Prozess einzeln und wartest dann bis der fertig ist, bevor Du den nächsten startest. m)

Die gesamte Aufgabe dauert ohne Multiprocessing den Kommentaren nach eine halbe Millisekunde. Dass das mit dem Erzeugen von 20 Prozessen, dem Aufteilen in Teillisten mittels Iteratoren, dem serialisieren, übertragen, und deserialisieren der Eingabedaten und noch mal dem gleichen Vorgang mit den Rückgabedaten in die andere Richtung, nicht schneller werden kann, sollte nicht überraschen. Wenn etwas unter einer Sekunde dauert, käme ich nicht auf die Idee einen Prozess dafür anzuwerfen, es sei denn es ist tatsächlich sehr zeitkritisch oder passiert sehr oft.

Solange man nicht sehr spezielle Anforderungen hat, sollte man die Prozesse auch nicht unbedingt selbst erzeugen und sich damit mit deren Anzahl auseinander setzen. Ich würde erst versuchen ob man mit einem `Pool`-Objekt das Problem gelöst bekommt.

Wie man das konkret angeht hängt damit zusammen was man genau machen möchte. Also ob es sich um eine „Prozedur” handelt, es also keinen Rückgabewert gibt, oder wenn es bei jeder Verarbeitung einen Rückgabewert gibt, ob einen die Reihenfolge davon interessiert oder nicht.
DasIch
User
Beiträge: 2718
Registriert: Montag 19. Mai 2008, 04:21
Wohnort: Berlin

Du startest immer jeweils einen Prozess wartest bis er zu Ende gelaufen ist und startest erst dann den nächsten, deswegen muss es langsamer sein als ohne.

20 Prozesse zu starten wäre nur dann unsinnig wenn das Skript ausschliesslich auf PCs mit mindestens 20 Cores läuft, wenn es dass nicht tut ist es völlig unsinnig.

Die Aufgaben in gleich große Listen aufzuteilen, ist nur dann sinnvoll wenn du garantieren kannst dass alle Aufgaben gleich schnell abgearbeitet werden, ansonsten wird die Arbeit nicht sinnvoll verteilt.

Diese Probleme könntest du alle übrigens lösen indem du multiprocessing.Pool nutzt.
Nebelhom
User
Beiträge: 155
Registriert: Mittwoch 19. Mai 2010, 01:31

Hmmm... ok, danke fuer eure Hilfe. Ich habe offensichtlich die benutzung es Moduls grundlegend missverstanden.

@BlackJack
Die gesamte Aufgabe dauert ohne Multiprocessing den Kommentaren nach eine halbe Millisekunde. Dass das mit dem Erzeugen von 20 Prozessen, dem Aufteilen in Teillisten mittels Iteratoren, dem serialisieren, übertragen, und deserialisieren der Eingabedaten und noch mal dem gleichen Vorgang mit den Rückgabedaten in die andere Richtung, nicht schneller werden kann, sollte nicht überraschen. Wenn etwas unter einer Sekunde dauert, käme ich nicht auf die Idee einen Prozess dafür anzuwerfen, es sei denn es ist tatsächlich sehr zeitkritisch oder passiert sehr oft.
Ich glaube, ich habe mich da falsch ausgedrueckt. Das war nur eine Uebungsfunktion, um 1) die Befehle fuer multiprocessing richtig zu machen und 2) zu sehen, ob und wieviel Zeit (prozentual) ich mir dabei sparen kann. Beim eigentlich Skript ist die Dauer bei einer langen Liste (5000+) bei, ich schaetze mal, 5-10 minuten oder so. Das wollte ich verbessern. Leider ist der Originalcode viel zu lang, um da irgendein sinnvolles minimiertes Beispiel zu geben, aber es laeuft auf das Iterieren ueber eine Liste heraus und mit jedem Item werden ein paar Operationen durchgefuehrt. Deshalb diese plus_one Funktion.

Ich habe jetzt mal auf euer Anraten (Dankeschoen Blackjack und DasIch) mal multiprocessing.Pool verwendet. Das ist doppelt so schnell wie mein Code mit multiprocessing.Process, aber immernoch viel langsamer als der Originalcode. Ist das nur ein Overhead wegen der Zeit, die es braucht, um die Prozesse alle zu starten, oder wird das mehr relativ zur Laenge der Liste? Ich verstehe nicht genau, woher der grosse Zeitunterschied kommt... Oder habe ich das jetzt schon wieder falsch benutzt? Es gibt die richtige Antwort...

Code: Alles auswählen

from multiprocessing import Process, Pool
from timeit import Timer

def plus_one(li):
    new = []
    for i in li:
        if i == None:
            pass
        else:
            new.append(i+1)
    return new

def plus_one_multi(li):
    pool = Pool()
    res = pool.apply_async(plus_one, (li,))
    return res.get()

def test():
    r = range(20000)
    plus_one_multi(r)

if __name__ == '__main__':
    t = Timer("test()", "from __main__ import test")
    print t.timeit(1000)
    # simple plus_one took 4.62936496735 sec
    # plus_one_multi with Pool took 21.8338820934 sec
BlackJack

@Nebelhom: Den Punkt 2) kannst Du damit nicht messen weil bei so einer kurzen Aufgabe ganz offensichtlich gar keine Zeit gespart werden kann. Ich habe doch versucht klar zu machen was da alles *zusätzlich* passiert, was man natürlich erst einmal durch die Parallelität wieder herein holen muss. Es ist ja kaum etwas da was parallel läuft. Das ist einfach zu verschwindend gering im Anteil der Gesamtlaufzeit. Das kann nicht schneller werden als ohne `multiprocessing`.

Auch diesmal verwendest Du das Modul falsch. Du rufst `pool.apply_async()` einmal mit der kompletten Liste auf, das heisst die *gesamte* Aufgabe wird *einem* anderen Prozess übergeben, der die komplett linear abarbeitet und das Ergebnis liefert. Wobei es etwas sinnfrei ist die asynchrone Variante von `apply()` aufzurufen, wenn man danach sowieso sofort auf das Ergebnis wartet.

Um irgendetwas zu machen was Dir ein Gefühl für den zu erwartenden Geschwindigkeitsgewinn gibt, muss der Code der für die „Arbeit” steht, *Zeit* verbrauchen. Und wenn es nur `time.sleep()` ist. Nur dann siehst Du was Dir die Parallelität bringt. Wenn Du die Prozesseren gut auslasten kannst, dann wird das gegen die alte Zeit geteilt durch die Anzahl der Prozessoren plus ein wenig Overhead gehen. Bei zwei Prozessoren/Kernen also ca. doppelt so schnell. Wobei das natürlich noch davon abhängt was gemacht wird, denn es kann bei Dateiverarbeitung auch sein, dass die Festplatte ausbremst, und sogar das bei paralleler Verarbeitung von Dateien alles *langsamer* wird, weil die Zeit für Schreib-/Lesekopfbewegung (seek time) das lineare auslesen einer Datenspur dominiert.

Wie lange braucht Dein echter Code denn ungefähr für die Abarbeitung *eines* Dateinamens? Und hat der Code Ergebnisse, oder hat er einfach nur Nebeneffekte auf die es ankommt?
Nebelhom
User
Beiträge: 155
Registriert: Mittwoch 19. Mai 2010, 01:31

Ok, ich sehe, ich habe davon wirklich gar keine Ahnung und werde mich erst nochmal mit der Materie etwas genauer befassen, bevor ich eure Zeit weiter verschwende. Kennt jemand ein gutes Tutorial zu dem multiprocessing module? Ich habe bisher nur dieses hier gefunden. Der Rest ist eher eine Ansammlung von Codestuecken, die mir keine Hintergrundinformationen geben.

@BlackJack:
Wie lange braucht Dein echter Code denn ungefähr für die Abarbeitung *eines* Dateinamens? Und hat der Code Ergebnisse, oder hat er einfach nur Nebeneffekte auf die es ankommt?
Um deine Fragen zu beantworten, die Abarbeitung eines Dateinamens dauerte mit timeit modul 0.101815223694, die Dauer im von mir getesteten "Extremfall" war 525.691046 (ca. 5000 Dateien). Der Code hat ausserdem nur Nebeneffekte: Das Erstellen eines md5 hash pro Datei und Ansammeln in einer Liste und wenn alles durch ist, einmaliges schreiben der erstellten Liste in ein xml format. Kein return value.

Danke, dass du dir mal wieder Zeit fuer meine Fragen nimmst ;)
BlackJack

@Nebelhom: Dann hat der Code ja doch ein Ergebnis was Du von den Prozessen zurück kommuniziert brauchst: die Prüfsumme für jede Datei. Du hast also eine Liste mit Dateinamen und brauchst jetzt eine Funktion die *einen* Namen abarbeitet und das Ergebnis dazu liefert. Also zum Beispiel ein Tupel (Dateiname, Prüfsumme). Dann könntest Du eine der `*map*`-Varianten verwenden. Im einfachsten Fall vielleicht wirklich erst einmal `Pool.map()` wie die normale `map()`-Funktion verwenden.
Antworten