Seite 5 von 8

Re: Dataframes aus verschiedenen Prozessen zusammenfügen

Verfasst: Samstag 17. Juli 2021, 14:56
von rogerb
... ib_insync, mir reicht es, wenn der Teil so mal mit durchläuft. Die Werte, die da ausgespuckt werden, ändern sich nicht stündlich.
Du hattest doch vorher gesagt, die Daten, die du durch ib_insync erhältst, brauchen gar nicht so oft angefragt werden. Warum sagst du jetzt, dass du das unbedingt parallel zum Aufruf von Yahoo machen willst?

Re: Dataframes aus verschiedenen Prozessen zusammenfügen

Verfasst: Samstag 17. Juli 2021, 15:12
von mirko3107
Nicht unbedingt, wäre nur schön, wenn es gänge. Der nächste Schritt wäre, die einzelnen Funktionen per Scheduler regelmäßig aufzurufen, egal ob eine andere noch läuft oder nicht.

Re: Dataframes aus verschiedenen Prozessen zusammenfügen

Verfasst: Sonntag 18. Juli 2021, 00:21
von rogerb
@mirko3107,

hier ist ein minimales Beispiel um mittels einer Queue Daten aus einem Prozess zu erhalten:

Code: Alles auswählen

from multiprocessing import Process, Queue, set_start_method
import pandas as pd


def get_sepal_length(queue):
    iris = pd.read_csv(
        "https://raw.githubusercontent.com/mwaskom/seaborn-data/master/iris.csv"
    )
    filered_data = iris[iris.sepal_length > 6]
    queue.put(filered_data)


if __name__ == "__main__":
    set_start_method("spawn")
    queue = Queue()

    process = Process(target=get_sepal_length, args=(queue,))
    process.start()

    print(queue.get())

Re: Dataframes aus verschiedenen Prozessen zusammenfügen

Verfasst: Sonntag 18. Juli 2021, 08:22
von mirko3107
Danke, schau ich mir mal an.

Re: Dataframes aus verschiedenen Prozessen zusammenfügen

Verfasst: Dienstag 27. Juli 2021, 13:34
von mirko3107
Ich versuch mit gerade an asyncio zusammen mit ib_insync. Folgender Code kam bisher zusammen:

Code: Alles auswählen

import asyncio
import ib_insync as ibi
import pandas as pd
import time

yf_df = pd.read_csv('yahoo.csv')
stocks = yf_df.Ticker.tolist()

class App:

    async def run(self):
        self.ib = ibi.IB()
        with await self.ib.connectAsync('127.0.0.1', 7496, clientId=16):
            self.ib.reqMarketDataType(1)
            contracts = [ibi.Stock(symbol, 'SMART', 'USD') for symbol in stocks]
            ticks = [self.ib.reqMktData(s, "106,165,221,100", False, False) for s in contracts]
        data = pd.DataFrame(ticks, columns =['bid', 'ask'])
        data.to_csv('test.csv')
        print(data)

    def stop(self):
        self.ib.disconnect()

app = App()
try:
    asyncio.run(app.run())
except (KeyboardInterrupt, SystemExit):
    app.stop()
Scheinbar ist der Zugriff auf die API von IB zu schnell, ich bekomme nur Daten für die ersten 10-15 Anfragen, nachher nur noch NaN.

Wo und wie bau ich hier eine Bremse ein?

Danke

Re: Dataframes aus verschiedenen Prozessen zusammenfügen

Verfasst: Dienstag 27. Juli 2021, 14:27
von __deets__
await asyncio.sleep(seconds)

Re: Dataframes aus verschiedenen Prozessen zusammenfügen

Verfasst: Dienstag 27. Juli 2021, 14:53
von mirko3107
Ich hatte ja gehofft, mit asyncio mehrere Anfragen parallel laufen zu lassen, das scheint aber irgendwie nicht zu funktionieren oder ich bin zu blöd dafür.
Asyncio.sleep macht ja nur bei gleichzeitigen Anfragen Sinn, sonst kann ich auch alles ohne asyncio abfragen.

