Multiprocessing / Threading auf gemeinsamer Datenbasis - Welche Methode?

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
telefonnummer
User
Beiträge: 7
Registriert: Montag 30. Januar 2017, 18:38

Guten Tag,

ich überlege gerade, wie ich mein Programm am besten mit Python parallelisiere. Ich habe ein 2D Array in dem alle Daten gespeichert werden, im Prinzip könnte sich jeder Thread / Prozess ein Stück nehmen und es befüllen, das Problem ist nur, dass nicht alle Threads am selben Array arbeiten können, oder? D.h. ich müsste die einzelnen Arrays der Threads nach der Bearbeitung wieder in das gesamte Daten-Array überführen.
Das Problem ist nur: Ich hab mir mühe gegeben, das Projekt in Objekte zu kapseln, ich hab nun eine Klasse "Renderer" die das gesamte Rendering inklusive dem 2D-Array als Datenbasis kapselt, dieser Renderer enthält eine Referenz auf ein QWidget, dass die Daten im 2D Array anzeigt (es sind Farbwerte für jeden Pixel). Angenommen ich würde einen neuen Thread erstellen, dann wüsste ich erstmal nicht, wie ihm die gesamte Funktionalität des Renderer-Objekts zur Verfügung stelle, da es nur ein Renderer-Objekt gibt, können dann auch wieder nicht 4 Thread-Objekte am selben Renderer-Objekt arbeiten, oder?
Mit Multiprocessing geht das sowieso nicht, da Klassenmethoden nicht "gepickled" werden können.

Mein Ansatz mit Threads sieht aktuell so aus:

Code: Alles auswählen

    def calculate(self):
        thrds = []
        for x in range(0,4):
            startWidth = int(x*self.width / 4)
            wLength = int(self.width / 4)
            startHeight = int(x*self.height / 4)
            hLength = int(self.height / 4)
            t = threading.Thread(target=self.calculateInRange,args=(startWidth,wLength,startHeight,hLength))
            t.start()
            thrds.append(t)
        for t in thrds:
            t.join()
startWidth ist die Start-X Koordinate und startHeight die Start-Y-Koordinate des 2D Bildes. Die an den Thread übergebene Methode sieht so aus:

Code: Alles auswählen

    def calculateInRange(self,startW,wLength,startH,hLength):
        i = startW * self.height + startH;
        for w in range(startW,wLength):
            for h in range(startH,hLength):
                color = self.computeSample(w,h)
                self.buffer[i * 3 + 0] += color.r
                self.buffer[i * 3 + 1] += color.g
                self.buffer[i * 3 + 2] += color.b
                i += 1
Hier sieht man, dass das 2D Daten-Array für alle Threads benutzt wird. Wenn ich in dieser Methode debugge, läuft mein Thread ca. 5 mal in diese Methode:

Bild

Nachdem der Thread aus dieser Methode "run" zurück zu "calculateInRange" kehrt, sind auf einmal alle Variablen (also startW, wLength,startH,hLength,w und h) auf '0' gesetzt, keine Ahnung wieso und wie ich das ändern kann. :K
Sirius3
User
Beiträge: 17710
Registriert: Sonntag 21. Oktober 2012, 17:20

@telefonnummer: wenn Du Ganzzahl-Division willst, nimm den Ganzzahldivisionsoperator //. Willst Du wirklich nur vier Rechtecke auf der Diagonalen berechnen? So, wie Du range benutzt, funktioniert es nicht. Die ganze Mühe, die Du Dir da mit Parallelisierung gemacht hast, hat wahrscheinlich nur den Effekt, dass das Programm komplizierter wird und langsamer läuft. Aber ohne diese Render-Klasse oder computeSample zu kennen, kann man da nicht viel dazu sagen.
telefonnummer
User
Beiträge: 7
Registriert: Montag 30. Januar 2017, 18:38

wenn Du Ganzzahl-Division willst, nimm den Ganzzahldivisionsoperator //
Danke für den Tipp!
Willst Du wirklich nur vier Rechtecke auf der Diagonalen berechnen? So, wie Du range benutzt, funktioniert es nicht.
Das hab ich auch gerade gemerkt, hab es nun so versucht:

