Dataframes aus verschiedenen Prozessen zusammenfügen

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Ich habe jetzt einiges probiert, aber mit wenig Erfolg.
Ich hab mein Script vom Beginn mal wieder rausgekramt und wollte da Dinge einbauen, die in den letzten Tage denke gelernt zu haben, besser gesagt gelesen.

Das Script funktioniert im Grunde, nicht schön wie ihr durchaus bemerkt habt, aber läuft.

Wenn ich aus meiner CSV-Datei per

Code: Alles auswählen

yf_df = pd.read_csv(file, sep=',')
tickers = yf_df.Ticker.tolist()
eine Ticker-Liste erstelle und diese dann per

Code: Alles auswählen

p1 = multiprocessing.Process(target=rsi, args = (tickers, ))
an den jeweiligen Prozess schicke, läuft zb. dieser hier durch:

Code: Alles auswählen

def rsi(tickers):
    rsilist = []
    while True:
        for ticker in tickers:
            try:
                rsi = pdr.get_data_yahoo(ticker, dt.datetime(2021, 5, 1), dt.datetime.now(),session=session)
                delta = rsi['Close'].diff()
                up = delta.clip(lower=0)
                down = -1 * delta.clip(upper=0)
                ema_up = up.ewm(com=13, adjust=False).mean()
                ema_down = down.ewm(com=13, adjust=False).mean()
                rs = ema_up / ema_down
                rsi['RSI'] = 100 - (100 / (1 + rs))
                rsi_value = rsi.iloc[-1]['RSI']
                print(ticker, rsi_value)
                rsilist.append(rsi_value)
            except Exception as e:
                logging.exception('Something went terribly wrong')
                return [0.0] * 4
Wie bekomm ich die Ergebnisse dieses Prozesses wieder aus diesem heraus?

Vielen Dank
LukeNukem
User
Beiträge: 232
Registriert: Mittwoch 19. Mai 2021, 03:40

mirko3107 hat geschrieben: Freitag 16. Juli 2021, 18:39 Wie bekomm ich die Ergebnisse dieses Prozesses wieder aus diesem heraus?
Naja, wenn Du es mit Multiprocessing machst, dann müssen die Daten natürlich vom sendenden Prozeß serialisiert werden -- etwa mit pandas.DataFrame.to_pickle() und vom lesenden Prozeß wieder deserialisiert werden, beispielsweise mit pandas.read_pickle(). Aber die Kollegen haben natürlich Recht: wenn dieses ib-Dingsi asynchron arbeitet, solltest Du beim Rest ebenfalls asynchron arbeiten. Man kann die Ansätze zwar mischen, aber... dazu sollte man UNBEDINGT und sehr GENAU wissen, was man tut und warum man es tut. ;-)
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Also mit async und ib_insync hatte ich bisher keinen Erfolg, per multiprocessing läuft es aber, reicht mir völlig.

MIt multiprocessing.Pool ist es aber nicht möglich, Funktionen parallel auszuführen, oder?
Hab einen Pool für yahoo mit 4 Prozessen und einen Pool für ib_insync mit 1 Prozess, welche aber nacheinander laufen.
__deets__
User
Beiträge: 14494
Registriert: Mittwoch 14. Oktober 2015, 14:29

Sinn und Zweck von multiprocessing *ist* Parallelitaet. Wenn das bei dir nicht parallel laeuft, ist da was komisch.
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

die Funktionen, die ich aufrufe, laufen auch parallel in mehreren Prozessen (ausser ib_insync), aber ich hab es nicht hinbekommen, während Funktion "Yahoo" läuft, parallel dazu die Funktion "IB" laufen zu lassen.
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

... ib_insync, mir reicht es, wenn der Teil so mal mit durchläuft. Die Werte, die da ausgespuckt werden, ändern sich nicht stündlich.
Du hattest doch vorher gesagt, die Daten, die du durch ib_insync erhältst, brauchen gar nicht so oft angefragt werden. Warum sagst du jetzt, dass du das unbedingt parallel zum Aufruf von Yahoo machen willst?
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Nicht unbedingt, wäre nur schön, wenn es gänge. Der nächste Schritt wäre, die einzelnen Funktionen per Scheduler regelmäßig aufzurufen, egal ob eine andere noch läuft oder nicht.
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

