Dataframes aus verschiedenen Prozessen zusammenfügen

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Contracts kann ich auch alle auf einmal abfragen, das klappt. Aber danach kann ich scheinbar nur pro Client eine Datenabfrage machen, nicht mehrere auf einmal.
Ich versuche gerade, mehrere Clients auf einmal laufen zu lassen, muss dabei nur die Clientnummer ändern. Angeblich wäre die Obergrenze 32, ich wäre schon über 2 happy.

Wie kann ich sowas am besten umsetzen? eine Function für die Datenabfrage und pro Function-Aufruf nur eine andere Clientnummer übergeben?
August1328
User
Beiträge: 65
Registriert: Samstag 27. Februar 2021, 12:18

Hallo zusammen,

vielleicht kann ich mit einem async Beispiel helfen... ich habe während den letzten Monaten & lockdown ein eigenes kleines script geschrieben, welches selbständig traden kann.

Ich bekomme von einem Anbieter streaming Daten, die dann vom script asynchron verarbeitet werden. Da ich kein professioneller Programmierer bin, habe ich lange "gekämpft" bis ich asynchron verstanden habe bzw. hat mir jemand die entscheidenden Tips gegeben, unten mehr dazu.

Hier in Auszügen der Code, der den asynchronen Rahmen setzt:

Code: Alles auswählen

import asyncio
import ib_insync as ibi
import socketio
...

ib = ibi.IB()
sio = socketio.AsyncClient()

@sio.on('connect')
def on_connect():
    print("\nConnected to streaming API")

@sio.on('data')
async def on_data(data):
... 

async def trade(tickersymbol, regel):
...

async def main_loop():
    await ib.connectAsync()
    await sio.connect('http://...')
    await sio.wait()

try:
    asyncio.run(main_loop())
...
Da ich das alleine nicht hinbekommen habe und ich kurz davor war, mein Projekt aufzugeben, habe ich den Autor von ib_insync angeschrieben, ob er mir hilft und gegen einen kleinen Obulus hat er mir meinen Code korrigiert und weitere Tipps gegeben.

Nun läuft das script täglich und wenn ich Zeit habe, arbeite ich an Verbesserungen.

Viel Erfolg... Andy
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

Aber danach kann ich scheinbar nur pro Client eine Datenabfrage machen, nicht mehrere auf einmal
Wie hast du das denn bisher versucht? Der Beispielcode macht nur eine Abfrage, den must du anpassen um mehrere zu bekommen.
Ich versuche gerade, mehrere Clients auf einmal laufen zu lassen, muss dabei nur die Clientnummer ändern. Angeblich wäre die Obergrenze 32, ich wäre schon über 2 happy.
Mehrere Clients zu verwenden ist sicher nicht Sinn der Sache, aber wenn es funktioniert und dich nicht stört, ...
Wie kann ich sowas am besten umsetzen? eine Funktion für die Datenabfrage und pro Funktion-Aufruf nur eine andere Clientnummer übergeben?
Wahrscheinlich mit asyncio gather.
Ein Beispiel:
https://docs.python.org/3/library/async ... ncurrently

Die Funktion "factorial" im Beispiel, wäre dann deine Funktion für die Datenabfrage.
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Hab eine Mehrfachverbindung hinbekommen:

Code: Alles auswählen

import nest_asyncio
import asyncio
import time
from ib_insync import *

nest_asyncio.apply()
util.logToFile('connect.log', 'DEBUG')
start = time.time()

async def connection(id):
    ib = IB()
    ib.connect('127.0.0.1', 7496, clientId=id, timeout=10.0)
    ib.reqMarketDataType(1)
    contract = Stock('AAPL', 'SMART', 'USD')
    data = ib.reqMktData(contract, "106,100", False, False)
    await asyncio.sleep(2)
    iv = data.impliedVolatility*100
    print('ID:',id, iv)
    return iv

