multiprocessing für parallele API Aufrufe mit Wert für maximal

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
Benutzeravatar
martinjo
User
Beiträge: 186
Registriert: Dienstag 14. Juni 2011, 20:03

Hallo
Schon seit langem versuche ich folgendes zu verwirklichen:

Es wird eine API Anfrage gesendet. Ich erhalte als Ergebnis eine Liste mit 100 Artikeln und der Info, dass noch 30 weitere Seiten vorhanden sind.
Also muss ich 30 weitere Aufrufe starten bei denen ich die Seitenanzahl mitliefere, um die Artikel der jeweiligen Seite zu bekommen.

Da so ein Aufruf etwas längern dauert und sechs Aufrufe parallel möglich sind möchte ich das auch gerne nutzen. Ich bin mir nur nicht sicher welches Werkzeug dafür am geeignetsten ist und auf was ich zu achten habe:

Bisheriger Aufruf:

Code: Alles auswählen

import myapi
response = myapi.get_all_products()
products = response["items"]
pages = response["item_pages_available"]
for page in range(int(pages)):
    response = myapi.get_all_products(page=page)
    products.extend(response["items"])
Überlegung 1:
Ist multiprocessing hier überhaupt passend? Ist man hier nicht auf die Anzahl der Prozessoren beschränkt? Ist ja bei diesen Aufrufen gar nicht wichtig, denn selbst ein alter Einkerner kann ja 6 Anfragen senden und auf die Antworten warten.

Ich glaube das ist erstmal die wichtigste Frage.
Sirius3
User
Beiträge: 17737
Registriert: Sonntag 21. Oktober 2012, 17:20

Da solche Anfragen IO-Limitiert sind, nimmt man dazu am besten AsyncIO.
Benutzeravatar
noisefloor
User
Beiträge: 3853
Registriert: Mittwoch 17. Oktober 2007, 21:40
Wohnort: WW
Kontaktdaten:

Hallo,

asyncio ist für so was gemacht. Ggf. möchtest du das Modul aihttp benutzen, welches asynchrone (einfacher) Requests ermöglicht.

Gruß, noisefloor
Benutzeravatar
martinjo
User
Beiträge: 186
Registriert: Dienstag 14. Juni 2011, 20:03

Danke, ich habe mich die letzen Stunden damit beschäftigt, jetzt qualmt der Kopf und ich komme nicht weiter. Evtl. habt ihr mir ja noch n Tipp, Problem gerade, ich gebe über return eine Liste zurück, erhalte jedoch ein CoroWrapper Objekt. Dadurch wir die Liste mit den Items nie erweitert:


Code: Alles auswählen

        items, pages = self.get_items_response_test(options=options)
      
        import asyncio
        loop = asyncio.get_event_loop()
        loop.set_debug(True)
        
        async def asy_func_base(items, pages, options):
            for page_number in range(2, int(pages-1)):
                yitems = asy_get_items(page_number, options)
                logger.info("return from asy_get_items = %s", yitems)
                items.extend(yitems)
                print(len(items))
        
        async def asy_get_items(page_number, options):
            xitems, xpage = self.get_items_response_test(options=options, page_number=page_number)
            return xitems
        
        loop.run_until_complete(asy_func_base(items, pages, options))
        loop.close()
Auch werden die Aufrufe nacheinander ausgeführt, ich war jedoch froh, das dass zumindest mal halbwegs funktioniert hat. Wenn ich die Liste "items" der Funktion "asy_get_items" durchreiche kann ich sie zumindest dort füllen, doch irgendwie scheint das nicht richtig.
__deets__
User
Beiträge: 14522
Registriert: Mittwoch 14. Oktober 2015, 14:29

Ich glaube dir fehlt ein await in "yitems = ...". Das ist ja wieder eine Co-Routine, und damit die im mainloop verarbeitet wird, gehoert das await davor.
Benutzeravatar
martinjo
User
Beiträge: 186
Registriert: Dienstag 14. Juni 2011, 20:03

Danke, inzwischen habe ich es jedoch umgeändert:

Code: Alles auswählen

import myapi
response = myapi.get_all_products()
items = response["items"]
pages = response["item_pages_available"]

import asyncio
loop = asyncio.get_event_loop()


async def asy_get_items(page_number, items, options):
            logger.debug("asy_get_items")
            response = myapi.get_all_products(page=page_number, options=options)
            items.extend(response["items"])
            logger.info("items stored now: %s", len(items))
    
async def main_routine():
            tasks = []
            for page_number in range(2, int(pages-1)):
                logger.info("add task for page nr.: %s", page_number)
                asyncio.ensure_future(asy_get_items(page_number, items, options))
            await asyncio.gather(*tasks)

try:
            loop.run_until_complete(main_routine())
finally:
            loop.close()
Also bisher läuft es damit, aber nicht parallel, sieht aber besser aus als das Beispiel davor.
Benutzeravatar
martinjo
User
Beiträge: 186
Registriert: Dienstag 14. Juni 2011, 20:03

