threading - Liste mit Funktionen abarbeiten - jeweils nur 2

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Dami123
User
Beiträge: 225
Registriert: Samstag 23. Februar 2013, 13:01

Freitag 31. Mai 2013, 14:07

Könnte mir einer erklären, wie ich aus einer Liste mit Funktionen alle stufenweise durchlaufen lass, aber immer nur 2 oder variabel viele gleichzeitig aktiv sind?
Benutze das "threading"-Modul.

Bsp.:

Code: Alles auswählen


class myThread (threading.Thread):
    def __init__(self, name, parameter):
        threading.Thread.__init__(self)
        self.name = name
        self.para = parameter
    def run(self):
        print "Starting " + self.name
        funktion(self.para)
        print "Exiting " + self.name

Liste = ["element", "element2", ...]
for i in Liste:
    myThread("Thread_"+i, i).start()
Da es sich um die Elemente um Webseitenlinks handelt, die gescannt werden, muss ich die max. thread Anzahl stark reduzieren, weil es sich auf die Seite und mein Internet auswirkt und nach ner Weile "URLError" bekomme.
BlackJack

Freitag 31. Mai 2013, 14:20

@Dami123: Ich würde an Deiner Stelle zwei Threads starten, die sich die Daten aus einer Queue holen und abarbeiten. Hat den netten Nebeneffekt, dass man nicht dauernd Threads startet und beendet, was ja auch einen gewissen Mehraufwand bedeutet.
droptix
User
Beiträge: 521
Registriert: Donnerstag 13. Oktober 2005, 21:27

Freitag 31. Mai 2013, 19:48

Hier ein Beispiel, das ich kürzlich erstellt habe, als ich mich mal wieder mit diesem Thema beschäftigt habe:

http://www.python-forum.de/pastebin.php?mode=view&s=351

1. Fülle die `inQ` mit deiner Liste von Funktionen.
2. Überschreibe die Funktion `work`: die nimmt eine Funktion aus der `inQ` und führt diese aus.
Benutzeravatar
darktrym
User
Beiträge: 723
Registriert: Freitag 24. April 2009, 09:26

Freitag 31. Mai 2013, 22:29

Die Idee könnte man übernehmen, den Code würde ich überarbeiten.
„gcc finds bugs in Linux, NetBSD finds bugs in gcc.“[Michael Dexter, Systems 2008]
Bitbucket, Github
droptix
User
Beiträge: 521
Registriert: Donnerstag 13. Oktober 2005, 21:27

Freitag 31. Mai 2013, 23:11

Verbesserungsvorschläge & Optimierungstipps würden mich interessieren!
Sirius3
User
Beiträge: 10008
Registriert: Sonntag 21. Oktober 2012, 17:20

Samstag 1. Juni 2013, 08:01

@droptix: die netten Strings, die Du überall zwischen Deine Befehle streust, erinnern irgendwie an doc-Strings, sind aber keine und sollten deshalb auch richtige Kommentare werden. Die Namen halten sich nicht an PEP8.
- keine Deiner Klassenvariablen sollte ein Klassenvariable sein. Zwei »QueueWorkers«-Instanzen kommen so auf jeden Fall nicht miteinander klar, wenn sie verschiedene Arbeit erledigen sollen erst recht nicht.
- Du hast wunderschöne busy-Loops implementiert, was man ja mit einer Queue gerade verhindern will.
- Dein »except« sollte nur ein »Empty« abfangen.
- den Fall, dass in »work« eine »Exception« auftritt, ignorierst Du völlig.
- der »_threadLock« ist überflüssig, da Queues an sich threadsicher sind. Solltest Du doch mal einen Lock brauchen, benutze »with«.
- »create« hat einen Parameter, der nicht benutzt wird.

Dann würde ich der QueueWorkers-Klasse noch ein schönes input-output-Interface gönnen, das direkte Ansprechen von inQ und outQ ist unschön. Die work-Funktion sollte man auch von außen festlegen können, z.B. als Parameter in __init__.
BlackJack