Re: Dataframes aus verschiedenen Prozessen zusammenfügen

Verfasst: Dienstag 27. Juli 2021, 15:25
von __deets__
Na es hindert dich doch keiner daran, mehrere Tasks aufzusetzen. Du lässt im Moment ja nur einen laufen.

Nur klingt das was du da beschreibst eher nach einem Problem auf der Server Seite - der erlaubt nur so viele Anfragen. Oder missverstehe ich da was?

Re: Dataframes aus verschiedenen Prozessen zusammenfügen

Verfasst: Dienstag 27. Juli 2021, 15:40
von mirko3107
Soweit ich weiß, dauert die Antwort der API ca. 1s, bis Daten enthalten sind.

Ok, dann muss ich mal überlegen, wie ich das umsetze mit mehreren Threads.

Dann müsste ich ja die Liste der Ticker-Symbole wieder aufteilen und die jeweiligen Teile auf die Threads verteilen und die Ergebnisse nachher wieder zusammenbauen, oder?

Re: Dataframes aus verschiedenen Prozessen zusammenfügen

Verfasst: Dienstag 27. Juli 2021, 16:17
von __deets__
Ja, zb einfach mit einer Queue und jeder Task (nicht thread!) holt sich das nächste.

Re: Dataframes aus verschiedenen Prozessen zusammenfügen

Verfasst: Dienstag 27. Juli 2021, 17:54
von rogerb
@mirko3107,

"mehrere Tasks" heißt bei asyncio nicht mehrere Threads. Bei Asyncio übergibst du Tasks (Arbeitspakete) an die Eventloop. (Zum Beispiel mit asyncio.gather()) Die führt diese dann - grob gesagt - so aus, dass immer der Task an die Reihe kommt der gerade etwas sinnvolles zu tun hat. Ein Task der gerade auf Serverantwort wartet wird zugunsten eines anderen zurückgestellt.
In deinem Fall brauchst du aber nicht mehrere Tasks.

Ich denke, du verwendest die ib.reqMktData() Methode falsch. Jedenfalls scheint es mir so wenn ich in die Dokumentation schaue.
https://ib-insync.readthedocs.io/api.ht ... reqMktData

Das ist eine Subscription. Wie gesagt, ich habe keine Praxiserfahrung mit ib-insync. Aber ich vermute, nachdem du das einmal aufgerufen hast, wirst du dort in bestimmten Zeitabständen die Daten wie bei einem Generator abholen können.
Du holst in einer Schleife einmal alle aufgelaufenen Daten ab. Dann ist der Generator leer und deine Schleife und das Programm wird beendet. Statt dessen solltest du in bestimmten Zeitabständen immer wieder die aufgelaufenen Daten abholen.

Es sind nun einmal Kursdaten. Die werden ja wohl nur alle paar Millisekunden erzeugt.

Re: Dataframes aus verschiedenen Prozessen zusammenfügen

Verfasst: Dienstag 27. Juli 2021, 18:15
von mirko3107
Ich hab das Script umgebaut, dass ich nun pro contract Werte bekomme, aller 1s.

Das Script müsste ich nun so umbauen, dass ich mehrere Anfragen auf einmal senden kann, wird nicht leicht.

Code: Alles auswählen

import asyncio
import ib_insync as ibi
import pandas as pd
import time

yf_df = pd.read_csv('yahoo.csv')
stocks = yf_df.Ticker.tolist()

class App:

    async def run(self):
        self.ib = ibi.IB()
        with await self.ib.connectAsync('127.0.0.1', 7496, clientId=16):
            self.ib.reqMarketDataType(1)
            contracts = [ibi.Stock(symbol, 'SMART', 'USD') for symbol in stocks]
            for s in contracts:
                iv = self.ib.reqMktData(s, "106,100", False, False)
                await asyncio.sleep(1)
                print(round(iv.impliedVolatility*100, 2))

    def stop(self):
        self.ib.disconnect()


