threading - Liste mit Funktionen abarbeiten - jeweils nur 2

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Dami123
User
Beiträge: 225
Registriert: Samstag 23. Februar 2013, 13:01

Könnte mir einer erklären, wie ich aus einer Liste mit Funktionen alle stufenweise durchlaufen lass, aber immer nur 2 oder variabel viele gleichzeitig aktiv sind?
Benutze das "threading"-Modul.

Bsp.:

Code: Alles auswählen


class myThread (threading.Thread):
    def __init__(self, name, parameter):
        threading.Thread.__init__(self)
        self.name = name
        self.para = parameter
    def run(self):
        print "Starting " + self.name
        funktion(self.para)
        print "Exiting " + self.name

Liste = ["element", "element2", ...]
for i in Liste:
    myThread("Thread_"+i, i).start()
Da es sich um die Elemente um Webseitenlinks handelt, die gescannt werden, muss ich die max. thread Anzahl stark reduzieren, weil es sich auf die Seite und mein Internet auswirkt und nach ner Weile "URLError" bekomme.
BlackJack

@Dami123: Ich würde an Deiner Stelle zwei Threads starten, die sich die Daten aus einer Queue holen und abarbeiten. Hat den netten Nebeneffekt, dass man nicht dauernd Threads startet und beendet, was ja auch einen gewissen Mehraufwand bedeutet.
droptix
User
Beiträge: 521
Registriert: Donnerstag 13. Oktober 2005, 21:27

Hier ein Beispiel, das ich kürzlich erstellt habe, als ich mich mal wieder mit diesem Thema beschäftigt habe:

http://www.python-forum.de/pastebin.php?mode=view&s=351

1. Fülle die `inQ` mit deiner Liste von Funktionen.
2. Überschreibe die Funktion `work`: die nimmt eine Funktion aus der `inQ` und führt diese aus.
Benutzeravatar
darktrym
User
Beiträge: 784
Registriert: Freitag 24. April 2009, 09:26

Die Idee könnte man übernehmen, den Code würde ich überarbeiten.
„gcc finds bugs in Linux, NetBSD finds bugs in gcc.“[Michael Dexter, Systems 2008]
Bitbucket, Github
droptix
User
Beiträge: 521
Registriert: Donnerstag 13. Oktober 2005, 21:27

Verbesserungsvorschläge & Optimierungstipps würden mich interessieren!
Sirius3
User
Beiträge: 17738
Registriert: Sonntag 21. Oktober 2012, 17:20

@droptix: die netten Strings, die Du überall zwischen Deine Befehle streust, erinnern irgendwie an doc-Strings, sind aber keine und sollten deshalb auch richtige Kommentare werden. Die Namen halten sich nicht an PEP8.
- keine Deiner Klassenvariablen sollte ein Klassenvariable sein. Zwei »QueueWorkers«-Instanzen kommen so auf jeden Fall nicht miteinander klar, wenn sie verschiedene Arbeit erledigen sollen erst recht nicht.
- Du hast wunderschöne busy-Loops implementiert, was man ja mit einer Queue gerade verhindern will.
- Dein »except« sollte nur ein »Empty« abfangen.
- den Fall, dass in »work« eine »Exception« auftritt, ignorierst Du völlig.
- der »_threadLock« ist überflüssig, da Queues an sich threadsicher sind. Solltest Du doch mal einen Lock brauchen, benutze »with«.
- »create« hat einen Parameter, der nicht benutzt wird.

Dann würde ich der QueueWorkers-Klasse noch ein schönes input-output-Interface gönnen, das direkte Ansprechen von inQ und outQ ist unschön. Die work-Funktion sollte man auch von außen festlegen können, z.B. als Parameter in __init__.
BlackJack

Bevor man sich selbst etwas bastelt, könnte man sich auch noch das `concurrent.futures`-Modul ansehen. Ist ab Python 3.2 in der Standardbibliothek, aber es ist auch bis Python 2.5 rückportiert erhältlich. Reiner Python-Code, nur von der Standardbibliothek abhängig.

Edit: Ungetestet:

Code: Alles auswählen

from concurrent.futures import ThreadPoolExecutor


def funktion(_argument):
    pass


