Seite 7 von 8

Re: Dataframes aus verschiedenen Prozessen zusammenfügen

Verfasst: Freitag 6. August 2021, 16:21
von mirko3107
Danke dir für deine wertvollen Tips.
Umgesetzt hab ich es bisher so:

Code: Alles auswählen

async def main(ticker):
    df_ibi = pd.DataFrame(columns=['Ticker', 'IV', 'Earnings'])
    amount = len(ticker)
    parts = 10
    for start in range(0, amount, parts):
        results = await asyncio.gather(*[ibi(data, num) for num, data in enumerate(ticker[start:start+parts], 100)])
        df_results = pd.DataFrame(results, columns=['Ticker', 'IV', 'Earnings'])
        df_ibi = df_ibi.append(df_results, ignore_index=True)
    df_ibi.to_csv('ibi.csv', header=True, index=False)
Wie bindet man in asyncio.gather weiter Funktionen ein?

Re: Dataframes aus verschiedenen Prozessen zusammenfügen

Verfasst: Freitag 6. August 2021, 17:22
von rogerb

Code: Alles auswählen

results = await asyncio.gather(*[ibi(data, num) for num, data in enumerate(ticker[start:start+parts], 100)])
Dieser etwas kompakte Aufruf kann auch auseinander gezogen werden:

Code: Alles auswählen

ticker_coroutines = []

for num, data in enumerate(ticker[start:start+parts], 100):
    ticker_coroutines.append(ibi(data, num))


# weitere async Funktionen in die Liste einfügen
ticker_couroutines.append(another_async_function1)
ticker_couroutines.append(another_async_function2)


results = await asyncio.gather(*ticker_coroutines)
Denn die gather() Funktion nimmt als Eingangsparameter einzelne Coroutinen, was eigentlich normale Funktionen mit einem 'async' davor sind.

zum Beispiel eine einzelne Funktion (Dafür bräuchte man natürlich kein 'gather'):

Code: Alles auswählen

asnyc def func():
    # irgend ein asynchroner code
    ...

result = await gather(func())
Oder mehrere einzelne Funktionen, die dann quasi parallel abgearbeitet werden:

Code: Alles auswählen

result = await gather(func1(), func2(), ...)
oder mehrere Funktionen in einer Liste:

Code: Alles auswählen

coroutine_list = [func1(), func(), ...]

result = await gather(*coroutine_list )

Re: Dataframes aus verschiedenen Prozessen zusammenfügen

Verfasst: Freitag 6. August 2021, 19:28
von mirko3107
Habs mal getestet, bekomme den Fehler

Code: Alles auswählen

API connection failed: OSError(24, 'Too many open files')
.

Re: Dataframes aus verschiedenen Prozessen zusammenfügen

Verfasst: Freitag 6. August 2021, 19:46
von rogerb
Vielleicht gibt die komplette Fehlermeldung mehr Aufschluss? So kann das viele Gründe haben.
Wie schon so oft gesagt, du verwendest die Library wahrscheinlich nicht so wie es eigentlich gedacht ist, ...
Dabei hebelst du wahrscheinlich Schutzmechanismen aus, die normalerweise abgefangen würden, bzw. die Performance optimieren würden.
Kann ich aber auch nur vermuten.

Vielleicht must die Anzahl der parallelen Aufrufe reduzieren.

Re: Dataframes aus verschiedenen Prozessen zusammenfügen

Verfasst: Freitag 6. August 2021, 20:54
von mirko3107
wird wohl an meinem falschen Aufruf liegen:

Code: Alles auswählen

for start in range(0, amount, parts):
	for num, data in enumerate(ticker[start:start+parts], 100):
        	ticker_coroutines.append(ibi(data, num))
results = await asyncio.gather(*ticker_coroutines)

Re: Dataframes aus verschiedenen Prozessen zusammenfügen

Verfasst: Samstag 7. August 2021, 09:37
von rogerb
Das sieht erstmal nicht falsch aus.
In der Fehlermeldung steht ja, dass zu viele Dateien geöffnet sind.
Vielleicht werden die nicht wieder geschlossen, oder es sind einfach zu viele geöffnet worden.
Du müsstest mal schauen wo und wann der Fehler genau auftritt.