Samstag 1. Juni 2013, 08:25

Bevor man sich selbst etwas bastelt, könnte man sich auch noch das `concurrent.futures`-Modul ansehen. Ist ab Python 3.2 in der Standardbibliothek, aber es ist auch bis Python 2.5 rückportiert erhältlich. Reiner Python-Code, nur von der Standardbibliothek abhängig.

Edit: Ungetestet:

Code: Alles auswählen

from concurrent.futures import ThreadPoolExecutor


def funktion(_argument):
    pass


def main():
    liste = ['element', 'element2']
    executor = ThreadPoolExecutor(2)
    for future in executor.map(funktion, liste):
        future.result()
Dami123
User
Beiträge: 225
Registriert: Samstag 23. Februar 2013, 13:01

Samstag 1. Juni 2013, 23:11

Wird ja viel geposted hier :D

@concurrent
Danke für den Link jedoch etwas unübersichtlich.

@BlackJack
Hab mir futures gleich geholt und funktioniert super! Klein und sehenswert.
Danke dafür :)

"Queue" könnte ich verwenden, aber da "ThreadPoolExecutor" genau dies tut, wärs nur sinnvoll es neu zu schreiben, wenn man es lernen möchte.
droptix
User
Beiträge: 521
Registriert: Donnerstag 13. Oktober 2005, 21:27

Montag 3. Juni 2013, 13:50

Sirius3 hat geschrieben:@droptix: die netten Strings, die Du überall zwischen Deine Befehle streust, erinnern irgendwie an doc-Strings, sind aber keine und sollten deshalb auch richtige Kommentare werden. Die Namen halten sich nicht an PEP8.
- keine Deiner Klassenvariablen sollte ein Klassenvariable sein. Zwei »QueueWorkers«-Instanzen kommen so auf jeden Fall nicht miteinander klar, wenn sie verschiedene Arbeit erledigen sollen erst recht nicht.
- Du hast wunderschöne busy-Loops implementiert, was man ja mit einer Queue gerade verhindern will.
- Dein »except« sollte nur ein »Empty« abfangen.
- den Fall, dass in »work« eine »Exception« auftritt, ignorierst Du völlig.
- der »_threadLock« ist überflüssig, da Queues an sich threadsicher sind. Solltest Du doch mal einen Lock brauchen, benutze »with«.
- »create« hat einen Parameter, der nicht benutzt wird.

Dann würde ich der QueueWorkers-Klasse noch ein schönes input-output-Interface gönnen, das direkte Ansprechen von inQ und outQ ist unschön. Die work-Funktion sollte man auch von außen festlegen können, z.B. als Parameter in __init__.
Habe einige Dinge berücksichtigt, die in der "quick 'n' dirty" Fassung wirklich nicht gut durchdacht wurden: siehe Pastebin
  • Docstrings: das waren schon welche, aber nicht überall korrekt. Verbessert...
  • Klassenvariablen: da hast du völlig recht, die sind nun Instanzvariablen
  • busy-Loops: wie verbessern? Was schlägst du vor?
  • Exceptions: konkrete Tipps (Code-Snippets)?
  • threadLock: bist du da sicher? Du meinst für `Queue.put()` muss man nicht vorher `threading.Lock.aquire()` und danach `...release()` machen?
  • create-Parameter: entfernt, ein Überbleibsel... :)
EyDu
User
Beiträge: 4872
Registriert: Donnerstag 20. Juli 2006, 23:06
Wohnort: Berlin

Montag 3. Juni 2013, 22:48