def main():
    liste = ['element', 'element2']
    executor = ThreadPoolExecutor(2)
    for future in executor.map(funktion, liste):
        future.result()
Dami123
User
Beiträge: 225
Registriert: Samstag 23. Februar 2013, 13:01

Wird ja viel geposted hier :D

@concurrent
Danke für den Link jedoch etwas unübersichtlich.

@BlackJack
Hab mir futures gleich geholt und funktioniert super! Klein und sehenswert.
Danke dafür :)

"Queue" könnte ich verwenden, aber da "ThreadPoolExecutor" genau dies tut, wärs nur sinnvoll es neu zu schreiben, wenn man es lernen möchte.
droptix
User
Beiträge: 521
Registriert: Donnerstag 13. Oktober 2005, 21:27

Sirius3 hat geschrieben:@droptix: die netten Strings, die Du überall zwischen Deine Befehle streust, erinnern irgendwie an doc-Strings, sind aber keine und sollten deshalb auch richtige Kommentare werden. Die Namen halten sich nicht an PEP8.
- keine Deiner Klassenvariablen sollte ein Klassenvariable sein. Zwei »QueueWorkers«-Instanzen kommen so auf jeden Fall nicht miteinander klar, wenn sie verschiedene Arbeit erledigen sollen erst recht nicht.
- Du hast wunderschöne busy-Loops implementiert, was man ja mit einer Queue gerade verhindern will.
- Dein »except« sollte nur ein »Empty« abfangen.
- den Fall, dass in »work« eine »Exception« auftritt, ignorierst Du völlig.
- der »_threadLock« ist überflüssig, da Queues an sich threadsicher sind. Solltest Du doch mal einen Lock brauchen, benutze »with«.
- »create« hat einen Parameter, der nicht benutzt wird.

Dann würde ich der QueueWorkers-Klasse noch ein schönes input-output-Interface gönnen, das direkte Ansprechen von inQ und outQ ist unschön. Die work-Funktion sollte man auch von außen festlegen können, z.B. als Parameter in __init__.
Habe einige Dinge berücksichtigt, die in der "quick 'n' dirty" Fassung wirklich nicht gut durchdacht wurden: siehe Pastebin
  • Docstrings: das waren schon welche, aber nicht überall korrekt. Verbessert...
  • Klassenvariablen: da hast du völlig recht, die sind nun Instanzvariablen
  • busy-Loops: wie verbessern? Was schlägst du vor?
  • Exceptions: konkrete Tipps (Code-Snippets)?
  • threadLock: bist du da sicher? Du meinst für `Queue.put()` muss man nicht vorher `threading.Lock.aquire()` und danach `...release()` machen?
  • create-Parameter: entfernt, ein Überbleibsel... :)
EyDu
User
Beiträge: 4881
Registriert: Donnerstag 20. Juli 2006, 23:06
Wohnort: Berlin

droptix hat geschrieben:busy-Loops: wie verbessern? Was schlägst du vor?
Indem du blockierst, wenn _inQ keine Nachrichten enthält (gib dem mal einen richtigen Namen). Du brauchst dann nur noch eine Nachricht für "finishAllTasks". Das ``else: continue`` kannst du dir in der Schleife übrigens sparen.
droptix hat geschrieben:Exceptions: konkrete Tipps (Code-Snippets)?
Dein try/except-Block behandelt alle Fehler gleich. Wenn ein NameError in dem try-Block auftritt, dann wirst du den Fehler nie bemerken, bzw. schwer finden. Dir wurde doch bereits der Hinweis zu der Empty-Exception gegeben, lese doch einfach mal die Dokumentation zum Queue-Modul
droptix hat geschrieben:threadLock: bist du da sicher? Du meinst für `Queue.put()` muss man nicht vorher `threading.Lock.aquire()` und danach `...release()` machen?
Genau dafür ist das Queue-Modul da. Wie ich oben schon schrieb: wenn du ein Modul verwendest, dann lese dazu auch die Dokumentation.
Das Leben ist wie ein Tennisball.
droptix
User
Beiträge: 521
Registriert: Donnerstag 13. Oktober 2005, 21:27