Re: Dataframes aus verschiedenen Prozessen zusammenfügen

Verfasst: Samstag 7. August 2021, 10:04
von mirko3107
das komische ist ja dabei, dass ich überhaupt keine Dateien öffne, ich übergebe ja nur Listen und Dataframes

Re: Dataframes aus verschiedenen Prozessen zusammenfügen

Verfasst: Samstag 7. August 2021, 10:49
von __deets__
Du oeffnest Sockets, und sockets sind Dateiobjekte. Du machst einfach zu viele davon auf.

Re: Dataframes aus verschiedenen Prozessen zusammenfügen

Verfasst: Samstag 7. August 2021, 18:42
von mirko3107
wahrscheinlich öffnet er jetzt alle auf einmal.

bei diesem Aufruf klappt es aber:

Code: Alles auswählen

for start in range(0, amount, parts):
        results = await asyncio.gather(*[ibi(data, num) for num, data in enumerate(ticker[start:start+parts], 100)])

Re: Dataframes aus verschiedenen Prozessen zusammenfügen

Verfasst: Samstag 7. August 2021, 21:58
von rogerb
@mirko3107

das ist der gleiche Code wie vorher, nur anders geschrieben.
Warum sollte das einen Unterschied machen?

Code: Alles auswählen

for start in range(0, amount, parts):
	for num, data in enumerate(ticker[start:start+parts], 100):
        	ticker_coroutines.append(ibi(data, num))
results = await asyncio.gather(*ticker_coroutines)

Code: Alles auswählen

for start in range(0, amount, parts):
        results = await asyncio.gather(*[ibi(data, num) for num, data in enumerate(ticker[start:start+parts], 100)])

Re: Dataframes aus verschiedenen Prozessen zusammenfügen

Verfasst: Sonntag 8. August 2021, 06:55
von Sirius3
Wie bei Multiprocessing oder Threads auch, muss man bei async die Ressourcen richtig verwalten. Man muss eine gewisse Anzahl an Workern erzeugen, die über eine Queue mit Aufträgen versorgt werden und diese wieder per Queue an das Hauptprogramm zurückgeben, das die Ergebnisse dann zusammensammelt. Bei Dir kommt noch als Schwierigkeit dazu, dass Du je nach Quelle dynamisch die maximale Anzahl an parallelen Verbindungen ermitteln musst.
Aber dessen ungeachtet solltest Du erst mit dem Grundgerüst überhaupt erst einmal anfangen.

Re: Dataframes aus verschiedenen Prozessen zusammenfügen

Verfasst: Sonntag 8. August 2021, 13:32
von rogerb
@Sirius3,
das Grundgerüst ist schon fertig. mirko3107 verwendet die ib_insync Library.

Re: Dataframes aus verschiedenen Prozessen zusammenfügen

Verfasst: Montag 9. August 2021, 09:43
von mirko3107
rogerb hat geschrieben: Samstag 7. August 2021, 21:58 @mirko3107

das ist der gleiche Code wie vorher, nur anders geschrieben.
Warum sollte das einen Unterschied machen?

Code: Alles auswählen

for start in range(0, amount, parts):
	for num, data in enumerate(ticker[start:start+parts], 100):
        	ticker_coroutines.append(ibi(data, num))
results = await asyncio.gather(*ticker_coroutines)

Code: Alles auswählen

for start in range(0, amount, parts):
        results = await asyncio.gather(*[ibi(data, num) for num, data in enumerate(ticker[start:start+parts], 100)])
Irgendwie klappts nicht, finds nicht.

Re: Dataframes aus verschiedenen Prozessen zusammenfügen

Verfasst: Montag 9. August 2021, 10:19
von rogerb
@mirko3107,
Irgendwie klappts nicht, finds nicht.
Ich verstehe leider nicht was du meinst...
Um hier grundsätzlich mal weiter zu kommen, wäre es vielleicht hilfreich, wenn du nochmal beschreibst, was du eigentlich genau machen möchtest. Das betrifft auch allgemein das Ziel dieses Projekts.

