Multhithreaded Job anhalten, bisher fertige results holen.

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
Nras
User
Beiträge: 24
Registriert: Dienstag 25. März 2014, 10:38

Hallo,

ich befasse mich gerade mit Multlthreading. Der Anwendsungsfall ist: Ich möchte gleichzeitig Daten von verschiedenen Quellen holen. Manchmal reagiert beispielsweise eine der Quellen sehr langsam und hält den gesamten Prozess auf. Für diesen Fall möchte ich gerne das Abholen der Daten anhalten und nur die bereits fertig abgeholten Daten weiterreichen. Diese Zeitspanne für die das Sammeln der Daten habe ich TIMEOUT getauft.
Unten habe ich ein Beispiel vorbereitet, das macht jedoch noch nicht ganz das, was es soll.

Code: Alles auswählen

import time
import sys
from datetime import datetime
from multiprocessing.pool import ThreadPool
from multiprocessing import TimeoutError

# fetching data takes time, depending on the source
REQUIRED_TIME = {'source1': 5,
                 'source2': 2,
                 'source3': 3,
                 'source4': 4,
                 'source5': 7}

# fetching data should be aborted after this time
TIMEOUT = 4

def fetch_data_threaded():
    t0 = datetime.utcnow()

    data_sources = {'source1': ['d1', 'd2', 'd3'],
                    'source2': ['d4', 'd5', 'd6', 'd7'],
                    'source3': ['d8'],
                    'source4': ['d9', 'd10'],
                    'source5': ['d11', 'd12']}

    # Fetch data multi-threaded
    thread_pool = ThreadPool(processes=2)
    data = {}

    # -------------------------------------------------------------------------------- #
    # This block should be aborted after TIMEOUT seconds, keep whatever data was fetched
    for source, ids in data_sources.iteritems():
        data[source] = thread_pool.apply_async(
            get_data,
            [source, ids],
        )
    result = []
    for source in data:
        try:
            result += data[source].get(TIMEOUT)
            print "  Got data for source: {0}".format(source)
        except TimeoutError as e:
            print "  ERROR: Timeout for key {0}: {1}".format(source, repr(e))
    # --------------------------------------------------------------------------------- #

    print "I got {0} results: {1}.".format(len(result), ", ".join(result))
    t1 = datetime.utcnow()
    print "took {0} seconds total".format((t1-t0).total_seconds())
    return result

def get_data(source, ids):
    # This method gets the data, it takes some time... and returns some identifyable data
    sleep = REQUIRED_TIME[source]
    print "source: {0}, ids: {1}, sleeptime: {2}s.".format(source, ids, sleep)
    time.sleep(sleep)
    return [i + 'data' for i in ids]


if __name__ == '__main__':
    sys.exit(fetch_data_threaded())
Dieses Beispiel erzeugt bei mir folgenden Output:

Code: Alles auswählen

source: source2, ids: ['d4', 'd5', 'd6', 'd7'], sleeptime: 2s.
source: source3, ids: ['d8'], sleeptime: 3s.
source: source1, ids: ['d1', 'd2', 'd3'], sleeptime: 5s.
  Got data for source: source2
source: source4, ids: ['d9', 'd10'], sleeptime: 4s.
  Got data for source: source3
source: source5, ids: ['d11', 'd12'], sleeptime: 7s.
  Got data for source: source1
  Got data for source: source4
  ERROR: Timeout for key source5: TimeoutError()
I got 10 results: d4data, d5data, d6data, d7data, d8data, d1data, d2data, d3data, d9data, d10data.
took 11.010673 seconds total
['d4data', 'd5data', 'd6data', 'd7data', 'd8data', 'd1data', 'd2data', 'd3data', 'd9data', 'd10data']
Dabei verwundern mich ein paar Dinge:
  • Ein TimeoutError kommt nur für source5, ich hätte dies auch bei source1 erwartet
  • Der ganze Code läuft in 11 Sekunden durch. Ich hätte erwartet, es würde nur 4 Sekunden dauern.
  • Der Timeout scheint nur pro data[source].get() zu gelten, statt für alle data[source].get() gleichzeitig.
