Seite 1 von 1

Threading: Fehlerhafte Rückgabe, wie den Fehler suchen

Verfasst: Donnerstag 2. Juli 2009, 11:07
von Llan
Hallo,

ich habe Probleme mit einer Anwendung, in der ich Threads verwende um AUfgaben verteilt berechnen zu lassen. Die verteilte Berechnung funktioniert so, dass ich einen Thread abspalte der dann durch mpiexec auf anderen Prozessoren rechnet. Nach Ende der Berechnungen sollen durch ein join die Threads vereint und die Ergebnisliste gebaut werden.
Ich gebe kurz an wie das Prinzip des Threads aussieht:

Code: Alles auswählen

class filterThread (threading.Thread):
    '''single thread that runs particles on one processor'''
    
    def __init__(self, aParticles, fEndTime):
        threading.Thread.__init__(self)
        self.aParticles = aParticles
        
    def run(self):
        for p in self.aParticles:
                p.observe()
                print p.weight
Das ist das wesentliche: Er führt für eine Anzahl von "Partikeln" eine Funktion aus, die irgendetwas berechnet und daraus ein "Gewicht" (einfach einen Zahlenwert) je Partikel berechnet und im Partikel speichert.

Die Funktion zum Verteilen der Threads sieht ungefähr so aus:

Code: Alles auswählen

  #(...) 
            threads = []
            for key in partPartition.keys():
                thread = filterThread(partPartition[key], t)
                threads.append(thread)

           for np in range(iProc):
                 threads[np].start()
           for np in range(iProc):
                 threads[np].join()
                 for p in threads[np].aParticles:
                      print p.weight
Wieder auf das wesentliche vereinfacht.
Ich teile also die Gesamtanzahl Partikel auf Listen auf, die den Threads gegeben werden und dann rechnet jeder für sich an seinem Satz Partikel herum.
Der Thread funktioniert auch, also das print p.weight spuckt Zahlen aus. Jetzt kommt es aber bei dem zweiten print in der obigen Funktion manchmal vor, dass für ein Partikel plötzlich weight 0 ausgegen wird, und die Zahlen für die folgenden Partikel viel zu hoch ist, und ganz anders als die im Thread berechnete Zahl.

Ich weiß nicht, wo das herkommt oder wie ich überhaupt anfangen kann, den Fehler zu suchen, habt ihr dazu einen Tipp?

Verfasst: Donnerstag 2. Juli 2009, 11:35
von Dauerbaustelle
Ohne Code, wie denn `weight` berechnet wird, nicht.

Verfasst: Donnerstag 2. Juli 2009, 11:39
von Llan
Wie weight berechnet wird ist irrelevant. Es ist einfach ein float, und im Thread-print wird es korrekt ausgegeben.

Verfasst: Donnerstag 2. Juli 2009, 11:40
von ms4py
Deine Iteration ist schonmal sehr verwirrend.

Code: Alles auswählen

for np in range(iProc): 
                 threads[np].start() 
Warum nicht:

Code: Alles auswählen

for thread in threads: 
                 thread.start() 
Vermutlich ist das Problem nur, dass die prints in den einzelnen Threads nicht synchron sind, sprich die Ausgabe von 2 Threads kommen gleichzeitig und vermischen sich.
Versuch die Prints mal mit einem Lock abzusichern, dann müssten die Ergebnisse übereinstimmen.

Verfasst: Donnerstag 2. Juli 2009, 11:49
von Llan
Aber auch wenn ich zuerst alle Threads joine, dann nochmal durchgehe und alles ausgebe (ich verwende sowieso kein print, sondern ein logfile), dann sind die Werte die dann in den Partikeln drinstehe einfach falsch, auch für die weiteren Berechnungen. Kann es trotzdem daher kommen?

Verfasst: Donnerstag 2. Juli 2009, 11:58
von Rebecca
Das kommt drauf an, was genau deine Daten sind oder wo sie her kommen, wenn naemlich mehrere Threads gleichzeitig damit arbeiten wollen, kann es zu Problemen kommen. Teilst du irgendwelche Objekte oder Ressourcen mit allen Threads?

Verfasst: Donnerstag 2. Juli 2009, 12:03
von jerch
@Llan:
Ist Dein logging threadsafe? Ansonsten hättest Du wohl das selbe Problem wie mit dem print.

Ich kenne mich mit dem Python-mpi-Aufsatz nicht aus, folgendes fällt mir aber in Deinem Code auf:
Steht iProc irgendwie in Beziehung zur Länge von partPartition? Wenn nicht, würden evtl. initialisierte Threads garnicht abgearbeitet oder threads einen IndexError werfen, falls iProc größer sein sollte.
Und wenn parPartition sehr groß werden kann im Verhältnis zu den CPUs, solltest Du vllt. über ein Refactoring nachdenken und Threads wiederverwenden (evtl. mittels Queues), da die Threaderzeigung nicht gerade billig ist.

