ich versuche zurzeit rauszufinden, wie ich cpu intensiven code parallel möglichst schnell ausführen kann.
Um dabei alle verfügbaren CPUs zu nutzen, nutze ich also multiprocessing.
multiprocessing.Pool scheint wirklich kinderleicht anwendbar zu sein.
Dennoch hatte ich zuvor noch eine "eigene" Lösung geschrieben, welche multiprocessing und threading kombiniert.
Nun wundert es mich, warum meine Lösung schneller ist als Pool, obwohl sie möglicherweise sogar ziemlich mies gecodet ist.
Meine Lösung macht folgendes:
Erstelle soviele Prozesse, wie es CPU gibt. Teile die Aufgaben auf die Prozesse auf, welche dann mit threading durchgeführt werden.
Hier zuerst der Code mit Pool:
Code: Alles auswählen
import multiprocessing
import time
import random
def test(a,b=0):
l = []
for i in range(2000000):
l.append(random.random())
return "ok"
if __name__ == "__main__":
inputs = []
for i in range(100):
inputs.append(i)
anfangszeit = int(str(time.time() * 1000).split(".")[0])
pool_size = multiprocessing.cpu_count()
pool = multiprocessing.Pool(
processes=pool_size,
)
pool_outputs = pool.map(test, inputs)
pool.close() # no more tasks
pool.join() # wrap up current tasks
print('Pool :', pool_outputs)
endzeit = int(str(time.time() * 1000).split(".")[0])
dauer = (float(endzeit)-float(anfangszeit))/1000 # in sek
print(dauer)
Code: Alles auswählen
import multiprocessing
import time
import threading
import random
def test(a,b):
l = []
for i in range(2000000): # cpu intensive aufgabe
l.append(random.random())
return "ok"
class Multithread(threading.Thread):
def __init__(self, aufgabe=0):
if aufgabe:
threading.Thread.__init__(self)
self.aufgabe = aufgabe[0] #ist ja ein Tupel aus funktion und argumenten
self.argumente = aufgabe[1]
self.result = None
return
def run(self):
self.result = self.aufgabe(*self.argumente)
return
class MultiProcessBeides(multiprocessing.Process):
def __init__(self, aufgaben=0):
if aufgaben!=0:
multiprocessing.Process.__init__(self)
self.aufgaben = aufgaben
for aufgabe in aufgaben:
self.result_queue = aufgabe[2] # get any of the result objects. they should direct all to the same queue
break
return
def run(self):
machen = []
for aufgabe in self.aufgaben:
machen.append(aufgabe)
results = parallelThread(machen) # nun jeden der prozesse die aufgaben mithilfe von threads erledigen lassen
self.result_queue.put(results)
return
def parallelThread(aufgabenliste,dummy=0):
threads = list(map(Multithread, aufgabenliste))
for thread in threads:
thread.start()
for thread in threads:
thread.join()
return(list(thread.result for thread in threads)) #ergebnisse als liste printen
def parallelBeides(aufgabenliste):
cpus = multiprocessing.cpu_count()
threadaufgaben = {}
for c in range(cpus):
threadaufgaben[c] = []
while aufgabenliste:
for c in range(cpus):
if aufgabenliste:
threadaufgaben[c].append(aufgabenliste.pop()) # gleichmäßige Verteilung der aufgaben auf die cpus
processes = []
for c in range(cpus):
processes.append(MultiProcessBeides(threadaufgaben[c]))
for process in processes:
process.start()
for process in processes:
process.join()
return(list(process.result_queue.get() for process in processes)) #ergebnisse als liste printen
if __name__ == "__main__":
results = multiprocessing.Queue() # queue wird gebraucht, um ergebnisse zu übertragen
Instanzen = []
anfangszeit = int(str(time.time() * 1000).split(".")[0])
for i in range(100):
Instanzen.append((test,(0,1),results))
r = parallelBeides(Instanzen)
print(r)
endzeit = int(str(time.time() * 1000).split(".")[0])
dauer = (float(endzeit)-float(anfangszeit))/1000 # in sek
print(dauer)
Diese beiden Codes hab ich auf windows mit 8 cpus ausgeführt.
Pool braucht 11.75 sekunden.
Meine Lösung braucht 11.03 sekunden.
(Ausschließlich Threading braucht ~40-50 sekunden)
Kann man sagen, woran das liegt? Ist es also immer besser meine eigene Lösung anstatt Pool zu verwenden? Oder hat meine Lösung Fehler/Fehlerquellen, die Pool abdeckt und deswegen etwas langsamer ist?