threading - Liste mit Funktionen abarbeiten - jeweils nur 2

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
BlackJack

Mittwoch 12. Juni 2013, 22:35

@droptix: Du greifst parallel auf eine Queue zu. Wie gesagt an einer Stelle an der man das nicht tun sollte, weil das die `Future`-Exemplare komplett ignoriert. Wenn die so unwichtig wären, würde das Modul nicht so heissen. Das habe ich doch schon mal geschrieben und Du fragst immer noch nach. Die Funktion sollte das berechnen was sie halt berechnen soll. Wo die Eingabewerte herkommen und wo/wie sie danach gespeichert werden gehört da nicht mehr rein. Zumal die sowieso nicht einfach so auf `results` zugreifen sollte, denn das kam ja gar nicht als Argument rein.

Wenn Du merkst, dass Du mit `submit()` selbst `map()` implementierst, dann solltest Du lieber `map()` nehmen. Man hat nichts mit den `Future`-Exemplaren zu tun und die Reihenfolge bleibt erhalten.

Warum willst Du mehrere `ThreadPoolExecutor`-Exemplare hintereinander schalten? Statt Funktionen zu kombinieren und „hintereinander zu schalten” und die von *einem* Executor ausführen zu lassen. Das ist wesentlich einfacher. Wenn mehrere parallel laufen sollen und Du die erledigten Aufgaben weitergeben möchtest, dann müsstest Du den `Future`-Exemplaren Rückruffunktionen mitgeben, die dann den nächsten Schritt anstossen.
droptix
User
Beiträge: 521
Registriert: Donnerstag 13. Oktober 2005, 21:27

Freitag 14. Juni 2013, 09:27

Deine erste Ansage nehme ich jetzt einfach mal als gegeben hin. Der Knackpunkt ist eben, dass in der Doku nicht beschrieben ist, wie man vorgehen sollte... das muss man sich selbst zusammenreimen. Daher hab ich so viel nachgefragt. Danke also für deine ausführlichen Erklärungen!
BlackJack hat geschrieben:Wenn Du merkst, dass Du mit `submit()` selbst `map()` implementierst, dann solltest Du lieber `map()` nehmen. Man hat nichts mit den `Future`-Exemplaren zu tun und die Reihenfolge bleibt erhalten.
Du meinst, wenn ich das so mache, kann ich auch gleich `map()` nehmen, richtig?

Code: Alles auswählen

tasks = "".join([chr(97+i) for i in range(26)]) # string.lowercase returns more than 26 alphabet characters (Win7x64)
    results = []
    def work(character):
        return character.upper() 
    with concurrent.futures.ThreadPoolExecutor(4) as executor:
        for character in tasks:
            future = executor.submit(work, character)
            results.append(future.result())
    print results
