Seite 1 von 1

Verständnisfrage bei multiprocessing Pool

Verfasst: Freitag 3. November 2017, 10:58
von chuen72
Hallo,

aus dem Einführungs-Beispiel von python multiprocessing (https://docs.python.org/2/library/multiprocessing.html)

Beispiel 1:

Code: Alles auswählen

from multiprocessing import Pool

def f(x):
    return x*x*x

if __name__ == '__main__':
    p = Pool(5)
    x=p.map(f,[1,2,3,4])
    print(p.map(f, [1, 2, 3]))
    print(x)
Beispiel 2:

Code: Alles auswählen

from multiprocessing import Pool

def  f(x):
    return x*x

if __name__ == '__main__':
    pool = Pool(processes=4)              # start 4 worker processes
    result = pool.apply_async(f, [10])    # evaluate "f(10)" asynchronously
    print(result.get(timeout=1))           # prints "100" unless your computer is *very* slow
    print(pool.map(f, range(10)))          # prints "[0, 1, 4,..., 81]"

Ich habe das multiprocessing Modul so verstanden, dass damit Abläufe parallelisiert werden können. Wofür stehen in den Beispielen die Zahlen in Pool?

Code: Alles auswählen

p = Pool(5)
pool = Pool(processes=4) 
Ist das die Anzahl der parallelen Programmabläufe? Wie viele parallele Abläufe kann ich starten, sicherlich nicht unendlich viele...? Hängt das von der Anzahl der Kerne ab?

Bei meinem Problem habe ich ein großes ndarray, bei dem ich mir immer 12 bits anschaue und daraus in einer Funktion ein Ergebnis berechne, welches ich in einem Output-ndarray speicher. Die Berechnungen funktionieren fehlerfrei, aber sind sehr träge. Deshalb habe ich das Input-ndarray in 4 Teile aufgeteilt um später die 4 Output-ndarray hintereinander zu hängen (Reihenfolge ist wichtig). Die Berechnungsgeschwindigkeit möchte ich nun mit Parallelisierung der Berechnung der 4 Teile via multiprocessing verbessern.

Prinzip:
-Input ist eindimensionales ndarray mit 282.240 Elementen (erledigt)
-Aufteilen von Input in 4 Teile (erledigt)
-Parallelisierung: 4x paralleler Funktionsaufruf um Geschwindigkeit zu reduzieren (Funktionsaufruf erledigt, Parallelisierung offen)
-Ergebnisse der 4 Rückgabewerte (auch ndarrays mit jeweils 188160 Elementen) zu einem ndarray zusammenfügen (ndarray) (erledigt)

Re: Verständnisfrage bei multiprocessing Pool

Verfasst: Freitag 3. November 2017, 11:00
von __deets__
Hast du mal in die Doku von multiprocessing.Pool geschaut?

Re: Verständnisfrage bei multiprocessing Pool

Verfasst: Freitag 3. November 2017, 11:06
von chuen72
Ja, jedoch halten sich meine Programmier- und Englischkenntnisse in Grenzen und daher wollte ich hier im deutschsprachigen Forum nachfragen :?

Re: Verständnisfrage bei multiprocessing Pool

Verfasst: Freitag 3. November 2017, 11:08
von chuen72
Das ist mein Ansatz ohne die decompress_part2 Funktion... Die berechnet einfach das Output ndarray. Den Aufruf dieser Funktion möchte ich parallelisieren. 4 x aufrufen und das Ganze parallel. Die Funktion selbst habe ich hier nicht gepostet, da sie einfach die Berechnungen ausführt und das ganze nur noch unübersichtlicher machen würde

Code: Alles auswählen

def decompress(payload,WIDTH,HEIGHT):
    # INPUTS / OUTPUTS
    n_threads = 4                                                                           
    img_input = np.fromstring(payload, dtype='uint32')                                      
    img_output = np.zeros((WIDTH * HEIGHT), dtype=np.uint32)                            
    n_elements_part = np.int(len(img_input) / n_threads)                                    
    input_part=np.zeros((n_threads,n_elements_part)).astype(np.uint32)                      
    output_part =np.zeros((n_threads,np.int(n_elements_part/3*8))).astype(np.uint32)        

    # DEFINE PARTS (here 4 different ones)
    start = np.zeros(n_threads).astype(np.int)                          
    end = np.zeros(n_threads).astype(np.int)                            
    for i in range(0,n_threads):
        start[i] = i * n_elements_part
        end[i] = (i+1) * n_elements_part -1

    # COPY IMAGE DATA
    for idx in range(0,n_threads):
        input_part [idx,:] = img_input[start[idx]:end[idx]+1]


    for idx in range(0,n_threads):                          # following line is the function_call that should be parallized
        output_part[idx,:] = decompress_part2(input_part[idx],output_part[idx])



    # COPY PARTS INTO THE IMAGE
    img_output[0     : 188160] = output_part[0,:]
    img_output[188160: 376320] = output_part[1,:]
    img_output[376320: 564480] = output_part[2,:]
    img_output[564480: 752640] = output_part[3,:]

    # RESHAPE IMAGE
    img_output = np.reshape(img_output,(HEIGHT, WIDTH))

    return img_output

Re: Verständnisfrage bei multiprocessing Pool

Verfasst: Freitag 3. November 2017, 11:16
von __deets__
Der Satz “processes is the number of worker processes to use. If processes is None then the number returned by cpu_count() is used” ist doch recht eindeutig. Und selbst wenn das nicht verständlich ist, die google translate Übersetzung ist “Prozesse ist die Anzahl der zu verwendenden Arbeitsprozesse. Wenn Prozesse auf None gesetzt sind, wird die von cpu_count () zurückgegebene Zahl verwendet” spätestens ist es.

Wieso du alles in englisch kommentierst (und das nicht schlecht) aber dann vor einem solchen Satz kapitulierst verstehe ich nicht.

Re: Verständnisfrage bei multiprocessing Pool

Verfasst: Freitag 3. November 2017, 13:03
von chuen72
Danke für die Hinweise. Blicke jetzt schon etwas besser durch.

Ich habe mein Beispiel nochmal stark vereinfacht und möchte wissen wie ich die Iterationen parallelisieren kann, damit die gesamte Berechnung schneller läuft. Ein Ansatz steht in dem auskommentierten Teil

Code: Alles auswählen

import multiprocessing
import numpy as np

def split(data,parts,step, length):
	data_array=np.zeros((parts,step))

	for i in range(parts):	
		data_array[i,:] = data[i*step:(i+1)*step]

	return(data_array)

def mul(arr, scalar):
	result = np.multiply(arr,scalar)
	return(result)

data = np.linspace(1.0, 100.0, num=24).astype(int)
parts = 4
length=len(data)
step = np.int(length/parts)
scalar = 2
data_array = split(data,parts,step,length)						
res_array = np.zeros((parts,step))
print(data_array)

for idx in range(parts):
	test = data_array[idx,:]
	res_array[idx,:] = mul(test,scalar)	# Line to be parallized !

#with multiprocessing.Pool(parts) as p:
	#results = (p.map(mul(data_arr,2),parts))

print('\n',res_array)


Re: Verständnisfrage bei multiprocessing Pool

Verfasst: Freitag 3. November 2017, 13:27
von Sirius3
@chuen72: wenn es Dir nur darum geht, numpy-Berechnungen zu parallelisieren, dann nimm ein numpy das gegen die mkl-Bibliothek gelinkt wurde, die hat Parallelisierung schon eingebaut. Die `split`-Funktion ist überflüssig, da es `data.reshape` gibt; die `mul`-Funktion ist überflüssig, da es `*` gibt.

Code: Alles auswählen

import multiprocessing
import numpy as np

scalar = 2
data = np.linspace(1.0, 100.0, num=24).astype(int)
parts = 4

def calc(part):
    return part * scalar

result = np.zeros_like(data)
for idx, part in enumerate(data.reshape(parts, -1)):
   result[idx,:] = calc(part)

print(result)

with multiprocessing.Pool(parts) as p:
    results = p.map(calc, data.reshape(parts, -1))

print(results)

Re: Verständnisfrage bei multiprocessing Pool

Verfasst: Freitag 3. November 2017, 13:53
von chuen72
Danke für deine Antwort.

Meine beiden Beispielfunktionen waren nur einfache Dummys... Mir geht es ausschließlich um den Ansatz der Parallelisierung. Da ich deutlich größere Eingangsdaten habe muss ich diese aufteilen und parallel berechnen lassen um sie später zusammen zu setzen.

Bei deinem Code bekomme ich in line 13 einen Index Error: too many indices for Array?

Re: Verständnisfrage bei multiprocessing Pool

Verfasst: Freitag 3. November 2017, 14:18
von Sirius3
@chuen72: wie schon geschrieben, bei reiner Numerik kann mkl die bessere Wahl sein.

Für das selbe Ergebnis wie bei multiprocessing muß es natürlich so heißen:

Code: Alles auswählen

result = []
for part in data.reshape(parts, -1):
   result.append(calc(part))

Re: Verständnisfrage bei multiprocessing Pool

Verfasst: Freitag 3. November 2017, 14:35
von noisefloor
Hallo,
Ich habe das multiprocessing Modul so verstanden, dass damit Abläufe parallelisiert werden können.
Mit multiprocessing verteilst Abläufe auf X Prozesse, wobei du X vorgeben kannst (siehe oben). Das kann, muss aber nicht schneller sein. Kommt z.B. auf die zu berechnenden Abläufe, die CPU etc an.

Du musst halt bedenken, dass das Anlegen von neuen Prozessen erst Mal administrativen Overhead für das OS erzeugt.

Abgesehen davon ist numpy ja, wie Sirius3 schon sagt, OOTB auf schnelles Rechnen optimiert. Kann also sein, dass das hier nichts bringt. Muss du halt mal ausprobieren.

@Sirius3:
dann nimm ein numpy das gegen die mkl-Bibliothek gelinkt wurde
Ist das nicht standardmäßig der Fall, wenn man numpy via pip installiert? Bzw. wie kann man raus finden, ob numpy gegen mkl gelinkt ist?

Gruß, noisefloor

Re: Verständnisfrage bei multiprocessing Pool

Verfasst: Freitag 3. November 2017, 14:50
von chuen72
@ noisefloor: Genau das möchte ich vorgeben. Bei mir sind es 4 Prozesse bei denen jedes Mal die gleiche Funktion aber mit einem Input anderen Array aufgerufen wird. Ich möchte ihm genau diese 4 Arrays geben und dann soll jeder der vier Funktionsaufrufe parallel durchgeführt werden. Außerdem benötige ich das jeweilige return ndarray.

Ich bin nur weiterhin ratlos, wie ich das in Code fassen soll :K

Re: Verständnisfrage bei multiprocessing Pool

Verfasst: Freitag 3. November 2017, 15:22
von noisefloor
Hallo,

@chuen72:
Ich bin nur weiterhin ratlos, wie ich das in Code fassen soll
äh... das hat dir Sirius3 im 1. Post vom ihm doch gezeigt...?!

Gruß, noisefloor

Re: Verständnisfrage bei multiprocessing Pool

Verfasst: Freitag 3. November 2017, 15:59
von narpfel
Weil der Hinweis noch nicht gegeben wurde: `numpy` ist nur dann schnell, wenn möglichst viel der Berechnung im `numpy`-Code stattfindet und explizite Schleifen im Python-Code vermieden werden. Es kann also sein, dass deine `decompress_part2`-Funktion noch Optimierungspotential hat.

Außerdem: `numba` kann `numpy`-Code auch beschleunigen.