Dataframes aus verschiedenen Prozessen zusammenfügen

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

@mirko3107,

"mehrere Tasks" heißt bei asyncio nicht mehrere Threads. Bei Asyncio übergibst du Tasks (Arbeitspakete) an die Eventloop. (Zum Beispiel mit asyncio.gather()) Die führt diese dann - grob gesagt - so aus, dass immer der Task an die Reihe kommt der gerade etwas sinnvolles zu tun hat. Ein Task der gerade auf Serverantwort wartet wird zugunsten eines anderen zurückgestellt.
In deinem Fall brauchst du aber nicht mehrere Tasks.

Ich denke, du verwendest die ib.reqMktData() Methode falsch. Jedenfalls scheint es mir so wenn ich in die Dokumentation schaue.
https://ib-insync.readthedocs.io/api.ht ... reqMktData

Das ist eine Subscription. Wie gesagt, ich habe keine Praxiserfahrung mit ib-insync. Aber ich vermute, nachdem du das einmal aufgerufen hast, wirst du dort in bestimmten Zeitabständen die Daten wie bei einem Generator abholen können.
Du holst in einer Schleife einmal alle aufgelaufenen Daten ab. Dann ist der Generator leer und deine Schleife und das Programm wird beendet. Statt dessen solltest du in bestimmten Zeitabständen immer wieder die aufgelaufenen Daten abholen.

Es sind nun einmal Kursdaten. Die werden ja wohl nur alle paar Millisekunden erzeugt.
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Ich hab das Script umgebaut, dass ich nun pro contract Werte bekomme, aller 1s.

Das Script müsste ich nun so umbauen, dass ich mehrere Anfragen auf einmal senden kann, wird nicht leicht.

Code: Alles auswählen

import asyncio
import ib_insync as ibi
import pandas as pd
import time

yf_df = pd.read_csv('yahoo.csv')
stocks = yf_df.Ticker.tolist()

class App:

    async def run(self):
        self.ib = ibi.IB()
        with await self.ib.connectAsync('127.0.0.1', 7496, clientId=16):
            self.ib.reqMarketDataType(1)
            contracts = [ibi.Stock(symbol, 'SMART', 'USD') for symbol in stocks]
            for s in contracts:
                iv = self.ib.reqMktData(s, "106,100", False, False)
                await asyncio.sleep(1)
                print(round(iv.impliedVolatility*100, 2))

    def stop(self):
        self.ib.disconnect()


app = App()
try:
    asyncio.run(app.run())
except (KeyboardInterrupt, SystemExit):
    app.stop()
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

@mirk03107,
Das Script müsste ich nun so umbauen, dass ich mehrere Anfragen auf einmal senden kann, wird nicht leicht.
Mehrere Anfragen in asyncio, ist kein Problem. Ich kann dir dazu gerne ein Beispiel posten.
Nur hatte ich ja schon geschrieben, dass das wahrscheinlich gar nicht notwendig ist.

Hast du dir mal den Link zur Dokumentation angeschaut?
da wird doch sogar erklärt, dass es anfangs mehrere Sekunden dauern kann bis die Daten zur Verfügung stehen. Bei einer Subscription must du die Daten periodisch abholen.
Der Server stellt sie einfach nicht schneller zur Verfügung, egal wieviele Threads, Prozesse oder Tasks parallel laufen.

Du brauchst wahrscheinlich nur eine Endlosschleife in der run()-Funktion die einmal alle 100 Millisekunden alle aufgelaufenen Ticker-Werte abholt und in deine lokale Datenbank zwischenspeichert.
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Du meinst, wenn ich 100 Werte hintereinander anfrage, kommen irgendwann Daten zurück, wenn ich dann nochmal hintereinander abfrage?
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

In der Dokumentation steht, dass du einen Ticker als Rückgabewert der Funktion reqMktData() bekommst. (Bei dir ist das wohl 'iv') Dieser sollte dann nach eine bestimmten Zeit (1 Sekunde oder so) die letzten aufgelaufenen Kursdaten enthalten die du auslesen und zwischenspeichern kannst. Dann must du wahrscheinlich wieder etwas warten, bis weitere Kursdaten aufgelaufen sind, die du wieder auslesen kannst.
Das sollte also in einer Schleife passieren.