@BlackJack: ich bin gerade dabei das Rad nicht neu zu erfinden, daher beschäftige ich mich mit `concurrent.futures`... ich komme nicht klar wie das genau gemeint ist. Die Doku dazu ist etwas kurz gefasst... Kannst du mir helfen?

Code hier: http://www.python-forum.de/pastebin.php?mode=view&s=357

Ich verstehe es so:
  • `ThreadPoolExecutor` erzeugt ein abstraktes Objekt, dass die Threads und Workers verwaltet.
  • Ich verstehe die Unterschiede von `map()` und `submit()` nicht und wozu/ob/wann ich `shutdown()` benötige.
  • Vermutlich benötigt man `shutdown()` nicht, wenn man mit dem `with` Statement arbeitet?
  • Mit `submit()` übergibt man Aufgaben an die Workers.
  • Ich muss so oft `submit()` aufrufen, wie Aufgaben zu erledigen sind. Vermutlich verteilt der `ThreadPoolExecutor` diese dann selbstständig?
Was passiert nach Zeile 29 in dem o.g. Code-Shnippet? Blockt Python so lange, bis alle Worker-Threads alle Aufgaben abgearbeitet haben oder läuft das asynchron im Hintergrund? Bei mir kommt kein Fehler und ich erhalte nach Zeile 30 ein vollständiges und korrektes Ergebnis... aber das könnte ja Zufall sein, weil die Aufgaben sehr schnell erledigt sind?

Wenn ich in meinem o.g. Code-Snippet die items der output queue anzeigen lasse, dann sind alle in sortierter Reihenfolge A-Z. Hätte ich dasselbe Beispiel manuell mit dem `threading` Modul gemacht, dann ist das Ergebnis üblicherweise unsortiert, da parallel abgearbeitet wurde. Werden die tasks also wirklich parallel/asynchron abgearbeitet?

Wenn ich das Ergebnis zur Laufzeit innerhalb von `work` in die output queue schreibe, dann ist es manchmal auch unsortiert... das lässt darauf schließen, dass tatsächlich parallel gearbeitet wird... aber komisch ist das Ergebnis, wenn man `outQ.put()` innerhalb von `with` macht. Hier zu sehen, ich habe zwei kleine Stellen umgearbeitet und prüfe am Ende ob das Ergebnis sortiert ist oder nicht. Schon bei 3-5 Durchläufen passiert es bei mir, dass auch mal ein unsortiertes Ergebnis dabei ist: http://www.python-forum.de/pastebin.php?mode=view&s=358

--

Übrigens: ich hab mir mit der Installation unter Python 2.7 für Windows einen abgequält, da das nirgends beschrieben ist. So ging es dann: futures<version>.tar.gz speichern, entpacken, dist-Verzeichnis -> .tar entpacken (7-Zip). Am besten eine kleine Batch-Datei anlegen mit folgendem Inhalt:

Code: Alles auswählen

D:
dir D:\Temp
C:\Python27\python.exe D:\Temp\dist\futures\setup.py build
C:\Python27\python.exe D:\Temp\dist\futures\setup.py install
pause
^^ Wenn man nicht explizit ins Arbeitsverzeichnis der `setup.py` wechselt, dann kommen Fehler bei `build` bzw. `install`.[/i]
BlackJack

@droptix: Was genau fehlt Dir denn in der Dokumentation? Die Methoden sind IMHO ausreichend beschrieben und es gibt Beispiele.

Man kann natürlich sagen, dass alle Objekte im Rechner abstrakt sind, aber `ThreadPoolExecutor` erzeugt ein ziemlich konkretes Objekt das die Threads und die Aufgaben die darauf verteilt werden sollen, verwaltet.

`map()` entspricht der eingebauten `map()`-Funktion, nur das die Funktion asynchron aufgerufen wird und das Ergebnis `Future`-Exemplare sind. `submit()` ruft die angegebene Funktion asynchron auf und liefert das dazugehörige `Future`-Exemplar.

`shutdown()` wartet bis alle Arbeiten erledigt sind, also jedes `Future` ohne blockieren nach seinem Ergebnis befragt werden kann. Alle drei Methoden sind in der Dokumentation beschrieben. Auch das man `shutdown()` nicht explizit aufrufen muss wenn man ``with`` verwendet steht ziemlich deutlich dort, direkt gefolgt von einem Quelltextbeispiel.

