wie multiprocessing am schnellsten?

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
Serpens66
User
Beiträge: 259
Registriert: Montag 15. Dezember 2014, 00:31

Hallo,

ich versuche zurzeit rauszufinden, wie ich cpu intensiven code parallel möglichst schnell ausführen kann.
Um dabei alle verfügbaren CPUs zu nutzen, nutze ich also multiprocessing.

multiprocessing.Pool scheint wirklich kinderleicht anwendbar zu sein.
Dennoch hatte ich zuvor noch eine "eigene" Lösung geschrieben, welche multiprocessing und threading kombiniert.
Nun wundert es mich, warum meine Lösung schneller ist als Pool, obwohl sie möglicherweise sogar ziemlich mies gecodet ist.

Meine Lösung macht folgendes:
Erstelle soviele Prozesse, wie es CPU gibt. Teile die Aufgaben auf die Prozesse auf, welche dann mit threading durchgeführt werden.

Hier zuerst der Code mit Pool:

Code: Alles auswählen

import multiprocessing
import time
import random
    
def test(a,b=0):
    l = []
    for i in range(2000000):
	    l.append(random.random())
    return "ok"

if __name__ == "__main__":  
    inputs = []
    for i in range(100):
        inputs.append(i)
    anfangszeit = int(str(time.time() * 1000).split(".")[0])

    pool_size = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(
        processes=pool_size,
    )
    pool_outputs = pool.map(test, inputs)
    pool.close()  # no more tasks
    pool.join()  # wrap up current tasks

    print('Pool    :', pool_outputs)
    endzeit = int(str(time.time() * 1000).split(".")[0])
    dauer = (float(endzeit)-float(anfangszeit))/1000 # in sek
    print(dauer)
Hier mein Code:

Code: Alles auswählen

import multiprocessing
import time
import threading
import random

def test(a,b):
    l = []
    for i in range(2000000):            # cpu intensive aufgabe
	    l.append(random.random())
    return "ok"
    
class Multithread(threading.Thread):
    def __init__(self, aufgabe=0):
        if aufgabe:
            threading.Thread.__init__(self)
            self.aufgabe = aufgabe[0]  #ist ja ein Tupel aus funktion und argumenten
            self.argumente = aufgabe[1]   
            self.result = None  
        return
        
    def run(self):
        self.result = self.aufgabe(*self.argumente) 
        return
    
class MultiProcessBeides(multiprocessing.Process):
    def __init__(self, aufgaben=0):
        if aufgaben!=0:
            multiprocessing.Process.__init__(self)
            self.aufgaben = aufgaben
            for aufgabe in aufgaben:
                self.result_queue = aufgabe[2]      # get any of the result objects. they should direct all to the same queue
                break
        return
        
    def run(self):
        machen = []
        for aufgabe in self.aufgaben:
            machen.append(aufgabe)
        results = parallelThread(machen)       # nun jeden der prozesse die aufgaben mithilfe von threads erledigen lassen
        self.result_queue.put(results)
        return

def parallelThread(aufgabenliste,dummy=0): 
    threads = list(map(Multithread, aufgabenliste)) 
    for thread in threads: 
        thread.start()
   
    for thread in threads: 
        thread.join()
    return(list(thread.result for thread in threads))  #ergebnisse als liste printen     

def parallelBeides(aufgabenliste): 
    cpus = multiprocessing.cpu_count()
    
    threadaufgaben = {}
    for c in range(cpus):
        threadaufgaben[c] = [] 
    while aufgabenliste:
        for c in range(cpus):
            if aufgabenliste:
                threadaufgaben[c].append(aufgabenliste.pop())  # gleichmäßige Verteilung der aufgaben auf die cpus
    
    processes = []
    for c in range(cpus):
        processes.append(MultiProcessBeides(threadaufgaben[c]))
    
    for process in processes: 
        process.start()
   
    for process in processes: 
        process.join()
    
    return(list(process.result_queue.get() for process in processes))  #ergebnisse als liste printen
    

if __name__ == "__main__":  
    
    results = multiprocessing.Queue() # queue wird gebraucht, um ergebnisse zu übertragen
    Instanzen = []
    anfangszeit = int(str(time.time() * 1000).split(".")[0])
    for i in range(100):
        Instanzen.append((test,(0,1),results))
    r = parallelBeides(Instanzen) 
    print(r)
    endzeit = int(str(time.time() * 1000).split(".")[0])
    dauer = (float(endzeit)-float(anfangszeit))/1000 # in sek
    print(dauer)
