Dataframes aus verschiedenen Prozessen zusammenfügen
Hier ist nochmal ein Beispiel, was ich vorher in einem anderen Thread geposted hatte:
Funktion aaaa und bbbb werden asynchron ausgeführt. Das heißt immer wenn die eine gerade Pause macht kann wird die andere für die Ausführung priorisiert und arbeitet in der Zwischenzeit.
Bei deinen Anfragen an den Broker heißt das, wenn gerade eine der Verbindungen auf Antwort vom Server wartet, wird die Wartezeit genutzt um mit einer anderen Anfrage weiter zu arbeiten.
Das Priorisieren wird durch asyncio im Hintergrund gesteuert.
Falls die beiden Coroutines nacheinander laufen würden, würde es 1,3 Sekunden dauern. Wie du aber siehst, dauert es nur so lange, wie die langsamere der beiden Routinen läuft.
Hier kann man auch sehen wie man einfach über die "responses" iterieren und ausgeben kann.
Funktion aaaa und bbbb werden asynchron ausgeführt. Das heißt immer wenn die eine gerade Pause macht kann wird die andere für die Ausführung priorisiert und arbeitet in der Zwischenzeit.
Bei deinen Anfragen an den Broker heißt das, wenn gerade eine der Verbindungen auf Antwort vom Server wartet, wird die Wartezeit genutzt um mit einer anderen Anfrage weiter zu arbeiten.
Das Priorisieren wird durch asyncio im Hintergrund gesteuert.
Code: Alles auswählen
import asyncio
import time
async def aaaa():
await asyncio.sleep(1)
return "AAAA"
async def bbbb():
await asyncio.sleep(0.3)
return "BBBB"
async def main():
while True:
start = time.perf_counter()
responses = await asyncio.gather(*[aaaa(), bbbb()])
for response in responses:
print(response)
print(f"Dauer: {time.perf_counter()-start:.2f}s")
asyncio.run(main())
"""
Ausgabe:
AAAA
BBBB
Dauer: 1.01s
AAAA
BBBB
Dauer: 0.99s
...
"""
Hier kann man auch sehen wie man einfach über die "responses" iterieren und ausgeben kann.
Danke, sieht auch gut aus, teste ich morgen mal.
Wie bekomm ich denn aber nun jedem Prozess einen anderen Wert aus meiner Liste zugewiesen? Wenn ich über den asyncio.gather iteriere, macht der doch 10x den selben Wert, oder?
Wie bekomm ich denn aber nun jedem Prozess einen anderen Wert aus meiner Liste zugewiesen? Wenn ich über den asyncio.gather iteriere, macht der doch 10x den selben Wert, oder?
@mirko3107,
Da du die Variable 'iv' aus jeder Funktion zurück gibst, wirst du eine Liste aller 'iv' bekommen. Und zwar in der Reihenfolge der Funktionen.
Und: Ich hoffe "Prozess" war nur ein Schreibfehler und kein Verständnisproblem. Denn unterschiedliche Prozesse sind dies nicht.
Du bekommst die einzelnen Rückgabewerte jeder einzelnen Funktion, die du an asyncio.gather() übergeben hast.Wie bekomm ich denn aber nun jedem Prozess einen anderen Wert aus meiner Liste zugewiesen? Wenn ich über den asyncio.gather iteriere, macht der doch 10x den selben Wert, oder?
Da du die Variable 'iv' aus jeder Funktion zurück gibst, wirst du eine Liste aller 'iv' bekommen. Und zwar in der Reihenfolge der Funktionen.
Und: Ich hoffe "Prozess" war nur ein Schreibfehler und kein Verständnisproblem. Denn unterschiedliche Prozesse sind dies nicht.
Meine Frage ist, wie übergebe ich jeder einzelnen Funktionen einen anderen Wert aus meiner Liste?
So wie hier wirds ja nicht funktionieren, da würde ich ja 10x den selben Wert übergeben.
So wie hier wirds ja nicht funktionieren, da würde ich ja 10x den selben Wert übergeben.
Code: Alles auswählen
yf_df = pd.read_csv('short.csv')
stocks = yf_df.Ticker.tolist()
async def main(ticker):
for data in ticker:
await asyncio.gather(
connection(data,100),
connection(data,101),
connection(data,102),
connection(data,103),
connection(data,104),
connection(data,105),
connection(data,106),
connection(data,107),
connection(data,108),
connection(data,109)
)
end = time.time()
print('Dauer: ',int(end-start),'s')
asyncio.run(main(stocks))
@mirko3107,
ja da hatte ich dich missverstanden.
Ich denke du meinst dann das:
(ungetestet)
Nur zur Info:
time.time() ist nicht die beste Wahl für so eine Zeitmessung. Besser ist time.perf_counter()
Bei print() lieber f-strings verwenden:
ja da hatte ich dich missverstanden.
Ich denke du meinst dann das:
(ungetestet)
Code: Alles auswählen
yf_df = pd.read_csv('short.csv')
stocks = yf_df.Ticker.tolist()
async def main(ticker):
await asyncio.gather(*[connection(data, num) for num, data in enumerate(ticker, 100)])
end = time.time()
print('Dauer: ',int(end-start),'s')
asyncio.run(main(stocks))
time.time() ist nicht die beste Wahl für so eine Zeitmessung. Besser ist time.perf_counter()
Bei print() lieber f-strings verwenden:
Code: Alles auswählen
print(f"Dauer: {end-start}s")
@mirko3107,
Du kannst bei der ticker-Liste zum Beispiel 5 als Limit angeben:
Du kannst bei der ticker-Liste zum Beispiel 5 als Limit angeben:
Code: Alles auswählen
await asyncio.gather(*[connection(data, num) for num, data in enumerate(ticker[:5], 100)])
Dann kannst du das auch flexibel gestalten:
start = 10: ab dem 11. Ticker
count = 5 : die nächsten 5
Das must du dann in eine Schleife packen, wobei start immer um 5 (oder 10 oder 20, oder 'count') hochgezählt wird.
Wie ich ja schon sagte, dass man dafür mehrere connections öffnet ist sicher nicht so gedacht. Es mag funktionieren, dann wird es aber an anderer Stelle wieder umständlich.
Code: Alles auswählen
await asyncio.gather(*[connection(data, num) for num, data in enumerate(ticker[start:start+count], 100)])
count = 5 : die nächsten 5
Das must du dann in eine Schleife packen, wobei start immer um 5 (oder 10 oder 20, oder 'count') hochgezählt wird.
Wie ich ja schon sagte, dass man dafür mehrere connections öffnet ist sicher nicht so gedacht. Es mag funktionieren, dann wird es aber an anderer Stelle wieder umständlich.
Ich würde es mit einem gleitenden Index machen.
Dann werden bei jedem Schleifendurchlauf 20 ticker /stocks an die connections übergeben. Dann die nächsten 20, usw.
Vorher Aufsplitten ist nicht so gut, da du ja auch diese Ids von 100 weiter raufzählen willst.
Aber das hat jetzt nichts mehr mit asyncio zu tun und das könnte man auf zig verschiedene Weg gehen.
Dann werden bei jedem Schleifendurchlauf 20 ticker /stocks an die connections übergeben. Dann die nächsten 20, usw.
Code: Alles auswählen
total_number_of_stocks = 100
stocks_per_part = 20
for start in range(0, total_number_of_stocks, stocks_per_part):
results = await asyncio.gather(*[connection(data, num) for num, data in enumerate(ticker[start:start+stocks_per_part], 100)])
# hier müssen die results zwischengespeichert werden, sonst werden sie im nächsten Durchlauf überschrieben
Aber das hat jetzt nichts mehr mit asyncio zu tun und das könnte man auf zig verschiedene Weg gehen.
Dein Weg funktioniert, nur ab und mal leider nicht, liegt aber eher an der API:
Code: Alles auswählen
ID: 104 MGM 48.11 8/4/2021
ID: 105 MHK 33.66 10/28/2021
ID: 106 MHO 34.56 10/27/2021
ID: 107 MKC 17.66 9/28/2021
ID: 108 MKTX 25.67 10/27/2021
ID: 108 MRCY 40.71 8/3/2021
ID: 109 MRK 16.0 10/28/2021
Traceback (most recent call last):
File "/usr/lib/python3.9/asyncio/tasks.py", line 258, in __step
result = coro.throw(exc)
File "/usr/lib/python3.9/asyncio/tasks.py", line 690, in _wrap_awaitable
return (yield from awaitable.__await__())
File "/usr/lib/python3.9/asyncio/futures.py", line 284, in __await__
yield self # This tells Task to wait for completion.
File "/usr/lib/python3.9/asyncio/tasks.py", line 328, in __wakeup
future.result()
File "/usr/lib/python3.9/asyncio/futures.py", line 196, in result
raise exc
asyncio.exceptions.CancelledError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/lib/python3.9/asyncio/tasks.py", line 492, in wait_for
fut.result()
File "/usr/lib/python3.9/asyncio/futures.py", line 196, in result
raise exc
asyncio.exceptions.CancelledError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/user/stocks/streamlit/async/lll.py", line 54, in <module>
asyncio.run(main(stocks))
File "/home/user/.local/lib/python3.9/site-packages/nest_asyncio.py", line 32, in run
return loop.run_until_complete(future)
File "/home/user/.local/lib/python3.9/site-packages/nest_asyncio.py", line 70, in run_until_complete
return f.result()
File "/usr/lib/python3.9/asyncio/futures.py", line 201, in result
raise self._exception
File "/usr/lib/python3.9/asyncio/tasks.py", line 258, in __step
result = coro.throw(exc)
File "/home/user/stocks/streamlit/async/lll.py", line 47, in main
results = await asyncio.gather(*[connection(data, num) for num, data in enumerate(ticker[start:start+parts], 100)])
File "/usr/lib/python3.9/asyncio/tasks.py", line 328, in __wakeup
future.result()
File "/usr/lib/python3.9/asyncio/tasks.py", line 256, in __step
result = coro.send(None)
File "/home/user/stocks/streamlit/async/lll.py", line 17, in connection
ib.connect('127.0.0.1', 4001, clientId=id, timeout=5.0)
File "/home/user/.local/lib/python3.9/site-packages/ib_insync/ib.py", line 271, in connect
return self._run(self.connectAsync(
File "/home/user/.local/lib/python3.9/site-packages/ib_insync/ib.py", line 310, in _run
return util.run(*awaitables, timeout=self.RequestTimeout)
File "/home/user/.local/lib/python3.9/site-packages/ib_insync/util.py", line 322, in run
result = loop.run_until_complete(task)
File "/home/user/.local/lib/python3.9/site-packages/nest_asyncio.py", line 70, in run_until_complete
return f.result()
File "/usr/lib/python3.9/asyncio/futures.py", line 201, in result
raise self._exception
File "/usr/lib/python3.9/asyncio/tasks.py", line 256, in __step
result = coro.send(None)
File "/home/user/.local/lib/python3.9/site-packages/ib_insync/ib.py", line 1626, in connectAsync
await self.client.connectAsync(host, port, clientId, timeout)
File "/home/user/.local/lib/python3.9/site-packages/ib_insync/client.py", line 218, in connectAsync
await asyncio.wait_for(self.apiStart, timeout)
File "/usr/lib/python3.9/asyncio/tasks.py", line 494, in wait_for
raise exceptions.TimeoutError() from exc
asyncio.exceptions.TimeoutError
Naja, du scheints da ja auch eine Menge an Symbolen zu konsumieren.Dein Weg funktioniert, nur ab und mal leider nicht, liegt aber eher an der API:
Vielleicht kannst du die Exception abfangen.
Da muss ich allerdings jetzt passen, da ich den Fehler nicht nachstellen kann.
Danke dir für deine wertvollen Tips.
Umgesetzt hab ich es bisher so:
Wie bindet man in asyncio.gather weiter Funktionen ein?
Umgesetzt hab ich es bisher so:
Code: Alles auswählen
async def main(ticker):
df_ibi = pd.DataFrame(columns=['Ticker', 'IV', 'Earnings'])
amount = len(ticker)
parts = 10
for start in range(0, amount, parts):
results = await asyncio.gather(*[ibi(data, num) for num, data in enumerate(ticker[start:start+parts], 100)])
df_results = pd.DataFrame(results, columns=['Ticker', 'IV', 'Earnings'])
df_ibi = df_ibi.append(df_results, ignore_index=True)
df_ibi.to_csv('ibi.csv', header=True, index=False)
Code: Alles auswählen
results = await asyncio.gather(*[ibi(data, num) for num, data in enumerate(ticker[start:start+parts], 100)])
Code: Alles auswählen
ticker_coroutines = []
for num, data in enumerate(ticker[start:start+parts], 100):
ticker_coroutines.append(ibi(data, num))
# weitere async Funktionen in die Liste einfügen
ticker_couroutines.append(another_async_function1)
ticker_couroutines.append(another_async_function2)
results = await asyncio.gather(*ticker_coroutines)
zum Beispiel eine einzelne Funktion (Dafür bräuchte man natürlich kein 'gather'):
Code: Alles auswählen
asnyc def func():
# irgend ein asynchroner code
...
result = await gather(func())
Code: Alles auswählen
result = await gather(func1(), func2(), ...)
Code: Alles auswählen
coroutine_list = [func1(), func(), ...]
result = await gather(*coroutine_list )
Habs mal getestet, bekomme den Fehler .
Code: Alles auswählen
API connection failed: OSError(24, 'Too many open files')
Vielleicht gibt die komplette Fehlermeldung mehr Aufschluss? So kann das viele Gründe haben.
Wie schon so oft gesagt, du verwendest die Library wahrscheinlich nicht so wie es eigentlich gedacht ist, ...
Dabei hebelst du wahrscheinlich Schutzmechanismen aus, die normalerweise abgefangen würden, bzw. die Performance optimieren würden.
Kann ich aber auch nur vermuten.
Vielleicht must die Anzahl der parallelen Aufrufe reduzieren.
Wie schon so oft gesagt, du verwendest die Library wahrscheinlich nicht so wie es eigentlich gedacht ist, ...
Dabei hebelst du wahrscheinlich Schutzmechanismen aus, die normalerweise abgefangen würden, bzw. die Performance optimieren würden.
Kann ich aber auch nur vermuten.
Vielleicht must die Anzahl der parallelen Aufrufe reduzieren.
wird wohl an meinem falschen Aufruf liegen:
Code: Alles auswählen
for start in range(0, amount, parts):
for num, data in enumerate(ticker[start:start+parts], 100):
ticker_coroutines.append(ibi(data, num))
results = await asyncio.gather(*ticker_coroutines)