Nach Zeile 29 wird also gewartet bis alle Aufgaben fertig sind. Allerdings ist das in Deinem Fall gar nicht nötig, denn Du führst die Aufgaben nicht wirklich asynchron aus, weil Du nach jedem `submit()` sofort mit `Future.result()` das Ergebnis abfragst, was natürlich solange blockiert bis das Ergebnis auch tatsächlich vorliegt. Was hättest Du denn sonst auch als Rückgabwert an der Stelle erwartet wenn die Berechnung noch nicht fertig ist? Darum siehst Du auch die sequenzielle Abarbeitung.

Bei der abgewandelten Variante wartest Du nicht jedes mal auf das Ergebnis, aber Du verwendest auch die `Future`-Objekte überhaupt nicht. Was nicht der vorgesehenen Verwendung des Moduls entspricht. Vorgesehen ist, dass man entweder `Future.result()` erst aufruft wenn man das Ergebnis auch tatsächlich weiter verarbeiten will, oder dass man mit `Future.add_done_callback()` eine Rückruffunktion registriert, die aufgerufen wird sobald das Ergebnis feststeht (oder das `Future` abgebrochen wird). `Future`-Exemplare komplett ignorieren sollte man nur wenn man nur am Effekt der ausgeführten Funktion interessiert ist. Und selbst dann ist das IMHO ein „code smell”, weil Ausnahmen dann verloren gehen.

Übrigens könntest Du in Deinem Quelltext `string.lower` und `string.upper` verwenden um es etwas lesbarer und ein klein wenig weniger fehleranfällig zu machen.

Und die Queues sind nicht wirklich notwendig. Es wird nichts verwendet was nicht auch der `list`-Datentyp bietet.
droptix
User
Beiträge: 521
Registriert: Donnerstag 13. Oktober 2005, 21:27

Sorry, die Beispiele unterscheiden sich so stark, dass zumindest ich nicht erfassen kann, wieso man das hier so und dort anders macht.

Mir ist konkret unklar, wann die Worker-Funktionen zurück kehren, ob oder wann sie blockieren und wie ich prüfen kann ob alle Tasks erledigt sind, bevor mein Hauptprogramm weiter arbeitet. `submit()` wird z.B. so beschrieben, dass es die Funktion nicht sofort ausführt, sondern "scheduled", also in eine Art Warteschlange einreiht. Das ThreadPoolExecutor-Objekt scheint die Funktion dann vermutlich im nächsten freien Thread auszuführen.

Ich habe immer noch nicht verstanden, wie ich mit `shutdown()` eingreifen und alles stoppen kann. Nach meinem Verständnis geht das nicht in Verbindung mit `with`, weil `submit()` erst dann aufgerufen werden kann, wenn alle Tasks abgearbeitet sind, d.h. ich müsste ohne `with` arbeiten und in einem Loop auf ein Stopp-Signal warten o.ä. Oder am Ende des Codes einfach `shutdown(True)` aufrufen, um alle eingereihten Aufgaben noch abzuarbeiten aber keine neuen Aufgaben mehr anzunehmen... dafür fehlt mir noch der Zusammenhang. Threading & Co. sind halt kein Pappenstiel...

Wieso Queues: weil ich dachte dass Listen nicht threadsafe sind. Oder kümmert sich der abstrakte Executor selbstständig um die nötigen Locks? Außerdem stelle ich mir das so vor, dass mehrere Prozesse hintereinander arbeiten. Wenn Prozess 1 die ersten Ergebnisse liefert, greift Prozess 2 darauf zu und verarbeitet diese weiter. Noch während Prozess 2 abarbeitet, füllt Prozess 1 die Aufgaben weiter auf.

So langsam kommt Licht ins Dunkel :) Durch Ausprobieren kann man Einiges herauskriegen... Hier meine Erkenntnisse:

1. So blockiert der Code nicht:

Code: Alles auswählen

import time
import concurrent.futures

# list of tasks: lower-case alphabet letters
tasks = []
for i in range(26):
    tasks.append(chr(97+i))
