Dataframes aus verschiedenen Prozessen zusammenfügen

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Hallo zusammen,

ich versuche mit seit einigen Tagen an Python und hätte mal eine Frage zu weiteren Vorgehen.

Ich habe ein Script geschrieben, welches diverse Finanzdaten aus mehreren Quellen zusammenträgt und in ein Dataframe schreibt.
Das Script verarbeitet ein Dataframe mit 700 Zeilen über eine Schleife.

Da mir das ganze doch arg lange dauert (ca.40min), hab ich mich mit Multiprocessing beschäftigt und das Script in mehrere Prozesse
aufgeteilt, wodurch sich die Dauer auf 15min reduziert hat.

Wie kann ich die Ergebnisse aus jedem Prozess in ein Gesamt-Dataframe zusammenfassen?
Momentan mach ich es so, dass jeder Prozess sein Dataframe in eine CSV schreibt, welche ich in einem anderen Script wieder zu einem Dataframe
zusammenbaue.

Das geht doch sicherlich auch einfacher, oder?

Danke
mirko3107
Sirius3
User
Beiträge: 17711
Registriert: Sonntag 21. Oktober 2012, 17:20

Bedeutet das zusammentragen aus mehreren Quellen, dass die Abfragen aus dem Internet sehr lange dauern? Dann hast Du ein IO-bound Problem.
Solche Probleme löst man nicht mit multiprocessing, sondern mit einem Prozess, z.B. per asyncio.

Um genaueres zu sagen, müßte man aber den Programmcode kennen.
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Dessen bin ich mir bewusst, aber das ist ein anderes Thema.

Die Frage war ja, wie ich die einzelnen Ergebnisse eines jeden Prozesses/Threads am Ende wieder zu einem Dataframe zusammenfüge.
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Hier mal das Script, nix professionelles, aber es funktioniert.

Code: Alles auswählen

import multiprocessing
import xml.etree.ElementTree as ET
import datetime as dt
from datetime import date
from ib_insync import *
import requests
import requests_cache
import pandas as pd
import pandas_datareader as pdr
from pandas_datareader._utils import RemoteDataError
import yfinance as yf
import time

ib = IB()
ib.connect('127.0.0.1', 7496, clientId=12)

expire_after = dt.timedelta(days=3)
session = requests_cache.CachedSession(cache_name='cache', backend='sqlite',expire_after=expire_after)

yf.pdr_override()

start_date = '01-01-2021'
end_date = '07-01-2021'
start = dt.datetime.strptime(start_date, '%d-%m-%Y')

def pack1():
    list = pd.read_csv("watch.csv", usecols=['Ticker', 'Name', 'Price', 'Change', 'Cap', 'EPS'])
    rows = len(list.index)
    while True:
        for i in range(0, rows):
            try:
                ticker = list['Ticker'].values[i]
                quote_url = 'https://query1.finance.yahoo.com/v7/finance/quote?symbols='+ticker
                quote_req = requests.get(quote_url)
                change = round(quote_req.json()['quoteResponse']['result'][0].get('regularMarketChangePercent', 0),2)
                price = round(quote_req.json()['quoteResponse']['result'][0].get('regularMarketPrice', 0),2)
                if 'epsCurrentYear' in quote_req.json()['quoteResponse']['result'][0]:
                    eps = round(quote_req.json()['quoteResponse']['result'][0].get('epsCurrentYear', 0),2)
                else:
                    eps = '0'
                cap = round(quote_req.json()['quoteResponse']['result'][0].get('marketCap', 0)/1000000000, 2)
                time.sleep(1)
                print(ticker,change,price,cap)
                list.at[i, 'Price'] = price
                list.at[i, 'Change'] = change
                list.at[i, 'Cap'] = cap
                list.at[i, 'EPS'] = eps
            except KeyError as ke:
                print(ke)
                pass
            except TypeError as te:
                print(te)
                pass
            except RemoteDataError as rde:
                print(rde)
                pass
        list.to_csv('pack1.csv', header=True, index=False) 
        time.sleep(600)