async def main():
    await asyncio.gather(
        connection(100),
        connection(101),
        connection(102),
        connection(103),
        connection(104),
        connection(105),
        connection(106),
        connection(107),
        connection(108),
        connection(109),
        connection(110)
        )
    end = time.time()
    print('Dauer: ',int(end-start),'s')

asyncio.run(main())
Erstmal 10x die gleiche Aktie, nun muss ich sehen, wie ich die Liste aus 700 Werten aufgeteilt bekomme und nachher wieder zusammengebaut.

Ergebnis:

Code: Alles auswählen

ID: 110 21.21831800725646
ID: 109 21.21831800725646
ID: 108 21.21831800725646
ID: 107 21.21831800725646
ID: 106 21.21831800725646
ID: 105 21.21831800725646
ID: 104 21.21831800725646
ID: 103 21.21831800725646
ID: 102 21.21831800725646
ID: 101 21.21831800725646
ID: 100 21.21831800725646
Dauer:  2 s
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

@mirco3107,

einfach einer Variablen zuweisen und darüber iterieren:

Code: Alles auswählen

all_data = await asyncio.gather(
        connection(100),
        connection(101),
        connection(102),
        connection(103),
        connection(104),
        connection(105),
        connection(106),
        connection(107),
        connection(108),
        connection(109),
        connection(110)
        )

for data in all_data:
    ... 
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

dann macht der doch auch wieder jede Verbindung nach der anderen und nicht alle auf einmal, oder?
__deets__
User
Beiträge: 14545
Registriert: Mittwoch 14. Oktober 2015, 14:29

Nö. Gather wartet bis alle quasi-Parallelen Anfragen fertig sind.
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

Hier ist nochmal ein Beispiel, was ich vorher in einem anderen Thread geposted hatte:
Funktion aaaa und bbbb werden asynchron ausgeführt. Das heißt immer wenn die eine gerade Pause macht kann wird die andere für die Ausführung priorisiert und arbeitet in der Zwischenzeit.
Bei deinen Anfragen an den Broker heißt das, wenn gerade eine der Verbindungen auf Antwort vom Server wartet, wird die Wartezeit genutzt um mit einer anderen Anfrage weiter zu arbeiten.
Das Priorisieren wird durch asyncio im Hintergrund gesteuert.

Code: Alles auswählen

import asyncio
import time

async def aaaa():
    await asyncio.sleep(1)
    return "AAAA"

async def bbbb():
    await asyncio.sleep(0.3)
    return "BBBB"

async def main():
    while True:
        start = time.perf_counter()
        responses = await asyncio.gather(*[aaaa(), bbbb()])
        for response in responses:
            print(response)
        print(f"Dauer: {time.perf_counter()-start:.2f}s")

asyncio.run(main())

"""
Ausgabe:
AAAA
BBBB
Dauer: 1.01s
AAAA
BBBB
Dauer: 0.99s
...
"""
Falls die beiden Coroutines nacheinander laufen würden, würde es 1,3 Sekunden dauern. Wie du aber siehst, dauert es nur so lange, wie die langsamere der beiden Routinen läuft.
Hier kann man auch sehen wie man einfach über die "responses" iterieren und ausgeben kann.
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Danke, sieht auch gut aus, teste ich morgen mal.

Wie bekomm ich denn aber nun jedem Prozess einen anderen Wert aus meiner Liste zugewiesen? Wenn ich über den asyncio.gather iteriere, macht der doch 10x den selben Wert, oder?
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

@mirko3107,
Wie bekomm ich denn aber nun jedem Prozess einen anderen Wert aus meiner Liste zugewiesen? Wenn ich über den asyncio.gather iteriere, macht der doch 10x den selben Wert, oder?
Du bekommst die einzelnen Rückgabewerte jeder einzelnen Funktion, die du an asyncio.gather() übergeben hast.
Da du die Variable 'iv' aus jeder Funktion zurück gibst, wirst du eine Liste aller 'iv' bekommen. Und zwar in der Reihenfolge der Funktionen.