Verfasst: Donnerstag 2. Juli 2009, 12:04
von Llan
Hmm, also eigentlich nicht, bei der Berechnung jedenfalls agiert jeder unabhängig. Beim Aufsammeln nach dem join mache ich auch Kopien der Partikel und speichere die zurück.
Aber ich mache keine Kopien bevor ich auf die Threads aufteile. Das heißt, der Thread arbeitet nur per Referenz mit den Partikeln der Originalliste (aber kein Partikel wird je von mehr als einem Thread bearbeitet), und vielleicht gibt es dann Probleme beim join - vielleicht versuche ich mal, Kopien aus der Originalliste zu machen und diese den Threads zu geben?

Verfasst: Donnerstag 2. Juli 2009, 12:11
von Llan
jerch hat geschrieben:@Llan:
Ist Dein logging threadsafe? Ansonsten hättest Du wohl das selbe Problem wie mit dem print.
Die Hilfe sagt "The logging module is intended to be thread-safe without any special work needing to be done by its clients. "
jerch hat geschrieben: Steht iProc irgendwie in Beziehung zur Länge von partPartition? Wenn nicht, würden evtl. initialisierte Threads garnicht abgearbeitet oder threads einen IndexError werfen, falls iProc größer sein sollte.
Sorry, ja mein Code ist da nicht sehr aussagekräftig.

So ist es wohl klarer:

Code: Alles auswählen

            threads = []
            for key in partPartition.keys(): #Aufteilung der Partikelliste in Teillisten für n Prozessoren
                thread = filterThread(partPartition[key], t)
                threads.append(thread)

           for thread in threads:
                 thread.start()
           for thread in threads:
                 thread.join()
                 for p in thread.aParticles:
                      print p.weight 

partPartition ist also ein dictionary von Listen, die jeweils eine ANzahl an Partikeln enthalten.
jerch hat geschrieben: Und wenn parPartition sehr groß werden kann im Verhältnis zu den CPUs, solltest Du vllt. über ein Refactoring nachdenken und Threads wiederverwenden (evtl. mittels Queues), da die Threaderzeigung nicht gerade billig ist.
Naja, die Aufgaben die ein Thread verwaltet laufen jeweils auf einem eigenen Prozessor, und laufen ca. 1-2 Stunden. Die Threadverwaltung macht ein eigener Prozessor, also finde ich die Threaderzeugung billig im Vergleich.


Danke schonmal für die schnelle Hilfe, hab schon ein paar Ideen bekommen jetzt.

Verfasst: Donnerstag 2. Juli 2009, 12:14
von Rebecca
Mit dem MPI-Kram kenne ich mich auch nicht aus, aber allgemein ist das hier ganz nett: http://effbot.org/zone/thread-synchronization.htm. Das logging-Modul ist imo thread-safe.

Verfasst: Donnerstag 2. Juli 2009, 12:54
von BlackJack
@Llan: Aus dem Erzeugen der Threads könnte man noch eine "list comprehension" machen.

Wenn innerhalb von `run()` das `weight`-Attribut noch richtig ist, könntest Du das an der Stelle ja mal durch ein Property ersetzen, dass bei dem Versuch einen neuen Wert zu setzen eine Ausnahme auslöst. Dann bekommst Du einen Traceback zu der Stelle im Code, die versucht das richtige Ergebnis zu überschreiben.

Nur um sicher zu gehen: Wenn Du sagst, Du machst Kopien, wie sieht das genau aus?

Verfasst: Donnerstag 2. Juli 2009, 13:54
von Llan
@Blackjack Danky für den Tipp, das kannte ich bisher nicht und werde es ansehen.

Copy erzeuge ich mit dem copy- Modul (also copy.copy(p)...), weil ich später nämlich manche Partikel verdoppeln muss, per Referenz würde er ja dann zweimal das gleiche berechnen (bzw. an einem weiterrechnen).

Verfasst: Donnerstag 2. Juli 2009, 14:17
von BlackJack
Llan: Partikel lassen sich damit auch "komplett" kopieren? Die enthalten nichts "tiefes", was von `copy.copy()` nicht erfasst wird?

Code: Alles auswählen

In [12]: class A(object):
   ....:     def __init__(self):
   ....:         self.a = range(5)
   ....:

In [13]: import copy

In [14]: a = A()

In [15]: b = copy.copy(a)

In [16]: a.a[1] = 42

In [17]: a.a
Out[17]: [0, 42, 2, 3, 4]

In [18]: b.a
Out[18]: [0, 42, 2, 3, 4]

Verfasst: Donnerstag 2. Juli 2009, 14:34
von Llan
Hmm ich kann natürlich ein deepcopy verwenden, aber da ja der Filter normalerweise funktioniert, eben nur manchmal und nur einzelne Threads mal Unfug zurückkommt, denke ich dass es soweit funktioniert.

/edit: Achso es funktioniert bislang NUR weil ich vorher die alte Liste explizit lösche. Ich muss auf jeden Fall deepcopy verwenden!