def pack2():
    list = pd.read_csv("watch.csv", usecols=['Ticker', 'Volumen', '52Whigh', '52Wlow'])
    rows = len(list.index)
    while True:
        for i in range(0, rows):
            try:
                ticker = list['Ticker'].values[i]
                url_cap = 'https://query2.finance.yahoo.com/v10/finance/quoteSummary/'
                url_cap2 = '?formatted=true&crumb=8ldhetOu7RJ&lang=en-US&region=US&modules=summaryDetail&corsDomain=finance.yahoo.com'
                cap_url = url_cap + ticker + url_cap2
                req_cap = requests.get(cap_url)
                volume = req_cap.json()['quoteSummary']['result'][0]['summaryDetail']['volume'].get('raw', 0)
                high = req_cap.json()['quoteSummary']['result'][0]['summaryDetail']['fiftyTwoWeekHigh'].get('raw', 0)
                low = req_cap.json()['quoteSummary']['result'][0]['summaryDetail']['fiftyTwoWeekLow'].get('raw', 0)
                time.sleep(1)
                print(ticker,volume,high,low)
                list.at[i, 'Volumen'] = volume
                list.at[i, '52Whigh'] = round(high, 2)
                list.at[i, '52Wlow'] = round(low, 2)
            except KeyError as ke:
                print(ke)
                pass
            except TypeError as te:
                print(te)
                pass
            except RemoteDataError as rde:
                print(rde)
                pass
        list.to_csv('pack2.csv', header=True, index=False, columns=['Ticker', 'Volumen', '52Whigh', '52Wlow'])
        time.sleep(180)
        
def rating():
    list = pd.read_csv("watch.csv", usecols=['Ticker', 'Rating'])
    rows = len(list.index)
    while True:
        for i in range(0, rows):
            try:
                ticker = list['Ticker'].values[i]
                rat1_url = 'https://query2.finance.yahoo.com/v10/finance/quoteSummary/'
                rat2_url = '?formatted=true&crumb=swg7qs5y9UP&lang=en-US&region=US&' \
                           'modules=upgradeDowngradeHistory,recommendationTrend,' \
                           'financialData,earningsHistory,earningsTrend,industryTrend&' \
                           'corsDomain=finance.yahoo.com'
                rat_url = rat1_url + ticker + rat2_url
                rat_req = requests.get(rat_url)
                rating = rat_req.json()['quoteSummary']['result'][0]['financialData'].get('recommendationKey', 'None')
                time.sleep(1)
                print(ticker,rating)
                list.at[i, 'Rating'] = rating
            except KeyError as ke:
                print(ke)
                pass
            except TypeError as te:
                print(te)
                pass
            except RemoteDataError as rde:
                print(rde)
                pass
        list.to_csv('rating.csv', header=True, columns=['Ticker', 'Rating'])
        time.sleep(86400)
        
def rsi():
    list = pd.read_csv("watch.csv", usecols=['Ticker', 'RSI'])
    rows = len(list.index)
    while True:
        for i in range(0, rows):
            try:
                ticker = list['Ticker'].values[i]
                rsi = pdr.get_data_yahoo(ticker, dt.datetime(2021, 5, 1), dt.datetime.now(),session=session)
                delta = rsi['Close'].diff()
                up = delta.clip(lower=0)
                down = -1 * delta.clip(upper=0)
                ema_up = up.ewm(com=13, adjust=False).mean()
                ema_down = down.ewm(com=13, adjust=False).mean()
                rs = ema_up / ema_down
                rsi['RSI'] = 100 - (100 / (1 + rs))
                rsi_value = rsi.iloc[-1]['RSI']
                list.at[i, 'RSI'] = round(rsi_value, 2)
                print(ticker,rsi_value)
            except KeyError as ke:
                print(ke)
                pass
            except TypeError as te:
                print(te)
                pass
            except RemoteDataError as rde:
                print(rde)
                list.at[i, 'RSI'] = '0' 
                pass
        list.to_csv('rsi.csv', header=True, index=False, columns=['Ticker', 'RSI'])
        print('RSI CSV written')
        time.sleep(1800)
       
def sma():
    list = pd.read_csv("watch.csv", usecols=['Ticker', 'SMA200'])
    rows = len(list.index)
    while True:
        for i in range(0, rows):
            try:
                ticker = list['Ticker'].values[i]
                sma = pdr.get_data_yahoo(ticker, dt.datetime(2020, 5, 1),session=session)
                #sma = yf.download(ticker, dt.datetime(2020, 5, 1))
                sma['SMA10'] = sma['Close'].rolling(10).mean()
                sma['SMA50'] = sma['Close'].rolling(50).mean()
                sma['SMA200'] = sma['Close'].rolling(200).mean()
                sma_value = sma.iloc[-1]['SMA200']
                print(ticker, sma_value)
                list.at[i, 'SMA200'] = round(sma_value, 2)
                #time.sleep(1)
            except KeyError as ke:
                print(ke)
                pass
            except TypeError as te:
                print(te)
                pass
            except RemoteDataError as rde:
                print(rde)
                pass
        list.to_csv('sma.csv', header=True, index=False, columns=['Ticker', 'SMA200'])
        time.sleep(180)
        