Und: Ich hoffe "Prozess" war nur ein Schreibfehler und kein Verständnisproblem. Denn unterschiedliche Prozesse sind dies nicht.
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Meine Frage ist, wie übergebe ich jeder einzelnen Funktionen einen anderen Wert aus meiner Liste?

So wie hier wirds ja nicht funktionieren, da würde ich ja 10x den selben Wert übergeben.

Code: Alles auswählen

yf_df = pd.read_csv('short.csv')
stocks = yf_df.Ticker.tolist()

async def main(ticker):
    for data in ticker:
        await asyncio.gather(
            connection(data,100),
            connection(data,101),
            connection(data,102),
            connection(data,103),
            connection(data,104),
            connection(data,105),
            connection(data,106),
            connection(data,107),
            connection(data,108),
            connection(data,109)
            )
    end = time.time()
    print('Dauer: ',int(end-start),'s')

asyncio.run(main(stocks))
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

@mirko3107,

ja da hatte ich dich missverstanden.
Ich denke du meinst dann das:
(ungetestet)

Code: Alles auswählen

yf_df = pd.read_csv('short.csv')
stocks = yf_df.Ticker.tolist()

async def main(ticker):
    await asyncio.gather(*[connection(data, num) for num, data in enumerate(ticker, 100)])
    end = time.time()
    print('Dauer: ',int(end-start),'s')

asyncio.run(main(stocks))
Nur zur Info:
time.time() ist nicht die beste Wahl für so eine Zeitmessung. Besser ist time.perf_counter()
Bei print() lieber f-strings verwenden:

Code: Alles auswählen

print(f"Dauer: {end-start}s")  
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Danke für deine Hilfe, klasse.

Kann man "num" eingrenzen? also Obergrenze 20 oder so etwas? Sonst öffnet er mir so viele Verbindungen wie Elemente in der Liste, da meckert die API.
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

@mirko3107,

Du kannst bei der ticker-Liste zum Beispiel 5 als Limit angeben:

Code: Alles auswählen

await asyncio.gather(*[connection(data, num) for num, data in enumerate(ticker[:5], 100)])
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Ok, aber da fragt er ja nur die ersten 5 aus der Liste ab, der Rest wird ignoriert.
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

Dann kannst du das auch flexibel gestalten:

Code: Alles auswählen

await asyncio.gather(*[connection(data, num) for num, data in enumerate(ticker[start:start+count], 100)])
start = 10: ab dem 11. Ticker
count = 5 : die nächsten 5

Das must du dann in eine Schleife packen, wobei start immer um 5 (oder 10 oder 20, oder 'count') hochgezählt wird.

Wie ich ja schon sagte, dass man dafür mehrere connections öffnet ist sicher nicht so gedacht. Es mag funktionieren, dann wird es aber an anderer Stelle wieder umständlich.
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Könnte man nicht auch die Liste splitten und die Teile an die "connections" übergeben?
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

Ich würde es mit einem gleitenden Index machen.
Dann werden bei jedem Schleifendurchlauf 20 ticker /stocks an die connections übergeben. Dann die nächsten 20, usw.

Code: Alles auswählen

total_number_of_stocks = 100
stocks_per_part = 20

for start in range(0, total_number_of_stocks, stocks_per_part):
    results = await asyncio.gather(*[connection(data, num) for num, data in enumerate(ticker[start:start+stocks_per_part], 100)])
    # hier müssen die results zwischengespeichert werden, sonst werden sie im nächsten Durchlauf überschrieben
Vorher Aufsplitten ist nicht so gut, da du ja auch diese Ids von 100 weiter raufzählen willst.
Aber das hat jetzt nichts mehr mit asyncio zu tun und das könnte man auf zig verschiedene Weg gehen.
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Dein Weg funktioniert, nur ab und mal leider nicht, liegt aber eher an der API:

Code: Alles auswählen

