Du hattest doch vorher gesagt, die Daten, die du durch ib_insync erhältst, brauchen gar nicht so oft angefragt werden. Warum sagst du jetzt, dass du das unbedingt parallel zum Aufruf von Yahoo machen willst?... ib_insync, mir reicht es, wenn der Teil so mal mit durchläuft. Die Werte, die da ausgespuckt werden, ändern sich nicht stündlich.
Dataframes aus verschiedenen Prozessen zusammenfügen
@mirko3107,
hier ist ein minimales Beispiel um mittels einer Queue Daten aus einem Prozess zu erhalten:
hier ist ein minimales Beispiel um mittels einer Queue Daten aus einem Prozess zu erhalten:
Code: Alles auswählen
from multiprocessing import Process, Queue, set_start_method
import pandas as pd
def get_sepal_length(queue):
iris = pd.read_csv(
"https://raw.githubusercontent.com/mwaskom/seaborn-data/master/iris.csv"
)
filered_data = iris[iris.sepal_length > 6]
queue.put(filered_data)
if __name__ == "__main__":
set_start_method("spawn")
queue = Queue()
process = Process(target=get_sepal_length, args=(queue,))
process.start()
print(queue.get())
Ich versuch mit gerade an asyncio zusammen mit ib_insync. Folgender Code kam bisher zusammen:
Scheinbar ist der Zugriff auf die API von IB zu schnell, ich bekomme nur Daten für die ersten 10-15 Anfragen, nachher nur noch NaN.
Wo und wie bau ich hier eine Bremse ein?
Danke
Code: Alles auswählen
import asyncio
import ib_insync as ibi
import pandas as pd
import time
yf_df = pd.read_csv('yahoo.csv')
stocks = yf_df.Ticker.tolist()
class App:
async def run(self):
self.ib = ibi.IB()
with await self.ib.connectAsync('127.0.0.1', 7496, clientId=16):
self.ib.reqMarketDataType(1)
contracts = [ibi.Stock(symbol, 'SMART', 'USD') for symbol in stocks]
ticks = [self.ib.reqMktData(s, "106,165,221,100", False, False) for s in contracts]
data = pd.DataFrame(ticks, columns =['bid', 'ask'])
data.to_csv('test.csv')
print(data)
def stop(self):
self.ib.disconnect()
app = App()
try:
asyncio.run(app.run())
except (KeyboardInterrupt, SystemExit):
app.stop()
Wo und wie bau ich hier eine Bremse ein?
Danke
Ich hatte ja gehofft, mit asyncio mehrere Anfragen parallel laufen zu lassen, das scheint aber irgendwie nicht zu funktionieren oder ich bin zu blöd dafür.
Asyncio.sleep macht ja nur bei gleichzeitigen Anfragen Sinn, sonst kann ich auch alles ohne asyncio abfragen.
Asyncio.sleep macht ja nur bei gleichzeitigen Anfragen Sinn, sonst kann ich auch alles ohne asyncio abfragen.
Na es hindert dich doch keiner daran, mehrere Tasks aufzusetzen. Du lässt im Moment ja nur einen laufen.
Nur klingt das was du da beschreibst eher nach einem Problem auf der Server Seite - der erlaubt nur so viele Anfragen. Oder missverstehe ich da was?
Nur klingt das was du da beschreibst eher nach einem Problem auf der Server Seite - der erlaubt nur so viele Anfragen. Oder missverstehe ich da was?
Soweit ich weiß, dauert die Antwort der API ca. 1s, bis Daten enthalten sind.
Ok, dann muss ich mal überlegen, wie ich das umsetze mit mehreren Threads.
Dann müsste ich ja die Liste der Ticker-Symbole wieder aufteilen und die jeweiligen Teile auf die Threads verteilen und die Ergebnisse nachher wieder zusammenbauen, oder?
Ok, dann muss ich mal überlegen, wie ich das umsetze mit mehreren Threads.
Dann müsste ich ja die Liste der Ticker-Symbole wieder aufteilen und die jeweiligen Teile auf die Threads verteilen und die Ergebnisse nachher wieder zusammenbauen, oder?
@mirko3107,
"mehrere Tasks" heißt bei asyncio nicht mehrere Threads. Bei Asyncio übergibst du Tasks (Arbeitspakete) an die Eventloop. (Zum Beispiel mit asyncio.gather()) Die führt diese dann - grob gesagt - so aus, dass immer der Task an die Reihe kommt der gerade etwas sinnvolles zu tun hat. Ein Task der gerade auf Serverantwort wartet wird zugunsten eines anderen zurückgestellt.
In deinem Fall brauchst du aber nicht mehrere Tasks.
Ich denke, du verwendest die ib.reqMktData() Methode falsch. Jedenfalls scheint es mir so wenn ich in die Dokumentation schaue.
https://ib-insync.readthedocs.io/api.ht ... reqMktData
Das ist eine Subscription. Wie gesagt, ich habe keine Praxiserfahrung mit ib-insync. Aber ich vermute, nachdem du das einmal aufgerufen hast, wirst du dort in bestimmten Zeitabständen die Daten wie bei einem Generator abholen können.
Du holst in einer Schleife einmal alle aufgelaufenen Daten ab. Dann ist der Generator leer und deine Schleife und das Programm wird beendet. Statt dessen solltest du in bestimmten Zeitabständen immer wieder die aufgelaufenen Daten abholen.
Es sind nun einmal Kursdaten. Die werden ja wohl nur alle paar Millisekunden erzeugt.
"mehrere Tasks" heißt bei asyncio nicht mehrere Threads. Bei Asyncio übergibst du Tasks (Arbeitspakete) an die Eventloop. (Zum Beispiel mit asyncio.gather()) Die führt diese dann - grob gesagt - so aus, dass immer der Task an die Reihe kommt der gerade etwas sinnvolles zu tun hat. Ein Task der gerade auf Serverantwort wartet wird zugunsten eines anderen zurückgestellt.
In deinem Fall brauchst du aber nicht mehrere Tasks.
Ich denke, du verwendest die ib.reqMktData() Methode falsch. Jedenfalls scheint es mir so wenn ich in die Dokumentation schaue.
https://ib-insync.readthedocs.io/api.ht ... reqMktData
Das ist eine Subscription. Wie gesagt, ich habe keine Praxiserfahrung mit ib-insync. Aber ich vermute, nachdem du das einmal aufgerufen hast, wirst du dort in bestimmten Zeitabständen die Daten wie bei einem Generator abholen können.
Du holst in einer Schleife einmal alle aufgelaufenen Daten ab. Dann ist der Generator leer und deine Schleife und das Programm wird beendet. Statt dessen solltest du in bestimmten Zeitabständen immer wieder die aufgelaufenen Daten abholen.
Es sind nun einmal Kursdaten. Die werden ja wohl nur alle paar Millisekunden erzeugt.
Ich hab das Script umgebaut, dass ich nun pro contract Werte bekomme, aller 1s.
Das Script müsste ich nun so umbauen, dass ich mehrere Anfragen auf einmal senden kann, wird nicht leicht.
Das Script müsste ich nun so umbauen, dass ich mehrere Anfragen auf einmal senden kann, wird nicht leicht.
Code: Alles auswählen
import asyncio
import ib_insync as ibi
import pandas as pd
import time
yf_df = pd.read_csv('yahoo.csv')
stocks = yf_df.Ticker.tolist()
class App:
async def run(self):
self.ib = ibi.IB()
with await self.ib.connectAsync('127.0.0.1', 7496, clientId=16):
self.ib.reqMarketDataType(1)
contracts = [ibi.Stock(symbol, 'SMART', 'USD') for symbol in stocks]
for s in contracts:
iv = self.ib.reqMktData(s, "106,100", False, False)
await asyncio.sleep(1)
print(round(iv.impliedVolatility*100, 2))
def stop(self):
self.ib.disconnect()
app = App()
try:
asyncio.run(app.run())
except (KeyboardInterrupt, SystemExit):
app.stop()
@mirk03107,
Nur hatte ich ja schon geschrieben, dass das wahrscheinlich gar nicht notwendig ist.
Hast du dir mal den Link zur Dokumentation angeschaut?
da wird doch sogar erklärt, dass es anfangs mehrere Sekunden dauern kann bis die Daten zur Verfügung stehen. Bei einer Subscription must du die Daten periodisch abholen.
Der Server stellt sie einfach nicht schneller zur Verfügung, egal wieviele Threads, Prozesse oder Tasks parallel laufen.
Du brauchst wahrscheinlich nur eine Endlosschleife in der run()-Funktion die einmal alle 100 Millisekunden alle aufgelaufenen Ticker-Werte abholt und in deine lokale Datenbank zwischenspeichert.
Mehrere Anfragen in asyncio, ist kein Problem. Ich kann dir dazu gerne ein Beispiel posten.Das Script müsste ich nun so umbauen, dass ich mehrere Anfragen auf einmal senden kann, wird nicht leicht.
Nur hatte ich ja schon geschrieben, dass das wahrscheinlich gar nicht notwendig ist.
Hast du dir mal den Link zur Dokumentation angeschaut?
da wird doch sogar erklärt, dass es anfangs mehrere Sekunden dauern kann bis die Daten zur Verfügung stehen. Bei einer Subscription must du die Daten periodisch abholen.
Der Server stellt sie einfach nicht schneller zur Verfügung, egal wieviele Threads, Prozesse oder Tasks parallel laufen.
Du brauchst wahrscheinlich nur eine Endlosschleife in der run()-Funktion die einmal alle 100 Millisekunden alle aufgelaufenen Ticker-Werte abholt und in deine lokale Datenbank zwischenspeichert.
In der Dokumentation steht, dass du einen Ticker als Rückgabewert der Funktion reqMktData() bekommst. (Bei dir ist das wohl 'iv') Dieser sollte dann nach eine bestimmten Zeit (1 Sekunde oder so) die letzten aufgelaufenen Kursdaten enthalten die du auslesen und zwischenspeichern kannst. Dann must du wahrscheinlich wieder etwas warten, bis weitere Kursdaten aufgelaufen sind, die du wieder auslesen kannst.
Das sollte also in einer Schleife passieren.
Zur Zeit holst du aber nur ein Datenpaket ab:
... und das für jeden Contract
Außerdem hat die Funktion reqMktData() noch einige optionale Eingangsparameter. Es kann sein dass du die auch entsprechend setzen must. (Z.B.: mktDataOptions)
Ich habe mir aber nur Bruchstücke der Dokumentation angeschaut. Ich würde dir empfehlen das mal genauer durchzulesen, sonst bleibt das Ganze eine Ratespiel.
Mit welcher Frequenz stellt der Broker die Daten eigentlich bereit? Normalerweise gibt es doch Informationen dazu.
Das sollte also in einer Schleife passieren.
Zur Zeit holst du aber nur ein Datenpaket ab:
Code: Alles auswählen
print(round(iv.impliedVolatility*100, 2))
Außerdem hat die Funktion reqMktData() noch einige optionale Eingangsparameter. Es kann sein dass du die auch entsprechend setzen must. (Z.B.: mktDataOptions)
Ich habe mir aber nur Bruchstücke der Dokumentation angeschaut. Ich würde dir empfehlen das mal genauer durchzulesen, sonst bleibt das Ganze eine Ratespiel.
Mit welcher Frequenz stellt der Broker die Daten eigentlich bereit? Normalerweise gibt es doch Informationen dazu.
Du meinst sicher den Code wie der folgende:
Hier spuckt mir die API in Dauerschleife die Werte einer Aktie aus.
Wie bau ich das um, damit ich mehrere Abfragen parallel machen kann? Steig da nicht durch.
Code: Alles auswählen
from ib_insync import *
ib = IB()
ib.connect('127.0.0.1', 7496, clientId=10)
stock = Stock('AMD', 'SMART', 'USD')
market_data = ib.reqMktData(stock, '106,100', False, False)
def onPendingTicker(ticker):
print("pending ticker event received")
print(ticker)
ib.pendingTickersEvent += onPendingTicker
ib.run()
Code: Alles auswählen
{Ticker(contract=Stock(symbol='AMD', exchange='SMART', currency='USD'), time=datetime.datetime(2021, 7, 28, 18, 42, 20, 28620, tzinfo=datetime.timezone.utc), bid=98.12, bidSize=18, ask=98.14, askSize=50, last=98.12, lastSize=3, prevBid=98.1, prevBidSize=4, prevAsk=98.12, prevAskSize=7, prevLast=98.1, prevLastSize=2, volume=1146819, open=93.35, high=98.14, low=89.65, close=91.03, putVolume=323402, callVolume=1010232, impliedVolatility=0.37976469524517303, ticks=[TickData(time=datetime.datetime(2021, 7, 28, 18, 42, 20, 28620, tzinfo=datetime.timezone.utc), tickType=30, price=-1.0, size=323402)])}
pending ticker event received
{Ticker(contract=Stock(symbol='AMD', exchange='SMART', currency='USD'), time=datetime.datetime(2021, 7, 28, 18, 42, 20, 30634, tzinfo=datetime.timezone.utc), bid=98.12, bidSize=18, ask=98.14, askSize=50, last=98.12, lastSize=3, prevBid=98.1, prevBidSize=4, prevAsk=98.12, prevAskSize=7, prevLast=98.1, prevLastSize=2, volume=1146822, open=93.35, high=98.14, low=89.65, close=91.03, putVolume=323402, callVolume=1010232, impliedVolatility=0.37976469524517303, ticks=[TickData(time=datetime.datetime(2021, 7, 28, 18, 42, 20, 30634, tzinfo=datetime.timezone.utc), tickType=8, price=-1.0, size=1146822)])}
pending ticker event received
{Ticker(contract=Stock(symbol='AMD', exchange='SMART', currency='USD'), time=datetime.datetime(2021, 7, 28, 18, 42, 20, 32995, tzinfo=datetime.timezone.utc), bid=98.12, bidSize=15, ask=98.13, askSize=3, last=98.12, lastSize=3, prevBid=98.1, prevBidSize=18, prevAsk=98.14, prevAskSize=50, prevLast=98.1, prevLastSize=2, volume=1146822, open=93.35, high=98.14, low=89.65, close=91.03, putVolume=323402, callVolume=1010232, impliedVolatility=0.37976469524517303, ticks=[TickData(time=datetime.datetime(2021, 7, 28, 18, 42, 20, 32995, tzinfo=datetime.timezone.utc), tickType=2, price=98.13, size=3), TickData(time=datetime.datetime(2021, 7, 28, 18, 42, 20, 32995, tzinfo=datetime.timezone.utc), tickType=0, price=98.12, size=15)])}