results = []
# what to be done: transform to upper-case
def work(task):
    time.sleep(0.1)
    results.append(task.upper())
executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
for task in tasks:
    future = executor.submit(work, task)
""" It is not blocking here, are the pending tasks processed anyway? """
print "results 1:", results
""" result 1 is incomplete... if we wait a moment, it will be completed """
time.sleep(5)
print "results 2", results
2. So blockiert der Code vor der letzten Zeile mit der `print` Ausgabe:

Code: Alles auswählen

import time
import concurrent.futures

# list of tasks: lower-case alphabet letters
tasks = []
for i in range(26):
    tasks.append(chr(97+i))
results = []
# what to be done: transform to upper-case
def work(task):
    time.sleep(0.1)
    results.append(task.upper())
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    for task in tasks:
        future = executor.submit(work, task)
""" It is blocking here, so the result is already complete when printed. """
print "results:", results
BlackJack

@droptix: Die Worker-Funktion kehrt zurück wenn sie abgearbeitet ist. Aber nicht zu Dir. Du bekommst ein `Future`-Objekt. Und zwar sofort. Das ist ein Versprechen das Du von dem irgendwann in der Zukunft (daher der Name) das Ergebnis von der Funktion abfragen kannst. Wenn Du das tust, also die `result()`-Methode aufrufst, dann ist die Worker-Funktion entweder schon fertig und Du bekommst das Ergebnis sofort, oder sie ist noch nicht fertig, dann blockiert der Aufruf so lange bis es fertig ist. Alternativ kannst Du dem `Future`-Objekt auch eine Rückruffunktion geben die asynchron aufgerufen wird, wenn die dazugehörige Funktion ausgeführt wurde und ein Ergebnis geliefert hat.

`submit()` (und `map()`) packen die Aufgaben in der Tat in eine Queue die dann vom `Executor` abgearbeitet werden.

`shutdown()` stoppt keine laufenden Berechnungen, sondern wartet bis alles abgearbeitet ist und beendet dann die Threads. Danach kannst Du also sicher sein das alles was dem `Executor` an Aufgaben übergeben wurde, auch abgearbeitet ist.

Wieso Du denkst das man `submit()` erst aufrufen kann wenn alle Aufgaben abgearbeitet sind, verstehe ich nicht‽ Wenn man nicht `submit()` (oder `map()`) aufruft, dann werden doch erst überhaupt gar keine Aufgaben zugeteilt die abgearbeitet werden könnten.

Listen müssen in der Tat nicht thread-safe sein, aber Du greifst in Deinem Quelltext doch auch nirgends aus verschiedenen Threads auf die selbe Liste zu. Oh, warte das tust Du bei einigen Beispielen doch — aber da verwendest Du den Mechanismus ja auch falsch. Die Worker-Funktion soll ihr Ergebnis zurückgeben und nicht selbst in eine Datenstruktur stecken. Spätestens wenn Du den `ThreadPoolExecutor` durch einen `ProcessPoolExecutor` austauschst, der ja die gleiche API hat, funktioniert das auch nicht mehr weil Deine `results`-Queue in einem anderen Prozess ist als die Worker-Funktion ausgeführt wird.

Code: Alles auswählen

#!/usr/bin/env python
from random import random
from string import lowercase, uppercase
from time import sleep
from concurrent.futures import ThreadPoolExecutor


def work(character):
    sleep(random())
    print character
    return character.upper()


def main():
    tasks = lowercase
    with ThreadPoolExecutor(4) as executor:
        results = ''.join(executor.map(work, tasks))
    print results, results == uppercase


if __name__ == '__main__':
    main()
Hier sieht man schön, dass die Aufgaben zwar nicht in der Reihenfolge fertig werden in der sie in der Eingabeliste stehen, das aber `map()` netterweise dafür sorgt, dass sie in der Reihenfolge zurückgegeben werden.
droptix
User
Beiträge: 521
Registriert: Donnerstag 13. Oktober 2005, 21:27

@BlackJack: `Future`-Objekt und `result()` habe ich nun verstanden.

