Dataframes aus verschiedenen Prozessen zusammenfügen

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

dann macht der doch auch wieder jede Verbindung nach der anderen und nicht alle auf einmal, oder?
__deets__
User
Beiträge: 14494
Registriert: Mittwoch 14. Oktober 2015, 14:29

Nö. Gather wartet bis alle quasi-Parallelen Anfragen fertig sind.
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

Hier ist nochmal ein Beispiel, was ich vorher in einem anderen Thread geposted hatte:
Funktion aaaa und bbbb werden asynchron ausgeführt. Das heißt immer wenn die eine gerade Pause macht kann wird die andere für die Ausführung priorisiert und arbeitet in der Zwischenzeit.
Bei deinen Anfragen an den Broker heißt das, wenn gerade eine der Verbindungen auf Antwort vom Server wartet, wird die Wartezeit genutzt um mit einer anderen Anfrage weiter zu arbeiten.
Das Priorisieren wird durch asyncio im Hintergrund gesteuert.

Code: Alles auswählen

import asyncio
import time

async def aaaa():
    await asyncio.sleep(1)
    return "AAAA"

async def bbbb():
    await asyncio.sleep(0.3)
    return "BBBB"

async def main():
    while True:
        start = time.perf_counter()
        responses = await asyncio.gather(*[aaaa(), bbbb()])
        for response in responses:
            print(response)
        print(f"Dauer: {time.perf_counter()-start:.2f}s")

asyncio.run(main())

"""
Ausgabe:
AAAA
BBBB
Dauer: 1.01s
AAAA
BBBB
Dauer: 0.99s
...
"""
Falls die beiden Coroutines nacheinander laufen würden, würde es 1,3 Sekunden dauern. Wie du aber siehst, dauert es nur so lange, wie die langsamere der beiden Routinen läuft.
Hier kann man auch sehen wie man einfach über die "responses" iterieren und ausgeben kann.
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Danke, sieht auch gut aus, teste ich morgen mal.

Wie bekomm ich denn aber nun jedem Prozess einen anderen Wert aus meiner Liste zugewiesen? Wenn ich über den asyncio.gather iteriere, macht der doch 10x den selben Wert, oder?
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

@mirko3107,
Wie bekomm ich denn aber nun jedem Prozess einen anderen Wert aus meiner Liste zugewiesen? Wenn ich über den asyncio.gather iteriere, macht der doch 10x den selben Wert, oder?
Du bekommst die einzelnen Rückgabewerte jeder einzelnen Funktion, die du an asyncio.gather() übergeben hast.
Da du die Variable 'iv' aus jeder Funktion zurück gibst, wirst du eine Liste aller 'iv' bekommen. Und zwar in der Reihenfolge der Funktionen.

Und: Ich hoffe "Prozess" war nur ein Schreibfehler und kein Verständnisproblem. Denn unterschiedliche Prozesse sind dies nicht.
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Meine Frage ist, wie übergebe ich jeder einzelnen Funktionen einen anderen Wert aus meiner Liste?

So wie hier wirds ja nicht funktionieren, da würde ich ja 10x den selben Wert übergeben.

Code: Alles auswählen

yf_df = pd.read_csv('short.csv')
stocks = yf_df.Ticker.tolist()

async def main(ticker):
    for data in ticker:
        await asyncio.gather(
            connection(data,100),
            connection(data,101),
            connection(data,102),
            connection(data,103),
            connection(data,104),
            connection(data,105),
            connection(data,106),
            connection(data,107),
            connection(data,108),
            connection(data,109)
            )
    end = time.time()
    print('Dauer: ',int(end-start),'s')

asyncio.run(main(stocks))
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

@mirko3107,

ja da hatte ich dich missverstanden.
Ich denke du meinst dann das:
(ungetestet)

Code: Alles auswählen

yf_df = pd.read_csv('short.csv')
stocks = yf_df.Ticker.tolist()

async def main(ticker):
    await asyncio.gather(*[connection(data, num) for num, data in enumerate(ticker, 100)])
    end = time.time()
    print('Dauer: ',int(end-start),'s')

asyncio.run(main(stocks))
Nur zur Info:
time.time() ist nicht die beste Wahl für so eine Zeitmessung. Besser ist time.perf_counter()
Bei print() lieber f-strings verwenden:

Code: Alles auswählen

print(f"Dauer: {end-start}s")  
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Danke für deine Hilfe, klasse.

Kann man "num" eingrenzen? also Obergrenze 20 oder so etwas? Sonst öffnet er mir so viele Verbindungen wie Elemente in der Liste, da meckert die API.
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