(die ergebnisse sind unterscheidlich verschachtelt, aber darauf kommts aktuell nicht an, geht nur um die Zeit.)

Diese beiden Codes hab ich auf windows mit 8 cpus ausgeführt.
Pool braucht 11.75 sekunden.
Meine Lösung braucht 11.03 sekunden.
(Ausschließlich Threading braucht ~40-50 sekunden)

Kann man sagen, woran das liegt? Ist es also immer besser meine eigene Lösung anstatt Pool zu verwenden? Oder hat meine Lösung Fehler/Fehlerquellen, die Pool abdeckt und deswegen etwas langsamer ist?
Serpens66
User
Beiträge: 259
Registriert: Montag 15. Dezember 2014, 00:31

ah, ich glaub ich habs rausgefunden.

Das ganze war nur deshalb so verwirrend, weil die Aufgabe bei beiden ungefähr gleich lang gedauert hat (0.75 sekunden unterschied ist ja nicht so groß)
Deswegen ging ich davon aus, dass Pool intern auch irgendwie dafür sorgt, dass die ~13 anfallenden Aufgaben pro Prozess auch irgendwie gleichzeit abgearbeitet werden.

Schreibt man in die test-Funktion aber einfach nur ein sleep(1) rein, dann erkennt man, wie Pool (logischerweise) tatsächlich arbeitet.
Wenn 8 Prozesse dabei sind, dann warten 8 Prozesse gemeinsam 1 sekunde. Danach die nächsten 8.
Bei 100 Aufgaben sind das also 100/8 = ~13 sekunden die gebraucht werden.

Nimmt man sleep(1) bei meiner Kombinationslösung, dann dauerts nur ~1.2 sekunden.
Ausschließlich threading dauert sogar nur 1.03 sekunden.

Daher ist es wohl das schlauste meine Kombinationslösung zu verwenden, da diese in beiden Beispielen im Schnitt am Schnellsten ist. Oder gibt es eine andere "allrounder" Lösung, die in beiden (oder auch anderen) Fällen am schnellsten ist?

edit:
Hat es irgendeinen Nutzen/Sinn, mehr Prozesse zu starten, als man cpus hat? (von der Zeit ists identisch, aber hab in manchen biespielskripten gesehen, dass es mit 2 multipliziert wird)
Sirius3
User
Beiträge: 18216
Registriert: Sonntag 21. Oktober 2012, 17:20

@Serpens66: bei CPU-intensiven Rechnungen macht es Sinn, genau so viele Rechnungen parallel zu machen, wie es CPUs gibt, da jeder Kontextwechsel Zeit kostet, ist die Aufgabe auch noch speicherintensiv werden durch den Kontextwechsel auch noch die CPU-Caches zunichte macht, was das ganz nochmal langsamer macht. Je nach Aufgabe können da schon mal 30% der Rechenzeit draufgehen. Du startest für jede Teilaufgabe einen eigenen Thread, also 100 davon. Das ist maximal ungünstig. Eigenen Code für etwas zu schreiben, was es schon gibt, ist auch nicht gerade effektiv.

Dein Clustering-Problem läßt sich umgehen, indem Du nicht 100 mal 2Millionen Durchläufe rechnest, sondern 1000 mal 200000. Dann sollte die Pool-Lösung auch schneller sein.

Das „Ausrechnen“ der anfangszeit ist auch maximal umständlich. Du „rechnest“ time.time() in Millisekunden um, um sie am Schluß wieder in Sekunden umzurechnen. Dann ist doch das erste Umrechnen gar nicht nötig. Das Umwandeln in einen String, um die Nachkommastellen abzuschneiden führt bei Python2 dazu, dass das Ergebnis maximal falsch ist, da die Stringrepräsentation '1.4995400205e+12' ist. »anfangszeit = int(time.time() * 1000) hat das gleichen, bzw. nicht fehlerhafte Ergebnis.
Python1906
User
Beiträge: 29
Registriert: Mittwoch 9. November 2016, 13:52

Hallo allerseits,

ich hätte da mal eine Frage.

Kann man deine Lösung auch irgendwie auf Programme nutzen, welche man starten will (zum Beispiel, ein Schreibprogramm, oder ein Spiel), und wenn ja wie?
Antworten