app = App()
try:
    asyncio.run(app.run())
except (KeyboardInterrupt, SystemExit):
    app.stop()

Re: Dataframes aus verschiedenen Prozessen zusammenfügen

Verfasst: Dienstag 27. Juli 2021, 19:14
von rogerb
@mirk03107,
Das Script müsste ich nun so umbauen, dass ich mehrere Anfragen auf einmal senden kann, wird nicht leicht.
Mehrere Anfragen in asyncio, ist kein Problem. Ich kann dir dazu gerne ein Beispiel posten.
Nur hatte ich ja schon geschrieben, dass das wahrscheinlich gar nicht notwendig ist.

Hast du dir mal den Link zur Dokumentation angeschaut?
da wird doch sogar erklärt, dass es anfangs mehrere Sekunden dauern kann bis die Daten zur Verfügung stehen. Bei einer Subscription must du die Daten periodisch abholen.
Der Server stellt sie einfach nicht schneller zur Verfügung, egal wieviele Threads, Prozesse oder Tasks parallel laufen.

Du brauchst wahrscheinlich nur eine Endlosschleife in der run()-Funktion die einmal alle 100 Millisekunden alle aufgelaufenen Ticker-Werte abholt und in deine lokale Datenbank zwischenspeichert.

Re: Dataframes aus verschiedenen Prozessen zusammenfügen

Verfasst: Dienstag 27. Juli 2021, 19:41
von mirko3107
Du meinst, wenn ich 100 Werte hintereinander anfrage, kommen irgendwann Daten zurück, wenn ich dann nochmal hintereinander abfrage?

Re: Dataframes aus verschiedenen Prozessen zusammenfügen

Verfasst: Dienstag 27. Juli 2021, 20:04
von rogerb
In der Dokumentation steht, dass du einen Ticker als Rückgabewert der Funktion reqMktData() bekommst. (Bei dir ist das wohl 'iv') Dieser sollte dann nach eine bestimmten Zeit (1 Sekunde oder so) die letzten aufgelaufenen Kursdaten enthalten die du auslesen und zwischenspeichern kannst. Dann must du wahrscheinlich wieder etwas warten, bis weitere Kursdaten aufgelaufen sind, die du wieder auslesen kannst.
Das sollte also in einer Schleife passieren.

Zur Zeit holst du aber nur ein Datenpaket ab:

Code: Alles auswählen

print(round(iv.impliedVolatility*100, 2))
... und das für jeden Contract

Außerdem hat die Funktion reqMktData() noch einige optionale Eingangsparameter. Es kann sein dass du die auch entsprechend setzen must. (Z.B.: mktDataOptions)

Ich habe mir aber nur Bruchstücke der Dokumentation angeschaut. Ich würde dir empfehlen das mal genauer durchzulesen, sonst bleibt das Ganze eine Ratespiel.

Mit welcher Frequenz stellt der Broker die Daten eigentlich bereit? Normalerweise gibt es doch Informationen dazu.

Re: Dataframes aus verschiedenen Prozessen zusammenfügen

Verfasst: Mittwoch 28. Juli 2021, 19:46
von mirko3107
Du meinst sicher den Code wie der folgende:

Code: Alles auswählen

from ib_insync import *

ib = IB()
ib.connect('127.0.0.1', 7496, clientId=10)

stock = Stock('AMD', 'SMART', 'USD')

market_data = ib.reqMktData(stock, '106,100', False, False)

def onPendingTicker(ticker):
    print("pending ticker event received")
    print(ticker)

ib.pendingTickersEvent += onPendingTicker

ib.run()
Hier spuckt mir die API in Dauerschleife die Werte einer Aktie aus.

Code: Alles auswählen