def stoch():
    list = pd.read_csv("watch.csv", usecols=['Ticker', 'Stoch'])
    rows = len(list.index)
    while True:
        for i in range(0, rows):
            try:
                ticker = list['Ticker'].values[i]
                stoch = pdr.get_data_yahoo(ticker, dt.datetime(2021, 5, 1), dt.datetime.now(),session=session)
                stoch['14-high'] = stoch['High'].rolling(14).max()
                stoch['14-low'] = stoch['Low'].rolling(14).min()
                stoch['%K'] = (stoch['Close'] - stoch['14-low']) * 100 / (stoch['14-high'] - stoch['14-low'])
                stoch['%D'] = stoch['%K'].rolling(3).mean()
                stoch2 = stoch.iloc[-1]['%D']
                list.at[i, 'Stoch'] = round(stoch2, 2)
                print(ticker,stoch2)
            except KeyError as ke:
                print(ke)
                pass
            except TypeError as te:
                print(te)
                pass
            except RemoteDataError as rde:
                print(rde)
                pass
        list.to_csv('stoch.csv', header=True, index=False, columns=['Ticker', 'Stoch'])
        time.sleep(180)

def earnings_iv():
    list = pd.read_csv("watch.csv", usecols=['Ticker', 'Earnings', 'IV'])
    rows = len(list.index)
    while ib.isConnected():
        for i in range(0, rows):
            try:
                ticker = list['Ticker'].values[i]
                contract = Stock(ticker, 'SMART', 'USD')
                ib.reqMarketDataType(1)
                ib.qualifyContracts(contract)
                data = ib.reqMktData(contract, "106,100", False, False)
                ib.sleep(1)
                iv = round(data.impliedVolatility * 100, 2)
                earn = ib.reqFundamentalData(contract, 'CalendarReport')
                if type(earn) == str:
                    tree = ET.ElementTree(ET.fromstring(earn))
                    root = tree.getroot()
                    item = tree.find('.//Date')
                    earnings = item.text
                else:
                    earnings = '01/01/2000'
                print(ticker, earnings,iv)
                list.at[i, 'Earnings'] = earnings
                list.at[i, 'IV'] = iv
                ib.cancelMktData(contract)
            except KeyError as ke:
                print(ke)
                pass
            except TypeError as te:
                print(te)
                pass
        ib.disconnect()
        list.to_csv('earnings_iv.csv', header=True, index=False, columns=['Ticker', 'Earnings', 'IV'])
        time.sleep(86400)

def main():
    p1 = multiprocessing.Process(target=rsi)
    p2 = multiprocessing.Process(target=sma)
    p3 = multiprocessing.Process(target=stoch)
    p4 = multiprocessing.Process(target=rating)
    p5 = multiprocessing.Process(target=pack1)
    p6 = multiprocessing.Process(target=pack2)
    p7 = multiprocessing.Process(target=earnings_iv)
    p1.start()
    p2.start()
    p3.start()
    p4.start()
    p5.start()
    p6.start()
    p7.start()

if __name__ == "__main__":
    main()
LukeNukem
User
Beiträge: 232
Registriert: Mittwoch 19. Mai 2021, 03:40

mirko3107 hat geschrieben: Mittwoch 7. Juli 2021, 14:44 Hier mal das Script, nix professionelles, aber es funktioniert.

Code: Alles auswählen

                change = round(quote_req.json()['quoteResponse']['result'][0].get('regularMarketChangePercent', 0),2)
                price = round(quote_req.json()['quoteResponse']['result'][0].get('regularMarketPrice', 0),2)
                if 'epsCurrentYear' in quote_req.json()['quoteResponse']['result'][0]:
                    eps = round(quote_req.json()['quoteResponse']['result'][0].get('epsCurrentYear', 0),2)
                else:
                    eps = '0'
                cap = round(quote_req.json()['quoteResponse']['result'][0].get('marketCap', 0)/1000000000, 2)