droptix hat geschrieben:busy-Loops: wie verbessern? Was schlägst du vor?
Indem du blockierst, wenn _inQ keine Nachrichten enthält (gib dem mal einen richtigen Namen). Du brauchst dann nur noch eine Nachricht für "finishAllTasks". Das ``else: continue`` kannst du dir in der Schleife übrigens sparen.
droptix hat geschrieben:Exceptions: konkrete Tipps (Code-Snippets)?
Dein try/except-Block behandelt alle Fehler gleich. Wenn ein NameError in dem try-Block auftritt, dann wirst du den Fehler nie bemerken, bzw. schwer finden. Dir wurde doch bereits der Hinweis zu der Empty-Exception gegeben, lese doch einfach mal die Dokumentation zum Queue-Modul
droptix hat geschrieben:threadLock: bist du da sicher? Du meinst für `Queue.put()` muss man nicht vorher `threading.Lock.aquire()` und danach `...release()` machen?
Genau dafür ist das Queue-Modul da. Wie ich oben schon schrieb: wenn du ein Modul verwendest, dann lese dazu auch die Dokumentation.
Das Leben ist wie ein Tennisball.
droptix
User
Beiträge: 521
Registriert: Donnerstag 13. Oktober 2005, 21:27

Dienstag 11. Juni 2013, 09:43

@BlackJack: ich bin gerade dabei das Rad nicht neu zu erfinden, daher beschäftige ich mich mit `concurrent.futures`... ich komme nicht klar wie das genau gemeint ist. Die Doku dazu ist etwas kurz gefasst... Kannst du mir helfen?

Code hier: http://www.python-forum.de/pastebin.php?mode=view&s=357

Ich verstehe es so:
  • `ThreadPoolExecutor` erzeugt ein abstraktes Objekt, dass die Threads und Workers verwaltet.
  • Ich verstehe die Unterschiede von `map()` und `submit()` nicht und wozu/ob/wann ich `shutdown()` benötige.
  • Vermutlich benötigt man `shutdown()` nicht, wenn man mit dem `with` Statement arbeitet?
  • Mit `submit()` übergibt man Aufgaben an die Workers.
  • Ich muss so oft `submit()` aufrufen, wie Aufgaben zu erledigen sind. Vermutlich verteilt der `ThreadPoolExecutor` diese dann selbstständig?
Was passiert nach Zeile 29 in dem o.g. Code-Shnippet? Blockt Python so lange, bis alle Worker-Threads alle Aufgaben abgearbeitet haben oder läuft das asynchron im Hintergrund? Bei mir kommt kein Fehler und ich erhalte nach Zeile 30 ein vollständiges und korrektes Ergebnis... aber das könnte ja Zufall sein, weil die Aufgaben sehr schnell erledigt sind?

Wenn ich in meinem o.g. Code-Snippet die items der output queue anzeigen lasse, dann sind alle in sortierter Reihenfolge A-Z. Hätte ich dasselbe Beispiel manuell mit dem `threading` Modul gemacht, dann ist das Ergebnis üblicherweise unsortiert, da parallel abgearbeitet wurde. Werden die tasks also wirklich parallel/asynchron abgearbeitet?

Wenn ich das Ergebnis zur Laufzeit innerhalb von `work` in die output queue schreibe, dann ist es manchmal auch unsortiert... das lässt darauf schließen, dass tatsächlich parallel gearbeitet wird... aber komisch ist das Ergebnis, wenn man `outQ.put()` innerhalb von `with` macht. Hier zu sehen, ich habe zwei kleine Stellen umgearbeitet und prüfe am Ende ob das Ergebnis sortiert ist oder nicht. Schon bei 3-5 Durchläufen passiert es bei mir, dass auch mal ein unsortiertes Ergebnis dabei ist: http://www.python-forum.de/pastebin.php?mode=view&s=358

--

Übrigens: ich hab mir mit der Installation unter Python 2.7 für Windows einen abgequält, da das nirgends beschrieben ist. So ging es dann: futures<version>.tar.gz speichern, entpacken, dist-Verzeichnis -> .tar entpacken (7-Zip). Am besten eine kleine Batch-Datei anlegen mit folgendem Inhalt:

Code: Alles auswählen

D:
dir D:\Temp
C:\Python27\python.exe D:\Temp\dist\futures\setup.py build
C:\Python27\python.exe D:\Temp\dist\futures\setup.py install
pause
^^ Wenn man nicht explizit ins Arbeitsverzeichnis der `setup.py` wechselt, dann kommen Fehler bei `build` bzw. `install`.[/i]
BlackJack