@mirko3107,

Du kannst bei der ticker-Liste zum Beispiel 5 als Limit angeben:

Code: Alles auswählen

await asyncio.gather(*[connection(data, num) for num, data in enumerate(ticker[:5], 100)])
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Ok, aber da fragt er ja nur die ersten 5 aus der Liste ab, der Rest wird ignoriert.
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

Dann kannst du das auch flexibel gestalten:

Code: Alles auswählen

await asyncio.gather(*[connection(data, num) for num, data in enumerate(ticker[start:start+count], 100)])
start = 10: ab dem 11. Ticker
count = 5 : die nächsten 5

Das must du dann in eine Schleife packen, wobei start immer um 5 (oder 10 oder 20, oder 'count') hochgezählt wird.

Wie ich ja schon sagte, dass man dafür mehrere connections öffnet ist sicher nicht so gedacht. Es mag funktionieren, dann wird es aber an anderer Stelle wieder umständlich.
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Könnte man nicht auch die Liste splitten und die Teile an die "connections" übergeben?
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

Ich würde es mit einem gleitenden Index machen.
Dann werden bei jedem Schleifendurchlauf 20 ticker /stocks an die connections übergeben. Dann die nächsten 20, usw.

Code: Alles auswählen

total_number_of_stocks = 100
stocks_per_part = 20

for start in range(0, total_number_of_stocks, stocks_per_part):
    results = await asyncio.gather(*[connection(data, num) for num, data in enumerate(ticker[start:start+stocks_per_part], 100)])
    # hier müssen die results zwischengespeichert werden, sonst werden sie im nächsten Durchlauf überschrieben
Vorher Aufsplitten ist nicht so gut, da du ja auch diese Ids von 100 weiter raufzählen willst.
Aber das hat jetzt nichts mehr mit asyncio zu tun und das könnte man auf zig verschiedene Weg gehen.
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Dein Weg funktioniert, nur ab und mal leider nicht, liegt aber eher an der API:

Code: Alles auswählen