IMHO wäre es sinnvoller, erstmal die Basics glattzuziehen, denn so... wie sage ich das... gehst Du eine ungewöhnliche Intimität mit Deinem eigenen Knie ein. Das Parsen von JSON-Daten muß zeichenweise erfolgen und ist aus diesem Grund eine recht aufwändige Veranstaltung, und Dictionaries sind jetzt auch nicht unbedingt das performanteste Feature von Python.

Code: Alles auswählen

                data = quote_req.json()['quoteResponse']['result'][0]
                change = round(data.get('regularMarketChangePercent', 0),2)
                price = round(data.get('regularMarketPrice', 0),2)
                eps = round(data.get('epsCurrentYear', 0),2)
                cap = round(data.get('marketCap', 0)/1000000000, 2)
oder sogar:

Code: Alles auswählen

                data = quote_req.json()['quoteResponse']['result'][0]
                results = {key: round(data.get(key, 0), 2) for key in  ('regularMarketChangePercent', 'regularMarketPrice', 'epsCurrentYear', 'marketCap')}
                results['market_cap'] = round(results['market_cap']/1000000000, 2)
Vorteil ist, daß Du Dir die vielen und womöglich teuren Aufrufe von req_json() und die Dict-Lookups sparst.

Neben solcher Kleinigkeiten gibt es da so ein paar Leitsätze, die für jede Performanceoptimierung gelten, darunter: "Measure, don't guess" von Kirk Pepperdine. Genau das hast Du aber nicht getan, und einige der anderen Threadteilnehmer wohl auch nicht. Es gibt aber im Python-Universum sehr probate Mittel für solche Messungen, etwa die Module "profile" und "cProfile". Eine andere Möglichkeit können altbekannte und etablierte UNIX-Programme wie time(1), top(1) und ps(1) bieten, aber im Kern bleibt es immer dasselbe: Du solltest zunächst herausfinden, womit Dein Programm so viel Zeit vertrödelt, und dann genau dort gezielt ansetzen. Und ja: es ist eine durchaus valide Annahme, daß das Problem die I/O-Waits sein könnten -- wobei sich multithreading dann wohl eher anböte als multiprocessing, noch eher aber concurrent.futures oder asyncio -- aber es kann natürlich auch an allem anderen liegen: seien es die vielen Aufrufe von req_json(), die Dict-Lookups, oder die vielen, vielen manuellen Schreibzugriffe auf Deine bereits vorhandenen Dataframes, ... genau, oder. Measure, don't guess. HTH! ;-)
Benutzeravatar
snafu
User
Beiträge: 6731
Registriert: Donnerstag 21. Februar 2008, 17:31
Wohnort: Gelsenkirchen

Beim unnötigen wiederholten Parsen der JSON-Daten stimme ich zu. Das sollte man immer nur einmal pro Datensatz machen und das jeweilige Ergebnis ansprechen. Was ich jedoch nicht teile, ist die Aussage, dass Wörterbuch-Zugriffe langsam wären. Eher das Gegenteil ist der Fall. 🤷‍♂️
DasIch
User
Beiträge: 2718
Registriert: Montag 19. Mai 2008, 04:21
Wohnort: Berlin

Neben solcher Kleinigkeiten gibt es da so ein paar Leitsätze, die für jede Performanceoptimierung gelten, darunter: "Measure, don't guess" von Kirk Pepperdine. Genau das hast Du aber nicht getan, und einige der anderen Threadteilnehmer wohl auch nicht
Man kann über so ein Problem auch nachdenken und vielleicht hat man auch ein bisschen intuitives Verständnis davon wie lange typische Operationen dauern. Zur groben Einordnung hilft z.b. dies. Hat man dieses Verständnis sollte auch ganz ohne Profiler klar sein dass Code der einen O(n) Algorithmus hat und sequentiell n HTTP Requests macht, wohl eher nicht wegen einem dict lookup (< <1ms), JSON parsing (<1ms) oder Schreibzugriffe auf einem Dataframe (<<1ms) langsam ist sondern dass diese HTTP Requests (<~1s) wohl eher das Problem darstellen. Schliesslich müsste man so um die 1000 dict lookups pro Iteration machen um in den Bereich eines HTTP Requests zu kommen.

Ein bessere Ansatz wäre z.b. bei `pack1` alle Ticker Werte zu nehmen, die Requests alle nebenläufig zu machen (mit Threads oder asyncio), die Ergebnis dann zu sammeln daraus ein Dataframe zu bauen und dies mit dem input zu auf `Ticker` zu mergen. Im Prinzip macht man dann bei den anderen Funktionen genauso, also die Daten möglichst nebenläufig alle auf einmal ziehen um nicht auf einzelne Requests zu warten und sobald alles im Speicher ist möglichst in großen Schritten verarbeiten um pandas auszunutzen.
Sirius3
User
Beiträge: 17711
Registriert: Sonntag 21. Oktober 2012, 17:20