ID: 104 MGM 48.11 8/4/2021
ID: 105 MHK 33.66 10/28/2021
ID: 106 MHO 34.56 10/27/2021
ID: 107 MKC 17.66 9/28/2021
ID: 108 MKTX 25.67 10/27/2021
ID: 108 MRCY 40.71 8/3/2021
ID: 109 MRK 16.0 10/28/2021
Traceback (most recent call last):
  File "/usr/lib/python3.9/asyncio/tasks.py", line 258, in __step
    result = coro.throw(exc)
  File "/usr/lib/python3.9/asyncio/tasks.py", line 690, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/usr/lib/python3.9/asyncio/futures.py", line 284, in __await__
    yield self  # This tells Task to wait for completion.
  File "/usr/lib/python3.9/asyncio/tasks.py", line 328, in __wakeup
    future.result()
  File "/usr/lib/python3.9/asyncio/futures.py", line 196, in result
    raise exc
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/python3.9/asyncio/tasks.py", line 492, in wait_for
    fut.result()
  File "/usr/lib/python3.9/asyncio/futures.py", line 196, in result
    raise exc
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/user/stocks/streamlit/async/lll.py", line 54, in <module>
    asyncio.run(main(stocks))
  File "/home/user/.local/lib/python3.9/site-packages/nest_asyncio.py", line 32, in run
    return loop.run_until_complete(future)
  File "/home/user/.local/lib/python3.9/site-packages/nest_asyncio.py", line 70, in run_until_complete
    return f.result()
  File "/usr/lib/python3.9/asyncio/futures.py", line 201, in result
    raise self._exception
  File "/usr/lib/python3.9/asyncio/tasks.py", line 258, in __step
    result = coro.throw(exc)
  File "/home/user/stocks/streamlit/async/lll.py", line 47, in main
    results = await asyncio.gather(*[connection(data, num) for num, data in enumerate(ticker[start:start+parts], 100)])
  File "/usr/lib/python3.9/asyncio/tasks.py", line 328, in __wakeup
    future.result()
  File "/usr/lib/python3.9/asyncio/tasks.py", line 256, in __step
    result = coro.send(None)
  File "/home/user/stocks/streamlit/async/lll.py", line 17, in connection
    ib.connect('127.0.0.1', 4001, clientId=id, timeout=5.0)
  File "/home/user/.local/lib/python3.9/site-packages/ib_insync/ib.py", line 271, in connect
    return self._run(self.connectAsync(
  File "/home/user/.local/lib/python3.9/site-packages/ib_insync/ib.py", line 310, in _run
    return util.run(*awaitables, timeout=self.RequestTimeout)
  File "/home/user/.local/lib/python3.9/site-packages/ib_insync/util.py", line 322, in run
    result = loop.run_until_complete(task)
  File "/home/user/.local/lib/python3.9/site-packages/nest_asyncio.py", line 70, in run_until_complete
    return f.result()
  File "/usr/lib/python3.9/asyncio/futures.py", line 201, in result
    raise self._exception
  File "/usr/lib/python3.9/asyncio/tasks.py", line 256, in __step
    result = coro.send(None)
  File "/home/user/.local/lib/python3.9/site-packages/ib_insync/ib.py", line 1626, in connectAsync
    await self.client.connectAsync(host, port, clientId, timeout)
  File "/home/user/.local/lib/python3.9/site-packages/ib_insync/client.py", line 218, in connectAsync
    await asyncio.wait_for(self.apiStart, timeout)
  File "/usr/lib/python3.9/asyncio/tasks.py", line 494, in wait_for
    raise exceptions.TimeoutError() from exc
asyncio.exceptions.TimeoutError
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

Dein Weg funktioniert, nur ab und mal leider nicht, liegt aber eher an der API:
Naja, du scheints da ja auch eine Menge an Symbolen zu konsumieren.
Vielleicht kannst du die Exception abfangen.

Da muss ich allerdings jetzt passen, da ich den Fehler nicht nachstellen kann.
Antworten