Was muss ich verändern, damit der gesamte eingerahmte Prozess des multithreaded-Datenabholens nach der Zeit TIMEOUT vorüber ist?

Gruß,
Nras.

P.S.: Ich habe die Frage auch auf StackOverflow gestellt, jedoch leider komplette ohne Reaktion
Zuletzt geändert von Anonymous am Freitag 27. November 2015, 20:35, insgesamt 1-mal geändert.
Grund: Quelltext in Python-Codebox-Tags gesetzt.
BlackJack

@Nras: Die ersten beiden Dinge die Dich wundern dürften vom letzten Punkt abhängen der Dich wundert. Was mich wundert, denn wie sollte das auch anders funktionieren? Woher sollen `get()`-Aufrufe für andere Ergebnisse wissen das Du gerne ein TIMEOUT für alles haben willst wo Du den TIMEOUT-Wert doch bei jedem Aufruf auf den *unabhängigen* Ergebnis-Objekten jedes mal angibst?

Streng genommen geht das gar nicht was Du willst, denn es wird weiter in den noch laufenden Threads abgeholt. Die Zeitüberschreitung beim `get()` sagt ja nur das Du nicht länger bei dem Aufruf auf das Ergebnis warten möchtest, nicht das der Thread dann aufhört zu laufen. Also mindestens die Threads die nach der gewünschten Zeitspanne noch laufen werden das auch bis an ihr ”natürliches” Ende tun. Threads kann man nicht abbrechen. Wenn der Thread allerdings noch nicht gestartet ist, kann man auf dem Ergebnis-Objekt verhindern das er losläuft, das solltest Du also vielleicht auch einbauen wenn da nicht unnötig etwas nebenher noch laufen soll.

Du könntest Rückruffunktionen an die Ergebnisobjekte binden und dann die gewünschte Zeit auf einem `multiprocessing.Event` mit Timeout warten das entweder gesetzt wird weil alles abgearbeitet wird oder eben die Zeitüberschreitung zuschlägt.
Benutzeravatar
snafu
User
Beiträge: 6831
Registriert: Donnerstag 21. Februar 2008, 17:31
Wohnort: Gelsenkirchen

Nras hat geschrieben:

Code: Alles auswählen

source: source2, ids: ['d4', 'd5', 'd6', 'd7'], sleeptime: 2s.
source: source3, ids: ['d8'], sleeptime: 3s.
source: source1, ids: ['d1', 'd2', 'd3'], sleeptime: 5s.
  Got data for source: source2
source: source4, ids: ['d9', 'd10'], sleeptime: 4s.
  Got data for source: source3
source: source5, ids: ['d11', 'd12'], sleeptime: 7s.
  Got data for source: source1
  Got data for source: source4
  ERROR: Timeout for key source5: TimeoutError()
I got 10 results: d4data, d5data, d6data, d7data, d8data, d1data, d2data, d3data, d9data, d10data.
took 11.010673 seconds total
['d4data', 'd5data', 'd6data', 'd7data', 'd8data', 'd1data', 'd2data', 'd3data', 'd9data', 'd10data']
Dabei verwundern mich ein paar Dinge:
  • Ein TimeoutError kommt nur für source5, ich hätte dies auch bei source1 erwartet
  • Der ganze Code läuft in 11 Sekunden durch. Ich hätte erwartet, es würde nur 4 Sekunden dauern.
  • Der Timeout scheint nur pro data[source].get() zu gelten, statt für alle data[source].get() gleichzeitig.
Das Timeout besagt nur, wie lange auf den Abschluss eines Threads (bzw einer Aufgabe) gewartet werden soll, nachdem sein Ergebnis angefordert wurde.

In deinem Fall wird zuerst "source2" befragt und braucht nur 2 Sekunden, d.h. das Timeout wird nicht überschritten. In dieser Zeit laufen aber auch die anderen Threads weiter, weil nicht erst das `get()` den Thread anstößt, sondern das `apply_async()`, welches du ja in der vorherigen Schleife bereits auf alle abzuarbeitenden Objekte angewendet hast. Nun wird "source2" befragt, der aus den zuvor beschriebenen Gründen bereits 2 Sekunden "Vorsprung" hat und nach `get()` nur noch 1 Sekunde benötigt. Und den Rest kannst du dir jetzt wahrscheinlich selber denken...