@mirko3107,

hier ist ein minimales Beispiel um mittels einer Queue Daten aus einem Prozess zu erhalten:

Code: Alles auswählen

from multiprocessing import Process, Queue, set_start_method
import pandas as pd


def get_sepal_length(queue):
    iris = pd.read_csv(
        "https://raw.githubusercontent.com/mwaskom/seaborn-data/master/iris.csv"
    )
    filered_data = iris[iris.sepal_length > 6]
    queue.put(filered_data)


if __name__ == "__main__":
    set_start_method("spawn")
    queue = Queue()

    process = Process(target=get_sepal_length, args=(queue,))
    process.start()

    print(queue.get())
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Danke, schau ich mir mal an.
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Ich versuch mit gerade an asyncio zusammen mit ib_insync. Folgender Code kam bisher zusammen:

Code: Alles auswählen

import asyncio
import ib_insync as ibi
import pandas as pd
import time

yf_df = pd.read_csv('yahoo.csv')
stocks = yf_df.Ticker.tolist()

class App:

    async def run(self):
        self.ib = ibi.IB()
        with await self.ib.connectAsync('127.0.0.1', 7496, clientId=16):
            self.ib.reqMarketDataType(1)
            contracts = [ibi.Stock(symbol, 'SMART', 'USD') for symbol in stocks]
            ticks = [self.ib.reqMktData(s, "106,165,221,100", False, False) for s in contracts]
        data = pd.DataFrame(ticks, columns =['bid', 'ask'])
        data.to_csv('test.csv')
        print(data)

    def stop(self):
        self.ib.disconnect()

app = App()
try:
    asyncio.run(app.run())
except (KeyboardInterrupt, SystemExit):
    app.stop()
Scheinbar ist der Zugriff auf die API von IB zu schnell, ich bekomme nur Daten für die ersten 10-15 Anfragen, nachher nur noch NaN.

Wo und wie bau ich hier eine Bremse ein?

Danke
__deets__
User
Beiträge: 14494
Registriert: Mittwoch 14. Oktober 2015, 14:29

await asyncio.sleep(seconds)
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Ich hatte ja gehofft, mit asyncio mehrere Anfragen parallel laufen zu lassen, das scheint aber irgendwie nicht zu funktionieren oder ich bin zu blöd dafür.
Asyncio.sleep macht ja nur bei gleichzeitigen Anfragen Sinn, sonst kann ich auch alles ohne asyncio abfragen.
__deets__
User
Beiträge: 14494
Registriert: Mittwoch 14. Oktober 2015, 14:29

Na es hindert dich doch keiner daran, mehrere Tasks aufzusetzen. Du lässt im Moment ja nur einen laufen.

Nur klingt das was du da beschreibst eher nach einem Problem auf der Server Seite - der erlaubt nur so viele Anfragen. Oder missverstehe ich da was?
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Soweit ich weiß, dauert die Antwort der API ca. 1s, bis Daten enthalten sind.

Ok, dann muss ich mal überlegen, wie ich das umsetze mit mehreren Threads.

Dann müsste ich ja die Liste der Ticker-Symbole wieder aufteilen und die jeweiligen Teile auf die Threads verteilen und die Ergebnisse nachher wieder zusammenbauen, oder?
__deets__
User
Beiträge: 14494
Registriert: Mittwoch 14. Oktober 2015, 14:29

Ja, zb einfach mit einer Queue und jeder Task (nicht thread!) holt sich das nächste.
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

@mirko3107,

"mehrere Tasks" heißt bei asyncio nicht mehrere Threads. Bei Asyncio übergibst du Tasks (Arbeitspakete) an die Eventloop. (Zum Beispiel mit asyncio.gather()) Die führt diese dann - grob gesagt - so aus, dass immer der Task an die Reihe kommt der gerade etwas sinnvolles zu tun hat. Ein Task der gerade auf Serverantwort wartet wird zugunsten eines anderen zurückgestellt.
In deinem Fall brauchst du aber nicht mehrere Tasks.

Ich denke, du verwendest die ib.reqMktData() Methode falsch. Jedenfalls scheint es mir so wenn ich in die Dokumentation schaue.
https://ib-insync.readthedocs.io/api.ht ... reqMktData

