ich befasse mich gerade mit Multlthreading. Der Anwendsungsfall ist: Ich möchte gleichzeitig Daten von verschiedenen Quellen holen. Manchmal reagiert beispielsweise eine der Quellen sehr langsam und hält den gesamten Prozess auf. Für diesen Fall möchte ich gerne das Abholen der Daten anhalten und nur die bereits fertig abgeholten Daten weiterreichen. Diese Zeitspanne für die das Sammeln der Daten habe ich TIMEOUT getauft.
Unten habe ich ein Beispiel vorbereitet, das macht jedoch noch nicht ganz das, was es soll.
Code: Alles auswählen
import time
import sys
from datetime import datetime
from multiprocessing.pool import ThreadPool
from multiprocessing import TimeoutError
# fetching data takes time, depending on the source
REQUIRED_TIME = {'source1': 5,
'source2': 2,
'source3': 3,
'source4': 4,
'source5': 7}
# fetching data should be aborted after this time
TIMEOUT = 4
def fetch_data_threaded():
t0 = datetime.utcnow()
data_sources = {'source1': ['d1', 'd2', 'd3'],
'source2': ['d4', 'd5', 'd6', 'd7'],
'source3': ['d8'],
'source4': ['d9', 'd10'],
'source5': ['d11', 'd12']}
# Fetch data multi-threaded
thread_pool = ThreadPool(processes=2)
data = {}
# -------------------------------------------------------------------------------- #
# This block should be aborted after TIMEOUT seconds, keep whatever data was fetched
for source, ids in data_sources.iteritems():
data[source] = thread_pool.apply_async(
get_data,
[source, ids],
)
result = []
for source in data:
try:
result += data[source].get(TIMEOUT)
print " Got data for source: {0}".format(source)
except TimeoutError as e:
print " ERROR: Timeout for key {0}: {1}".format(source, repr(e))
# --------------------------------------------------------------------------------- #
print "I got {0} results: {1}.".format(len(result), ", ".join(result))
t1 = datetime.utcnow()
print "took {0} seconds total".format((t1-t0).total_seconds())
return result
def get_data(source, ids):
# This method gets the data, it takes some time... and returns some identifyable data
sleep = REQUIRED_TIME[source]
print "source: {0}, ids: {1}, sleeptime: {2}s.".format(source, ids, sleep)
time.sleep(sleep)
return [i + 'data' for i in ids]
if __name__ == '__main__':
sys.exit(fetch_data_threaded())
Code: Alles auswählen
source: source2, ids: ['d4', 'd5', 'd6', 'd7'], sleeptime: 2s.
source: source3, ids: ['d8'], sleeptime: 3s.
source: source1, ids: ['d1', 'd2', 'd3'], sleeptime: 5s.
Got data for source: source2
source: source4, ids: ['d9', 'd10'], sleeptime: 4s.
Got data for source: source3
source: source5, ids: ['d11', 'd12'], sleeptime: 7s.
Got data for source: source1
Got data for source: source4
ERROR: Timeout for key source5: TimeoutError()
I got 10 results: d4data, d5data, d6data, d7data, d8data, d1data, d2data, d3data, d9data, d10data.
took 11.010673 seconds total
['d4data', 'd5data', 'd6data', 'd7data', 'd8data', 'd1data', 'd2data', 'd3data', 'd9data', 'd10data']
- Ein TimeoutError kommt nur für source5, ich hätte dies auch bei source1 erwartet
- Der ganze Code läuft in 11 Sekunden durch. Ich hätte erwartet, es würde nur 4 Sekunden dauern.
- Der Timeout scheint nur pro data[source].get() zu gelten, statt für alle data[source].get() gleichzeitig.
Gruß,
Nras.
P.S.: Ich habe die Frage auch auf StackOverflow gestellt, jedoch leider komplette ohne Reaktion