@mirko3107,
"mehrere Tasks" heißt bei asyncio nicht mehrere Threads. Bei Asyncio übergibst du Tasks (Arbeitspakete) an die Eventloop. (Zum Beispiel mit asyncio.gather()) Die führt diese dann - grob gesagt - so aus, dass immer der Task an die Reihe kommt der gerade etwas sinnvolles zu tun hat. Ein Task der gerade auf Serverantwort wartet wird zugunsten eines anderen zurückgestellt.
In deinem Fall brauchst du aber nicht mehrere Tasks.
Ich denke, du verwendest die ib.reqMktData() Methode falsch. Jedenfalls scheint es mir so wenn ich in die Dokumentation schaue.
https://ib-insync.readthedocs.io/api.ht ... reqMktData
Das ist eine Subscription. Wie gesagt, ich habe keine Praxiserfahrung mit ib-insync. Aber ich vermute, nachdem du das einmal aufgerufen hast, wirst du dort in bestimmten Zeitabständen die Daten wie bei einem Generator abholen können.
Du holst in einer Schleife einmal alle aufgelaufenen Daten ab. Dann ist der Generator leer und deine Schleife und das Programm wird beendet. Statt dessen solltest du in bestimmten Zeitabständen immer wieder die aufgelaufenen Daten abholen.
Es sind nun einmal Kursdaten. Die werden ja wohl nur alle paar Millisekunden erzeugt.
Dataframes aus verschiedenen Prozessen zusammenfügen
Ich hab das Script umgebaut, dass ich nun pro contract Werte bekomme, aller 1s.
Das Script müsste ich nun so umbauen, dass ich mehrere Anfragen auf einmal senden kann, wird nicht leicht.
Das Script müsste ich nun so umbauen, dass ich mehrere Anfragen auf einmal senden kann, wird nicht leicht.
Code: Alles auswählen
import asyncio
import ib_insync as ibi
import pandas as pd
import time
yf_df = pd.read_csv('yahoo.csv')
stocks = yf_df.Ticker.tolist()
class App:
async def run(self):
self.ib = ibi.IB()
with await self.ib.connectAsync('127.0.0.1', 7496, clientId=16):
self.ib.reqMarketDataType(1)
contracts = [ibi.Stock(symbol, 'SMART', 'USD') for symbol in stocks]
for s in contracts:
iv = self.ib.reqMktData(s, "106,100", False, False)
await asyncio.sleep(1)
print(round(iv.impliedVolatility*100, 2))
def stop(self):
self.ib.disconnect()
app = App()
try:
asyncio.run(app.run())
except (KeyboardInterrupt, SystemExit):
app.stop()
@mirk03107,
Nur hatte ich ja schon geschrieben, dass das wahrscheinlich gar nicht notwendig ist.
Hast du dir mal den Link zur Dokumentation angeschaut?
da wird doch sogar erklärt, dass es anfangs mehrere Sekunden dauern kann bis die Daten zur Verfügung stehen. Bei einer Subscription must du die Daten periodisch abholen.
Der Server stellt sie einfach nicht schneller zur Verfügung, egal wieviele Threads, Prozesse oder Tasks parallel laufen.
Du brauchst wahrscheinlich nur eine Endlosschleife in der run()-Funktion die einmal alle 100 Millisekunden alle aufgelaufenen Ticker-Werte abholt und in deine lokale Datenbank zwischenspeichert.
Mehrere Anfragen in asyncio, ist kein Problem. Ich kann dir dazu gerne ein Beispiel posten.Das Script müsste ich nun so umbauen, dass ich mehrere Anfragen auf einmal senden kann, wird nicht leicht.
Nur hatte ich ja schon geschrieben, dass das wahrscheinlich gar nicht notwendig ist.
Hast du dir mal den Link zur Dokumentation angeschaut?
da wird doch sogar erklärt, dass es anfangs mehrere Sekunden dauern kann bis die Daten zur Verfügung stehen. Bei einer Subscription must du die Daten periodisch abholen.
Der Server stellt sie einfach nicht schneller zur Verfügung, egal wieviele Threads, Prozesse oder Tasks parallel laufen.
Du brauchst wahrscheinlich nur eine Endlosschleife in der run()-Funktion die einmal alle 100 Millisekunden alle aufgelaufenen Ticker-Werte abholt und in deine lokale Datenbank zwischenspeichert.
In der Dokumentation steht, dass du einen Ticker als Rückgabewert der Funktion reqMktData() bekommst. (Bei dir ist das wohl 'iv') Dieser sollte dann nach eine bestimmten Zeit (1 Sekunde oder so) die letzten aufgelaufenen Kursdaten enthalten die du auslesen und zwischenspeichern kannst. Dann must du wahrscheinlich wieder etwas warten, bis weitere Kursdaten aufgelaufen sind, die du wieder auslesen kannst.
Das sollte also in einer Schleife passieren.
Zur Zeit holst du aber nur ein Datenpaket ab:
... und das für jeden Contract
Außerdem hat die Funktion reqMktData() noch einige optionale Eingangsparameter. Es kann sein dass du die auch entsprechend setzen must. (Z.B.: mktDataOptions)
Ich habe mir aber nur Bruchstücke der Dokumentation angeschaut. Ich würde dir empfehlen das mal genauer durchzulesen, sonst bleibt das Ganze eine Ratespiel.
Mit welcher Frequenz stellt der Broker die Daten eigentlich bereit? Normalerweise gibt es doch Informationen dazu.
Das sollte also in einer Schleife passieren.
Zur Zeit holst du aber nur ein Datenpaket ab:
Code: Alles auswählen
print(round(iv.impliedVolatility*100, 2))
Außerdem hat die Funktion reqMktData() noch einige optionale Eingangsparameter. Es kann sein dass du die auch entsprechend setzen must. (Z.B.: mktDataOptions)
Ich habe mir aber nur Bruchstücke der Dokumentation angeschaut. Ich würde dir empfehlen das mal genauer durchzulesen, sonst bleibt das Ganze eine Ratespiel.
Mit welcher Frequenz stellt der Broker die Daten eigentlich bereit? Normalerweise gibt es doch Informationen dazu.
Du meinst sicher den Code wie der folgende:
Hier spuckt mir die API in Dauerschleife die Werte einer Aktie aus.
Wie bau ich das um, damit ich mehrere Abfragen parallel machen kann? Steig da nicht durch.
Code: Alles auswählen
from ib_insync import *
ib = IB()
ib.connect('127.0.0.1', 7496, clientId=10)
stock = Stock('AMD', 'SMART', 'USD')
market_data = ib.reqMktData(stock, '106,100', False, False)
def onPendingTicker(ticker):
print("pending ticker event received")
print(ticker)
ib.pendingTickersEvent += onPendingTicker
ib.run()
Code: Alles auswählen
{Ticker(contract=Stock(symbol='AMD', exchange='SMART', currency='USD'), time=datetime.datetime(2021, 7, 28, 18, 42, 20, 28620, tzinfo=datetime.timezone.utc), bid=98.12, bidSize=18, ask=98.14, askSize=50, last=98.12, lastSize=3, prevBid=98.1, prevBidSize=4, prevAsk=98.12, prevAskSize=7, prevLast=98.1, prevLastSize=2, volume=1146819, open=93.35, high=98.14, low=89.65, close=91.03, putVolume=323402, callVolume=1010232, impliedVolatility=0.37976469524517303, ticks=[TickData(time=datetime.datetime(2021, 7, 28, 18, 42, 20, 28620, tzinfo=datetime.timezone.utc), tickType=30, price=-1.0, size=323402)])}
pending ticker event received
{Ticker(contract=Stock(symbol='AMD', exchange='SMART', currency='USD'), time=datetime.datetime(2021, 7, 28, 18, 42, 20, 30634, tzinfo=datetime.timezone.utc), bid=98.12, bidSize=18, ask=98.14, askSize=50, last=98.12, lastSize=3, prevBid=98.1, prevBidSize=4, prevAsk=98.12, prevAskSize=7, prevLast=98.1, prevLastSize=2, volume=1146822, open=93.35, high=98.14, low=89.65, close=91.03, putVolume=323402, callVolume=1010232, impliedVolatility=0.37976469524517303, ticks=[TickData(time=datetime.datetime(2021, 7, 28, 18, 42, 20, 30634, tzinfo=datetime.timezone.utc), tickType=8, price=-1.0, size=1146822)])}
pending ticker event received
{Ticker(contract=Stock(symbol='AMD', exchange='SMART', currency='USD'), time=datetime.datetime(2021, 7, 28, 18, 42, 20, 32995, tzinfo=datetime.timezone.utc), bid=98.12, bidSize=15, ask=98.13, askSize=3, last=98.12, lastSize=3, prevBid=98.1, prevBidSize=18, prevAsk=98.14, prevAskSize=50, prevLast=98.1, prevLastSize=2, volume=1146822, open=93.35, high=98.14, low=89.65, close=91.03, putVolume=323402, callVolume=1010232, impliedVolatility=0.37976469524517303, ticks=[TickData(time=datetime.datetime(2021, 7, 28, 18, 42, 20, 32995, tzinfo=datetime.timezone.utc), tickType=2, price=98.13, size=3), TickData(time=datetime.datetime(2021, 7, 28, 18, 42, 20, 32995, tzinfo=datetime.timezone.utc), tickType=0, price=98.12, size=15)])}
Contracts kann ich auch alle auf einmal abfragen, das klappt. Aber danach kann ich scheinbar nur pro Client eine Datenabfrage machen, nicht mehrere auf einmal.
Ich versuche gerade, mehrere Clients auf einmal laufen zu lassen, muss dabei nur die Clientnummer ändern. Angeblich wäre die Obergrenze 32, ich wäre schon über 2 happy.
Wie kann ich sowas am besten umsetzen? eine Function für die Datenabfrage und pro Function-Aufruf nur eine andere Clientnummer übergeben?
Ich versuche gerade, mehrere Clients auf einmal laufen zu lassen, muss dabei nur die Clientnummer ändern. Angeblich wäre die Obergrenze 32, ich wäre schon über 2 happy.
Wie kann ich sowas am besten umsetzen? eine Function für die Datenabfrage und pro Function-Aufruf nur eine andere Clientnummer übergeben?
-
- User
- Beiträge: 65
- Registriert: Samstag 27. Februar 2021, 12:18
Hallo zusammen,
vielleicht kann ich mit einem async Beispiel helfen... ich habe während den letzten Monaten & lockdown ein eigenes kleines script geschrieben, welches selbständig traden kann.
Ich bekomme von einem Anbieter streaming Daten, die dann vom script asynchron verarbeitet werden. Da ich kein professioneller Programmierer bin, habe ich lange "gekämpft" bis ich asynchron verstanden habe bzw. hat mir jemand die entscheidenden Tips gegeben, unten mehr dazu.
Hier in Auszügen der Code, der den asynchronen Rahmen setzt:
Da ich das alleine nicht hinbekommen habe und ich kurz davor war, mein Projekt aufzugeben, habe ich den Autor von ib_insync angeschrieben, ob er mir hilft und gegen einen kleinen Obulus hat er mir meinen Code korrigiert und weitere Tipps gegeben.
Nun läuft das script täglich und wenn ich Zeit habe, arbeite ich an Verbesserungen.
Viel Erfolg... Andy
vielleicht kann ich mit einem async Beispiel helfen... ich habe während den letzten Monaten & lockdown ein eigenes kleines script geschrieben, welches selbständig traden kann.
Ich bekomme von einem Anbieter streaming Daten, die dann vom script asynchron verarbeitet werden. Da ich kein professioneller Programmierer bin, habe ich lange "gekämpft" bis ich asynchron verstanden habe bzw. hat mir jemand die entscheidenden Tips gegeben, unten mehr dazu.
Hier in Auszügen der Code, der den asynchronen Rahmen setzt:
Code: Alles auswählen
import asyncio
import ib_insync as ibi
import socketio
...
ib = ibi.IB()
sio = socketio.AsyncClient()
@sio.on('connect')
def on_connect():
print("\nConnected to streaming API")
@sio.on('data')
async def on_data(data):
...
async def trade(tickersymbol, regel):
...
async def main_loop():
await ib.connectAsync()
await sio.connect('http://...')
await sio.wait()
try:
asyncio.run(main_loop())
...
Nun läuft das script täglich und wenn ich Zeit habe, arbeite ich an Verbesserungen.
Viel Erfolg... Andy
Wie hast du das denn bisher versucht? Der Beispielcode macht nur eine Abfrage, den must du anpassen um mehrere zu bekommen.Aber danach kann ich scheinbar nur pro Client eine Datenabfrage machen, nicht mehrere auf einmal
Mehrere Clients zu verwenden ist sicher nicht Sinn der Sache, aber wenn es funktioniert und dich nicht stört, ...Ich versuche gerade, mehrere Clients auf einmal laufen zu lassen, muss dabei nur die Clientnummer ändern. Angeblich wäre die Obergrenze 32, ich wäre schon über 2 happy.
Wahrscheinlich mit asyncio gather.Wie kann ich sowas am besten umsetzen? eine Funktion für die Datenabfrage und pro Funktion-Aufruf nur eine andere Clientnummer übergeben?
Ein Beispiel:
https://docs.python.org/3/library/async ... ncurrently
Die Funktion "factorial" im Beispiel, wäre dann deine Funktion für die Datenabfrage.
Hab eine Mehrfachverbindung hinbekommen:
Erstmal 10x die gleiche Aktie, nun muss ich sehen, wie ich die Liste aus 700 Werten aufgeteilt bekomme und nachher wieder zusammengebaut.
Ergebnis:
Code: Alles auswählen
import nest_asyncio
import asyncio
import time
from ib_insync import *
nest_asyncio.apply()
util.logToFile('connect.log', 'DEBUG')
start = time.time()
async def connection(id):
ib = IB()
ib.connect('127.0.0.1', 7496, clientId=id, timeout=10.0)
ib.reqMarketDataType(1)
contract = Stock('AAPL', 'SMART', 'USD')
data = ib.reqMktData(contract, "106,100", False, False)
await asyncio.sleep(2)
iv = data.impliedVolatility*100
print('ID:',id, iv)
return iv
async def main():
await asyncio.gather(
connection(100),
connection(101),
connection(102),
connection(103),
connection(104),
connection(105),
connection(106),
connection(107),
connection(108),
connection(109),
connection(110)
)
end = time.time()
print('Dauer: ',int(end-start),'s')
asyncio.run(main())
Ergebnis:
Code: Alles auswählen
ID: 110 21.21831800725646
ID: 109 21.21831800725646
ID: 108 21.21831800725646
ID: 107 21.21831800725646
ID: 106 21.21831800725646
ID: 105 21.21831800725646
ID: 104 21.21831800725646
ID: 103 21.21831800725646
ID: 102 21.21831800725646
ID: 101 21.21831800725646
ID: 100 21.21831800725646
Dauer: 2 s
@mirco3107,
einfach einer Variablen zuweisen und darüber iterieren:
einfach einer Variablen zuweisen und darüber iterieren:
Code: Alles auswählen
all_data = await asyncio.gather(
connection(100),
connection(101),
connection(102),
connection(103),
connection(104),
connection(105),
connection(106),
connection(107),
connection(108),
connection(109),
connection(110)
)
for data in all_data:
...
Hier ist nochmal ein Beispiel, was ich vorher in einem anderen Thread geposted hatte:
Funktion aaaa und bbbb werden asynchron ausgeführt. Das heißt immer wenn die eine gerade Pause macht kann wird die andere für die Ausführung priorisiert und arbeitet in der Zwischenzeit.
Bei deinen Anfragen an den Broker heißt das, wenn gerade eine der Verbindungen auf Antwort vom Server wartet, wird die Wartezeit genutzt um mit einer anderen Anfrage weiter zu arbeiten.
Das Priorisieren wird durch asyncio im Hintergrund gesteuert.
Falls die beiden Coroutines nacheinander laufen würden, würde es 1,3 Sekunden dauern. Wie du aber siehst, dauert es nur so lange, wie die langsamere der beiden Routinen läuft.
Hier kann man auch sehen wie man einfach über die "responses" iterieren und ausgeben kann.
Funktion aaaa und bbbb werden asynchron ausgeführt. Das heißt immer wenn die eine gerade Pause macht kann wird die andere für die Ausführung priorisiert und arbeitet in der Zwischenzeit.
Bei deinen Anfragen an den Broker heißt das, wenn gerade eine der Verbindungen auf Antwort vom Server wartet, wird die Wartezeit genutzt um mit einer anderen Anfrage weiter zu arbeiten.
Das Priorisieren wird durch asyncio im Hintergrund gesteuert.
Code: Alles auswählen
import asyncio
import time
async def aaaa():
await asyncio.sleep(1)
return "AAAA"
async def bbbb():
await asyncio.sleep(0.3)
return "BBBB"
async def main():
while True:
start = time.perf_counter()
responses = await asyncio.gather(*[aaaa(), bbbb()])
for response in responses:
print(response)
print(f"Dauer: {time.perf_counter()-start:.2f}s")
asyncio.run(main())
"""
Ausgabe:
AAAA
BBBB
Dauer: 1.01s
AAAA
BBBB
Dauer: 0.99s
...
"""
Hier kann man auch sehen wie man einfach über die "responses" iterieren und ausgeben kann.
Danke, sieht auch gut aus, teste ich morgen mal.
Wie bekomm ich denn aber nun jedem Prozess einen anderen Wert aus meiner Liste zugewiesen? Wenn ich über den asyncio.gather iteriere, macht der doch 10x den selben Wert, oder?
Wie bekomm ich denn aber nun jedem Prozess einen anderen Wert aus meiner Liste zugewiesen? Wenn ich über den asyncio.gather iteriere, macht der doch 10x den selben Wert, oder?
@mirko3107,
Da du die Variable 'iv' aus jeder Funktion zurück gibst, wirst du eine Liste aller 'iv' bekommen. Und zwar in der Reihenfolge der Funktionen.
Und: Ich hoffe "Prozess" war nur ein Schreibfehler und kein Verständnisproblem. Denn unterschiedliche Prozesse sind dies nicht.
Du bekommst die einzelnen Rückgabewerte jeder einzelnen Funktion, die du an asyncio.gather() übergeben hast.Wie bekomm ich denn aber nun jedem Prozess einen anderen Wert aus meiner Liste zugewiesen? Wenn ich über den asyncio.gather iteriere, macht der doch 10x den selben Wert, oder?
Da du die Variable 'iv' aus jeder Funktion zurück gibst, wirst du eine Liste aller 'iv' bekommen. Und zwar in der Reihenfolge der Funktionen.
Und: Ich hoffe "Prozess" war nur ein Schreibfehler und kein Verständnisproblem. Denn unterschiedliche Prozesse sind dies nicht.