Dataframes aus verschiedenen Prozessen zusammenfügen

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Danke dir für deine wertvollen Tips.
Umgesetzt hab ich es bisher so:

Code: Alles auswählen

async def main(ticker):
    df_ibi = pd.DataFrame(columns=['Ticker', 'IV', 'Earnings'])
    amount = len(ticker)
    parts = 10
    for start in range(0, amount, parts):
        results = await asyncio.gather(*[ibi(data, num) for num, data in enumerate(ticker[start:start+parts], 100)])
        df_results = pd.DataFrame(results, columns=['Ticker', 'IV', 'Earnings'])
        df_ibi = df_ibi.append(df_results, ignore_index=True)
    df_ibi.to_csv('ibi.csv', header=True, index=False)
Wie bindet man in asyncio.gather weiter Funktionen ein?
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

Code: Alles auswählen

results = await asyncio.gather(*[ibi(data, num) for num, data in enumerate(ticker[start:start+parts], 100)])
Dieser etwas kompakte Aufruf kann auch auseinander gezogen werden:

Code: Alles auswählen

ticker_coroutines = []

for num, data in enumerate(ticker[start:start+parts], 100):
    ticker_coroutines.append(ibi(data, num))


# weitere async Funktionen in die Liste einfügen
ticker_couroutines.append(another_async_function1)
ticker_couroutines.append(another_async_function2)


results = await asyncio.gather(*ticker_coroutines)
Denn die gather() Funktion nimmt als Eingangsparameter einzelne Coroutinen, was eigentlich normale Funktionen mit einem 'async' davor sind.

zum Beispiel eine einzelne Funktion (Dafür bräuchte man natürlich kein 'gather'):

Code: Alles auswählen

asnyc def func():
    # irgend ein asynchroner code
    ...

result = await gather(func())
Oder mehrere einzelne Funktionen, die dann quasi parallel abgearbeitet werden:

Code: Alles auswählen

result = await gather(func1(), func2(), ...)
oder mehrere Funktionen in einer Liste:

Code: Alles auswählen

coroutine_list = [func1(), func(), ...]

result = await gather(*coroutine_list )
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Habs mal getestet, bekomme den Fehler

Code: Alles auswählen

API connection failed: OSError(24, 'Too many open files')
.
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

Vielleicht gibt die komplette Fehlermeldung mehr Aufschluss? So kann das viele Gründe haben.
Wie schon so oft gesagt, du verwendest die Library wahrscheinlich nicht so wie es eigentlich gedacht ist, ...
Dabei hebelst du wahrscheinlich Schutzmechanismen aus, die normalerweise abgefangen würden, bzw. die Performance optimieren würden.
Kann ich aber auch nur vermuten.

Vielleicht must die Anzahl der parallelen Aufrufe reduzieren.
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

wird wohl an meinem falschen Aufruf liegen:

Code: Alles auswählen

for start in range(0, amount, parts):
	for num, data in enumerate(ticker[start:start+parts], 100):
        	ticker_coroutines.append(ibi(data, num))
results = await asyncio.gather(*ticker_coroutines)
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

Das sieht erstmal nicht falsch aus.
In der Fehlermeldung steht ja, dass zu viele Dateien geöffnet sind.
Vielleicht werden die nicht wieder geschlossen, oder es sind einfach zu viele geöffnet worden.
Du müsstest mal schauen wo und wann der Fehler genau auftritt.
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

das komische ist ja dabei, dass ich überhaupt keine Dateien öffne, ich übergebe ja nur Listen und Dataframes
__deets__
User
Beiträge: 14545
Registriert: Mittwoch 14. Oktober 2015, 14:29

Du oeffnest Sockets, und sockets sind Dateiobjekte. Du machst einfach zu viele davon auf.
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

wahrscheinlich öffnet er jetzt alle auf einmal.

bei diesem Aufruf klappt es aber:

Code: Alles auswählen

for start in range(0, amount, parts):
        results = await asyncio.gather(*[ibi(data, num) for num, data in enumerate(ticker[start:start+parts], 100)])
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

@mirko3107

das ist der gleiche Code wie vorher, nur anders geschrieben.
Warum sollte das einen Unterschied machen?

Code: Alles auswählen

for start in range(0, amount, parts):
	for num, data in enumerate(ticker[start:start+parts], 100):
        	ticker_coroutines.append(ibi(data, num))
results = await asyncio.gather(*ticker_coroutines)

Code: Alles auswählen

for start in range(0, amount, parts):
        results = await asyncio.gather(*[ibi(data, num) for num, data in enumerate(ticker[start:start+parts], 100)])
Sirius3
User
Beiträge: 17754
Registriert: Sonntag 21. Oktober 2012, 17:20