Dir leuchtet insofern jetzt hoffentlich ein, dass das von dir beobachtete Verhalten nicht überraschend ist, sondern dass es dem Verhalten entspricht, welches man erwarten würde: Eine *gleichzeitige* Abarbeitung aller Aufgaben. Andernfalls könnte man sich den ThreadPool ja sparen, wenn man die Aufgaben ohnehin nacheinander ausführen möchte.

EDIT: Das mit `apply_async()` war nicht ganz richtig. Dadurch, dass du nur 2 Threads gleichzeitig erlaubt hast, starten nicht wirklich alle Threads gleichzeitig, sondern eben nur 2 gleichzeitig. Der dritte Thread beginnt, sobald der erste beendet wurde, kann aber den Vorsprung mitnehmen, der durch die Ausführungszeit des zweiten Threads entsteht. So kommen dann letztlich die 11 Sekunden Gesamtzeit zustande.
Benutzeravatar
snafu
User
Beiträge: 6831
Registriert: Donnerstag 21. Februar 2008, 17:31
Wohnort: Gelsenkirchen

Um das nochmal detailliert auszuführen (durfte den Beitrag nicht mehr editieren) :
Bezogen auf das Beispiel bedeutet das, dass "source1" gestartet wird, wenn "source2" fertig ist, d.h. wenn "source3" noch 1 Sekunde Laufzeit hat. Sobald "source3" fertig ist, kommt es zum `get()` für "source1", welches dann schon 1 Sekunde lang laufen konnte. Daher schafft es aufgrund der sich ergebenen Restlaufzeit von 4 Sekunden ganz knapp die Zeit innerhalb des Timeouts. Insgesamt sind bis hierher 7 Sekunden vergangen (2 Sek von "source2" + 1 Sek Rest von "source3" + 4 Sek Rest von "source1"). "source4" durfte beginnen als "source3" beendet war und konnte wegen der vollen Ausnutzung des Timeouts von "source1" bereits 4 Sekunden laufen und ist damit bereits fertig, wenn es durch `get()` befragt wird. Wir haben also immer noch 7 Sekunden Gesamtlaufzeit. Da "source5" erst starten konnte, nachdem "source1" fertig war, startet es also in Sekunde 7 und läuft die vollen 4 Sekunden bis zum Timeout ab. Da es in dieser Zeit nicht fertig wird, kommt es zum Fehler und unter Einbeziehung dieser 4 Sekunden haben wir die 11 Sekunden Gesamtzeit und damit die komplette Erklärung.
Nras
User
Beiträge: 24
Registriert: Dienstag 25. März 2014, 10:38

Hallo,

danke für die vielen Antworten. Ich habe noch mal über das Problem nachgedacht, und bin der Meinung, dass ich gerne ein pool.join() mit Timeout haben möchte, was allerdings wohl nicht möglich ist. Daher dachte ich, ich baue mir mein eigenes pool.join() (ohne zu wissen, wie das im originalen implementiert ist). Ich frage einfach, ob alle Prozesse fertig sind, wenn nicht, kurz warten und erneut Fragen. Solange, bis alle Prozesse fertig sind, oder die Wartezeit überschritten ist. Dann alle Ergebnisse holen.

Ich frage einfach immer wieder ab, ob noch Prozesse nicht fertig sind.

Code: Alles auswählen