@mirko3107: lass Dich nicht von anderen Beitragenden zu diesem Thread verwirren, Wörterbuchzugriffe sind sehr performant. Statt dessen gilt hier das DRY-Prinzip "Don't repeat yourself": Code der nur einmal dasteht muß man auch nur einmal verstehen.

Was man auf keinen Fall benutzt, sind globale Variablen, erst recht nicht in Zusammenhang mit multiprocessing.

Benutze keine kryptischen Abkürzungen, sondern aussagekräftige Variablennamen.
`list` ist der Name einer eingebauten Klasse und sollte nicht überdeckt werden. Über einen Index iteriert man nicht, weil man auch direkt über die Zeilen Deines Dataframes iterieren könnte.
Strings stückelt man nicht mit + zusammen, vor allem nicht bei URL-Parameters, weil die nicht alle Zeichen enthalten dürfen.
Das Ergebnis eines requests.get ist kein req(est) sondern ein response.
`eps` ist mal eine Zahl und mal ein String, das sollte nicht sein.
Exceptions fängt man dort ab, wo sie auftreten.
Die erste Funktion könnte so aussehen:

Code: Alles auswählen

QUOTE_URL = "https://query1.finance.yahoo.com/v7/finance/quote"

def pack1():
    watch = pd.read_csv("watch.csv", usecols=['Ticker', 'Name', 'Price', 'Change', 'Cap', 'EPS'])
    while True:
        for index, ticker in enumerate(watch.Ticker):
            try:
                quote_response = requests.get(QUOTE_URL, params={"symbols": ticker})
                result = quote_response.json()['quoteResponse']['result'][0]
            except (KeyError, IndexError, RemoteDataError) as error:
                print(f"Failed reading {ticker}: {error}")
            else:
                watch.iloc[index].Change = round(result.get('regularMarketChangePercent', 0),2)
                watch.iloc[index].Price = round(result.get('regularMarketPrice', 0),2)
                watch.iloc[index].EPS = round(result.get('epsCurrentYear', 0),2)
                watch.iloc[index].Cap = round(result.get('marketCap', 0)/1000000000, 2)
            time.sleep(1)
        watch.to_csv('pack1.csv', header=True, index=False) 
        time.sleep(600)
Und natürlich dauert das relativ lang, wegen der vielen `sleep`.
Wenn ein Ticker aus einem unbekannten Problem nicht gelesen werden kann, enthält der noch die Werte des letzten Laufs. Ist das so gewollt? Aber was ist beim ersten Lauf? Ist 0 wirklich der Wert, den Du haben möchtest, wenn einer der Schlüssel nicht existiert?
Die Funktion macht aber viel zu viel.
Eigentlich müßte die Funktion so aussehen:

Code: Alles auswählen

QUOTE_URL = "https://query1.finance.yahoo.com/v7/finance/quote"

def query_quote(tickers):
    quote = []
    for ticker in tickers:
        try:
            quote_response = requests.get(QUOTE_URL, params={"symbols": ticker})
            result = quote_response.json()['quoteResponse']['result'][0]
        except (KeyError, IndexError, RemoteDataError) as error:
            print(f"Failed reading {ticker}: {error}")
            quote.append((ticker, 0, 0, 0, 0))
        else:
            quote.append((
                ticker,
                round(result.get('regularMarketPrice', 0),2),
                round(result.get('regularMarketChangePercent', 0),2),
                round(result.get('marketCap', 0)/1000000000, 2),
                round(result.get('epsCurrentYear', 0),2)
            ))
        time.sleep(1)
    return pd.DataFrame(quote, columns=['Ticker', 'Price', 'Change', 'Cap', 'EPS'])
Dann kann man sie flexibler einsetzen.

Das ganze packt man in eine äußere Schleife, wo man alle Quellen abfrägt, benutzt concurrent.futures.ThreadPoolExecutor um einfach parallelisieren zu können, und gleichzeitig die Funktionen samt Rückgabewert verwenden zu können und kann dann dort auch einfach die Ergebnisse irgendwie zusammenpacken.