Nochmal ein kleines Update, dieses Mal auch direkt aus dem Skript kopiert, also ohne Anpassungen. Läuft sauber der Reihe nach durch. Aber nicht parallel.

Code: Alles auswählen

import asyncio
loop = asyncio.get_event_loop()

async def asy_get_items(page_number, items, options):
    logger.debug("asy_get_items for page %s", page_number)
    await asyncio.sleep(0.01)
    newitems, pages = myapi.get_items_response_test(options=options, page_number=page_number)
    items.extend(newitems)
    logger.info("items stored now: %s", len(items))
    
async def main_routine():
    tasks = []
    for page_number in range(2, int(pages-1)):
        if page_number > 5:
            break
        logger.info("add task for page nr.: %s", page_number)
        tasks.append(loop.create_task(asy_get_items(page_number, items, options)))
        
    #await asyncio.gather(*tasks)
    await asyncio.wait(tasks)
try:
    loop.run_until_complete(main_routine())
finally:
    loop.close()
    logger.info("finally items stored now: %s", len(items))
Benutzeravatar
noisefloor
User
Beiträge: 3853
Registriert: Mittwoch 17. Oktober 2007, 21:40
Wohnort: WW
Kontaktdaten:

Hallo,

das läuft wohl nicht parallel, weil so Code ja nicht "automagisch" asynchron wird. Zugegebener Maßen ist asyncio am Anfang etwas schwierig, weil man umdenken muss. Da kann schon mal der Kopf qualmen ;-)

Damit es asynchron läuft, muss etwas "awaitable" sein - dann kann der Mainloop weiter laufen, während eine Funktion auf ein Ergebnis wartet. Deine Funktion

newitems, pages = myapi.get_items_response_test(options=options, page_number=page_number)

ist aber nicht awaitable und damit synchron.

Wie gesagt, du müsstest für die Request die passende Klasse / Funktion aus dem Modul aiohttp nehmen. Da sind die Request dann nämlich "awaitble" und blockieren nicht.

BTW: wo asyncio und dessen Grundprinzipien deutlich besser erklärt sind als in der offiziellen Doku ist auf der "Python Module of the Week" Webseite und auf der Projektseite von Trio. Das ist ein Modul, welches eine "high level" API für asyncio bereit stellt.

Gruß, noisefloor
Benutzeravatar
martinjo
User
Beiträge: 186
Registriert: Dienstag 14. Juni 2011, 20:03

Hallo
ich nutze für die API ein fertiges Modul, daher kann ich dort nicht so einfach auf aiohttp umswitchen. Da ich mit dem asyncio-Modul nicht weiter komme habe ich nun doch erstmal multiprocessing mit Pool verwendet.

Damit benötige ich nun für den ersten, einzelnen Call 5.6 Sekunden, für weitere 10 Calls 9.3 Sekunden.

Ist alles noch nicht optimiert oder schön, funktioniert jedoch schon mal. Ich hoffe ich finde noch die Zeit, mich mit asyncio besser auseinander zu setzen.

Code: Alles auswählen

        items, pages = self.get_items_response_test(options=options)

        from numpy import array_split
        pages_range = range(2, int(pages)+1)
        pair = 10
        pages_range_pairs = [pages_range[i:i+pair] for i in range(0, len(pages_range), pair) ]

        for prp in pages_range_pairs:
            items_lists = self.get_items_with_multiprocessing(options, prp)
            end = time.time()
            for items_list in items_lists:
                items.extend(items_list)
            
    def get_items_with_multiprocessing(self, options, range_pair):
        from multiprocessing import Pool
        pool = Pool(processes=len(range_pair))
        myargs = [[options, pn] for pn in range_pair]
        return pool.map(API().get_items_response_test_wrapper, myargs)

    def get_items_response_test_wrapper(self, myargs):
        options, page_number = myargs
        return self.get_items_response_test(options=options, page_number=page_number)
Benutzeravatar
noisefloor
User
Beiträge: 3853
Registriert: Mittwoch 17. Oktober 2007, 21:40
Wohnort: WW
Kontaktdaten:

Hallo,

wie __deets__ in der ersten Antwort erwähnt ist dein "Problem" I/O bound, heißt: I/O ist der limitierende Faktor. Dann ist Multiprocessing nicht unbedingt erste Wahl, sondern wenn "klassisch" parallelisieren dann mittels Threading. Grund: einen neuen Thread starten braucht i.d.R. weniger Overhead als einen neuen Prozess zu starten. Wenn du das `concurrent.futures` Modul statt Multiprocessing oder Threading benutzt kannst du sogar ganz einfach zwischen Threads und Prozessen auswählen.

Was ich eben gesehen habe, was aus auch gibt: grequests, dass ist das requests-Modul kombiniert mit Gevent. Damit gehen auch asynchrone Request. Ist vielleicht / wahrscheinlich einfacher einzubauen als asyncio.

Gruß, noisefloor
Antworten