Das ist eine Subscription. Wie gesagt, ich habe keine Praxiserfahrung mit ib-insync. Aber ich vermute, nachdem du das einmal aufgerufen hast, wirst du dort in bestimmten Zeitabständen die Daten wie bei einem Generator abholen können.
Du holst in einer Schleife einmal alle aufgelaufenen Daten ab. Dann ist der Generator leer und deine Schleife und das Programm wird beendet. Statt dessen solltest du in bestimmten Zeitabständen immer wieder die aufgelaufenen Daten abholen.

Es sind nun einmal Kursdaten. Die werden ja wohl nur alle paar Millisekunden erzeugt.
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Ich hab das Script umgebaut, dass ich nun pro contract Werte bekomme, aller 1s.

Das Script müsste ich nun so umbauen, dass ich mehrere Anfragen auf einmal senden kann, wird nicht leicht.

Code: Alles auswählen

import asyncio
import ib_insync as ibi
import pandas as pd
import time

yf_df = pd.read_csv('yahoo.csv')
stocks = yf_df.Ticker.tolist()

class App:

    async def run(self):
        self.ib = ibi.IB()
        with await self.ib.connectAsync('127.0.0.1', 7496, clientId=16):
            self.ib.reqMarketDataType(1)
            contracts = [ibi.Stock(symbol, 'SMART', 'USD') for symbol in stocks]
            for s in contracts:
                iv = self.ib.reqMktData(s, "106,100", False, False)
                await asyncio.sleep(1)
                print(round(iv.impliedVolatility*100, 2))

    def stop(self):
        self.ib.disconnect()


app = App()
try:
    asyncio.run(app.run())
except (KeyboardInterrupt, SystemExit):
    app.stop()
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

@mirk03107,
Das Script müsste ich nun so umbauen, dass ich mehrere Anfragen auf einmal senden kann, wird nicht leicht.
Mehrere Anfragen in asyncio, ist kein Problem. Ich kann dir dazu gerne ein Beispiel posten.
Nur hatte ich ja schon geschrieben, dass das wahrscheinlich gar nicht notwendig ist.

Hast du dir mal den Link zur Dokumentation angeschaut?
da wird doch sogar erklärt, dass es anfangs mehrere Sekunden dauern kann bis die Daten zur Verfügung stehen. Bei einer Subscription must du die Daten periodisch abholen.
Der Server stellt sie einfach nicht schneller zur Verfügung, egal wieviele Threads, Prozesse oder Tasks parallel laufen.

Du brauchst wahrscheinlich nur eine Endlosschleife in der run()-Funktion die einmal alle 100 Millisekunden alle aufgelaufenen Ticker-Werte abholt und in deine lokale Datenbank zwischenspeichert.
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Du meinst, wenn ich 100 Werte hintereinander anfrage, kommen irgendwann Daten zurück, wenn ich dann nochmal hintereinander abfrage?
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

In der Dokumentation steht, dass du einen Ticker als Rückgabewert der Funktion reqMktData() bekommst. (Bei dir ist das wohl 'iv') Dieser sollte dann nach eine bestimmten Zeit (1 Sekunde oder so) die letzten aufgelaufenen Kursdaten enthalten die du auslesen und zwischenspeichern kannst. Dann must du wahrscheinlich wieder etwas warten, bis weitere Kursdaten aufgelaufen sind, die du wieder auslesen kannst.
Das sollte also in einer Schleife passieren.

Zur Zeit holst du aber nur ein Datenpaket ab:

Code: Alles auswählen

print(round(iv.impliedVolatility*100, 2))
... und das für jeden Contract

Außerdem hat die Funktion reqMktData() noch einige optionale Eingangsparameter. Es kann sein dass du die auch entsprechend setzen must. (Z.B.: mktDataOptions)

Ich habe mir aber nur Bruchstücke der Dokumentation angeschaut. Ich würde dir empfehlen das mal genauer durchzulesen, sonst bleibt das Ganze eine Ratespiel.

Mit welcher Frequenz stellt der Broker die Daten eigentlich bereit? Normalerweise gibt es doch Informationen dazu.
Antworten