@DasIch: beim parallelen Abfragen von der selben Quelle hat der Server bestimmt etwas dagegen.
Benutzeravatar
snafu
User
Beiträge: 6731
Registriert: Donnerstag 21. Februar 2008, 17:31
Wohnort: Gelsenkirchen

Hier ein ganz gutes Tutorial zum nebenläufigen Anfordern von Daten, wenn viele URLs benötigt werden: https://www.twilio.com/blog/asynchronou ... th-aiohttp

In das asyncio-Framework muss man sich aber erstmal eine Weile einlesen, wenn das Thema neu für einen ist. Oder notfalls eins der dort gezeigten Beispiele nehmen und anpassen.
DasIch
User
Beiträge: 2718
Registriert: Montag 19. Mai 2008, 04:21
Wohnort: Berlin

@DasIch: beim parallelen Abfragen von der selben Quelle hat der Server bestimmt etwas dagegen.
Wenn man es übertreibt sicherlich aber die meisten Browser bauen bis zu 6 Verbindungen gleichzeitig auf. Ich denke daran kann man sich in so einer Situation guten Gewissens orientieren, es sei den es werden explizite Limits dokumentiert oder man trifft auf ein Rate Limit (also 429 Antworten).
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Wie gesagt, ich bin kein Experte in Python, hab mir das meiste zusammengegoogelt und mir zig Videos angeschaut, um erstmal grob dahinter zu steigen.

Ich werd mir eure Anregungen annehmen und mich mal an die Optimierung machen.

Die sleeps hab ich eingefügt, weil doch ab und an mal von yahoo ein Fehler kam, aber sehr selten.
LukeNukem
User
Beiträge: 232
Registriert: Mittwoch 19. Mai 2021, 03:40

snafu hat geschrieben: Mittwoch 7. Juli 2021, 19:54 Beim unnötigen wiederholten Parsen der JSON-Daten stimme ich zu. Das sollte man immer nur einmal pro Datensatz machen und das jeweilige Ergebnis ansprechen.
Das eh. ;-)
snafu hat geschrieben: Mittwoch 7. Juli 2021, 19:54 Was ich jedoch nicht teile, ist die Aussage, dass Wörterbuch-Zugriffe langsam wären. Eher das Gegenteil ist der Fall. 🤷‍♂️
Sei mir bitte nicht böse, aber ich hab' mir das nicht ausgesucht, daß es so ist... und ich fürchte, Du hast meinen Punkt nicht ganz verstanden. Dict-Zugriffe in Python sind relativ flott (okay, solange man sie nicht mit C oder C++ vergleicht), aber das ANLEGEN eines Dict in Python ist eine ziemlich teure Veranstaltung, und so ein Dict selbst ist obendrein eine... recht speicherintensive Angelegenheit.
Benutzeravatar
snafu
User
Beiträge: 6731
Registriert: Donnerstag 21. Februar 2008, 17:31
Wohnort: Gelsenkirchen

Der Aufbau einer Datenstruktur nimmt mit zunehmender Komplexität und Datenmenge natürlich auch mehr Zeit in Anspruch. Aber langsam im Vergleich wozu? Was wäre deine Alternative und warum sollte man diese anstelle von Wörterbüchern nutzen bzw welchen immensen Zeitgewinn brächte sie? 🤔
Sirius3
User
Beiträge: 17711
Registriert: Sonntag 21. Oktober 2012, 17:20

@LukeNukem: C hat keine Wörterbücher in der Standardbibliothek und die Standardimplementierung in C++ ist deutlich langsamer, weil man sich bei Python sehr viele Gedanken um Geschwindigkeit gemacht hat, weil Wörterbücher so ein zentrales Element sind. Außerdem wären das Microoptimierungen. Zudem setzt Du selbst Wörterbücher ein. So schlimm kann es also nicht sein.
DasIch
User
Beiträge: 2718
Registriert: Montag 19. Mai 2008, 04:21
Wohnort: Berlin

Um mal "zentrales Element" von Sirius3 etwas auszuführen: In CPython ist im Prinzip jedes Objekt letztendlich ein Dictionary, ggfs. mit bisschen Kram drumherum. Jeder Attributzugriff macht mindestens einen dict lookup. Der Zugriff auf eine Variable, sofern sie nicht lokal zu einer Funktion sind, führt zu einem dict lookup. Das erstellen eines Objektes führt zwangsläufig auch dazu dass ein dict erstellt wird, es sei den man hat __slots__ definiert oder sie ist in C implementiert. Die Konsequenz ist natürlich auch dass das erstellen von allen Objekten viel speicherintensiver ist als es in z.B. C oder Rust wäre. Deswegen nutzt man ja übrigens auch Dinge wie numpy oder pandas.