{Ticker(contract=Stock(symbol='AMD', exchange='SMART', currency='USD'), time=datetime.datetime(2021, 7, 28, 18, 42, 20, 28620, tzinfo=datetime.timezone.utc), bid=98.12, bidSize=18, ask=98.14, askSize=50, last=98.12, lastSize=3, prevBid=98.1, prevBidSize=4, prevAsk=98.12, prevAskSize=7, prevLast=98.1, prevLastSize=2, volume=1146819, open=93.35, high=98.14, low=89.65, close=91.03, putVolume=323402, callVolume=1010232, impliedVolatility=0.37976469524517303, ticks=[TickData(time=datetime.datetime(2021, 7, 28, 18, 42, 20, 28620, tzinfo=datetime.timezone.utc), tickType=30, price=-1.0, size=323402)])}
pending ticker event received
{Ticker(contract=Stock(symbol='AMD', exchange='SMART', currency='USD'), time=datetime.datetime(2021, 7, 28, 18, 42, 20, 30634, tzinfo=datetime.timezone.utc), bid=98.12, bidSize=18, ask=98.14, askSize=50, last=98.12, lastSize=3, prevBid=98.1, prevBidSize=4, prevAsk=98.12, prevAskSize=7, prevLast=98.1, prevLastSize=2, volume=1146822, open=93.35, high=98.14, low=89.65, close=91.03, putVolume=323402, callVolume=1010232, impliedVolatility=0.37976469524517303, ticks=[TickData(time=datetime.datetime(2021, 7, 28, 18, 42, 20, 30634, tzinfo=datetime.timezone.utc), tickType=8, price=-1.0, size=1146822)])}
pending ticker event received
{Ticker(contract=Stock(symbol='AMD', exchange='SMART', currency='USD'), time=datetime.datetime(2021, 7, 28, 18, 42, 20, 32995, tzinfo=datetime.timezone.utc), bid=98.12, bidSize=15, ask=98.13, askSize=3, last=98.12, lastSize=3, prevBid=98.1, prevBidSize=18, prevAsk=98.14, prevAskSize=50, prevLast=98.1, prevLastSize=2, volume=1146822, open=93.35, high=98.14, low=89.65, close=91.03, putVolume=323402, callVolume=1010232, impliedVolatility=0.37976469524517303, ticks=[TickData(time=datetime.datetime(2021, 7, 28, 18, 42, 20, 32995, tzinfo=datetime.timezone.utc), tickType=2, price=98.13, size=3), TickData(time=datetime.datetime(2021, 7, 28, 18, 42, 20, 32995, tzinfo=datetime.timezone.utc), tickType=0, price=98.12, size=15)])}
Wie bau ich das um, damit ich mehrere Abfragen parallel machen kann? Steig da nicht durch.

Re: Dataframes aus verschiedenen Prozessen zusammenfügen

Verfasst: Mittwoch 28. Juli 2021, 21:53
von rogerb
@mirko3107,

ja, das ist doch schonmal gut. Wenn ich das richtig sehe, bekommst du hier ca. alle 20ms den Ticker von AMD.
Was soll jetzt parallel laufen?

Re: Dataframes aus verschiedenen Prozessen zusammenfügen

Verfasst: Donnerstag 29. Juli 2021, 08:36
von mirko3107
Ich brauch nicht aller paar ms eine Aktualisierung einer Aktie, sondern die Werte mehrerer Aktien gleichzeitig, aber nicht als Ticker im ms-Takt.

Re: Dataframes aus verschiedenen Prozessen zusammenfügen

Verfasst: Donnerstag 29. Juli 2021, 09:52
von mirko3107
Ich versuche mal. mehrere Verbindungen zur API aufzubauen und damit parallel Daten abzugreifen, vielleicht klappt das ja.

Re: Dataframes aus verschiedenen Prozessen zusammenfügen

Verfasst: Donnerstag 29. Juli 2021, 11:38
von rogerb
Du hattest doch in deinem vorherigen Beispielcode schon die Möglichkeit mehrere "Contracts" abzufragen. Die must du nur wiederholt durchführen, in einer Schleife und zwischen zwei Anfragen etwas warten.