Dann würde ich dir dringend empfehlen, dich mit der ib-insync und mit der Trader Workstation API Dokumentation auseinanderzusetzen.
Falls es da konkrete Verständnisprobleme gibt kannst du hier bestimmt Unterstützung bekommen.
Dabei gilt immer: Je konkreter und genauer du dein Problem beschreibst um so gezielter kann man dir auch helfen.
Aber, da man ja anscheinend ein Konto bei dem Broker haben muss um sinnvoll etwas mit der Libary anzufangen, wird die meiste Hilfestellung eher konzeptionell und theoretisch bleiben.
Das ist jedenfalls meine Einschätzung.

Übrigens, der Autor von ib_insync bietet auf seiner Git-Hub Seite auch Unterstützung an.
Falls du gar nicht weiter kommst, wäre das vielleicht eine Möglichkeit.

Re: Dataframes aus verschiedenen Prozessen zusammenfügen

Verfasst: Montag 9. August 2021, 12:20
von mirko3107
Der Aufruf per ib_insync funktioniert, der du yahoo ebenso, jedenfalls nacheinander. Das beide parallel laufen, klappt noch nicht.

Mein Aufruf der yahoo-Funktion in der main()

Code: Alles auswählen

yahoo_results = await asyncio.gather(*[yahoo(stocks) for stocks in ticker])
df_yahoo_results = pd.DataFrame(yahoo_results, columns=['Ticker' , 'Name', 'Price', 'Change', 'EPS', 'Cap', 'Volumen', '52Whigh', '52Wlow', 'Rating'])
df_yahoo = df_yahoo.append(df_yahoo_results, ignore_index=True)
df_yahoo.to_csv('yahoo.csv')
Wie müsste hier der Code aussehen, wenn ich die Funktion 5x parallel aufrufe?

Re: Dataframes aus verschiedenen Prozessen zusammenfügen

Verfasst: Montag 9. August 2021, 12:25
von jindalmnsh
Does the collection of multiple sources mean that the queries from the Internet take a long time? Then you have an IO-bound problem.

Re: Dataframes aus verschiedenen Prozessen zusammenfügen

Verfasst: Montag 9. August 2021, 12:32
von rogerb

Code: Alles auswählen

Das beide parallel laufen, klappt noch nicht
... und warum nicht?

Re: Dataframes aus verschiedenen Prozessen zusammenfügen

Verfasst: Montag 9. August 2021, 13:06
von mirko3107
Weil mir dabei immer die IB-API abstürzt, da scheinbar zu viele Anfrage auf einmal kommen.
Es wird scheinbar nicht auf die Ergebnisse der ersten 10 Anfragen gewartet, sondern gleich die nächsten 10
hinterher geschickt, und dann doppeln sich die Client-IDs und es kommt der Fehler der API.

Re: Dataframes aus verschiedenen Prozessen zusammenfügen

Verfasst: Montag 9. August 2021, 13:42
von rogerb
genau! Und da sind wir wieder an dem Punkt, wo du dich in die Dokumentation einlesen must, um es richtig zu machen. Denn solche Dinge würden bei richtiger Verwendung der Library wahrscheinlich berücksichtigt.
Mehr kann ich dazu nicht sagen.

Re: Dataframes aus verschiedenen Prozessen zusammenfügen

Verfasst: Montag 9. August 2021, 13:55
von mirko3107
Mit diesem Code klappt der Aufruf:

Code: Alles auswählen

for start in range(0, amount, parts):
	results = await asyncio.gather(*[ibi(data, num) for num, data in enumerate(ticker[start:start+parts], 100)])
Damit nicht:

Code: Alles auswählen

for start in range(0, amount, parts):
	for num, data in enumerate(ticker[start:start+parts], 100):
        	ticker_coroutines.append(ibi(data, num))
results = await asyncio.gather(*ticker_coroutines)
Irgendwo muss doch der Grund sein, dass der erste funktioniert, der zweite nicht, aber da bin ich weit weniger Experte wie du.

Edit: Wenn ich beim zweiten Aufruf "amount" z.B. durch 20 ersetze, bekomme ich folgende Meldung:

Code: Alles auswählen

Peer closed connection. clientId 100 already in use?
D.h. er ist noch mit dem ersten Client verbunden, da kommt schon die nächste hinterher.