Du solltest auch bedenken dass das anlegen von Objekten auf der Heap in Sprachen wie Python massiv optimiert ist und i.d.R. nicht einem naiven malloc() o.ä. wie in C oder C++ entspricht. Das anlegen von Objekten und damit auch dicts ist wahrscheinlich deutlich schneller als du erwartest.
__deets__
User
Beiträge: 14493
Registriert: Mittwoch 14. Oktober 2015, 14:29

Vor allem hat C++ in der Standardimplementierung fuer map & set Implementierungen, die auf Baumsuche basieren. Darum muss man ja nur < und = implementieren fuer die Schluesseltypen. Handelt sich aber O(log(n)) ein. Anders als Python, das gleich zu Beginn auch alles hashen konnte. Mit unordered_map geht das inzwischen auch in C++, aber dafuer muss man dann eben auch eine hash-Funktion anbieten. Und da hat man dann eben O(1) (amortisiert....)
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Sirius3 hat geschrieben: Mittwoch 7. Juli 2021, 20:08 @mirko3107: lass Dich nicht von anderen Beitragenden zu diesem Thread verwirren, Wörterbuchzugriffe sind sehr performant. Statt dessen gilt hier das DRY-Prinzip "Don't repeat yourself": Code der nur einmal dasteht muß man auch nur einmal verstehen.

Was man auf keinen Fall benutzt, sind globale Variablen, erst recht nicht in Zusammenhang mit multiprocessing.

Benutze keine kryptischen Abkürzungen, sondern aussagekräftige Variablennamen.
`list` ist der Name einer eingebauten Klasse und sollte nicht überdeckt werden. Über einen Index iteriert man nicht, weil man auch direkt über die Zeilen Deines Dataframes iterieren könnte.
Strings stückelt man nicht mit + zusammen, vor allem nicht bei URL-Parameters, weil die nicht alle Zeichen enthalten dürfen.
Das Ergebnis eines requests.get ist kein req(est) sondern ein response.
`eps` ist mal eine Zahl und mal ein String, das sollte nicht sein.
Exceptions fängt man dort ab, wo sie auftreten.
Die erste Funktion könnte so aussehen:

Code: Alles auswählen

QUOTE_URL = "https://query1.finance.yahoo.com/v7/finance/quote"

def pack1():
    watch = pd.read_csv("watch.csv", usecols=['Ticker', 'Name', 'Price', 'Change', 'Cap', 'EPS'])
    while True:
        for index, ticker in enumerate(watch.Ticker):
            try:
                quote_response = requests.get(QUOTE_URL, params={"symbols": ticker})
                result = quote_response.json()['quoteResponse']['result'][0]
            except (KeyError, IndexError, RemoteDataError) as error:
                print(f"Failed reading {ticker}: {error}")
            else:
                watch.iloc[index].Change = round(result.get('regularMarketChangePercent', 0),2)
                watch.iloc[index].Price = round(result.get('regularMarketPrice', 0),2)
                watch.iloc[index].EPS = round(result.get('epsCurrentYear', 0),2)
                watch.iloc[index].Cap = round(result.get('marketCap', 0)/1000000000, 2)
            time.sleep(1)
        watch.to_csv('pack1.csv', header=True, index=False) 
        time.sleep(600)
Und natürlich dauert das relativ lang, wegen der vielen `sleep`.
Wenn ein Ticker aus einem unbekannten Problem nicht gelesen werden kann, enthält der noch die Werte des letzten Laufs. Ist das so gewollt? Aber was ist beim ersten Lauf? Ist 0 wirklich der Wert, den Du haben möchtest, wenn einer der Schlüssel nicht existiert?
Die Funktion macht aber viel zu viel.
Eigentlich müßte die Funktion so aussehen:

Code: Alles auswählen

QUOTE_URL = "https://query1.finance.yahoo.com/v7/finance/quote"