Code: Alles auswählen

        thrds = []
        t1 = threading.Thread(target=self.calculateInRange, args=(0,64,0,128))
        t2 = threading.Thread(target=self.calculateInRange, args=(64,128,0,128))
        #thrds.append(t1)
        thrds.append(t2)
        #t1.start()
        t2.start()
        for t in thrds:
            t.join()
        self.samples += 1
t1 berechnet die linke Hälfte des Bildes, t2 die rechte. Allerdings macht es keinen Unterschied (weder von der Prozessorlast noch von der Geschwindigkeit), ob ich die beiden Threads aktiviere oder ob ich nur einen über das gesamte Bild laufen lasse, warum ist das so? Wenn nur t1 oder nur t2 läuft, ist es natürlich spürbar schneller, da nur die Hälfte des Bildes berechnet wird und die andere Hälfte schwarz bleibt (so wies sein soll).
Aber ohne diese Render-Klasse oder computeSample zu kennen, kann man da nicht viel dazu sagen.
Da passiert nicht viel, in der computeSample Methode wird einfach nur der Farbwert für den aktuellen Pixel berechnet, unabhängig vom 2D-Array. Das 2D-Array ist nur dafür da, um die Pixelinformationen zu speichern und mit dem QWidget auszugeben. Der Renderer besteht nur aus dem 2D-Array, computeSample um die Farbwerte zu berechnen und einer Referenz auf ein QWidget.
Die ganze Mühe, die Du Dir da mit Parallelisierung gemacht hast, hat wahrscheinlich nur den Effekt, dass das Programm komplizierter wird und langsamer läuft.
Warum? :(
BlackJack

@telefonnummer: Als erstes müsstest Du Dir klar machen das bei CPython Python-Bytecode in Threads sowieso nicht parallel läuft. Wenn Du mit Threads also reinen Python-Code zum Beschleunigen parallelisieren willst, dann geht das nicht oder Du musst eine Python-Implementierung nehmen bei der der Python-Code parallel ausgeführt werden kann. Wenn Du in CPython mehr als einen Prozessor(kern) gleichzeitig mit Python-Bytecode beschäftigen möchtest, dann musst Du `multiprocessing` verwenden.
telefonnummer
User
Beiträge: 7
Registriert: Montag 30. Januar 2017, 18:38

Wenn Du in CPython mehr als einen Prozessor(kern) gleichzeitig mit Python-Bytecode beschäftigen möchtest, dann musst Du `multiprocessing` verwenden.
Aber multiprocessing unterstützt keine Klassenmethoden :(
BlackJack

@telefonnummer: Klassenmethoden sollten eventuell sogar gehen, aber Du meinst sicher Instanzmethoden.

Dann musst Du es halt so umschreiben das keine gebundenen Methoden übergeben werden.
telefonnummer
User
Beiträge: 7
Registriert: Montag 30. Januar 2017, 18:38

Ich hab die Renderer-Klasse entfernt und alle Methoden einzeln, jetzt kommt keine Fehlermeldung mehr, allerdings bleibt das Bild schwarz...

Code: Alles auswählen

if __name__ == '__main__':
    mp.freeze_support()
    for i in range(0,100): #calculate 10 samples
        calculate()
        updateScreen()
        qApp.processEvents()

    sys.exit(qApp.exec_())
Muss ich mit dem 2D Daten-Array noch irgendwas machen, oder regelt das Python von selber?

Code: Alles auswählen

    prcs = []
    pro1 = mp.Process(target=calculateInRange,args=(0,64,0,128))
    pro2 = mp.Process(target=calculateInRange,args=(64,128,0,128))
    prcs.append(pro1)
    prcs.append(pro2)
    pro1.start()
    pro2.start()
    for p in prcs:
        p.join()
BlackJack

@telefonnummer: Der Hauptteil sieht so aus als wenn Du die Kontrolle nicht an Qt abgeben möchtest, das ist nicht die Art wie man GUI-Rahmenwerke üblicherweise benutzt.

Und natürlich muss man da noch was machen, denn was andere Prozess mit ihren eigenen Daten so anstellen hat keine Auswirkung auf den Prozess der die gestartet hat. An der Stelle würde man auch eher nicht die `Process`-Klasse direkt verwenden sondern sich einen `Pool` erstellen und dann eine der `map*()`-Varianten darauf aufrufen. Dann bekommt man auch gleich die Ergebnisse von den anderen Prozessen wieder zurückkommuniziert.
telefonnummer
User
Beiträge: 7
Registriert: Montag 30. Januar 2017, 18:38

Der Hauptteil sieht so aus als wenn Du die Kontrolle nicht an Qt abgeben möchtest, das ist nicht die Art wie man GUI-Rahmenwerke üblicherweise benutzt.
Ich verstehe nicht ganz, ich benutze PyQt um Daten grafisch darzustellen, für was sollte ich es sonst nutzen? Es geht mir nicht darum, das Anzeigen der Daten zu parallelisieren, sondern das Berechnen, und das findet ohne PyQt statt.
An der Stelle würde man auch eher nicht die `Process`-Klasse direkt verwenden sondern sich einen `Pool` erstellen und dann eine der `map*()`-Varianten darauf aufrufen. Dann bekommt man auch gleich die Ergebnisse von den anderen Prozessen wieder zurückkommuniziert.
Die map-Funktion erwartet eine Art Iteration, ich bin mir nicht sicher, wie ich das mit meiner Funktion anstellen soll? Was genau ist der Unterschied zwischen `map` und `Processes`? Warum führt meine aktuelle Variante zu einem schwarzen Bild (d.h. das 2D-DatenArray ist leer)?

Um den Pool benutzen zu können, müsste ich also den Buffer von verschiedenen Prozessen berechnen lassen und dann am Ende zu einem 2D-Array zusammenfügen?

Eine Anmerkung: Ich hatte gehofft, dass das schwarze Bild verhindert werden kann, wenn ich den Buffer (also das 2D-Array) direkt als Parameter an die Funktionen übergebe, allerdings scheint das Python nicht wirklich zu kümmern, die berechneten Ergebnisse stimmen zwar, allerdings verschwinden Sie im Nirwana sobald ich versuche, sie im Buffer zu speichern, daher enthält der Buffer nur Nullen:

Code: Alles auswählen

if __name__ == '__main__':
    mp.freeze_support()
    buffer = [0] * (width * height * 3 + 1)
    qApp = QApplication(sys.argv)
    window = Window()
    window.setWidth(width)
    window.setHeight(height)
    for i in range(0,100): #calculate 100 samples
        calculate(buffer)
        updateScreen(buffer)
        qApp.processEvents()

    sys.exit(qApp.exec_())
Edit: Okay, so klappts nun:

Code: Alles auswählen

def calculateInRange(params):
    startW = params[0]
    wLength = params[1]
    startH = params[2]
    hLength = params[3]
    buffer = params[4]
    i = startW * height + startH;
    for w in range(startW,wLength):
        for h in range(startH,hLength):
            color = computeSample(w,h)
            buffer[i * 3 + 0] += color.r
            buffer[i * 3 + 1] += color.g
            buffer[i * 3 + 2] += color.b
            i += 1
def calculate(buffer):
    global samples
    param1 = [0,64,0,128,buffer]
    param2 = [64,128,0,128,buffer]
    params = [param1,param2]
    pool = mp.Pool(processes=2)
    pool.map(calculateInRange,params)
Allerdings ist das Bild nach wie vor schwarz, wohin verschwinden die Berechneten werte? Im Buffer gespeichert werden sie auf jeden Fall nicht...
BlackJack

@telefonnummer: Es geht nicht darum für was Du Qt benutzt sondern *wie*. Die normale vorgehensweise ist es am Ende der Vorbereitungen die `exec_()`-Methode aufzurufen und nicht zu versuchen selbst die volle Kontrolle über den Programmfluss zu behalten. Das kann zu Problemen führen wenn man die Ereignisschleife nicht oft/regelmässig genug antreibt und skaliert auch nicht, weil man diese eigene Schleife selbst ja nur an einer Stelle im Programm haben kann.

Der Unterschied zwischen `map*()` und `Process` ist der das Du Dich bei `map*()` nicht um die `Process`-Exemplare kümmern musst und das Du vor allem auch das Ergebnis der Berechnung zurück bekommst ohne Dich das selbst drum kümmern zu müssen.

Prozesse haben einen eigenen Adressraum und Speicher. Du übergibst da nicht *das* Array sondern eine Kopie davon. Die wird in dem Prozess gefüllt, und wenn der Prozess dann am Ende angekommen ist, wird er beendet und der Speicher wird freigegeben. Wenn Du die Daten nicht aktiv zum aufrufenden Prozess zurück kommunizierst, gehen sie damit verloren. Wenn es wirklich nur um das füllen eines Arrays geht und dessen Werte am Anfang 0 oder egal sind, dann solltest Du auch keine Kopien dieser unwichtigen Daten übergeben, sondern einfach nur die Arraygrösse. Das Teilarray kann dann in dem Prozess erstellt und befüllt werden.

Was Du da mit den Argumente mit dem `zip()` anstellst ist mir nicht so wirklich klar‽
telefonnummer
User
Beiträge: 7
Registriert: Montag 30. Januar 2017, 18:38

Prozesse haben einen eigenen Adressraum und Speicher. Du übergibst da nicht *das* Array sondern eine Kopie davon. Die wird in dem Prozess gefüllt, und wenn der Prozess dann am Ende angekommen ist, wird er beendet und der Speicher wird freigegeben. Wenn Du die Daten nicht aktiv zum aufrufenden Prozess zurück kommunizierst, gehen sie damit verloren. Wenn es wirklich nur um das füllen eines Arrays geht und dessen Werte am Anfang 0 oder egal sind, dann solltest Du auch keine Kopien dieser unwichtigen Daten übergeben, sondern einfach nur die Arraygrösse. Das Teilarray kann dann in dem Prozess erstellt und befüllt werden
Was zu Beginn im Array drinnen ist, ist egal, allerdings summieren sich die Werte über die Zeit. Die Werte werden zwar unabhängig voneinander berechnet, aber aufsummiert, darum auch das `+=`, d.h. wenn sich das Array bei jedem Versuch resetten würde, wäre das relativ nutzlos.

Ich könnte aber den aktuellen Stand in einem "globalen" Array vorhalten und nach jedem Update der 4 Prozesse die neuen Werte aufaddieren. Ich habs mal so probiert:

Code: Alles auswählen

def calculateInRange(params):
    startW = params[0]
    endW = params[1]
    startH = params[2]
    endH = params[3]
    buffer = [0] * (endW - startW) * (endH - startH) * 3 + 1
    i = 0
    for w in range(startW,endW):
        for h in range(startH,endH):
            color = computeSample(w,h)
            buffer[i * 3 + 0] += color.r
            buffer[i * 3 + 1] += color.g
            buffer[i * 3 + 2] += color.b
            i += 1
    return buffer
def calculate(buffer):
    global samples
    param1 = [0,32,0,128,buffer]
    param2 = [32,64,0,128,buffer]
    param3 = [64,96,0,128,buffer]
    param4 = [96,128,0,128,buffer]
    params = [param1,param2,param3,param4]
    pool = mp.Pool(processes=4)
    result = pool.map(calculateInRange,params)

Jeder Prozess gibt also ein Array mit den berechneten Werten zurück, allerdings erhalte ich so die Fehlermeldung bei der letzten Zeile:
TypeError: can only concatenate list (not "int") to list
Sirius3
User
Beiträge: 17710
Registriert: Sonntag 21. Oktober 2012, 17:20

@telefonnummer: es wäre noch ganz gut zu erfahren, wo denn der Fehler auftritt. Dann übergibst Du immer noch buffer an calculateInRange obwohl Du den dann gar nicht benutzt.
Antworten