for source, ids in data_sources.iteritems():
        data[source] = thread_pool.apply_async(
            get_data,
            [source, ids],
        )

    # regulary check for finished processes within given TIMEOUT-time
    processes_ready = {}
    all_ready = False
    while not all_ready and (datetime.utcnow() - t0).total_seconds() < TIMEOUT:
        for source in data:
            processes_ready[source] = data[source].ready()
        all_ready = all(processes_ready.values())
        if all_ready:
            print "all processes ready after {0} seconds".format((datetime.utcnow() - t0).total_seconds())
            break
        print "{0}/{1} processes finished, sleeping shortly... ".format(np.sum(processes_ready.values()),
                                                                        len(processes_ready))
        time.sleep(0.1)  # Zzzzz

    # fetching whatever data is there
    print processes_ready
    result = []
    for source in data:
        try:
            result += data[source].get(timeout=0.0)
            print "  Got data for source: {0}".format(source)
        except TimeoutError as e:
            print "  ERROR: Timeout for key {0}: {1}".format(source, repr(e))

    print "I got {0} results: {1}.".format(len(result), ", ".join(result))
    t1 = datetime.utcnow()
    print "took {0} seconds total".format((t1-t0).total_seconds())
    return result
Das holt (mit den Zeiten von vom Eröffnungspost) folgenden output:

Code: Alles auswählen

2/5 processes finished, sleeping shortly... 
{'source2': True, 'source3': True, 'source1': False, 'source4': False, 'source5': False}
  Got data for source: source2
  Got data for source: source3
  ERROR: Timeout for key source1: TimeoutError()
  ERROR: Timeout for key source4: TimeoutError()
  ERROR: Timeout for key source5: TimeoutError()
I got 5 results: d4data, d5data, d6data, d7data, d8data.
took 4.013774 seconds total
['d4data', 'd5data', 'd6data', 'd7data', 'd8data']
Das heißt, es sieht so aus, als würde das passieren, was ich möchte. Einige results werden geholt, der Rest wird ignoriert. Die Laufzeit ist entsprechend der eingestellten TIMEOUT-Variable erwartungsgemäß.

Nun stellt sich mir bloß noch eine Frage, bezüglich der Antwort von @BlackJack
Laufen die Threads nun noch weiter? Was passiert mit den Threads, wenn die Methode ihr return erreicht? Werden die sauber entsorgt, oder geistern die dann noch irgendwo rum?
Nras
User
Beiträge: 24
Registriert: Dienstag 25. März 2014, 10:38

Ich kann obigen post leider nicht mehr editieren. Die erste Zeile ist falsch eingerückt, sie muss 4 Zeichen weiter eingerückt werden.
Nras
User
Beiträge: 24
Registriert: Dienstag 25. März 2014, 10:38

Falls es noch jemanden interessiert. Mein poor man's pool.join() mit timeout funktioniert wunderbar. Es gab noch einen Bug, bei dem mir noch später Exceptions um die Ohren flogen, wenn nachträglich noch Jobs fertig wurden, bevor die Methode anständig zuende war. Das habe ich gelöst, in dem ich nach der while-schleife ein thrad_pool.terminate() ein normals thread_pool.join() gemacht habe.

Vielen Dank für die Anregungen.

Gruß,
Nras.
Alfons Mittelmeyer
User
Beiträge: 1715
Registriert: Freitag 31. Juli 2015, 13:34

Ich würde sagen, völlig falscher Ansatz. Man macht die Aktionen ereignisgesteuert verbunden mit einer Event Queue und wenn man nichts mehr zu tun hat, dann wartet man nicht mit sleep, sondern mit event.wait.

Wenn es etwas zu tun gibt, legt ein anderer Prozess den Auftrag in die Eventqueue und weckt mit event.set für den Fall, dass der Prozess noch schläft.

Oder würdest Du das auch so machen?

Die Tochter wünscht sich ein Tablet zu Weihnachten. Die Frau bittet ihren Mann eines bei Amazon zu bestellen. Da es bereits Nachts ist, legt sich die Frau zu Bett und wartet dann auf ihren Mann. Aber der kommt nicht und kommt nicht. Die Frau ruft, warum er nicht zu Bett kommt, hat er denn nichts passendes gefunden. Nein, sagt der Mann, ich habe es bereits bestellt. Und warum kommst Du dann nichts ins Bett? Geht nicht, sagt der Mann. Ich habe das Tablet bestellt, aber es ist noch nicht da. Geht Du schon mal zu Bett. Wenn es dann da ist, komme ich nach.
Benutzeravatar
snafu
User
Beiträge: 6831
Registriert: Donnerstag 21. Februar 2008, 17:31
Wohnort: Gelsenkirchen