BlackJack hat geschrieben:Warum willst Du mehrere `ThreadPoolExecutor`-Exemplare hintereinander schalten? Statt Funktionen zu kombinieren und „hintereinander zu schalten” und die von *einem* Executor ausführen zu lassen. Das ist wesentlich einfacher. Wenn mehrere parallel laufen sollen und Du die erledigten Aufgaben weitergeben möchtest, dann müsstest Du den `Future`-Exemplaren Rückruffunktionen mitgeben, die dann den nächsten Schritt anstossen.
Konkretes Beispiel für mein Python-Programm:
  • Prozess 1: Ich habe ein Verzeichnis wo viele Daten zusammenlaufen. 1x am Tag geht mein Python Programm durch dieses Verzeichnis, liest alle neuen Dateien ein und schreibt diese in eine "Zu-Prüfen"-Queue. Während das Programm läuft, wird das Verzeichnis aller 10 Sekunden erneut gescannt, da auch zur Laufzeit neue Dateien reinkommen können. d.h. die Queue wird zur Laufzeit voller.
  • Prozess 2: Die Dateipfade aus der Queue werden geprüft, ob sie schon fertig geschrieben worden sind (`os.access(path, os.W_OK)`). Sobald die fertig sind, kommen die in die nächste "Zu-Bearbeiten"-Queue. Dieser Zwischenschritt ist notwendig, damit ich nicht auf die Daten zugreife, während sie noch kopiert/geschrieben werden (geht über's Netzwerk, eine gewisse Trägheit ist bei der Größe der Daten also zwingend zu berücksichtigen)
  • Prozess 3: Je nach Dateityp werden die Dateien zunächst automatisiert bearbeitet und anschließend verschoben.
  • Erst wenn über 5 Minuten lang keine neuen Dateien eingegangen sind oder wenn das gesamte Programm länger als 12 Stunden lief, dann wird ein Stopp-Signal gesendet. Das bewirkt, dass Prozess 1 aufhört das Verzeichnis neu einzulesen und neue Dateipfade in die "Zu-Prüfen"-Queue einreiht. Folglich haben die Prozesse 2 und 3 dann noch Zeit, um ihre laufenden Aufgaben abzuarbeiten.
Der Übersichtlichkeit halber und zu Gunsten der besseren Struktur möchte ich das in einzelne Prozess-Schritte aufteilen. So arbeiten im Prinzip ja auch "Business Process Management" Systeme (BPMs).
BlackJack

Freitag 14. Juni 2013, 10:37

@droptix: Die Dokumentation hat doch einige Beispiele die das typische Vorgehen zeigen. Vielleicht bin ich auch vorbelastet, weil ich das allgemeine Konzept von Futures schon aus anderen Programmiersprachen kenne. Siehe auch Wikipedia zum Begriff Future in der Programmierung. Ansonsten kommt man da vielleicht noch durch allgemeinen guten Programmentwurf drauf. Eine Funktion sollte nicht zu viel tun und wissen. Der Worker-Funktion sollte es egal sein von wem sie aufgerufen wird, und was mit ihrem Ergebnis passiert, denn dann kann man sie am flexibelsten verwenden.

Dein Quelltextbeispiel arbeitet schon wieder alles sequentiell ab. `result()` blockiert und wartet so lange bis das Ergebnis fertig ist! Und erst wenn das Ergebnis von einer Aufgabe fertig ist, gehst Du in der Schleife zur nächsten Aufgabe. So ist immer nur einer von den vier Threads in dem Beispiel beschäftigt. Wenn dort etwas parallel ausgeführt werden soll, müsste man schon so etwas hier machen:

Code: Alles auswählen

    tasks = string.ascii_lowercase
    def work(character):
        return character.upper() 
    with concurrent.futures.ThreadPoolExecutor(4) as executor:
        futures = [executor.submit(work, character) for character in tasks]
        results = [future.result() for future in futures]
        # 
        # oder wenn die Reihenfolge egal ist:
        # 
        results = [
            future.result()
            for future in concurrent.futures.as_completed(futures)
        ]
    print results
Und wenn man mit Ausnahmen rechnet, würde man das erstellen der Ergebnisliste noch in ein ``try``/``finally`` verpacken und im ``finally``-Block auf allen `Future`-Exemplaren `cancel()` aufrufen, damit wenigstens die Aufgaben die noch nicht gestartet wurden, nicht noch unnötig abgearbeitet werden.
droptix
User
Beiträge: 521
Registriert: Donnerstag 13. Oktober 2005, 21:27

Dienstag 18. Juni 2013, 14:15

Mein Quelltext-Beispiel war absichtlich "falsch", um zu zeigen dass `map()` hier sinnvoller ist. Trotzdem gut dein Hinweis: denn mein Code-Beispiel baut das Verhalten von `map()` nicht nach sondern blockiert nach jedem `result()`. Bei `map()` hingegen bleiben die Aufrufe asynchron. Ist mir schon klar dass das Blödsinn ist... diente der Veranschaulichung.

Mein Fazit: Leider komme ich zu dem Schluss, dass `concurrent.futures-ThreadPoolExecutor` nur dann sinnvoll ist, wenn die Liste mit Aufgaben im Vorhinein klar und endlich ist. Es ist nicht möglich, damit eine Aufgabenliste zu bewältigen, in der zur Laufzeit neue Aufgaben hinzukommen... so wie ich das vorhabe.

Richtig?
BlackJack

Dienstag 18. Juni 2013, 14:45

@droptix: Sehe ich nicht so. Man kann ja zum Beispiel die Rückruffunktionen verwenden. Exemplarisch:

Code: Alles auswählen

        for task in iter_endless_tasks():
            executor.submit(work, task).add_done_callback(work_done)
Man muss ein bisschen in der `work_done()`-Funktion aufpassen, weil die mehrfach gleichzeitig ausgeführt werden kann, man also dort die kritischen Abschnitte schützen müsste.

Man könnte sich eine Klasse schreiben, die eine Funktion und einen Executor kapselt und ein Nachfolger-Exemplar kennt, zu dem die `work_done()`-Methode die Ergebnisse weiterreicht. (Ich gehe jetzt mal von der Bedingung aus, dass die Reihenfolge der Bearbeitung keine Rolle spielt.)

Edit: Sinnfreies Beispiel:

Code: Alles auswählen

#!/usr/bin/env python
from __future__ import print_function

from concurrent.futures import ThreadPoolExecutor
from random import random
from threading import Lock
from time import sleep


class ProcessingStep(object):
    def __init__(self, work_callable, worker_count, next_step=None):
        self.work_callable = work_callable
        self.executor = ThreadPoolExecutor(worker_count)
        self.next_step = next_step
        self.lock = Lock()

    def __enter__(self):
        return self

    def __exit__(self, _type, _value, _traceback):
        self.wait()

    def submit(self, *arguments):
        (
            self.executor
                .submit(self.work_callable, *arguments)
                .add_done_callback(self.work_done)
        )

    def work_done(self, future):
        result = future.result()
        if self.next_step:
            with self.lock:
                self.next_step.submit(result)

    def wait(self):
        self.executor.shutdown()
        if self.next_step:
            self.next_step.wait()


def work_a(argument):
    sleep(random())
    return argument.upper()


def work_b(argument):
    sleep(random())
    return ' '.join(argument)


def main():
    with ProcessingStep(
        work_a, 2, ProcessingStep(work_b, 2, ProcessingStep(print, 1))
    ) as processor:
        for task in iter(raw_input, ''):
            processor.submit(task)


if __name__ == '__main__':
    main()
droptix
User
Beiträge: 521
Registriert: Donnerstag 13. Oktober 2005, 21:27

Donnerstag 20. Juni 2013, 13:45

Netter Ansatz! Dann arbeite ich mich doch nochmal da rein...

Allgemeine Python-Syntax-Frage dazu: Was bezweckst du mit den runden Klammern in `submit()`? Ich verstehe die Syntax nicht.
BlackJack hat geschrieben:

Code: Alles auswählen

class ProcessingStep(object):
    # ...
    def submit(self, *arguments):
        (
            self.executor
                .submit(self.work_callable, *arguments)
                .add_done_callback(self.work_done)
        )
Benutzeravatar
cofi
Moderator
Beiträge: 4432
Registriert: Sonntag 30. März 2008, 04:16
Wohnort: RGFybXN0YWR0

Donnerstag 20. Juni 2013, 13:50

Code: Alles auswählen

class ProcessingStep(object):
    # ...
    def submit(self, *arguments):
        (
            self.executor
                .submit(self.work_callable, *arguments)
                .add_done_callback(self.work_done)
        )
entspricht

Code: Alles auswählen

class ProcessingStep(object):
    # ...
    def submit(self, *arguments):
        self.executor \
            .submit(self.work_callable, *arguments) \
            .add_done_callback(self.work_done)
oder

Code: Alles auswählen

class ProcessingStep(object):
    # ...
    def submit(self, *arguments):
        self.executor.submit(self.work_callable, *arguments).add_done_callback(self.work_done)
Kurzum: BlackJack will einen Ausdruck ueber mehrere Zeilen verteilen und benutzt dazu die Tatsache, dass der Parser in Klammern ueber Zeilengrenzen hinweg liest.
droptix
User
Beiträge: 521
Registriert: Donnerstag 13. Oktober 2005, 21:27

Donnerstag 20. Juni 2013, 13:52

Danke. Hat also nichts mit Tupeln zu tun... :D

Man könnte dasselbe auch so schreiben, richtig?

Code: Alles auswählen

class ProcessingStep(object):
    # ...
    def submit(self, *arguments):
        future = self.executor.submit(self.work_callable, *arguments)
        future.add_done_callback(self.work_done)
Nachtrag: Wofür brauche ich `__enter__` und `__exit__`? Sind die `object`-spezifisch? Danke!
EyDu
User
Beiträge: 4872
Registriert: Donnerstag 20. Juli 2006, 23:06
Wohnort: Berlin

Donnerstag 20. Juni 2013, 14:17

droptix hat geschrieben:Nachtrag: Wofür brauche ich `__enter__` und `__exit__`? Sind die `object`-spezifisch? Danke!
Für das with-Statement.
Das Leben ist wie ein Tennisball.
Antworten