Dienstag 11. Juni 2013, 22:10

@droptix: Was genau fehlt Dir denn in der Dokumentation? Die Methoden sind IMHO ausreichend beschrieben und es gibt Beispiele.

Man kann natürlich sagen, dass alle Objekte im Rechner abstrakt sind, aber `ThreadPoolExecutor` erzeugt ein ziemlich konkretes Objekt das die Threads und die Aufgaben die darauf verteilt werden sollen, verwaltet.

`map()` entspricht der eingebauten `map()`-Funktion, nur das die Funktion asynchron aufgerufen wird und das Ergebnis `Future`-Exemplare sind. `submit()` ruft die angegebene Funktion asynchron auf und liefert das dazugehörige `Future`-Exemplar.

`shutdown()` wartet bis alle Arbeiten erledigt sind, also jedes `Future` ohne blockieren nach seinem Ergebnis befragt werden kann. Alle drei Methoden sind in der Dokumentation beschrieben. Auch das man `shutdown()` nicht explizit aufrufen muss wenn man ``with`` verwendet steht ziemlich deutlich dort, direkt gefolgt von einem Quelltextbeispiel.

Nach Zeile 29 wird also gewartet bis alle Aufgaben fertig sind. Allerdings ist das in Deinem Fall gar nicht nötig, denn Du führst die Aufgaben nicht wirklich asynchron aus, weil Du nach jedem `submit()` sofort mit `Future.result()` das Ergebnis abfragst, was natürlich solange blockiert bis das Ergebnis auch tatsächlich vorliegt. Was hättest Du denn sonst auch als Rückgabwert an der Stelle erwartet wenn die Berechnung noch nicht fertig ist? Darum siehst Du auch die sequenzielle Abarbeitung.

Bei der abgewandelten Variante wartest Du nicht jedes mal auf das Ergebnis, aber Du verwendest auch die `Future`-Objekte überhaupt nicht. Was nicht der vorgesehenen Verwendung des Moduls entspricht. Vorgesehen ist, dass man entweder `Future.result()` erst aufruft wenn man das Ergebnis auch tatsächlich weiter verarbeiten will, oder dass man mit `Future.add_done_callback()` eine Rückruffunktion registriert, die aufgerufen wird sobald das Ergebnis feststeht (oder das `Future` abgebrochen wird). `Future`-Exemplare komplett ignorieren sollte man nur wenn man nur am Effekt der ausgeführten Funktion interessiert ist. Und selbst dann ist das IMHO ein „code smell”, weil Ausnahmen dann verloren gehen.

Übrigens könntest Du in Deinem Quelltext `string.lower` und `string.upper` verwenden um es etwas lesbarer und ein klein wenig weniger fehleranfällig zu machen.

Und die Queues sind nicht wirklich notwendig. Es wird nichts verwendet was nicht auch der `list`-Datentyp bietet.
droptix
User
Beiträge: 521
Registriert: Donnerstag 13. Oktober 2005, 21:27

Mittwoch 12. Juni 2013, 15:36

Sorry, die Beispiele unterscheiden sich so stark, dass zumindest ich nicht erfassen kann, wieso man das hier so und dort anders macht.

Mir ist konkret unklar, wann die Worker-Funktionen zurück kehren, ob oder wann sie blockieren und wie ich prüfen kann ob alle Tasks erledigt sind, bevor mein Hauptprogramm weiter arbeitet. `submit()` wird z.B. so beschrieben, dass es die Funktion nicht sofort ausführt, sondern "scheduled", also in eine Art Warteschlange einreiht. Das ThreadPoolExecutor-Objekt scheint die Funktion dann vermutlich im nächsten freien Thread auszuführen.

