Verständnisfrage bei multiprocessing Pool

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
chuen72
User
Beiträge: 6
Registriert: Freitag 3. November 2017, 10:38

Hallo,

aus dem Einführungs-Beispiel von python multiprocessing (https://docs.python.org/2/library/multiprocessing.html)

Beispiel 1:

Code: Alles auswählen

from multiprocessing import Pool

def f(x):
    return x*x*x

if __name__ == '__main__':
    p = Pool(5)
    x=p.map(f,[1,2,3,4])
    print(p.map(f, [1, 2, 3]))
    print(x)
Beispiel 2:

Code: Alles auswählen

from multiprocessing import Pool

def  f(x):
    return x*x

if __name__ == '__main__':
    pool = Pool(processes=4)              # start 4 worker processes
    result = pool.apply_async(f, [10])    # evaluate "f(10)" asynchronously
    print(result.get(timeout=1))           # prints "100" unless your computer is *very* slow
    print(pool.map(f, range(10)))          # prints "[0, 1, 4,..., 81]"

Ich habe das multiprocessing Modul so verstanden, dass damit Abläufe parallelisiert werden können. Wofür stehen in den Beispielen die Zahlen in Pool?

Code: Alles auswählen

p = Pool(5)
pool = Pool(processes=4) 
Ist das die Anzahl der parallelen Programmabläufe? Wie viele parallele Abläufe kann ich starten, sicherlich nicht unendlich viele...? Hängt das von der Anzahl der Kerne ab?

Bei meinem Problem habe ich ein großes ndarray, bei dem ich mir immer 12 bits anschaue und daraus in einer Funktion ein Ergebnis berechne, welches ich in einem Output-ndarray speicher. Die Berechnungen funktionieren fehlerfrei, aber sind sehr träge. Deshalb habe ich das Input-ndarray in 4 Teile aufgeteilt um später die 4 Output-ndarray hintereinander zu hängen (Reihenfolge ist wichtig). Die Berechnungsgeschwindigkeit möchte ich nun mit Parallelisierung der Berechnung der 4 Teile via multiprocessing verbessern.

Prinzip:
-Input ist eindimensionales ndarray mit 282.240 Elementen (erledigt)
-Aufteilen von Input in 4 Teile (erledigt)
-Parallelisierung: 4x paralleler Funktionsaufruf um Geschwindigkeit zu reduzieren (Funktionsaufruf erledigt, Parallelisierung offen)
-Ergebnisse der 4 Rückgabewerte (auch ndarrays mit jeweils 188160 Elementen) zu einem ndarray zusammenfügen (ndarray) (erledigt)
__deets__
User
Beiträge: 14545
Registriert: Mittwoch 14. Oktober 2015, 14:29

Hast du mal in die Doku von multiprocessing.Pool geschaut?
chuen72
User
Beiträge: 6
Registriert: Freitag 3. November 2017, 10:38

Ja, jedoch halten sich meine Programmier- und Englischkenntnisse in Grenzen und daher wollte ich hier im deutschsprachigen Forum nachfragen :?
chuen72
User
Beiträge: 6
Registriert: Freitag 3. November 2017, 10:38

Das ist mein Ansatz ohne die decompress_part2 Funktion... Die berechnet einfach das Output ndarray. Den Aufruf dieser Funktion möchte ich parallelisieren. 4 x aufrufen und das Ganze parallel. Die Funktion selbst habe ich hier nicht gepostet, da sie einfach die Berechnungen ausführt und das ganze nur noch unübersichtlicher machen würde

Code: Alles auswählen

def decompress(payload,WIDTH,HEIGHT):
    # INPUTS / OUTPUTS
    n_threads = 4                                                                           
    img_input = np.fromstring(payload, dtype='uint32')                                      
    img_output = np.zeros((WIDTH * HEIGHT), dtype=np.uint32)                            
    n_elements_part = np.int(len(img_input) / n_threads)                                    
    input_part=np.zeros((n_threads,n_elements_part)).astype(np.uint32)                      
    output_part =np.zeros((n_threads,np.int(n_elements_part/3*8))).astype(np.uint32)        

    # DEFINE PARTS (here 4 different ones)
    start = np.zeros(n_threads).astype(np.int)                          
    end = np.zeros(n_threads).astype(np.int)                            
    for i in range(0,n_threads):
        start[i] = i * n_elements_part
        end[i] = (i+1) * n_elements_part -1

    # COPY IMAGE DATA
    for idx in range(0,n_threads):
        input_part [idx,:] = img_input[start[idx]:end[idx]+1]


    for idx in range(0,n_threads):                          # following line is the function_call that should be parallized
        output_part[idx,:] = decompress_part2(input_part[idx],output_part[idx])



    # COPY PARTS INTO THE IMAGE
    img_output[0     : 188160] = output_part[0,:]
    img_output[188160: 376320] = output_part[1,:]
    img_output[376320: 564480] = output_part[2,:]
    img_output[564480: 752640] = output_part[3,:]

    # RESHAPE IMAGE
    img_output = np.reshape(img_output,(HEIGHT, WIDTH))

    return img_output
__deets__
User
Beiträge: 14545
Registriert: Mittwoch 14. Oktober 2015, 14:29

Der Satz “processes is the number of worker processes to use. If processes is None then the number returned by cpu_count() is used” ist doch recht eindeutig. Und selbst wenn das nicht verständlich ist, die google translate Übersetzung ist “Prozesse ist die Anzahl der zu verwendenden Arbeitsprozesse. Wenn Prozesse auf None gesetzt sind, wird die von cpu_count () zurückgegebene Zahl verwendet” spätestens ist es.

Wieso du alles in englisch kommentierst (und das nicht schlecht) aber dann vor einem solchen Satz kapitulierst verstehe ich nicht.
chuen72
User
Beiträge: 6
Registriert: Freitag 3. November 2017, 10:38

Danke für die Hinweise. Blicke jetzt schon etwas besser durch.

Ich habe mein Beispiel nochmal stark vereinfacht und möchte wissen wie ich die Iterationen parallelisieren kann, damit die gesamte Berechnung schneller läuft. Ein Ansatz steht in dem auskommentierten Teil

Code: Alles auswählen

import multiprocessing
import numpy as np

def split(data,parts,step, length):
	data_array=np.zeros((parts,step))

	for i in range(parts):	
		data_array[i,:] = data[i*step:(i+1)*step]

	return(data_array)

def mul(arr, scalar):
	result = np.multiply(arr,scalar)
	return(result)

data = np.linspace(1.0, 100.0, num=24).astype(int)
parts = 4
length=len(data)
step = np.int(length/parts)
scalar = 2
data_array = split(data,parts,step,length)						
res_array = np.zeros((parts,step))
print(data_array)

for idx in range(parts):
	test = data_array[idx,:]
	res_array[idx,:] = mul(test,scalar)	# Line to be parallized !

#with multiprocessing.Pool(parts) as p:
	#results = (p.map(mul(data_arr,2),parts))

print('\n',res_array)

Sirius3
User
Beiträge: 18299
Registriert: Sonntag 21. Oktober 2012, 17:20

@chuen72: wenn es Dir nur darum geht, numpy-Berechnungen zu parallelisieren, dann nimm ein numpy das gegen die mkl-Bibliothek gelinkt wurde, die hat Parallelisierung schon eingebaut. Die `split`-Funktion ist überflüssig, da es `data.reshape` gibt; die `mul`-Funktion ist überflüssig, da es `*` gibt.

Code: Alles auswählen

import multiprocessing
import numpy as np

scalar = 2
data = np.linspace(1.0, 100.0, num=24).astype(int)
parts = 4

def calc(part):
    return part * scalar

result = np.zeros_like(data)
for idx, part in enumerate(data.reshape(parts, -1)):
   result[idx,:] = calc(part)

print(result)

with multiprocessing.Pool(parts) as p:
    results = p.map(calc, data.reshape(parts, -1))

print(results)
chuen72
User
Beiträge: 6
Registriert: Freitag 3. November 2017, 10:38

Danke für deine Antwort.

Meine beiden Beispielfunktionen waren nur einfache Dummys... Mir geht es ausschließlich um den Ansatz der Parallelisierung. Da ich deutlich größere Eingangsdaten habe muss ich diese aufteilen und parallel berechnen lassen um sie später zusammen zu setzen.

Bei deinem Code bekomme ich in line 13 einen Index Error: too many indices for Array?
Sirius3
User
Beiträge: 18299
Registriert: Sonntag 21. Oktober 2012, 17:20

@chuen72: wie schon geschrieben, bei reiner Numerik kann mkl die bessere Wahl sein.

Für das selbe Ergebnis wie bei multiprocessing muß es natürlich so heißen:

Code: Alles auswählen

result = []
for part in data.reshape(parts, -1):
   result.append(calc(part))
Benutzeravatar
noisefloor
User
Beiträge: 4209
Registriert: Mittwoch 17. Oktober 2007, 21:40
Wohnort: WW
Kontaktdaten:

Hallo,
Ich habe das multiprocessing Modul so verstanden, dass damit Abläufe parallelisiert werden können.
Mit multiprocessing verteilst Abläufe auf X Prozesse, wobei du X vorgeben kannst (siehe oben). Das kann, muss aber nicht schneller sein. Kommt z.B. auf die zu berechnenden Abläufe, die CPU etc an.

Du musst halt bedenken, dass das Anlegen von neuen Prozessen erst Mal administrativen Overhead für das OS erzeugt.

Abgesehen davon ist numpy ja, wie Sirius3 schon sagt, OOTB auf schnelles Rechnen optimiert. Kann also sein, dass das hier nichts bringt. Muss du halt mal ausprobieren.

@Sirius3:
dann nimm ein numpy das gegen die mkl-Bibliothek gelinkt wurde
Ist das nicht standardmäßig der Fall, wenn man numpy via pip installiert? Bzw. wie kann man raus finden, ob numpy gegen mkl gelinkt ist?

Gruß, noisefloor
chuen72
User
Beiträge: 6
Registriert: Freitag 3. November 2017, 10:38

@ noisefloor: Genau das möchte ich vorgeben. Bei mir sind es 4 Prozesse bei denen jedes Mal die gleiche Funktion aber mit einem Input anderen Array aufgerufen wird. Ich möchte ihm genau diese 4 Arrays geben und dann soll jeder der vier Funktionsaufrufe parallel durchgeführt werden. Außerdem benötige ich das jeweilige return ndarray.

Ich bin nur weiterhin ratlos, wie ich das in Code fassen soll :K
Benutzeravatar
noisefloor
User
Beiträge: 4209
Registriert: Mittwoch 17. Oktober 2007, 21:40
Wohnort: WW
Kontaktdaten:

Hallo,

@chuen72:
Ich bin nur weiterhin ratlos, wie ich das in Code fassen soll
äh... das hat dir Sirius3 im 1. Post vom ihm doch gezeigt...?!

Gruß, noisefloor
narpfel
User
Beiträge: 696
Registriert: Freitag 20. Oktober 2017, 16:10

Weil der Hinweis noch nicht gegeben wurde: `numpy` ist nur dann schnell, wenn möglichst viel der Berechnung im `numpy`-Code stattfindet und explizite Schleifen im Python-Code vermieden werden. Es kann also sein, dass deine `decompress_part2`-Funktion noch Optimierungspotential hat.

Außerdem: `numba` kann `numpy`-Code auch beschleunigen.
Antworten