Zur Zeit holst du aber nur ein Datenpaket ab:

Code: Alles auswählen

print(round(iv.impliedVolatility*100, 2))
... und das für jeden Contract

Außerdem hat die Funktion reqMktData() noch einige optionale Eingangsparameter. Es kann sein dass du die auch entsprechend setzen must. (Z.B.: mktDataOptions)

Ich habe mir aber nur Bruchstücke der Dokumentation angeschaut. Ich würde dir empfehlen das mal genauer durchzulesen, sonst bleibt das Ganze eine Ratespiel.

Mit welcher Frequenz stellt der Broker die Daten eigentlich bereit? Normalerweise gibt es doch Informationen dazu.
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Du meinst sicher den Code wie der folgende:

Code: Alles auswählen

from ib_insync import *

ib = IB()
ib.connect('127.0.0.1', 7496, clientId=10)

stock = Stock('AMD', 'SMART', 'USD')

market_data = ib.reqMktData(stock, '106,100', False, False)

def onPendingTicker(ticker):
    print("pending ticker event received")
    print(ticker)

ib.pendingTickersEvent += onPendingTicker

ib.run()
Hier spuckt mir die API in Dauerschleife die Werte einer Aktie aus.

Code: Alles auswählen

{Ticker(contract=Stock(symbol='AMD', exchange='SMART', currency='USD'), time=datetime.datetime(2021, 7, 28, 18, 42, 20, 28620, tzinfo=datetime.timezone.utc), bid=98.12, bidSize=18, ask=98.14, askSize=50, last=98.12, lastSize=3, prevBid=98.1, prevBidSize=4, prevAsk=98.12, prevAskSize=7, prevLast=98.1, prevLastSize=2, volume=1146819, open=93.35, high=98.14, low=89.65, close=91.03, putVolume=323402, callVolume=1010232, impliedVolatility=0.37976469524517303, ticks=[TickData(time=datetime.datetime(2021, 7, 28, 18, 42, 20, 28620, tzinfo=datetime.timezone.utc), tickType=30, price=-1.0, size=323402)])}
pending ticker event received
{Ticker(contract=Stock(symbol='AMD', exchange='SMART', currency='USD'), time=datetime.datetime(2021, 7, 28, 18, 42, 20, 30634, tzinfo=datetime.timezone.utc), bid=98.12, bidSize=18, ask=98.14, askSize=50, last=98.12, lastSize=3, prevBid=98.1, prevBidSize=4, prevAsk=98.12, prevAskSize=7, prevLast=98.1, prevLastSize=2, volume=1146822, open=93.35, high=98.14, low=89.65, close=91.03, putVolume=323402, callVolume=1010232, impliedVolatility=0.37976469524517303, ticks=[TickData(time=datetime.datetime(2021, 7, 28, 18, 42, 20, 30634, tzinfo=datetime.timezone.utc), tickType=8, price=-1.0, size=1146822)])}
pending ticker event received
{Ticker(contract=Stock(symbol='AMD', exchange='SMART', currency='USD'), time=datetime.datetime(2021, 7, 28, 18, 42, 20, 32995, tzinfo=datetime.timezone.utc), bid=98.12, bidSize=15, ask=98.13, askSize=3, last=98.12, lastSize=3, prevBid=98.1, prevBidSize=18, prevAsk=98.14, prevAskSize=50, prevLast=98.1, prevLastSize=2, volume=1146822, open=93.35, high=98.14, low=89.65, close=91.03, putVolume=323402, callVolume=1010232, impliedVolatility=0.37976469524517303, ticks=[TickData(time=datetime.datetime(2021, 7, 28, 18, 42, 20, 32995, tzinfo=datetime.timezone.utc), tickType=2, price=98.13, size=3), TickData(time=datetime.datetime(2021, 7, 28, 18, 42, 20, 32995, tzinfo=datetime.timezone.utc), tickType=0, price=98.12, size=15)])}
Wie bau ich das um, damit ich mehrere Abfragen parallel machen kann? Steig da nicht durch.
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

@mirko3107,