@Alfons Mittelmeyer
Und wenn das Tablet nach 2 Wochen immer noch nicht angekommen ist, dann würdest du brav weiter warten, anstatt die Bestellung irgendwann zu stornieren...?
BlackJack

Das Beispiel hinkt auch ein wenig weil ich mehrere Leute kenne (mich eingeschlossen) die nach einer Bestellung regelmässig nachsehen ob die Sendung schon raus ist, beziehungsweise wo sie denn gerade unterwegs ist. Also reales „busy waiting“. :-)
Benutzeravatar
snafu
User
Beiträge: 6831
Registriert: Donnerstag 21. Februar 2008, 17:31
Wohnort: Gelsenkirchen

snafu hat geschrieben:@Alfons Mittelmeyer
Und wenn das Tablet nach 2 Wochen immer noch nicht angekommen ist, dann würdest du brav weiter warten, anstatt die Bestellung irgendwann zu stornieren...?
Oder besser gesagt: Würdest du nicht irgendwann kontrollieren wollen, was aus der Bestellung geworden ist und abhängig vom Status der Bestellung den Vorgang ggf "abbrechen" wollen?
Alfons Mittelmeyer hat geschrieben:und wenn man nichts mehr zu tun hat, dann wartet man nicht mit sleep, sondern mit event.wait.
Das stimmt. Die Verwendung von `sleep()` könnte hier aber durchaus bloß beispielhaft bzw zum Testen / zur Simulierung einer längeren Wartezeit gedacht sein.
Alfons Mittelmeyer
User
Beiträge: 1715
Registriert: Freitag 31. Juli 2015, 13:34

Das Beispiel hat wenig mit richtig multithreaded zu tun. Nehmen wir als anderes Beispiel einen Internetserver. Jemand saugt sich da eine 2 GB Datei mit einen Spielfilm herunter. Dem alles schicken und andere inzwischen blockieren wäre grundverkehrt. Man teilt die Daten in sinnvolle Blöcke, schickt sie blockweise und jeder kommt dran, auch wenn es dann bei vielen Usern entsprechend langsamer wird.

Und auf dem Computer, der den Film auf die Festplatte schreibt, wird auch nicht gewartet, bis alles da ist, sondern schon wenn der erste Blockk eintrifft, mit dem Schreiben begonnen.

Die Daten sollten also nicht alle auf einmal sondern in sinnvollen Paketgrößen gesendet w4erden, die man, wenn sie eintreffen, schon weiterverabeiten kann.

Und dazwischen sollten auch die Daten von den anderen Sourcen eintreffen. Wäre also vielleicht auch sinnvoll, pro Source einen Thread zu machen, je nachdem, wie es gedacht ist.

Dann ist auch darauf zu achten, dass die Prozesse, weche die Daten senden, den Prozeß, der sie weiter verarbeitet, nicht vollmüllen, sodass sie sich bei diesem stauen - wäre natürlich bei genügend großem Hauptspeicher kein Problem. Ansonsten, sollte der weiterverarbeitende Prozeß die Geschwindigkeit steuern, indem dieser dann Daten nachfordert, wenn bei ihm seine Dateneingansqueue unter einen bestimmten Level sinkt.
BlackJack

@Alfons Mittelmeyer: Und das hat jetzt alles genau *was* mit der Fragestellung des OP zu tun? Der, mal bildlich gesprochen, mehrere unabhängige Bestellungen aufgibt und nach einer festgelegten Wartezeit die Pakete verwendet die innerhalb dieser Zeit angekommen sind und alle anderen wieder abbestellen/verwerfen möchte. Dafür hat er jetzt eine Lösung, die mit einem `sleep()` zwar nicht optimal ist, aber wirklich blockiert wird da ja nichts. Also nicht länger als maximal eine Zehntelsekunde zumindest.
Antworten