Anzahl von gleichz. Threads begrenzen für Anfänger (Queue)

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
Aureliusus
User
Beiträge: 9
Registriert: Mittwoch 5. April 2006, 10:23

Ich hab nun schon einige Beispiele und Erklärungen gelesen, so recht will der Groschen jedoch noch nicht fallen. :cry:

Mein Ziel: Ich will URL aus einer DB holen und der Reihe nach abarbeiten. Da eine Verbindung zur gleichen Zeit verschwendung wäre, da ja massig Zeit aufgrund der Requests, Daten übertragen usw. drauf geht muss also mehree Threads her. Ich bin wahrscheinlich auch nicht der erste mit dieser Problemstellung.

Bevor ich mich dadran wagen wollte hab ich einige Experimente gemacht und schon da kann ich mir einiges nicht erklären.

Code: Alles auswählen

import Queue
import time
import random
import thread

def worker(name):
	wzeit = random.randint(1,300)/100.0
	time.sleep(wzeit)
	print name,'wartezeit:'+str(wzeit)


for i in ['Hans','Dieter','Klaus','Bert','Lisa','Paul']: #Lisa ist nur dabei, damit die Frauenquote stimmt o.0
	thread.start_new_thread(worker,(i,))
time.sleep(5) 
Ok, Threads mit dem einfachen thread-Modul starten klappt schonmal. Nur wie kann ich es umgehen, dass der Hauptprozess beendet wird, bevor die Threads zuende gearbeitet haben? Der time.sleep(5) Notbehelf ist nun nicht gerade der Hit.

Auf in die Nächste Runde, diesmal sollen nur 2 Threads gleichzeitig laufen.

Code: Alles auswählen

import Queue
import time
import random
import thread
import threading

#Wir legen das "Wartezimmer" mit 2 Sitzplätzen an
schlange = Queue.Queue(2)
#Wir legen fest, was der Thread überhaupt machen soll
def worker(name):
        #soleneg noch was im "Wartezimmer" ist soll ein Thread gestartet werden und seine Arbeit machen
	if schlange.qsize()!=0:
		thread.start_new_thread(worker,(schlange.get(),))
	wzeit = random.randint(1,300)/100.0
	time.sleep(wzeit)
	print name,'wartezeit:'+str(wzeit),'Elemente in schlange:',schlange.qsize()

#Das "Wartezimmer" wird gefüllt und der Rest muss draußen bleiben, auch die Privatversicherten
for i in ['Hans','Dieter','Klaus','Bert','Lisa','Paul']:
        #Wir packen Elemente ins Wartezimmer, der Rest wird, wenn ich die
        #http://docs.python.org/lib/QueueObjects.html
        #richtig verstehe, geblockt (bleibt vor der Tür) bis wieder Platz ist.
	schlange.put(i)
thread.start_new_thread(worker,(schlange.get(),))
time.sleep(5) 
Das Script läut sich tot, es gibt nichts aus, der Prozess läuft einfach und nichts passiert, bis ich ihn irgendwann manuell beende. Auch die Nummer mit der Queue auf 2 begrenzen ist im Nachhinein falsch, da ich ja, sobald ich ein Element aus dem Wartezimmer nehme, ein neues nachrutscht. Das Ende vom Lied: Alle Threads laufen wieder auf einmal.

Kann mir da jemand ein wenig helfen, soll ich womöglich lieber gelich mit Klassen und dem "großen" Threads Modul arbeiten? Wär nett, wenn mir jemand einen simplen Beispieltcode schreiben könnte, muss ja nichts grosses sein. Bin für jede Hilfe dankbar.
Benutzeravatar
Rebecca
User
Beiträge: 1662
Registriert: Freitag 3. Februar 2006, 12:28
Wohnort: DN, Heimat: HB
Kontaktdaten:

Soweit ich weiss, musst du das threading-Modul benutzen, wenn du auf die Beendigung eines Thread warten willst. Es hat ausserdem einen Thread-Counter:

Code: Alles auswählen

import time
import random
import threading;

def worker(name):
    wzeit = random.randint(1,300)/100.0
    time.sleep(wzeit)
    print name,'wartezeit:'+str(wzeit)

