Dataframes aus verschiedenen Prozessen zusammenfügen
Contracts kann ich auch alle auf einmal abfragen, das klappt. Aber danach kann ich scheinbar nur pro Client eine Datenabfrage machen, nicht mehrere auf einmal.
Ich versuche gerade, mehrere Clients auf einmal laufen zu lassen, muss dabei nur die Clientnummer ändern. Angeblich wäre die Obergrenze 32, ich wäre schon über 2 happy.
Wie kann ich sowas am besten umsetzen? eine Function für die Datenabfrage und pro Function-Aufruf nur eine andere Clientnummer übergeben?
Ich versuche gerade, mehrere Clients auf einmal laufen zu lassen, muss dabei nur die Clientnummer ändern. Angeblich wäre die Obergrenze 32, ich wäre schon über 2 happy.
Wie kann ich sowas am besten umsetzen? eine Function für die Datenabfrage und pro Function-Aufruf nur eine andere Clientnummer übergeben?
-
- User
- Beiträge: 65
- Registriert: Samstag 27. Februar 2021, 12:18
Hallo zusammen,
vielleicht kann ich mit einem async Beispiel helfen... ich habe während den letzten Monaten & lockdown ein eigenes kleines script geschrieben, welches selbständig traden kann.
Ich bekomme von einem Anbieter streaming Daten, die dann vom script asynchron verarbeitet werden. Da ich kein professioneller Programmierer bin, habe ich lange "gekämpft" bis ich asynchron verstanden habe bzw. hat mir jemand die entscheidenden Tips gegeben, unten mehr dazu.
Hier in Auszügen der Code, der den asynchronen Rahmen setzt:
Da ich das alleine nicht hinbekommen habe und ich kurz davor war, mein Projekt aufzugeben, habe ich den Autor von ib_insync angeschrieben, ob er mir hilft und gegen einen kleinen Obulus hat er mir meinen Code korrigiert und weitere Tipps gegeben.
Nun läuft das script täglich und wenn ich Zeit habe, arbeite ich an Verbesserungen.
Viel Erfolg... Andy
vielleicht kann ich mit einem async Beispiel helfen... ich habe während den letzten Monaten & lockdown ein eigenes kleines script geschrieben, welches selbständig traden kann.
Ich bekomme von einem Anbieter streaming Daten, die dann vom script asynchron verarbeitet werden. Da ich kein professioneller Programmierer bin, habe ich lange "gekämpft" bis ich asynchron verstanden habe bzw. hat mir jemand die entscheidenden Tips gegeben, unten mehr dazu.
Hier in Auszügen der Code, der den asynchronen Rahmen setzt:
Code: Alles auswählen
import asyncio
import ib_insync as ibi
import socketio
...
ib = ibi.IB()
sio = socketio.AsyncClient()
@sio.on('connect')
def on_connect():
print("\nConnected to streaming API")
@sio.on('data')
async def on_data(data):
...
async def trade(tickersymbol, regel):
...
async def main_loop():
await ib.connectAsync()
await sio.connect('http://...')
await sio.wait()
try:
asyncio.run(main_loop())
...
Nun läuft das script täglich und wenn ich Zeit habe, arbeite ich an Verbesserungen.
Viel Erfolg... Andy
Wie hast du das denn bisher versucht? Der Beispielcode macht nur eine Abfrage, den must du anpassen um mehrere zu bekommen.Aber danach kann ich scheinbar nur pro Client eine Datenabfrage machen, nicht mehrere auf einmal
Mehrere Clients zu verwenden ist sicher nicht Sinn der Sache, aber wenn es funktioniert und dich nicht stört, ...Ich versuche gerade, mehrere Clients auf einmal laufen zu lassen, muss dabei nur die Clientnummer ändern. Angeblich wäre die Obergrenze 32, ich wäre schon über 2 happy.
Wahrscheinlich mit asyncio gather.Wie kann ich sowas am besten umsetzen? eine Funktion für die Datenabfrage und pro Funktion-Aufruf nur eine andere Clientnummer übergeben?
Ein Beispiel:
https://docs.python.org/3/library/async ... ncurrently
Die Funktion "factorial" im Beispiel, wäre dann deine Funktion für die Datenabfrage.
Hab eine Mehrfachverbindung hinbekommen:
Erstmal 10x die gleiche Aktie, nun muss ich sehen, wie ich die Liste aus 700 Werten aufgeteilt bekomme und nachher wieder zusammengebaut.
Ergebnis:
Code: Alles auswählen
import nest_asyncio
import asyncio
import time
from ib_insync import *
nest_asyncio.apply()
util.logToFile('connect.log', 'DEBUG')
start = time.time()
async def connection(id):
ib = IB()
ib.connect('127.0.0.1', 7496, clientId=id, timeout=10.0)
ib.reqMarketDataType(1)
contract = Stock('AAPL', 'SMART', 'USD')
data = ib.reqMktData(contract, "106,100", False, False)
await asyncio.sleep(2)
iv = data.impliedVolatility*100
print('ID:',id, iv)
return iv
async def main():
await asyncio.gather(
connection(100),
connection(101),
connection(102),
connection(103),
connection(104),
connection(105),
connection(106),
connection(107),
connection(108),
connection(109),
connection(110)
)
end = time.time()
print('Dauer: ',int(end-start),'s')
asyncio.run(main())
Ergebnis:
Code: Alles auswählen
ID: 110 21.21831800725646
ID: 109 21.21831800725646
ID: 108 21.21831800725646
ID: 107 21.21831800725646
ID: 106 21.21831800725646
ID: 105 21.21831800725646
ID: 104 21.21831800725646
ID: 103 21.21831800725646
ID: 102 21.21831800725646
ID: 101 21.21831800725646
ID: 100 21.21831800725646
Dauer: 2 s
@mirco3107,
einfach einer Variablen zuweisen und darüber iterieren:
einfach einer Variablen zuweisen und darüber iterieren:
Code: Alles auswählen
all_data = await asyncio.gather(
connection(100),
connection(101),
connection(102),
connection(103),
connection(104),
connection(105),
connection(106),
connection(107),
connection(108),
connection(109),
connection(110)
)
for data in all_data:
...
Hier ist nochmal ein Beispiel, was ich vorher in einem anderen Thread geposted hatte:
Funktion aaaa und bbbb werden asynchron ausgeführt. Das heißt immer wenn die eine gerade Pause macht kann wird die andere für die Ausführung priorisiert und arbeitet in der Zwischenzeit.
Bei deinen Anfragen an den Broker heißt das, wenn gerade eine der Verbindungen auf Antwort vom Server wartet, wird die Wartezeit genutzt um mit einer anderen Anfrage weiter zu arbeiten.
Das Priorisieren wird durch asyncio im Hintergrund gesteuert.
Falls die beiden Coroutines nacheinander laufen würden, würde es 1,3 Sekunden dauern. Wie du aber siehst, dauert es nur so lange, wie die langsamere der beiden Routinen läuft.
Hier kann man auch sehen wie man einfach über die "responses" iterieren und ausgeben kann.
Funktion aaaa und bbbb werden asynchron ausgeführt. Das heißt immer wenn die eine gerade Pause macht kann wird die andere für die Ausführung priorisiert und arbeitet in der Zwischenzeit.
Bei deinen Anfragen an den Broker heißt das, wenn gerade eine der Verbindungen auf Antwort vom Server wartet, wird die Wartezeit genutzt um mit einer anderen Anfrage weiter zu arbeiten.
Das Priorisieren wird durch asyncio im Hintergrund gesteuert.
Code: Alles auswählen
import asyncio
import time
async def aaaa():
await asyncio.sleep(1)
return "AAAA"
async def bbbb():
await asyncio.sleep(0.3)
return "BBBB"
async def main():
while True:
start = time.perf_counter()
responses = await asyncio.gather(*[aaaa(), bbbb()])
for response in responses:
print(response)
print(f"Dauer: {time.perf_counter()-start:.2f}s")
asyncio.run(main())
"""
Ausgabe:
AAAA
BBBB
Dauer: 1.01s
AAAA
BBBB
Dauer: 0.99s
...
"""
Hier kann man auch sehen wie man einfach über die "responses" iterieren und ausgeben kann.
Danke, sieht auch gut aus, teste ich morgen mal.
Wie bekomm ich denn aber nun jedem Prozess einen anderen Wert aus meiner Liste zugewiesen? Wenn ich über den asyncio.gather iteriere, macht der doch 10x den selben Wert, oder?
Wie bekomm ich denn aber nun jedem Prozess einen anderen Wert aus meiner Liste zugewiesen? Wenn ich über den asyncio.gather iteriere, macht der doch 10x den selben Wert, oder?
@mirko3107,
Da du die Variable 'iv' aus jeder Funktion zurück gibst, wirst du eine Liste aller 'iv' bekommen. Und zwar in der Reihenfolge der Funktionen.
Und: Ich hoffe "Prozess" war nur ein Schreibfehler und kein Verständnisproblem. Denn unterschiedliche Prozesse sind dies nicht.
Du bekommst die einzelnen Rückgabewerte jeder einzelnen Funktion, die du an asyncio.gather() übergeben hast.Wie bekomm ich denn aber nun jedem Prozess einen anderen Wert aus meiner Liste zugewiesen? Wenn ich über den asyncio.gather iteriere, macht der doch 10x den selben Wert, oder?
Da du die Variable 'iv' aus jeder Funktion zurück gibst, wirst du eine Liste aller 'iv' bekommen. Und zwar in der Reihenfolge der Funktionen.
Und: Ich hoffe "Prozess" war nur ein Schreibfehler und kein Verständnisproblem. Denn unterschiedliche Prozesse sind dies nicht.
Meine Frage ist, wie übergebe ich jeder einzelnen Funktionen einen anderen Wert aus meiner Liste?
So wie hier wirds ja nicht funktionieren, da würde ich ja 10x den selben Wert übergeben.
So wie hier wirds ja nicht funktionieren, da würde ich ja 10x den selben Wert übergeben.
Code: Alles auswählen
yf_df = pd.read_csv('short.csv')
stocks = yf_df.Ticker.tolist()
async def main(ticker):
for data in ticker:
await asyncio.gather(
connection(data,100),
connection(data,101),
connection(data,102),
connection(data,103),
connection(data,104),
connection(data,105),
connection(data,106),
connection(data,107),
connection(data,108),
connection(data,109)
)
end = time.time()
print('Dauer: ',int(end-start),'s')
asyncio.run(main(stocks))
@mirko3107,
ja da hatte ich dich missverstanden.
Ich denke du meinst dann das:
(ungetestet)
Nur zur Info:
time.time() ist nicht die beste Wahl für so eine Zeitmessung. Besser ist time.perf_counter()
Bei print() lieber f-strings verwenden:
ja da hatte ich dich missverstanden.
Ich denke du meinst dann das:
(ungetestet)
Code: Alles auswählen
yf_df = pd.read_csv('short.csv')
stocks = yf_df.Ticker.tolist()
async def main(ticker):
await asyncio.gather(*[connection(data, num) for num, data in enumerate(ticker, 100)])
end = time.time()
print('Dauer: ',int(end-start),'s')
asyncio.run(main(stocks))
time.time() ist nicht die beste Wahl für so eine Zeitmessung. Besser ist time.perf_counter()
Bei print() lieber f-strings verwenden:
Code: Alles auswählen
print(f"Dauer: {end-start}s")
@mirko3107,
Du kannst bei der ticker-Liste zum Beispiel 5 als Limit angeben:
Du kannst bei der ticker-Liste zum Beispiel 5 als Limit angeben:
Code: Alles auswählen
await asyncio.gather(*[connection(data, num) for num, data in enumerate(ticker[:5], 100)])
Dann kannst du das auch flexibel gestalten:
start = 10: ab dem 11. Ticker
count = 5 : die nächsten 5
Das must du dann in eine Schleife packen, wobei start immer um 5 (oder 10 oder 20, oder 'count') hochgezählt wird.
Wie ich ja schon sagte, dass man dafür mehrere connections öffnet ist sicher nicht so gedacht. Es mag funktionieren, dann wird es aber an anderer Stelle wieder umständlich.
Code: Alles auswählen
await asyncio.gather(*[connection(data, num) for num, data in enumerate(ticker[start:start+count], 100)])
count = 5 : die nächsten 5
Das must du dann in eine Schleife packen, wobei start immer um 5 (oder 10 oder 20, oder 'count') hochgezählt wird.
Wie ich ja schon sagte, dass man dafür mehrere connections öffnet ist sicher nicht so gedacht. Es mag funktionieren, dann wird es aber an anderer Stelle wieder umständlich.
Ich würde es mit einem gleitenden Index machen.
Dann werden bei jedem Schleifendurchlauf 20 ticker /stocks an die connections übergeben. Dann die nächsten 20, usw.
Vorher Aufsplitten ist nicht so gut, da du ja auch diese Ids von 100 weiter raufzählen willst.
Aber das hat jetzt nichts mehr mit asyncio zu tun und das könnte man auf zig verschiedene Weg gehen.
Dann werden bei jedem Schleifendurchlauf 20 ticker /stocks an die connections übergeben. Dann die nächsten 20, usw.
Code: Alles auswählen
total_number_of_stocks = 100
stocks_per_part = 20
for start in range(0, total_number_of_stocks, stocks_per_part):
results = await asyncio.gather(*[connection(data, num) for num, data in enumerate(ticker[start:start+stocks_per_part], 100)])
# hier müssen die results zwischengespeichert werden, sonst werden sie im nächsten Durchlauf überschrieben
Aber das hat jetzt nichts mehr mit asyncio zu tun und das könnte man auf zig verschiedene Weg gehen.
Dein Weg funktioniert, nur ab und mal leider nicht, liegt aber eher an der API:
Code: Alles auswählen
ID: 104 MGM 48.11 8/4/2021
ID: 105 MHK 33.66 10/28/2021
ID: 106 MHO 34.56 10/27/2021
ID: 107 MKC 17.66 9/28/2021
ID: 108 MKTX 25.67 10/27/2021
ID: 108 MRCY 40.71 8/3/2021
ID: 109 MRK 16.0 10/28/2021
Traceback (most recent call last):
File "/usr/lib/python3.9/asyncio/tasks.py", line 258, in __step
result = coro.throw(exc)
File "/usr/lib/python3.9/asyncio/tasks.py", line 690, in _wrap_awaitable
return (yield from awaitable.__await__())
File "/usr/lib/python3.9/asyncio/futures.py", line 284, in __await__
yield self # This tells Task to wait for completion.
File "/usr/lib/python3.9/asyncio/tasks.py", line 328, in __wakeup
future.result()
File "/usr/lib/python3.9/asyncio/futures.py", line 196, in result
raise exc
asyncio.exceptions.CancelledError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/lib/python3.9/asyncio/tasks.py", line 492, in wait_for
fut.result()
File "/usr/lib/python3.9/asyncio/futures.py", line 196, in result
raise exc
asyncio.exceptions.CancelledError
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/home/user/stocks/streamlit/async/lll.py", line 54, in <module>
asyncio.run(main(stocks))
File "/home/user/.local/lib/python3.9/site-packages/nest_asyncio.py", line 32, in run
return loop.run_until_complete(future)
File "/home/user/.local/lib/python3.9/site-packages/nest_asyncio.py", line 70, in run_until_complete
return f.result()
File "/usr/lib/python3.9/asyncio/futures.py", line 201, in result
raise self._exception
File "/usr/lib/python3.9/asyncio/tasks.py", line 258, in __step
result = coro.throw(exc)
File "/home/user/stocks/streamlit/async/lll.py", line 47, in main
results = await asyncio.gather(*[connection(data, num) for num, data in enumerate(ticker[start:start+parts], 100)])
File "/usr/lib/python3.9/asyncio/tasks.py", line 328, in __wakeup
future.result()
File "/usr/lib/python3.9/asyncio/tasks.py", line 256, in __step
result = coro.send(None)
File "/home/user/stocks/streamlit/async/lll.py", line 17, in connection
ib.connect('127.0.0.1', 4001, clientId=id, timeout=5.0)
File "/home/user/.local/lib/python3.9/site-packages/ib_insync/ib.py", line 271, in connect
return self._run(self.connectAsync(
File "/home/user/.local/lib/python3.9/site-packages/ib_insync/ib.py", line 310, in _run
return util.run(*awaitables, timeout=self.RequestTimeout)
File "/home/user/.local/lib/python3.9/site-packages/ib_insync/util.py", line 322, in run
result = loop.run_until_complete(task)
File "/home/user/.local/lib/python3.9/site-packages/nest_asyncio.py", line 70, in run_until_complete
return f.result()
File "/usr/lib/python3.9/asyncio/futures.py", line 201, in result
raise self._exception
File "/usr/lib/python3.9/asyncio/tasks.py", line 256, in __step
result = coro.send(None)
File "/home/user/.local/lib/python3.9/site-packages/ib_insync/ib.py", line 1626, in connectAsync
await self.client.connectAsync(host, port, clientId, timeout)
File "/home/user/.local/lib/python3.9/site-packages/ib_insync/client.py", line 218, in connectAsync
await asyncio.wait_for(self.apiStart, timeout)
File "/usr/lib/python3.9/asyncio/tasks.py", line 494, in wait_for
raise exceptions.TimeoutError() from exc
asyncio.exceptions.TimeoutError
Naja, du scheints da ja auch eine Menge an Symbolen zu konsumieren.Dein Weg funktioniert, nur ab und mal leider nicht, liegt aber eher an der API:
Vielleicht kannst du die Exception abfangen.
Da muss ich allerdings jetzt passen, da ich den Fehler nicht nachstellen kann.