ja, das ist doch schonmal gut. Wenn ich das richtig sehe, bekommst du hier ca. alle 20ms den Ticker von AMD.
Was soll jetzt parallel laufen?
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Ich brauch nicht aller paar ms eine Aktualisierung einer Aktie, sondern die Werte mehrerer Aktien gleichzeitig, aber nicht als Ticker im ms-Takt.
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Ich versuche mal. mehrere Verbindungen zur API aufzubauen und damit parallel Daten abzugreifen, vielleicht klappt das ja.
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

Du hattest doch in deinem vorherigen Beispielcode schon die Möglichkeit mehrere "Contracts" abzufragen. Die must du nur wiederholt durchführen, in einer Schleife und zwischen zwei Anfragen etwas warten.
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Contracts kann ich auch alle auf einmal abfragen, das klappt. Aber danach kann ich scheinbar nur pro Client eine Datenabfrage machen, nicht mehrere auf einmal.
Ich versuche gerade, mehrere Clients auf einmal laufen zu lassen, muss dabei nur die Clientnummer ändern. Angeblich wäre die Obergrenze 32, ich wäre schon über 2 happy.

Wie kann ich sowas am besten umsetzen? eine Function für die Datenabfrage und pro Function-Aufruf nur eine andere Clientnummer übergeben?
August1328
User
Beiträge: 65
Registriert: Samstag 27. Februar 2021, 12:18

Hallo zusammen,

vielleicht kann ich mit einem async Beispiel helfen... ich habe während den letzten Monaten & lockdown ein eigenes kleines script geschrieben, welches selbständig traden kann.

Ich bekomme von einem Anbieter streaming Daten, die dann vom script asynchron verarbeitet werden. Da ich kein professioneller Programmierer bin, habe ich lange "gekämpft" bis ich asynchron verstanden habe bzw. hat mir jemand die entscheidenden Tips gegeben, unten mehr dazu.

Hier in Auszügen der Code, der den asynchronen Rahmen setzt:

Code: Alles auswählen

import asyncio
import ib_insync as ibi
import socketio
...

ib = ibi.IB()
sio = socketio.AsyncClient()

@sio.on('connect')
def on_connect():
    print("\nConnected to streaming API")

@sio.on('data')
async def on_data(data):
... 

async def trade(tickersymbol, regel):
...

async def main_loop():
    await ib.connectAsync()
    await sio.connect('http://...')
    await sio.wait()

try:
    asyncio.run(main_loop())
...
Da ich das alleine nicht hinbekommen habe und ich kurz davor war, mein Projekt aufzugeben, habe ich den Autor von ib_insync angeschrieben, ob er mir hilft und gegen einen kleinen Obulus hat er mir meinen Code korrigiert und weitere Tipps gegeben.

Nun läuft das script täglich und wenn ich Zeit habe, arbeite ich an Verbesserungen.

Viel Erfolg... Andy
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

Aber danach kann ich scheinbar nur pro Client eine Datenabfrage machen, nicht mehrere auf einmal
Wie hast du das denn bisher versucht? Der Beispielcode macht nur eine Abfrage, den must du anpassen um mehrere zu bekommen.
Ich versuche gerade, mehrere Clients auf einmal laufen zu lassen, muss dabei nur die Clientnummer ändern. Angeblich wäre die Obergrenze 32, ich wäre schon über 2 happy.
Mehrere Clients zu verwenden ist sicher nicht Sinn der Sache, aber wenn es funktioniert und dich nicht stört, ...
Wie kann ich sowas am besten umsetzen? eine Funktion für die Datenabfrage und pro Funktion-Aufruf nur eine andere Clientnummer übergeben?
Wahrscheinlich mit asyncio gather.
Ein Beispiel:
https://docs.python.org/3/library/async ... ncurrently

Die Funktion "factorial" im Beispiel, wäre dann deine Funktion für die Datenabfrage.
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Hab eine Mehrfachverbindung hinbekommen:

Code: Alles auswählen

import nest_asyncio
import asyncio
import time
from ib_insync import *

nest_asyncio.apply()
util.logToFile('connect.log', 'DEBUG')
start = time.time()

async def connection(id):
    ib = IB()
    ib.connect('127.0.0.1', 7496, clientId=id, timeout=10.0)
    ib.reqMarketDataType(1)
    contract = Stock('AAPL', 'SMART', 'USD')
    data = ib.reqMktData(contract, "106,100", False, False)
    await asyncio.sleep(2)
    iv = data.impliedVolatility*100
    print('ID:',id, iv)
    return iv