Das mit `shutdown()` ist auch klar, wartet (= blockiert) allerdings nur wenn man `shutdown(True)` aufruft, sonst läuft der Code weiter -> dann hat `shutdown()` lediglich die Funktion, das Einreihen neuer Tasks zu verhindern und nach dem Abarbeiten der laufenden Tasks quasi aufzuräumen.

Zu `submit()`: Tippfehler, ich meinte `shutdown()` wenn ich das `with` Statement nutze, dann kann ich `shutdown()` praktisch nicht nutzen. Beispiel:

Code: Alles auswählen

with ThreadPoolExecutor(4) as executor:
    executor.submit(work, task)
executor.shutdown()
^^ Das macht keinen Sinn, weil bereits vor dem Aufruf von `executor.shutdown()`alle Tasks vollständig abgearbeitet sind, richtig (da `with` blockiert)? Nach nochmaligem Durchlesen verstehe ich dann auch diesen Satz hier:
Doku hat geschrieben:You can avoid having to call this method explicitly if you use the with statement, which will shutdown the Executor (waiting as if Executor.shutdown() were called with wait set to True)
Außerdem sind alle `submit()`-Aufrufe ja bereits durchgeführt worden. Ich finde also kein sinnvolles praktisches Beispiel zur Nutzung von `shutdown()`.