ID: 104 MGM 48.11 8/4/2021
ID: 105 MHK 33.66 10/28/2021
ID: 106 MHO 34.56 10/27/2021
ID: 107 MKC 17.66 9/28/2021
ID: 108 MKTX 25.67 10/27/2021
ID: 108 MRCY 40.71 8/3/2021
ID: 109 MRK 16.0 10/28/2021
Traceback (most recent call last):
  File "/usr/lib/python3.9/asyncio/tasks.py", line 258, in __step
    result = coro.throw(exc)
  File "/usr/lib/python3.9/asyncio/tasks.py", line 690, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/usr/lib/python3.9/asyncio/futures.py", line 284, in __await__
    yield self  # This tells Task to wait for completion.
  File "/usr/lib/python3.9/asyncio/tasks.py", line 328, in __wakeup
    future.result()
  File "/usr/lib/python3.9/asyncio/futures.py", line 196, in result
    raise exc
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/python3.9/asyncio/tasks.py", line 492, in wait_for
    fut.result()
  File "/usr/lib/python3.9/asyncio/futures.py", line 196, in result
    raise exc
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/user/stocks/streamlit/async/lll.py", line 54, in <module>
    asyncio.run(main(stocks))
  File "/home/user/.local/lib/python3.9/site-packages/nest_asyncio.py", line 32, in run
    return loop.run_until_complete(future)
  File "/home/user/.local/lib/python3.9/site-packages/nest_asyncio.py", line 70, in run_until_complete
    return f.result()
  File "/usr/lib/python3.9/asyncio/futures.py", line 201, in result
    raise self._exception
  File "/usr/lib/python3.9/asyncio/tasks.py", line 258, in __step
    result = coro.throw(exc)
  File "/home/user/stocks/streamlit/async/lll.py", line 47, in main
    results = await asyncio.gather(*[connection(data, num) for num, data in enumerate(ticker[start:start+parts], 100)])
  File "/usr/lib/python3.9/asyncio/tasks.py", line 328, in __wakeup
    future.result()
  File "/usr/lib/python3.9/asyncio/tasks.py", line 256, in __step
    result = coro.send(None)
  File "/home/user/stocks/streamlit/async/lll.py", line 17, in connection
    ib.connect('127.0.0.1', 4001, clientId=id, timeout=5.0)
  File "/home/user/.local/lib/python3.9/site-packages/ib_insync/ib.py", line 271, in connect
    return self._run(self.connectAsync(
  File "/home/user/.local/lib/python3.9/site-packages/ib_insync/ib.py", line 310, in _run
    return util.run(*awaitables, timeout=self.RequestTimeout)
  File "/home/user/.local/lib/python3.9/site-packages/ib_insync/util.py", line 322, in run
    result = loop.run_until_complete(task)
  File "/home/user/.local/lib/python3.9/site-packages/nest_asyncio.py", line 70, in run_until_complete
    return f.result()
  File "/usr/lib/python3.9/asyncio/futures.py", line 201, in result
    raise self._exception
  File "/usr/lib/python3.9/asyncio/tasks.py", line 256, in __step
    result = coro.send(None)
  File "/home/user/.local/lib/python3.9/site-packages/ib_insync/ib.py", line 1626, in connectAsync
    await self.client.connectAsync(host, port, clientId, timeout)
  File "/home/user/.local/lib/python3.9/site-packages/ib_insync/client.py", line 218, in connectAsync
    await asyncio.wait_for(self.apiStart, timeout)
  File "/usr/lib/python3.9/asyncio/tasks.py", line 494, in wait_for
    raise exceptions.TimeoutError() from exc
asyncio.exceptions.TimeoutError
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

Dein Weg funktioniert, nur ab und mal leider nicht, liegt aber eher an der API:
Naja, du scheints da ja auch eine Menge an Symbolen zu konsumieren.
Vielleicht kannst du die Exception abfangen.

Da muss ich allerdings jetzt passen, da ich den Fehler nicht nachstellen kann.
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Danke dir für deine wertvollen Tips.
Umgesetzt hab ich es bisher so:

Code: Alles auswählen

async def main(ticker):
    df_ibi = pd.DataFrame(columns=['Ticker', 'IV', 'Earnings'])
    amount = len(ticker)
    parts = 10
    for start in range(0, amount, parts):
        results = await asyncio.gather(*[ibi(data, num) for num, data in enumerate(ticker[start:start+parts], 100)])
        df_results = pd.DataFrame(results, columns=['Ticker', 'IV', 'Earnings'])
        df_ibi = df_ibi.append(df_results, ignore_index=True)
    df_ibi.to_csv('ibi.csv', header=True, index=False)
Wie bindet man in asyncio.gather weiter Funktionen ein?
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

Code: Alles auswählen

results = await asyncio.gather(*[ibi(data, num) for num, data in enumerate(ticker[start:start+parts], 100)])
Dieser etwas kompakte Aufruf kann auch auseinander gezogen werden:

Code: Alles auswählen

ticker_coroutines = []

for num, data in enumerate(ticker[start:start+parts], 100):
    ticker_coroutines.append(ibi(data, num))


# weitere async Funktionen in die Liste einfügen
ticker_couroutines.append(another_async_function1)
ticker_couroutines.append(another_async_function2)


results = await asyncio.gather(*ticker_coroutines)
Denn die gather() Funktion nimmt als Eingangsparameter einzelne Coroutinen, was eigentlich normale Funktionen mit einem 'async' davor sind.

zum Beispiel eine einzelne Funktion (Dafür bräuchte man natürlich kein 'gather'):

Code: Alles auswählen

asnyc def func():
    # irgend ein asynchroner code
    ...

result = await gather(func())
Oder mehrere einzelne Funktionen, die dann quasi parallel abgearbeitet werden:

Code: Alles auswählen

result = await gather(func1(), func2(), ...)
oder mehrere Funktionen in einer Liste:

Code: Alles auswählen

coroutine_list = [func1(), func(), ...]

result = await gather(*coroutine_list )
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Habs mal getestet, bekomme den Fehler

Code: Alles auswählen

API connection failed: OSError(24, 'Too many open files')
.
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

Vielleicht gibt die komplette Fehlermeldung mehr Aufschluss? So kann das viele Gründe haben.
Wie schon so oft gesagt, du verwendest die Library wahrscheinlich nicht so wie es eigentlich gedacht ist, ...
Dabei hebelst du wahrscheinlich Schutzmechanismen aus, die normalerweise abgefangen würden, bzw. die Performance optimieren würden.
Kann ich aber auch nur vermuten.

Vielleicht must die Anzahl der parallelen Aufrufe reduzieren.
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

wird wohl an meinem falschen Aufruf liegen:

Code: Alles auswählen

for start in range(0, amount, parts):
	for num, data in enumerate(ticker[start:start+parts], 100):
        	ticker_coroutines.append(ibi(data, num))
results = await asyncio.gather(*ticker_coroutines)
Antworten