def query_quote(tickers):
    quote = []
    for ticker in tickers:
        try:
            quote_response = requests.get(QUOTE_URL, params={"symbols": ticker})
            result = quote_response.json()['quoteResponse']['result'][0]
        except (KeyError, IndexError, RemoteDataError) as error:
            print(f"Failed reading {ticker}: {error}")
            quote.append((ticker, 0, 0, 0, 0))
        else:
            quote.append((
                ticker,
                round(result.get('regularMarketPrice', 0),2),
                round(result.get('regularMarketChangePercent', 0),2),
                round(result.get('marketCap', 0)/1000000000, 2),
                round(result.get('epsCurrentYear', 0),2)
            ))
        time.sleep(1)
    return pd.DataFrame(quote, columns=['Ticker', 'Price', 'Change', 'Cap', 'EPS'])
Dann kann man sie flexibler einsetzen.

Das ganze packt man in eine äußere Schleife, wo man alle Quellen abfrägt, benutzt concurrent.futures.ThreadPoolExecutor um einfach parallelisieren zu können, und gleichzeitig die Funktionen samt Rückgabewert verwenden zu können und kann dann dort auch einfach die Ergebnisse irgendwie zusammenpacken.

@DasIch: beim parallelen Abfragen von der selben Quelle hat der Server bestimmt etwas dagegen.
Das ist natürlich eine Stufe, auf der ich noch lange nicht angekommen bin, aber vielen Dank für die Tips, ich werd mich dran versuchen.

Und von concurrent.futures.ThreadPoolExecutor hab ich noch nie was gehört, aber wie gesagt, bin totaler Python-Noob im ersten Level.
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

@mirko3107,

noch ein Hinweis:
Das von dir verwendete ib-insync
https://ib-insync.readthedocs.io/api.html
ist ein asynchrones Framework, daher ist aus meiner Sicht, asyncio von allen anderen Vorschlägen zu bevorzugen.
Besonders den Punkt "The One Rule" bzgl time.sleep() würde ich mir mal durchlesen.

Das was dich hier am stärksten limitiert ist wahrscheinlich die Beschränkung der API von Yahoo-Finance, denn die erlauben eine begrenzte Anzahl von Requests pro Stunde.
Ich würde dir empfehlen das entsprechen zu "tunen" um das Optimum da heraus zu holen, bzw. insgesamt weniger oft auf die API zuzugreifen.
Eine Sekunde pauschal zu warten ist sicher nicht nötig.
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Also ib_insync hab ich bisher nur per Multiprocessing ans laufen bekommen, bei Threading kommt "RuntimeError: There is no current event loop in thread 'Thread-7'", alle anderen Threads laufen problemlos.

Code: Alles auswählen

def earnings_iv():
    list = pd.read_csv("watch.csv", usecols=['Ticker', 'Earnings', 'IV'])
    rows = len(list.index)
    while ib.isConnected():
        for i in range(0, 10):
            try:
                ticker = list['Ticker'].values[i]
                contract = Stock(ticker, 'SMART', 'USD')
                ib.reqMarketDataType(1)
                ib.qualifyContractsAsync(contract)
                data = ib.reqMktData(contract, "106,100", False, False)
                ib.sleep(1)
                iv = round(data.impliedVolatility * 100, 2)
                earn = ib.reqFundamentalData(contract, 'CalendarReport')
                if type(earn) == str:
                    tree = ET.ElementTree(ET.fromstring(earn))
                    root = tree.getroot()
                    item = tree.find('.//Date')
                    earnings = item.text
                else:
                    earnings = '01/01/2000'
                print(ticker, earnings,iv)
                list.at[i, 'Earnings'] = earnings
                list.at[i, 'IV'] = iv
                ib.cancelMktData(contract)
            except (KeyError, IndexError) as error:
                print(f"Failed reading {ticker}: {error}")
                pass
        ib.disconnect()
        #list.to_csv('earnings_iv.csv', header=True, index=False, columns=['Ticker', 'Earnings', 'IV'])
        time.sleep(3600)
__deets__
User
Beiträge: 14493
Registriert: Mittwoch 14. Oktober 2015, 14:29

Man bekommt das auch threaded zum laufen (die Fehlermeldung sagt im Grunde schon wie), es bringt nur nix. Mit asyncio kannst du mehrere Tasks in EINEM thread (dem main tread eben) quasi-parallel ausführen lassen. Darum muss da weder threading noch multiprocessing her.

Und allen Unkenrufen zum trotz ist, wie schon von diversen Leuten angemerkt, das wahrscheinlichste problem einfach die Wartezeiten und Limitierungen der Server. Nicht dein Code, der irgendwas signifikantes zu rechnen hätte. Noch ein Grund weniger, multiprocessing zu verwenden.
Antworten