Wie bei Multiprocessing oder Threads auch, muss man bei async die Ressourcen richtig verwalten. Man muss eine gewisse Anzahl an Workern erzeugen, die über eine Queue mit Aufträgen versorgt werden und diese wieder per Queue an das Hauptprogramm zurückgeben, das die Ergebnisse dann zusammensammelt. Bei Dir kommt noch als Schwierigkeit dazu, dass Du je nach Quelle dynamisch die maximale Anzahl an parallelen Verbindungen ermitteln musst.
Aber dessen ungeachtet solltest Du erst mit dem Grundgerüst überhaupt erst einmal anfangen.
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

@Sirius3,
das Grundgerüst ist schon fertig. mirko3107 verwendet die ib_insync Library.
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

rogerb hat geschrieben: Samstag 7. August 2021, 21:58 @mirko3107

das ist der gleiche Code wie vorher, nur anders geschrieben.
Warum sollte das einen Unterschied machen?

Code: Alles auswählen

for start in range(0, amount, parts):
	for num, data in enumerate(ticker[start:start+parts], 100):
        	ticker_coroutines.append(ibi(data, num))
results = await asyncio.gather(*ticker_coroutines)

Code: Alles auswählen

for start in range(0, amount, parts):
        results = await asyncio.gather(*[ibi(data, num) for num, data in enumerate(ticker[start:start+parts], 100)])
Irgendwie klappts nicht, finds nicht.
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

@mirko3107,
Irgendwie klappts nicht, finds nicht.
Ich verstehe leider nicht was du meinst...
Um hier grundsätzlich mal weiter zu kommen, wäre es vielleicht hilfreich, wenn du nochmal beschreibst, was du eigentlich genau machen möchtest. Das betrifft auch allgemein das Ziel dieses Projekts.

Dann würde ich dir dringend empfehlen, dich mit der ib-insync und mit der Trader Workstation API Dokumentation auseinanderzusetzen.
Falls es da konkrete Verständnisprobleme gibt kannst du hier bestimmt Unterstützung bekommen.
Dabei gilt immer: Je konkreter und genauer du dein Problem beschreibst um so gezielter kann man dir auch helfen.
Aber, da man ja anscheinend ein Konto bei dem Broker haben muss um sinnvoll etwas mit der Libary anzufangen, wird die meiste Hilfestellung eher konzeptionell und theoretisch bleiben.
Das ist jedenfalls meine Einschätzung.

Übrigens, der Autor von ib_insync bietet auf seiner Git-Hub Seite auch Unterstützung an.
Falls du gar nicht weiter kommst, wäre das vielleicht eine Möglichkeit.
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Der Aufruf per ib_insync funktioniert, der du yahoo ebenso, jedenfalls nacheinander. Das beide parallel laufen, klappt noch nicht.

Mein Aufruf der yahoo-Funktion in der main()

Code: Alles auswählen

yahoo_results = await asyncio.gather(*[yahoo(stocks) for stocks in ticker])
df_yahoo_results = pd.DataFrame(yahoo_results, columns=['Ticker' , 'Name', 'Price', 'Change', 'EPS', 'Cap', 'Volumen', '52Whigh', '52Wlow', 'Rating'])
df_yahoo = df_yahoo.append(df_yahoo_results, ignore_index=True)
df_yahoo.to_csv('yahoo.csv')
Wie müsste hier der Code aussehen, wenn ich die Funktion 5x parallel aufrufe?
jindalmnsh
User
Beiträge: 1
Registriert: Montag 9. August 2021, 11:44

Does the collection of multiple sources mean that the queries from the Internet take a long time? Then you have an IO-bound problem.
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

Code: Alles auswählen

Das beide parallel laufen, klappt noch nicht
... und warum nicht?
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Weil mir dabei immer die IB-API abstürzt, da scheinbar zu viele Anfrage auf einmal kommen.
Es wird scheinbar nicht auf die Ergebnisse der ersten 10 Anfragen gewartet, sondern gleich die nächsten 10
hinterher geschickt, und dann doppeln sich die Client-IDs und es kommt der Fehler der API.
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

genau! Und da sind wir wieder an dem Punkt, wo du dich in die Dokumentation einlesen must, um es richtig zu machen. Denn solche Dinge würden bei richtiger Verwendung der Library wahrscheinlich berücksichtigt.
Mehr kann ich dazu nicht sagen.
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Mit diesem Code klappt der Aufruf:

Code: Alles auswählen

for start in range(0, amount, parts):
	results = await asyncio.gather(*[ibi(data, num) for num, data in enumerate(ticker[start:start+parts], 100)])
Damit nicht:

Code: Alles auswählen

for start in range(0, amount, parts):
	for num, data in enumerate(ticker[start:start+parts], 100):
        	ticker_coroutines.append(ibi(data, num))
results = await asyncio.gather(*ticker_coroutines)
Irgendwo muss doch der Grund sein, dass der erste funktioniert, der zweite nicht, aber da bin ich weit weniger Experte wie du.

Edit: Wenn ich beim zweiten Aufruf "amount" z.B. durch 20 ersetze, bekomme ich folgende Meldung:

Code: Alles auswählen

Peer closed connection. clientId 100 already in use?
D.h. er ist noch mit dem ersten Client verbunden, da kommt schon die nächste hinterher.
Antworten