Dataframes aus verschiedenen Prozessen zusammenfügen

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

Ist diese liste "ticker_coroutines" denn vor diesem Code Abschnitt leer, oder ist da schon was drin.
Wenn ja, dann werden die entsprechenden Couroutinen natürlich zusätzlich ausgeführt und das kann dann zu Problemen führen.
Du versuchst hier anscheinend das absolute Maximum aus der Library herauszuholen, willst dich aber nicht mit der Dokumentation beschäftigen.
Ich kann grundsätzliche Tipps bzgl. Asyncio geben, habe aber keine Praxiserfahrung mit der Library.
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Ich will nicht das Maximum herausholen, das liegt glaub ich bei 32 Verbindungen oder sogar mehr, mir reichen 10.

In Summe sind 20 Coroutinen drin, wenn die Probleme auftauchen.
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

Dann sollte es theoretisch funktionieren. Warum es dann in der Praxis nicht funktioniert kann ich leider nicht sagen.
Benutzeravatar
__blackjack__
User
Beiträge: 13122
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@mirko3107: In den beiden Varianten ist ja eigentlich sehr offensichtlich ein entscheidender Unterschied wie oft `asyncio.gather()` aufgerufen wird und mit wie vielen Coroutinen jeweils. Da muss man die IB-API-Bibliothek noch nicht mal für kennen, und um Grunde noch nicht einem `asyncio`, sondern einfach nur das Wissen über ``for``-Schleifen und Funktionsaufrufe.
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Hab jetzt einen neuen Weg getestet, der wohl erstmal funktioniert, zwar kein schöner Code, aber läuft.

Code: Alles auswählen

def ibi(tickers):
    i = 0
    ib = IB()
    id = random.randint(100,300)
    ib.connect('127.0.0.1', 7496, clientId=id, timeout=10.0)
    df_tickers = pd.DataFrame()
    results = []
    if ib.isConnected:
        for ticker in tickers:

def smap(x):
    return x()
....

def main():

listoflists = []
    splits = np.array_split(yf_df.Ticker.to_list(), parts)
    for array in splits:
        arrays = array.tolist()
        listoflists.append(arrays)
    f_ibi1 = functools.partial(ibi, listoflists[0])
    f_ibi2 = functools.partial(ibi, listoflists[1])
    f_ibi3 = functools.partial(ibi, listoflists[2])
    f_ibi4 = functools.partial(ibi, listoflists[3])
    f_ibi5 = functools.partial(ibi, listoflists[4])
    f_ibi6 = functools.partial(ibi, listoflists[5])

with Pool() as pool:
        res = pool.map(smap, [f_ibi1, f_ibi2, f_ibi3, f_ibi4, f_ibi5, f_ibi6)

Es sollte ja theoretisch gehen, dass man asyncio und multiprocessing parallel am laufen hat, oder?
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

Das ist kein lauffähiger Code, aber wenn du zufrieden bist, ...
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Hab nicht den kompletten Code gepostet, sondern nur den Teil den ich geändert habe.
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Hallo,

ich muss diesen Thread nochmal hochholen, weil momentan Problem auftreten. Folgende Meldung bekomm ich:

Code: Alles auswählen

multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 48, in mapstar
    return list(map(*args))
  File "yahoo.py", line 52, in yahoo
    result = quote_req['quoteResponse']['result'][0]
IndexError: list index out of range
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "yahoo.py", line 112, in <module>
    main()
  File "yahoo.py", line 97, in main
    yahoo_result = list(pool.map(yahoo, tickers))
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 364, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/usr/lib/python3.8/multiprocessing/pool.py", line 771, in get
    raise self._value
Hier der Teil des Scriptes, um den es geht.

Code: Alles auswählen

def yahoo(ticker):
    url = QUOTE_URL.format(ticker)
    rating_url = QUOTE2_URL.format(ticker)
    quote_req = requests.get(url, headers=headers).json()
    quote_rating = requests.get(rating_url, headers=headers).json()
    result = quote_req['quoteResponse']['result'][0]
    result_rating = quote_rating['quoteSummary']['result'][0]['financialData']
    if len(result) > 0:
        print('Financial_Data: ',ticker, 'success')
        time.sleep(0.5)
        return [ticker,
                result.get('shortName', 'Unknown'),
                result.get('regularMarketPrice', 0),
                result.get('regularMarketChangePercent', 0),
                result.get('epsCurrentYear', 0),
                result.get('marketCap', 0) / 1000000000,
                result.get('averageDailyVolume3Month', 0),
                result.get('fiftyTwoWeekHigh', 0),
                result.get('fiftyTwoWeekLow', 0),
                result.get('twoHundredDayAverage', 0),
                result_rating.get('recommendationKey', 'None')]
    else:
        print('Financial_Data: ',ticker, 'error')
        return [ticker, 'Unknown', '0', '0', '0', '0', '0','0','0','0','None']
Aufgerufen wird es hier:

Code: Alles auswählen

def main():
    pool = multiprocessing.Pool(8)
    yahoo_result = list(pool.map(yahoo, tickers))
    df_yahoo = pd.DataFrame(yahoo_result, columns=['Ticker' , 'Name', 'Price', 'Change', 'EPS', 'Cap', 'Volumen', '52Whigh', '52Wlow', 'SMA200', 'Rating'])
    
if __name__ == "__main__":
    main()
Es werden ca. 1500 Werte abgefragt. Ich habe dann zum Testen mal die Werte reduziert, gleicher Fehler. Als ich dann bei unter 100 Werten wahr, lief es durch. Ich konnte
dann wieder bis auf 500 Werte hochgehen, ohne Probleme.

Kann mir jemand sagen, was das sein könnte? Manchmal läuft es tagelang durch, dann wieder nicht.

Danke
Benutzeravatar
__blackjack__
User
Beiträge: 13122
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@mirko3107: Schau Dir die Daten halt an die Du tatsächlich bekommst. Wird wahrscheinlich irgendein Fehlerwert sein der da dann als JSON kodiert ist. API überlastet, Du fragst zu viel oder zu schnell ab, oder…
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Ok, grad mal etwas rumprobiert, liegt wirklich an einem nicht funktionierendem Ticker. Scheint mein "if len(result) > 0:" nicht zu funktionieren.
Werd mal nach einer anderen Möglichkeit suchen, das Ticker-Symbol zu validieren.
Antworten