async def main():
    await asyncio.gather(
        connection(100),
        connection(101),
        connection(102),
        connection(103),
        connection(104),
        connection(105),
        connection(106),
        connection(107),
        connection(108),
        connection(109),
        connection(110)
        )
    end = time.time()
    print('Dauer: ',int(end-start),'s')

asyncio.run(main())
Erstmal 10x die gleiche Aktie, nun muss ich sehen, wie ich die Liste aus 700 Werten aufgeteilt bekomme und nachher wieder zusammengebaut.

Ergebnis:

Code: Alles auswählen

ID: 110 21.21831800725646
ID: 109 21.21831800725646
ID: 108 21.21831800725646
ID: 107 21.21831800725646
ID: 106 21.21831800725646
ID: 105 21.21831800725646
ID: 104 21.21831800725646
ID: 103 21.21831800725646
ID: 102 21.21831800725646
ID: 101 21.21831800725646
ID: 100 21.21831800725646
Dauer:  2 s
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

@mirco3107,

einfach einer Variablen zuweisen und darüber iterieren:

Code: Alles auswählen

all_data = await asyncio.gather(
        connection(100),
        connection(101),
        connection(102),
        connection(103),
        connection(104),
        connection(105),
        connection(106),
        connection(107),
        connection(108),
        connection(109),
        connection(110)
        )

for data in all_data:
    ... 
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

dann macht der doch auch wieder jede Verbindung nach der anderen und nicht alle auf einmal, oder?
__deets__
User
Beiträge: 14522
Registriert: Mittwoch 14. Oktober 2015, 14:29

Nö. Gather wartet bis alle quasi-Parallelen Anfragen fertig sind.
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

Hier ist nochmal ein Beispiel, was ich vorher in einem anderen Thread geposted hatte:
Funktion aaaa und bbbb werden asynchron ausgeführt. Das heißt immer wenn die eine gerade Pause macht kann wird die andere für die Ausführung priorisiert und arbeitet in der Zwischenzeit.
Bei deinen Anfragen an den Broker heißt das, wenn gerade eine der Verbindungen auf Antwort vom Server wartet, wird die Wartezeit genutzt um mit einer anderen Anfrage weiter zu arbeiten.
Das Priorisieren wird durch asyncio im Hintergrund gesteuert.

Code: Alles auswählen

import asyncio
import time

async def aaaa():
    await asyncio.sleep(1)
    return "AAAA"

async def bbbb():
    await asyncio.sleep(0.3)
    return "BBBB"

async def main():
    while True:
        start = time.perf_counter()
        responses = await asyncio.gather(*[aaaa(), bbbb()])
        for response in responses:
            print(response)
        print(f"Dauer: {time.perf_counter()-start:.2f}s")

asyncio.run(main())

"""
Ausgabe:
AAAA
BBBB
Dauer: 1.01s
AAAA
BBBB
Dauer: 0.99s
...
"""
Falls die beiden Coroutines nacheinander laufen würden, würde es 1,3 Sekunden dauern. Wie du aber siehst, dauert es nur so lange, wie die langsamere der beiden Routinen läuft.
Hier kann man auch sehen wie man einfach über die "responses" iterieren und ausgeben kann.
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Danke, sieht auch gut aus, teste ich morgen mal.

Wie bekomm ich denn aber nun jedem Prozess einen anderen Wert aus meiner Liste zugewiesen? Wenn ich über den asyncio.gather iteriere, macht der doch 10x den selben Wert, oder?
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

@mirko3107,
Wie bekomm ich denn aber nun jedem Prozess einen anderen Wert aus meiner Liste zugewiesen? Wenn ich über den asyncio.gather iteriere, macht der doch 10x den selben Wert, oder?
Du bekommst die einzelnen Rückgabewerte jeder einzelnen Funktion, die du an asyncio.gather() übergeben hast.
Da du die Variable 'iv' aus jeder Funktion zurück gibst, wirst du eine Liste aller 'iv' bekommen. Und zwar in der Reihenfolge der Funktionen.

Und: Ich hoffe "Prozess" war nur ein Schreibfehler und kein Verständnisproblem. Denn unterschiedliche Prozesse sind dies nicht.
Antworten