Ich habe immer noch nicht verstanden, wie ich mit `shutdown()` eingreifen und alles stoppen kann. Nach meinem Verständnis geht das nicht in Verbindung mit `with`, weil `submit()` erst dann aufgerufen werden kann, wenn alle Tasks abgearbeitet sind, d.h. ich müsste ohne `with` arbeiten und in einem Loop auf ein Stopp-Signal warten o.ä. Oder am Ende des Codes einfach `shutdown(True)` aufrufen, um alle eingereihten Aufgaben noch abzuarbeiten aber keine neuen Aufgaben mehr anzunehmen... dafür fehlt mir noch der Zusammenhang. Threading & Co. sind halt kein Pappenstiel...

Wieso Queues: weil ich dachte dass Listen nicht threadsafe sind. Oder kümmert sich der abstrakte Executor selbstständig um die nötigen Locks? Außerdem stelle ich mir das so vor, dass mehrere Prozesse hintereinander arbeiten. Wenn Prozess 1 die ersten Ergebnisse liefert, greift Prozess 2 darauf zu und verarbeitet diese weiter. Noch während Prozess 2 abarbeitet, füllt Prozess 1 die Aufgaben weiter auf.

So langsam kommt Licht ins Dunkel :) Durch Ausprobieren kann man Einiges herauskriegen... Hier meine Erkenntnisse:

1. So blockiert der Code nicht:

Code: Alles auswählen

import time
import concurrent.futures

# list of tasks: lower-case alphabet letters
tasks = []
for i in range(26):
    tasks.append(chr(97+i))
results = []
# what to be done: transform to upper-case
def work(task):
    time.sleep(0.1)
    results.append(task.upper())
executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
for task in tasks:
    future = executor.submit(work, task)
""" It is not blocking here, are the pending tasks processed anyway? """
print "results 1:", results
""" result 1 is incomplete... if we wait a moment, it will be completed """
time.sleep(5)
print "results 2", results
2. So blockiert der Code vor der letzten Zeile mit der `print` Ausgabe:

Code: Alles auswählen

import time
import concurrent.futures

# list of tasks: lower-case alphabet letters
tasks = []
for i in range(26):
    tasks.append(chr(97+i))
results = []
# what to be done: transform to upper-case
def work(task):
    time.sleep(0.1)
    results.append(task.upper())
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    for task in tasks:
        future = executor.submit(work, task)
""" It is blocking here, so the result is already complete when printed. """
print "results:", results
BlackJack

Mittwoch 12. Juni 2013, 16:27

@droptix: Die Worker-Funktion kehrt zurück wenn sie abgearbeitet ist. Aber nicht zu Dir. Du bekommst ein `Future`-Objekt. Und zwar sofort. Das ist ein Versprechen das Du von dem irgendwann in der Zukunft (daher der Name) das Ergebnis von der Funktion abfragen kannst. Wenn Du das tust, also die `result()`-Methode aufrufst, dann ist die Worker-Funktion entweder schon fertig und Du bekommst das Ergebnis sofort, oder sie ist noch nicht fertig, dann blockiert der Aufruf so lange bis es fertig ist. Alternativ kannst Du dem `Future`-Objekt auch eine Rückruffunktion geben die asynchron aufgerufen wird, wenn die dazugehörige Funktion ausgeführt wurde und ein Ergebnis geliefert hat.

`submit()` (und `map()`) packen die Aufgaben in der Tat in eine Queue die dann vom `Executor` abgearbeitet werden.

`shutdown()` stoppt keine laufenden Berechnungen, sondern wartet bis alles abgearbeitet ist und beendet dann die Threads. Danach kannst Du also sicher sein das alles was dem `Executor` an Aufgaben übergeben wurde, auch abgearbeitet ist.

Wieso Du denkst das man `submit()` erst aufrufen kann wenn alle Aufgaben abgearbeitet sind, verstehe ich nicht‽ Wenn man nicht `submit()` (oder `map()`) aufruft, dann werden doch erst überhaupt gar keine Aufgaben zugeteilt die abgearbeitet werden könnten.