all_threads = []

for i in ['Hans','Dieter','Klaus','Bert','Lisa','Paul']:
    the_thread = threading.Thread(target=worker, name=i, args=(i,))
    the_thread.start()
    print "%i-ter Thread gestarted" % threading.activeCount()

    all_threads.append(the_thread)
    
#Auf alle Thread warten:
for t in all_threads:
    t.join()
Benutzeravatar
gerold
Python-Forum Veteran
Beiträge: 5555
Registriert: Samstag 28. Februar 2004, 22:04
Wohnort: Oberhofen im Inntal (Tirol)
Kontaktdaten:

Hi Aureliusus!

Hier ein Beispiel, das immer nur drei gleichzeitig laufende Threads mit dem Inhalt einer Queue arbeiten lässt.

Code: Alles auswählen

#!/usr/bin/env python
# -*- coding: iso-8859-1 -*-
"""
Drei gleichzeitig arbeitende Threads.

Alle drei Threads bekommen Anweisungen (in diesem Beispiel nur einzelne 
Buchstaben) von einer Queue.
"""

import Queue
import threading
import time


class Worker(threading.Thread):
    """
    Arbeitstier
    """
    
    def __init__(self, queue, breaker):
        """
        Übernimmt die Queue und initialisiert den Thread
        """
        
        threading.Thread.__init__(self)
        
        self.queue = queue
        assert(isinstance(self.queue, Queue.Queue))
        
        self.breaker = breaker
    
    
    def run(self):
        """
        Wird bei "Worker.Start" aufgerufen.
        Wird gestoppt, wenn die Queue leer ist.
        """
        
        while True:
            
            if self.queue.empty():
                # Die Schleife und damit auch den Thread beenden
                break
            
            # Wert aus der Queue holen
            new_value = self.queue.get()
            
            # Arbeiten (natürlich nur simuliert)
            time.sleep(4)
            print new_value,
        
        # Ausschalter setzen
        self.breaker.set()


def main():
    """
    Hauptprozedur
    """
    
    # Queue erstellen
    queue = Queue.Queue()
    
    # Queue füllen
    t = ("a", "b", "c", "d", "e", "f", "g", "h", "i", "j")
    for item in t:
        queue.put(item)

    # Ausschalter
    thread1_ausschalter = threading.Event()
    thread2_ausschalter = threading.Event()
    thread3_ausschalter = threading.Event()
    
    # Threads erstellen und starten
    thread1 = Worker(queue, thread1_ausschalter)
    thread2 = Worker(queue, thread2_ausschalter)
    thread3 = Worker(queue, thread3_ausschalter)
    
    thread1.start()
    thread2.start()
    thread3.start()
    
    # Hauptprozedur so lange hier warten lassen, bis alle
    # Ausschalter gesetzt wurden. Es geht also erst weiter, wenn
    # alle Ausschalter von den Threads (symbolisch) umgelegt wurden.
    thread1_ausschalter.wait()
    thread2_ausschalter.wait()
    thread3_ausschalter.wait()
    
    # Fertig
    print "Fertig"
    return
    

if __name__ == "__main__":
    main()
mfg
Gerold
:-)

Edit by Gerold: Ein Docstring hat etwas falsch erklärt
Zuletzt geändert von gerold am Dienstag 4. Juli 2006, 19:06, insgesamt 1-mal geändert.
http://halvar.at | Kleiner Bascom AVR Kurs
Wissen hat eine wunderbare Eigenschaft: Es verdoppelt sich, wenn man es teilt.
Aureliusus
User
Beiträge: 9
Registriert: Mittwoch 5. April 2006, 10:23

Tausend Dank für die ausführlichen Antworten. Ich werd mich da mal durcharbeiten aber in den Scripts scheint echt alles drin zu sein, was man als Anfänger jemals im Zusammenhang mit Threads brauchen könnte :D

vielen Dank nochmal :)
Mad-Marty
User
Beiträge: 317
Registriert: Mittwoch 18. Januar 2006, 19:46

Den hauptthread lässt du am einfachsten warten indem du