In meinem Code greife ich nicht parallel aus verschiedenen Threads auf dieselbe Liste zu, aber ich gehe ja davon aus, dass `ThreadPoolExecutor` neue Threads erzeugt und darin parallel meine `work` Funktion aufruft, die wiederum auf dieselbe Liste zugreift. So wie du das über `map()` umsetzt, stört das nicht, weil du lediglich den return-Wert von `work()` abfragst und anschließend alle Ergebnisse im selben Thread zusammenführst. Ist es generell besser, innerhalb von `work nicht das Ergebnis zu schreiben, sondern nur zu returnen?

^^ Das führt für mich auch gleich zur nächsten Frage: Wann verwende ich `submit()` und wann besser `map()`?

Meine Vorstellung ist wie bereits beschrieben, dass ich mehrere `ThreadPoolExecutor`-Instanzen hintereinander schalte, um komplexe Prozesse zu "koppeln" und somit in Reihe zu schalten. D.h. dass meine Task-Liste im laufenden Betrieb von Prozess 1 permanent erweitert wird, während Prozess 2 bereits daran arbeitet.

Dein Code-Snippet ist sehr schön kompakt und veranschaulicht das gut. :)
BlackJack

@droptix: Du greifst parallel auf eine Queue zu. Wie gesagt an einer Stelle an der man das nicht tun sollte, weil das die `Future`-Exemplare komplett ignoriert. Wenn die so unwichtig wären, würde das Modul nicht so heissen. Das habe ich doch schon mal geschrieben und Du fragst immer noch nach. Die Funktion sollte das berechnen was sie halt berechnen soll. Wo die Eingabewerte herkommen und wo/wie sie danach gespeichert werden gehört da nicht mehr rein. Zumal die sowieso nicht einfach so auf `results` zugreifen sollte, denn das kam ja gar nicht als Argument rein.

Wenn Du merkst, dass Du mit `submit()` selbst `map()` implementierst, dann solltest Du lieber `map()` nehmen. Man hat nichts mit den `Future`-Exemplaren zu tun und die Reihenfolge bleibt erhalten.

Warum willst Du mehrere `ThreadPoolExecutor`-Exemplare hintereinander schalten? Statt Funktionen zu kombinieren und „hintereinander zu schalten” und die von *einem* Executor ausführen zu lassen. Das ist wesentlich einfacher. Wenn mehrere parallel laufen sollen und Du die erledigten Aufgaben weitergeben möchtest, dann müsstest Du den `Future`-Exemplaren Rückruffunktionen mitgeben, die dann den nächsten Schritt anstossen.
droptix
User
Beiträge: 521
Registriert: Donnerstag 13. Oktober 2005, 21:27

Deine erste Ansage nehme ich jetzt einfach mal als gegeben hin. Der Knackpunkt ist eben, dass in der Doku nicht beschrieben ist, wie man vorgehen sollte... das muss man sich selbst zusammenreimen. Daher hab ich so viel nachgefragt. Danke also für deine ausführlichen Erklärungen!
BlackJack hat geschrieben:Wenn Du merkst, dass Du mit `submit()` selbst `map()` implementierst, dann solltest Du lieber `map()` nehmen. Man hat nichts mit den `Future`-Exemplaren zu tun und die Reihenfolge bleibt erhalten.
Du meinst, wenn ich das so mache, kann ich auch gleich `map()` nehmen, richtig?

Code: Alles auswählen

tasks = "".join([chr(97+i) for i in range(26)]) # string.lowercase returns more than 26 alphabet characters (Win7x64)
    results = []
    def work(character):
        return character.upper() 
    with concurrent.futures.ThreadPoolExecutor(4) as executor:
        for character in tasks:
            future = executor.submit(work, character)
            results.append(future.result())
    print results
BlackJack hat geschrieben:Warum willst Du mehrere `ThreadPoolExecutor`-Exemplare hintereinander schalten? Statt Funktionen zu kombinieren und „hintereinander zu schalten” und die von *einem* Executor ausführen zu lassen. Das ist wesentlich einfacher. Wenn mehrere parallel laufen sollen und Du die erledigten Aufgaben weitergeben möchtest, dann müsstest Du den `Future`-Exemplaren Rückruffunktionen mitgeben, die dann den nächsten Schritt anstossen.
Konkretes Beispiel für mein Python-Programm:
  • Prozess 1: Ich habe ein Verzeichnis wo viele Daten zusammenlaufen. 1x am Tag geht mein Python Programm durch dieses Verzeichnis, liest alle neuen Dateien ein und schreibt diese in eine "Zu-Prüfen"-Queue. Während das Programm läuft, wird das Verzeichnis aller 10 Sekunden erneut gescannt, da auch zur Laufzeit neue Dateien reinkommen können. d.h. die Queue wird zur Laufzeit voller.
  • Prozess 2: Die Dateipfade aus der Queue werden geprüft, ob sie schon fertig geschrieben worden sind (`os.access(path, os.W_OK)`). Sobald die fertig sind, kommen die in die nächste "Zu-Bearbeiten"-Queue. Dieser Zwischenschritt ist notwendig, damit ich nicht auf die Daten zugreife, während sie noch kopiert/geschrieben werden (geht über's Netzwerk, eine gewisse Trägheit ist bei der Größe der Daten also zwingend zu berücksichtigen)
  • Prozess 3: Je nach Dateityp werden die Dateien zunächst automatisiert bearbeitet und anschließend verschoben.
  • Erst wenn über 5 Minuten lang keine neuen Dateien eingegangen sind oder wenn das gesamte Programm länger als 12 Stunden lief, dann wird ein Stopp-Signal gesendet. Das bewirkt, dass Prozess 1 aufhört das Verzeichnis neu einzulesen und neue Dateipfade in die "Zu-Prüfen"-Queue einreiht. Folglich haben die Prozesse 2 und 3 dann noch Zeit, um ihre laufenden Aufgaben abzuarbeiten.
Der Übersichtlichkeit halber und zu Gunsten der besseren Struktur möchte ich das in einzelne Prozess-Schritte aufteilen. So arbeiten im Prinzip ja auch "Business Process Management" Systeme (BPMs).
BlackJack

@droptix: Die Dokumentation hat doch einige Beispiele die das typische Vorgehen zeigen. Vielleicht bin ich auch vorbelastet, weil ich das allgemeine Konzept von Futures schon aus anderen Programmiersprachen kenne. Siehe auch Wikipedia zum Begriff Future in der Programmierung. Ansonsten kommt man da vielleicht noch durch allgemeinen guten Programmentwurf drauf. Eine Funktion sollte nicht zu viel tun und wissen. Der Worker-Funktion sollte es egal sein von wem sie aufgerufen wird, und was mit ihrem Ergebnis passiert, denn dann kann man sie am flexibelsten verwenden.

Dein Quelltextbeispiel arbeitet schon wieder alles sequentiell ab. `result()` blockiert und wartet so lange bis das Ergebnis fertig ist! Und erst wenn das Ergebnis von einer Aufgabe fertig ist, gehst Du in der Schleife zur nächsten Aufgabe. So ist immer nur einer von den vier Threads in dem Beispiel beschäftigt. Wenn dort etwas parallel ausgeführt werden soll, müsste man schon so etwas hier machen:

Code: Alles auswählen

    tasks = string.ascii_lowercase
    def work(character):
        return character.upper() 
    with concurrent.futures.ThreadPoolExecutor(4) as executor:
        futures = [executor.submit(work, character) for character in tasks]
        results = [future.result() for future in futures]
        # 
        # oder wenn die Reihenfolge egal ist:
        # 
        results = [
            future.result()
            for future in concurrent.futures.as_completed(futures)
        ]
    print results
Und wenn man mit Ausnahmen rechnet, würde man das erstellen der Ergebnisliste noch in ein ``try``/``finally`` verpacken und im ``finally``-Block auf allen `Future`-Exemplaren `cancel()` aufrufen, damit wenigstens die Aufgaben die noch nicht gestartet wurden, nicht noch unnötig abgearbeitet werden.
droptix
User
Beiträge: 521
Registriert: Donnerstag 13. Oktober 2005, 21:27

Mein Quelltext-Beispiel war absichtlich "falsch", um zu zeigen dass `map()` hier sinnvoller ist. Trotzdem gut dein Hinweis: denn mein Code-Beispiel baut das Verhalten von `map()` nicht nach sondern blockiert nach jedem `result()`. Bei `map()` hingegen bleiben die Aufrufe asynchron. Ist mir schon klar dass das Blödsinn ist... diente der Veranschaulichung.

Mein Fazit: Leider komme ich zu dem Schluss, dass `concurrent.futures-ThreadPoolExecutor` nur dann sinnvoll ist, wenn die Liste mit Aufgaben im Vorhinein klar und endlich ist. Es ist nicht möglich, damit eine Aufgabenliste zu bewältigen, in der zur Laufzeit neue Aufgaben hinzukommen... so wie ich das vorhabe.

Richtig?
BlackJack

@droptix: Sehe ich nicht so. Man kann ja zum Beispiel die Rückruffunktionen verwenden. Exemplarisch:

Code: Alles auswählen

        for task in iter_endless_tasks():
            executor.submit(work, task).add_done_callback(work_done)
Man muss ein bisschen in der `work_done()`-Funktion aufpassen, weil die mehrfach gleichzeitig ausgeführt werden kann, man also dort die kritischen Abschnitte schützen müsste.

Man könnte sich eine Klasse schreiben, die eine Funktion und einen Executor kapselt und ein Nachfolger-Exemplar kennt, zu dem die `work_done()`-Methode die Ergebnisse weiterreicht. (Ich gehe jetzt mal von der Bedingung aus, dass die Reihenfolge der Bearbeitung keine Rolle spielt.)

Edit: Sinnfreies Beispiel:

Code: Alles auswählen

#!/usr/bin/env python
from __future__ import print_function

from concurrent.futures import ThreadPoolExecutor
from random import random
from threading import Lock
from time import sleep


class ProcessingStep(object):
    def __init__(self, work_callable, worker_count, next_step=None):
        self.work_callable = work_callable
        self.executor = ThreadPoolExecutor(worker_count)
        self.next_step = next_step
        self.lock = Lock()

    def __enter__(self):
        return self

    def __exit__(self, _type, _value, _traceback):
        self.wait()

    def submit(self, *arguments):
        (
            self.executor
                .submit(self.work_callable, *arguments)
                .add_done_callback(self.work_done)
        )

    def work_done(self, future):
        result = future.result()
        if self.next_step:
            with self.lock:
                self.next_step.submit(result)

    def wait(self):
        self.executor.shutdown()
        if self.next_step:
            self.next_step.wait()


def work_a(argument):
    sleep(random())
    return argument.upper()


def work_b(argument):
    sleep(random())
    return ' '.join(argument)


def main():
    with ProcessingStep(
        work_a, 2, ProcessingStep(work_b, 2, ProcessingStep(print, 1))
    ) as processor:
        for task in iter(raw_input, ''):
            processor.submit(task)


if __name__ == '__main__':
    main()
Antworten