Listen müssen in der Tat nicht thread-safe sein, aber Du greifst in Deinem Quelltext doch auch nirgends aus verschiedenen Threads auf die selbe Liste zu. Oh, warte das tust Du bei einigen Beispielen doch — aber da verwendest Du den Mechanismus ja auch falsch. Die Worker-Funktion soll ihr Ergebnis zurückgeben und nicht selbst in eine Datenstruktur stecken. Spätestens wenn Du den `ThreadPoolExecutor` durch einen `ProcessPoolExecutor` austauschst, der ja die gleiche API hat, funktioniert das auch nicht mehr weil Deine `results`-Queue in einem anderen Prozess ist als die Worker-Funktion ausgeführt wird.

Code: Alles auswählen

#!/usr/bin/env python
from random import random
from string import lowercase, uppercase
from time import sleep
from concurrent.futures import ThreadPoolExecutor


def work(character):
    sleep(random())
    print character
    return character.upper()


def main():
    tasks = lowercase
    with ThreadPoolExecutor(4) as executor:
        results = ''.join(executor.map(work, tasks))
    print results, results == uppercase


if __name__ == '__main__':
    main()
Hier sieht man schön, dass die Aufgaben zwar nicht in der Reihenfolge fertig werden in der sie in der Eingabeliste stehen, das aber `map()` netterweise dafür sorgt, dass sie in der Reihenfolge zurückgegeben werden.
droptix
User
Beiträge: 521
Registriert: Donnerstag 13. Oktober 2005, 21:27

Mittwoch 12. Juni 2013, 22:03

@BlackJack: `Future`-Objekt und `result()` habe ich nun verstanden.

Das mit `shutdown()` ist auch klar, wartet (= blockiert) allerdings nur wenn man `shutdown(True)` aufruft, sonst läuft der Code weiter -> dann hat `shutdown()` lediglich die Funktion, das Einreihen neuer Tasks zu verhindern und nach dem Abarbeiten der laufenden Tasks quasi aufzuräumen.

Zu `submit()`: Tippfehler, ich meinte `shutdown()` wenn ich das `with` Statement nutze, dann kann ich `shutdown()` praktisch nicht nutzen. Beispiel:

Code: Alles auswählen

with ThreadPoolExecutor(4) as executor:
    executor.submit(work, task)
executor.shutdown()
^^ Das macht keinen Sinn, weil bereits vor dem Aufruf von `executor.shutdown()`alle Tasks vollständig abgearbeitet sind, richtig (da `with` blockiert)? Nach nochmaligem Durchlesen verstehe ich dann auch diesen Satz hier:
Doku hat geschrieben:You can avoid having to call this method explicitly if you use the with statement, which will shutdown the Executor (waiting as if Executor.shutdown() were called with wait set to True)
Außerdem sind alle `submit()`-Aufrufe ja bereits durchgeführt worden. Ich finde also kein sinnvolles praktisches Beispiel zur Nutzung von `shutdown()`.

In meinem Code greife ich nicht parallel aus verschiedenen Threads auf dieselbe Liste zu, aber ich gehe ja davon aus, dass `ThreadPoolExecutor` neue Threads erzeugt und darin parallel meine `work` Funktion aufruft, die wiederum auf dieselbe Liste zugreift. So wie du das über `map()` umsetzt, stört das nicht, weil du lediglich den return-Wert von `work()` abfragst und anschließend alle Ergebnisse im selben Thread zusammenführst. Ist es generell besser, innerhalb von `work nicht das Ergebnis zu schreiben, sondern nur zu returnen?

^^ Das führt für mich auch gleich zur nächsten Frage: Wann verwende ich `submit()` und wann besser `map()`?

Meine Vorstellung ist wie bereits beschrieben, dass ich mehrere `ThreadPoolExecutor`-Instanzen hintereinander schalte, um komplexe Prozesse zu "koppeln" und somit in Reihe zu schalten. D.h. dass meine Task-Liste im laufenden Betrieb von Prozess 1 permanent erweitert wird, während Prozess 2 bereits daran arbeitet.

Dein Code-Snippet ist sehr schön kompakt und veranschaulicht das gut. :)
Antworten