Seite 1 von 1
multiprocessing für parallele API Aufrufe mit Wert für maximal
Verfasst: Donnerstag 24. Mai 2018, 08:35
von martinjo
Hallo
Schon seit langem versuche ich folgendes zu verwirklichen:
Es wird eine API Anfrage gesendet. Ich erhalte als Ergebnis eine Liste mit 100 Artikeln und der Info, dass noch 30 weitere Seiten vorhanden sind.
Also muss ich 30 weitere Aufrufe starten bei denen ich die Seitenanzahl mitliefere, um die Artikel der jeweiligen Seite zu bekommen.
Da so ein Aufruf etwas längern dauert und
sechs Aufrufe parallel möglich sind möchte ich das auch gerne nutzen. Ich bin mir nur nicht sicher welches Werkzeug dafür am geeignetsten ist und auf was ich zu achten habe:
Bisheriger Aufruf:
Code: Alles auswählen
import myapi
response = myapi.get_all_products()
products = response["items"]
pages = response["item_pages_available"]
for page in range(int(pages)):
response = myapi.get_all_products(page=page)
products.extend(response["items"])
Überlegung 1:
Ist multiprocessing hier überhaupt passend? Ist man hier nicht auf die Anzahl der Prozessoren beschränkt? Ist ja bei diesen Aufrufen gar nicht wichtig, denn selbst ein alter Einkerner kann ja 6 Anfragen senden und auf die Antworten warten.
Ich glaube das ist erstmal die wichtigste Frage.
Re: multiprocessing für parallele API Aufrufe mit Wert für maximal
Verfasst: Donnerstag 24. Mai 2018, 08:44
von Sirius3
Da solche Anfragen IO-Limitiert sind, nimmt man dazu am besten AsyncIO.
Re: multiprocessing für parallele API Aufrufe mit Wert für maximal
Verfasst: Donnerstag 24. Mai 2018, 09:59
von noisefloor
Hallo,
asyncio ist für so was gemacht. Ggf. möchtest du das Modul
aihttp benutzen, welches asynchrone (einfacher) Requests ermöglicht.
Gruß, noisefloor
Re: multiprocessing für parallele API Aufrufe mit Wert für maximal
Verfasst: Donnerstag 24. Mai 2018, 15:13
von martinjo
Danke, ich habe mich die letzen Stunden damit beschäftigt, jetzt qualmt der Kopf und ich komme nicht weiter. Evtl. habt ihr mir ja noch n Tipp, Problem gerade, ich gebe über return eine Liste zurück, erhalte jedoch ein CoroWrapper Objekt. Dadurch wir die Liste mit den Items nie erweitert:
Code: Alles auswählen
items, pages = self.get_items_response_test(options=options)
import asyncio
loop = asyncio.get_event_loop()
loop.set_debug(True)
async def asy_func_base(items, pages, options):
for page_number in range(2, int(pages-1)):
yitems = asy_get_items(page_number, options)
logger.info("return from asy_get_items = %s", yitems)
items.extend(yitems)
print(len(items))
async def asy_get_items(page_number, options):
xitems, xpage = self.get_items_response_test(options=options, page_number=page_number)
return xitems
loop.run_until_complete(asy_func_base(items, pages, options))
loop.close()
Auch werden die Aufrufe nacheinander ausgeführt, ich war jedoch froh, das dass zumindest mal halbwegs funktioniert hat. Wenn ich die Liste "items" der Funktion "asy_get_items" durchreiche kann ich sie zumindest dort füllen, doch irgendwie scheint das nicht richtig.
Re: multiprocessing für parallele API Aufrufe mit Wert für maximal
Verfasst: Donnerstag 24. Mai 2018, 16:10
von __deets__
Ich glaube dir fehlt ein await in "yitems = ...". Das ist ja wieder eine Co-Routine, und damit die im mainloop verarbeitet wird, gehoert das await davor.
Re: multiprocessing für parallele API Aufrufe mit Wert für maximal
Verfasst: Donnerstag 24. Mai 2018, 16:28
von martinjo
Danke, inzwischen habe ich es jedoch umgeändert:
Code: Alles auswählen
import myapi
response = myapi.get_all_products()
items = response["items"]
pages = response["item_pages_available"]
import asyncio
loop = asyncio.get_event_loop()
async def asy_get_items(page_number, items, options):
logger.debug("asy_get_items")
response = myapi.get_all_products(page=page_number, options=options)
items.extend(response["items"])
logger.info("items stored now: %s", len(items))
async def main_routine():
tasks = []
for page_number in range(2, int(pages-1)):
logger.info("add task for page nr.: %s", page_number)
asyncio.ensure_future(asy_get_items(page_number, items, options))
await asyncio.gather(*tasks)
try:
loop.run_until_complete(main_routine())
finally:
loop.close()
Also bisher läuft es damit, aber nicht parallel, sieht aber besser aus als das Beispiel davor.
Re: multiprocessing für parallele API Aufrufe mit Wert für maximal
Verfasst: Donnerstag 24. Mai 2018, 17:27
von martinjo
Nochmal ein kleines Update, dieses Mal auch direkt aus dem Skript kopiert, also ohne Anpassungen. Läuft sauber der Reihe nach durch. Aber nicht parallel.
Code: Alles auswählen
import asyncio
loop = asyncio.get_event_loop()
async def asy_get_items(page_number, items, options):
logger.debug("asy_get_items for page %s", page_number)
await asyncio.sleep(0.01)
newitems, pages = myapi.get_items_response_test(options=options, page_number=page_number)
items.extend(newitems)
logger.info("items stored now: %s", len(items))
async def main_routine():
tasks = []
for page_number in range(2, int(pages-1)):
if page_number > 5:
break
logger.info("add task for page nr.: %s", page_number)
tasks.append(loop.create_task(asy_get_items(page_number, items, options)))
#await asyncio.gather(*tasks)
await asyncio.wait(tasks)
try:
loop.run_until_complete(main_routine())
finally:
loop.close()
logger.info("finally items stored now: %s", len(items))
Re: multiprocessing für parallele API Aufrufe mit Wert für maximal
Verfasst: Donnerstag 24. Mai 2018, 19:18
von noisefloor
Hallo,
das läuft wohl nicht parallel, weil so Code ja nicht "automagisch" asynchron wird. Zugegebener Maßen ist asyncio am Anfang etwas schwierig, weil man umdenken muss. Da kann schon mal der Kopf qualmen
Damit es asynchron läuft, muss etwas "awaitable" sein - dann kann der Mainloop weiter laufen, während eine Funktion auf ein Ergebnis wartet. Deine Funktion
newitems, pages = myapi.get_items_response_test(options=options, page_number=page_number)
ist aber nicht awaitable und damit synchron.
Wie gesagt, du müsstest für die Request die passende Klasse / Funktion aus dem Modul aiohttp nehmen. Da sind die Request dann nämlich "awaitble" und blockieren nicht.
BTW: wo asyncio und dessen Grundprinzipien deutlich besser erklärt sind als in der offiziellen Doku ist auf der "Python Module of the Week" Webseite und auf der Projektseite von Trio. Das ist ein Modul, welches eine "high level" API für asyncio bereit stellt.
Gruß, noisefloor
Re: multiprocessing für parallele API Aufrufe mit Wert für maximal
Verfasst: Freitag 25. Mai 2018, 16:43
von martinjo
Hallo
ich nutze für die API ein fertiges Modul, daher kann ich dort nicht so einfach auf aiohttp umswitchen. Da ich mit dem asyncio-Modul nicht weiter komme habe ich nun doch erstmal multiprocessing mit Pool verwendet.
Damit benötige ich nun für den ersten, einzelnen Call 5.6 Sekunden, für weitere 10 Calls 9.3 Sekunden.
Ist alles noch nicht optimiert oder schön, funktioniert jedoch schon mal. Ich hoffe ich finde noch die Zeit, mich mit asyncio besser auseinander zu setzen.
Code: Alles auswählen
items, pages = self.get_items_response_test(options=options)
from numpy import array_split
pages_range = range(2, int(pages)+1)
pair = 10
pages_range_pairs = [pages_range[i:i+pair] for i in range(0, len(pages_range), pair) ]
for prp in pages_range_pairs:
items_lists = self.get_items_with_multiprocessing(options, prp)
end = time.time()
for items_list in items_lists:
items.extend(items_list)
def get_items_with_multiprocessing(self, options, range_pair):
from multiprocessing import Pool
pool = Pool(processes=len(range_pair))
myargs = [[options, pn] for pn in range_pair]
return pool.map(API().get_items_response_test_wrapper, myargs)
def get_items_response_test_wrapper(self, myargs):
options, page_number = myargs
return self.get_items_response_test(options=options, page_number=page_number)
Re: multiprocessing für parallele API Aufrufe mit Wert für maximal
Verfasst: Freitag 25. Mai 2018, 20:45
von noisefloor
Hallo,
wie __deets__ in der ersten Antwort erwähnt ist dein "Problem" I/O bound, heißt: I/O ist der limitierende Faktor. Dann ist Multiprocessing nicht unbedingt erste Wahl, sondern wenn "klassisch" parallelisieren dann mittels Threading. Grund: einen neuen Thread starten braucht i.d.R. weniger Overhead als einen neuen Prozess zu starten. Wenn du das `concurrent.futures` Modul statt Multiprocessing oder Threading benutzt kannst du sogar ganz einfach zwischen Threads und Prozessen auswählen.
Was ich eben gesehen habe, was aus auch gibt:
grequests, dass ist das requests-Modul kombiniert mit Gevent. Damit gehen auch asynchrone Request. Ist vielleicht / wahrscheinlich einfacher einzubauen als asyncio.
Gruß, noisefloor