Code: Alles auswählen

for mythread in allthreads:
    mythread.join()
das vorm hauptthread ausführen lässt.

join() ist blockend - damit ist das main erst fertig wenn alle threads joined wurden.
Benutzeravatar
gerold
Python-Forum Veteran
Beiträge: 5555
Registriert: Samstag 28. Februar 2004, 22:04
Wohnort: Oberhofen im Inntal (Tirol)
Kontaktdaten:

Mad-Marty hat geschrieben:join() ist blockend
Hi!

Ich habe mein Beispiel jetzt so umgeschrieben, dass es join() verwendet. Allerdings musste ich von queue.get() auf queue.get_nowait() umstellen, da ich drauf gekommen bin, dass sich die Threads zu ungünstigen Zeitpunkten die Einträge der Queue weg schnappen können.

Code: Alles auswählen

#!/usr/bin/env python
# -*- coding: iso-8859-1 -*-
"""
Drei (siehe Konstante THREAD_COUNT) gleichzeitig arbeitende Threads.

Alle Threads bekommen Anweisungen (in diesem Beispiel nur einzelne
Buchstaben) von einer Queue.

Da ich festgestellt habe, dass sich in gewissen Situationen die Threads
gegenseitig überholen können und dadurch die Threads in einen dauerhaften
Lock laufen, habe ich beim Auslesen der Queue von get() auf get_nowait()
umgestellt.

Warum die Änderung? Wenn queue.empty() False zurück liefert, dann war die
nächste Anweisung queue.get() um eine Wert aus der Queue abzurufen. 
Allerdings kann es vorkommen, dass beim Prüfen, ob die Queue leer ist, 
noch ein Eintrag in der Queue drinnen ist, aber beim Abrufen des 
nächsten Eintrages (mit queue.get()) keiner mehr drinnen ist, da
ein anderer Thread sich den letzten Eintrag vorher geschnappt hat. Ist das
der Fall (und das ist mehrmals in meinen Tests aufgetreten), dann steht der
Thread bei der queue.get()-Anweisung und kommt nicht mehr weiter, da es nichts
mehr auszulesen gibt.

"""

import Queue
import threading
import time

THREAD_COUNT = 3


class Worker(threading.Thread):
    """
    Arbeitstier
    """
   
    def __init__(self, queue):
        """
        Übernimmt die Queue und initialisiert den Thread
        """
       
        threading.Thread.__init__(self)
       
        self.queue = queue
        assert(isinstance(self.queue, Queue.Queue))
   
   
    def run(self):
        """
        Wird bei "Worker.Start" aufgerufen.
        Wird gestoppt, wenn die Queue leer ist.
        """
       
        while True:
           
            # Wert aus der Queue holen. Gibt es nichts mehr zu holen, dann
            # wird die Schleife abgebrochen.
            try:
                new_value = self.queue.get_nowait()
            except Queue.Empty:
                break
           
            # Arbeiten (natürlich nur simuliert)
            time.sleep(3)
            print new_value,


def main():
    """
    Hauptprozedur
    """
   
    # Queue erstellen
    queue = Queue.Queue()
   
    # Queue füllen
    t = ("a", "b", "c", "d", "e", "f", "g", "h", "i", "j")
    for item in t:
        queue.put(item)

    # Threadliste
    threads = []
    
    # Threads erstellen und starten
    for i in range(THREAD_COUNT):
        threads.append(Worker(queue))
        threads[-1].start()
    
    # Hauptprozedur so lange hier warten lassen, bis alle Threads
    # fertig sind.
    for thread in threads:
        thread.join()
   
    # Fertig
    print "Fertig"
    return
   

if __name__ == "__main__":
    main()
mfg
Gerold
:-)
http://halvar.at | Kleiner Bascom AVR Kurs
Wissen hat eine wunderbare Eigenschaft: Es verdoppelt sich, wenn man es teilt.
Aureliusus
User
Beiträge: 9
Registriert: Mittwoch 5. April 2006, 10:23

Danke für das Update, ich finde die zweite Version auch ein wenig übersichtlicher, da der breaker nicht mehr dabei ist. :)
Antworten