Dataframes aus verschiedenen Prozessen zusammenfügen

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
__deets__
User
Beiträge: 14543
Registriert: Mittwoch 14. Oktober 2015, 14:29

Vor allem hat C++ in der Standardimplementierung fuer map & set Implementierungen, die auf Baumsuche basieren. Darum muss man ja nur < und = implementieren fuer die Schluesseltypen. Handelt sich aber O(log(n)) ein. Anders als Python, das gleich zu Beginn auch alles hashen konnte. Mit unordered_map geht das inzwischen auch in C++, aber dafuer muss man dann eben auch eine hash-Funktion anbieten. Und da hat man dann eben O(1) (amortisiert....)
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Sirius3 hat geschrieben: Mittwoch 7. Juli 2021, 20:08 @mirko3107: lass Dich nicht von anderen Beitragenden zu diesem Thread verwirren, Wörterbuchzugriffe sind sehr performant. Statt dessen gilt hier das DRY-Prinzip "Don't repeat yourself": Code der nur einmal dasteht muß man auch nur einmal verstehen.

Was man auf keinen Fall benutzt, sind globale Variablen, erst recht nicht in Zusammenhang mit multiprocessing.

Benutze keine kryptischen Abkürzungen, sondern aussagekräftige Variablennamen.
`list` ist der Name einer eingebauten Klasse und sollte nicht überdeckt werden. Über einen Index iteriert man nicht, weil man auch direkt über die Zeilen Deines Dataframes iterieren könnte.
Strings stückelt man nicht mit + zusammen, vor allem nicht bei URL-Parameters, weil die nicht alle Zeichen enthalten dürfen.
Das Ergebnis eines requests.get ist kein req(est) sondern ein response.
`eps` ist mal eine Zahl und mal ein String, das sollte nicht sein.
Exceptions fängt man dort ab, wo sie auftreten.
Die erste Funktion könnte so aussehen:

Code: Alles auswählen

QUOTE_URL = "https://query1.finance.yahoo.com/v7/finance/quote"

def pack1():
    watch = pd.read_csv("watch.csv", usecols=['Ticker', 'Name', 'Price', 'Change', 'Cap', 'EPS'])
    while True:
        for index, ticker in enumerate(watch.Ticker):
            try:
                quote_response = requests.get(QUOTE_URL, params={"symbols": ticker})
                result = quote_response.json()['quoteResponse']['result'][0]
            except (KeyError, IndexError, RemoteDataError) as error:
                print(f"Failed reading {ticker}: {error}")
            else:
                watch.iloc[index].Change = round(result.get('regularMarketChangePercent', 0),2)
                watch.iloc[index].Price = round(result.get('regularMarketPrice', 0),2)
                watch.iloc[index].EPS = round(result.get('epsCurrentYear', 0),2)
                watch.iloc[index].Cap = round(result.get('marketCap', 0)/1000000000, 2)
            time.sleep(1)
        watch.to_csv('pack1.csv', header=True, index=False) 
        time.sleep(600)
Und natürlich dauert das relativ lang, wegen der vielen `sleep`.
Wenn ein Ticker aus einem unbekannten Problem nicht gelesen werden kann, enthält der noch die Werte des letzten Laufs. Ist das so gewollt? Aber was ist beim ersten Lauf? Ist 0 wirklich der Wert, den Du haben möchtest, wenn einer der Schlüssel nicht existiert?
Die Funktion macht aber viel zu viel.
Eigentlich müßte die Funktion so aussehen:

Code: Alles auswählen

QUOTE_URL = "https://query1.finance.yahoo.com/v7/finance/quote"

def query_quote(tickers):
    quote = []
    for ticker in tickers:
        try:
            quote_response = requests.get(QUOTE_URL, params={"symbols": ticker})
            result = quote_response.json()['quoteResponse']['result'][0]
        except (KeyError, IndexError, RemoteDataError) as error:
            print(f"Failed reading {ticker}: {error}")
            quote.append((ticker, 0, 0, 0, 0))
        else:
            quote.append((
                ticker,
                round(result.get('regularMarketPrice', 0),2),
                round(result.get('regularMarketChangePercent', 0),2),
                round(result.get('marketCap', 0)/1000000000, 2),
                round(result.get('epsCurrentYear', 0),2)
            ))
        time.sleep(1)
    return pd.DataFrame(quote, columns=['Ticker', 'Price', 'Change', 'Cap', 'EPS'])
Dann kann man sie flexibler einsetzen.

Das ganze packt man in eine äußere Schleife, wo man alle Quellen abfrägt, benutzt concurrent.futures.ThreadPoolExecutor um einfach parallelisieren zu können, und gleichzeitig die Funktionen samt Rückgabewert verwenden zu können und kann dann dort auch einfach die Ergebnisse irgendwie zusammenpacken.

@DasIch: beim parallelen Abfragen von der selben Quelle hat der Server bestimmt etwas dagegen.
Das ist natürlich eine Stufe, auf der ich noch lange nicht angekommen bin, aber vielen Dank für die Tips, ich werd mich dran versuchen.

Und von concurrent.futures.ThreadPoolExecutor hab ich noch nie was gehört, aber wie gesagt, bin totaler Python-Noob im ersten Level.
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

@mirko3107,

noch ein Hinweis:
Das von dir verwendete ib-insync
https://ib-insync.readthedocs.io/api.html
ist ein asynchrones Framework, daher ist aus meiner Sicht, asyncio von allen anderen Vorschlägen zu bevorzugen.
Besonders den Punkt "The One Rule" bzgl time.sleep() würde ich mir mal durchlesen.

Das was dich hier am stärksten limitiert ist wahrscheinlich die Beschränkung der API von Yahoo-Finance, denn die erlauben eine begrenzte Anzahl von Requests pro Stunde.
Ich würde dir empfehlen das entsprechen zu "tunen" um das Optimum da heraus zu holen, bzw. insgesamt weniger oft auf die API zuzugreifen.
Eine Sekunde pauschal zu warten ist sicher nicht nötig.
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Also ib_insync hab ich bisher nur per Multiprocessing ans laufen bekommen, bei Threading kommt "RuntimeError: There is no current event loop in thread 'Thread-7'", alle anderen Threads laufen problemlos.

Code: Alles auswählen

def earnings_iv():
    list = pd.read_csv("watch.csv", usecols=['Ticker', 'Earnings', 'IV'])
    rows = len(list.index)
    while ib.isConnected():
        for i in range(0, 10):
            try:
                ticker = list['Ticker'].values[i]
                contract = Stock(ticker, 'SMART', 'USD')
                ib.reqMarketDataType(1)
                ib.qualifyContractsAsync(contract)
                data = ib.reqMktData(contract, "106,100", False, False)
                ib.sleep(1)
                iv = round(data.impliedVolatility * 100, 2)
                earn = ib.reqFundamentalData(contract, 'CalendarReport')
                if type(earn) == str:
                    tree = ET.ElementTree(ET.fromstring(earn))
                    root = tree.getroot()
                    item = tree.find('.//Date')
                    earnings = item.text
                else:
                    earnings = '01/01/2000'
                print(ticker, earnings,iv)
                list.at[i, 'Earnings'] = earnings
                list.at[i, 'IV'] = iv
                ib.cancelMktData(contract)
            except (KeyError, IndexError) as error:
                print(f"Failed reading {ticker}: {error}")
                pass
        ib.disconnect()
        #list.to_csv('earnings_iv.csv', header=True, index=False, columns=['Ticker', 'Earnings', 'IV'])
        time.sleep(3600)
__deets__
User
Beiträge: 14543
Registriert: Mittwoch 14. Oktober 2015, 14:29

Man bekommt das auch threaded zum laufen (die Fehlermeldung sagt im Grunde schon wie), es bringt nur nix. Mit asyncio kannst du mehrere Tasks in EINEM thread (dem main tread eben) quasi-parallel ausführen lassen. Darum muss da weder threading noch multiprocessing her.

Und allen Unkenrufen zum trotz ist, wie schon von diversen Leuten angemerkt, das wahrscheinlichste problem einfach die Wartezeiten und Limitierungen der Server. Nicht dein Code, der irgendwas signifikantes zu rechnen hätte. Noch ein Grund weniger, multiprocessing zu verwenden.
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Ich dachte gerade, dass es durch paralleles Abfragen der Server etwas schneller geht, als alles nacheinander abzufragen.
DasIch
User
Beiträge: 2718
Registriert: Montag 19. Mai 2008, 04:21
Wohnort: Berlin

Der Grund wieso Abfragen lange dauern ist dass du auf den Server warten musst. Du könntest statt zu warten aber auch andere Dinge tun (asyncio). Multiprocessing ist dafür gedacht dass du gleichzeitig auf mehreren Cores in deinem Computer Dinge berechnest aber dass tust du nicht.
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Bin ja nur auf Multiprocessing geschwenkt, weil ich bei asyncio noch nicht den Durchblick habe, war für mich der einfachste Weg, nicht schön, aber funktioniert.
Ich hab jetzt auch die Abfragen an yahoo etwas gebündelt, um so auch ein paar Anfragen einzusparen.

Die Bremse ist ib_insync, weil die API von IB nicht die schnellste ist, daher wollte ich die Abfragen zu yahoo und IB parallel laufen lassen.
__deets__
User
Beiträge: 14543
Registriert: Mittwoch 14. Oktober 2015, 14:29

Und mit asyncio bekommst du das auch hin, das war ja mein ganzer Punkt. So funktioniert asyncio nunmal, man kann damit mehrere Anfragen parallel laufen lassen. Das beschleuningt eine einzelne Anfrage natuerlich nicht, aber das tun multiprocessing & co auch nicht.
LukeNukem
User
Beiträge: 232
Registriert: Mittwoch 19. Mai 2021, 03:40

snafu hat geschrieben: Donnerstag 8. Juli 2021, 05:50 Der Aufbau einer Datenstruktur nimmt mit zunehmender Komplexität und Datenmenge natürlich auch mehr Zeit in Anspruch. Aber langsam im Vergleich wozu? Was wäre deine Alternative und warum sollte man diese anstelle von Wörterbüchern nutzen bzw welchen immensen Zeitgewinn brächte sie? 🤔
Naja, wie oben gezeigt: es ist halt nicht sehr sinnvoll, immer und immer wieder über dieselben Schlüssel auf immer und immer wieder dieselben Elemente desselben Dictionary zuzugreifen. Das ist völlig überflüssig, wenn dieses Dictionary entsprechend groß ist, dann ist das nun einmal teuer... und, wie gesagt: völlig überflüssig. Meine Alternative habe ich in meinen Codebeispielen gezeigt, andernfalls könnte allerdings auch ein memoryview auf diesen Teilausschnitt des "großen" Dictionary eine gute Idee sein. Und ansonsten, wie gesagt: measure, don't guess -- alles andere ist Kaffeesatzleserei.
Sirius3
User
Beiträge: 17754
Registriert: Sonntag 21. Oktober 2012, 17:20

@LukeNukem: bei O(1) ist es völlig egal, wie groß das Wörterbuch ist. Performance spielt hier gar keine Rolle. Einzig die Performance des Lesers leidet spürbar. Aber da Du ja Experte bist, hast Du das bestimmt schon gemessen.
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Hab mal etwas komprimiert und hab durchaus den ganzen Yahoo-Kram beschleunigt.

Code: Alles auswählen

def yahoo():
    watch = pd.read_csv("watch.csv", usecols=['Ticker', 'Name', 'Price', 'Change', 'Cap', 'EPS', 'Volumen', '52Whigh', '52Wlow', 'Rating'])
    rows = len(watch.index)
    while True:
        run = time.time()
        for i in range(0, rows):
            try:
                ticker = watch['Ticker'].values[i]
                quote_url = 'https://query1.finance.yahoo.com/v7/finance/quote?symbols='+ticker
                quote_req = requests.get(quote_url)
                change = round(quote_req.json()['quoteResponse']['result'][0].get('regularMarketChangePercent', 0),2)
                price = round(quote_req.json()['quoteResponse']['result'][0].get('regularMarketPrice', 0),2)
                volume = quote_req.json()['quoteResponse']['result'][0].get('regularMarketVolume', 0)
                W52high = round(quote_req.json()['quoteResponse']['result'][0].get('fiftyTwoWeekHigh', 0),2)
                W52low = round(quote_req.json()['quoteResponse']['result'][0].get('fiftyTwoWeekLow', 0),2)
                rat1 = quote_req.json()['quoteResponse']['result'][0].get('averageAnalystRating', 'None')
                rating=''
                for i in rat1:
                    if i.isalpha():
                        rating = "".join([rating, i])
                if 'epsCurrentYear' in quote_req.json()['quoteResponse']['result'][0]:
                    eps = round(quote_req.json()['quoteResponse']['result'][0].get('epsCurrentYear', 0),2)
                else:
                    eps = '0'
                cap = round(quote_req.json()['quoteResponse']['result'][0].get('marketCap', 0)/1000000000, 2)
                #print(ticker,price,change,cap,eps,volume,W52high,W52low,rating)
                watch.at[i, 'Price'] = price
                watch.at[i, 'Change'] = change
                watch.at[i, 'Cap'] = cap
                watch.at[i, 'EPS'] = eps
                watch.at[i, 'Volumen'] = volume
                watch.at[i, '52Whigh'] = W52high
                watch.at[i, '52Wlow'] = W52low
                watch.at[i, 'Rating'] = rating
            except (KeyError, IndexError, RemoteDataError) as error:
                print(f"Failed reading {ticker}: {error}")
                pass
        watch.to_csv('pack1.csv', header=True, index=False)
        end = time.time()
        print('Yahoo',round(end-run,0))
        time.sleep(600)
        
def rsi():
    watch = pd.read_csv("watch.csv", usecols=['Ticker', 'RSI'])
    rows = len(watch.index)
    run1 = time.time()
    while True:
        for i in range(0, rows):
            try:
                ticker = watch['Ticker'].values[i]
                rsi = pdr.get_data_yahoo(ticker, dt.datetime(2021, 5, 1), dt.datetime.now(),session=session)
                #rsi = yf.download(ticker, dt.datetime(2021, 5, 1), dt.datetime.now())
                delta = rsi['Close'].diff()
                up = delta.clip(lower=0)
                down = -1 * delta.clip(upper=0)
                ema_up = up.ewm(com=13, adjust=False).mean()
                ema_down = down.ewm(com=13, adjust=False).mean()
                rs = ema_up / ema_down
                rsi['RSI'] = 100 - (100 / (1 + rs))
                rsi_value = rsi.iloc[-1]['RSI']
                #time.sleep(1)
                watch.at[i, 'RSI'] = round(rsi_value, 2)
                #print(ticker,rsi_value)
            except (KeyError, IndexError, RemoteDataError) as error:
                print(f"Failed reading {ticker}: {error}")
                watch.at[i, 'RSI'] = '0'
                pass
        watch.to_csv('rsi.csv', header=True, index=False, columns=['Ticker', 'RSI'])
        end1 = time.time()
        print('RSI',round(end1-run1,0))
        time.sleep(1800)

def sma():
    watch = pd.read_csv("watch.csv", usecols=['Ticker', 'SMA200'])
    rows = len(watch.index)
    run2 = time.time()
    while True:
        for i in range(0, rows):
            try:
                ticker = watch['Ticker'].values[i]
                sma = pdr.get_data_yahoo(ticker, dt.datetime(2020, 5, 1),session=session)
                #sma = yf.download(ticker, dt.datetime(2020, 5, 1))
                sma['SMA10'] = sma['Close'].rolling(10).mean()
                sma['SMA50'] = sma['Close'].rolling(50).mean()
                sma['SMA200'] = sma['Close'].rolling(200).mean()
                sma_value = sma.iloc[-1]['SMA200']
                #print(ticker, sma_value)
                watch.at[i, 'SMA200'] = round(sma_value, 2)
                #time.sleep(1)
            except (KeyError, IndexError, RemoteDataError) as error:
                print(f"Failed reading {ticker}: {error}")
                pass
        watch.to_csv('sma.csv', header=True, index=False, columns=['Ticker', 'SMA200'])
        end2 = time.time()
        print('SMA:',round(end2-run2,0))
        time.sleep(1800)
                
def stoch():
    watch = pd.read_csv("watch.csv", usecols=['Ticker', 'Stoch'])
    run3 = time.time()
    rows = len(watch.index)
    while True:
        for i in range(0, rows):
            try:
                ticker = watch['Ticker'].values[i]
                stoch = pdr.get_data_yahoo(ticker, dt.datetime(2021, 5, 1), dt.datetime.now(),session=session)
                #stoch = yf.download(ticker, dt.datetime(2021, 5, 1), dt.datetime.now())
                stoch['14-high'] = stoch['High'].rolling(14).max()
                stoch['14-low'] = stoch['Low'].rolling(14).min()
                stoch['%K'] = (stoch['Close'] - stoch['14-low']) * 100 / (stoch['14-high'] - stoch['14-low'])
                stoch['%D'] = stoch['%K'].rolling(3).mean()
                stoch2 = stoch.iloc[-1]['%D']
                watch.at[i, 'Stoch'] = round(stoch2, 2)
                #time.sleep(1)
                #print(ticker,stoch2)
            except (KeyError, IndexError, RemoteDataError) as error:
                print(f"Failed reading {ticker}: {error}")
                pass
        watch.to_csv('stoch.csv', header=True, index=False, columns=['Ticker', 'Stoch'])
        end3 = time.time()
        print('Stoch:',round(end3-run3,0))
        time.sleep(1800)
Hier mal die Zeiten in s:
RSI 91.0
Stoch: 94.0
SMA: 95.0
Yahoo: 135.0

Jetzt muss ich nur noch den IB-Part überreden, beim Threading mitzumachen, dann wäre alles ok.

Code: Alles auswählen

def earnings_iv():
    watch = pd.read_csv("watch.csv", usecols=['Ticker', 'Earnings', 'IV'])
    rows = len(watch.index)
    run4 = time.time()
    while ib.isConnected():
        for i in range(0, 10):
            try:
                ticker = watch['Ticker'].values[i]
                contract = Stock(ticker, 'SMART', 'USD')
                ib.reqMarketDataType(1)
                ib.qualifyContracts(contract)
                data = ib.reqMktData(contract, "106,100", False, False)
                ib.sleep(1)
                iv = round(data.impliedVolatility * 100, 2)
                earn = ib.reqFundamentalData(contract, 'CalendarReport')
                if type(earn) == str:
                    tree = ET.ElementTree(ET.fromstring(earn))
                    root = tree.getroot()
                    item = tree.find('.//Date')
                    earnings = item.text
                else:
                    earnings = '01/01/2000'
                print(ticker, earnings,iv)
                watch.at[i, 'Earnings'] = earnings
                watch.at[i, 'IV'] = iv
                ib.cancelMktData(contract)
            except (KeyError, IndexError) as error:
                print(f"Failed reading {ticker}: {error}")
                pass
        ib.disconnect()
        watch.to_csv('earnings_iv.csv', header=True, index=False, columns=['Ticker', 'Earnings', 'IV'])
        end4 = time.time()
        print('Stoch:',round(end4-run4,0))
        time.sleep(3600)
Error:

Code: Alles auswählen

Exception in thread Thread-5:
Traceback (most recent call last):
  File "/usr/lib/python3.9/threading.py", line 954, in _bootstrap_inner
    self.run()
  File "/usr/lib/python3.9/threading.py", line 892, in run
    self._target(*self._args, **self._kwargs)
  File "/home/mirko/stocks/streamlit/test/thread_watch.py", line 161, in earnings_iv
    ib.reqMarketDataType(1)
  File "/home/mirko/.local/lib/python3.9/site-packages/ib_insync/ib.py", line 1118, in reqMarketDataType
    self.client.reqMarketDataType(marketDataType)
  File "/home/mirko/.local/lib/python3.9/site-packages/ib_insync/client.py", line 815, in reqMarketDataType
    self.send(59, 1, marketDataType)
  File "/home/mirko/.local/lib/python3.9/site-packages/ib_insync/client.py", line 265, in send
    self.sendMsg(msg.getvalue())
  File "/home/mirko/.local/lib/python3.9/site-packages/ib_insync/client.py", line 268, in sendMsg
    loop = asyncio.get_event_loop()
  File "/usr/lib/python3.9/asyncio/events.py", line 642, in get_event_loop
    raise RuntimeError('There is no current event loop in thread %r.'
RuntimeError: There is no current event loop in thread 'Thread-5'.
LukeNukem
User
Beiträge: 232
Registriert: Mittwoch 19. Mai 2021, 03:40

DasIch hat geschrieben: Donnerstag 8. Juli 2021, 09:07 Um mal "zentrales Element" von Sirius3 etwas auszuführen: In CPython ist im Prinzip jedes Objekt letztendlich ein Dictionary, ggfs. mit bisschen Kram drumherum. Jeder Attributzugriff macht mindestens einen dict lookup. Der Zugriff auf eine Variable, sofern sie nicht lokal zu einer Funktion sind, führt zu einem dict lookup. Das erstellen eines Objektes führt zwangsläufig auch dazu dass ein dict erstellt wird, es sei den man hat __slots__ definiert oder sie ist in C implementiert. Die Konsequenz ist natürlich auch dass das erstellen von allen Objekten viel speicherintensiver ist als es in z.B. C oder Rust wäre. Deswegen nutzt man ja übrigens auch Dinge wie numpy oder pandas.

Du solltest auch bedenken dass das anlegen von Objekten auf der Heap in Sprachen wie Python massiv optimiert ist und i.d.R. nicht einem naiven malloc() o.ä. wie in C oder C++ entspricht. Das anlegen von Objekten und damit auch dicts ist wahrscheinlich deutlich schneller als du erwartest.
Das ist lieb, daß Du uns an Dinge erinnerst, die ich zwar schon lange weiß, die aber möglicherweise informativ und hilfreich für andere Mitlesende sind. Außerdem bieten numpy und das darauf basierende Pandas ja noch ein bisschen mehr als nur eine bessere Speichereffizienz. ;-)

Aber kommen wir zum Eingemachten: das Dumme ist halt, daß ich in den letzten acht Jahren beruflich wenig anderes gemacht habe als Massendatenverarbeitung unter Echtzeitbedingungen, genauer: die Verarbeitung von Transaktionsdaten zur Betrugsdetektion und -prävention. Wenn man so etwas über längere Zeiträume hinweg macht, dann gewinnt man eine... ziemlich innige Beziehung dazu, was eine Skriptsprache auf dem System und was das System dann auf der Hardware macht. Insofern habe ich vielleicht ein bisschen Erfahrung in Randbereichen, die die meisten anderen Entwickler vielleicht nicht jeden Tag sehen, und versuche meine Erfahrungen aus diesen Bereichen hier einzubringen.

De facto ist es nun einmal auch so, daß wir gar nicht wissen, wie die Netzwerkanbindung des TO ist, wie die Serverseite aussieht, und, vor allem: wie die Daten aussehen. Aber das das hier:

Code: Alles auswählen

d['a']['b']['c'].get('eins')
d['a']['b']['c'].get(''zwei')
d['a']['b']['c'].get('drei')
d['a']['b']['c'].get('vier')
mehrmals dieselben Zugriffe auf dasselbe Dictionary macht, während

Code: Alles auswählen

dummy = d['a']['b']['c']
dummy.get('eins')
dummy.get(''zwei')
dummy.get('drei')
dummy.get('vier')
sich diesen Unfug spart, ist für einen Profi wie Dich sicherlich offensichtlich und dürfte mit wenigen Sekunden des Nachdenkens sogar einem Hobbyisten klar werden. Nun könnte man zwar vermuten, daß die zweite Variante eine neue Speicherallokation provoziert, allerdings... einerseits optimiert Python hier schon ziemlich gut (warum gibt es eigentlich das Modul "copy"?) und auf der anderen Seite hat da womöglich auch das Betriebssystem bzw. dessen Speicherverwaltung noch etwas beizutragen, etwa mit Copy-On-Write und Same-Page-Merging.

Darüber hinaus, bitte verzeih' den Widerspruch: Python nutzt unter der Haube selbstverständlich die ganz normale malloc(3)-Funktion. Daß Python dessen Benutzung stark optimiert (wie gesagt, think "copy"-Modul), ist diesseits bekannt. Aber trotzdem danke für den Hinweis, der für andere Leser vielleicht wertvoll ist.

Übrigens wurde hier mehrmals gesagt, daß der Zugriff auf ein Dictionary eine Zeitkomplexität von O(1) hätte. Während das für kleine bis mittelgroße Dictionaries zweifellos der Fall ist, ist es das für wirklich große Dictionaries, bei denen die Kollisionswahrscheinlichkeit der Hashfunktion für die Keys steigt, leider nicht mehr -- da kann die Zeitkomplexität für Zugriffe im schlimmsten Fall sogar O(n) betragen. Das sieht auch das Python-Wiki so, wie hier [1] (unten) gerne nachgelesen werden kann.

Nun, wie dem auch sei: an der Ausführung eines Python-Programms sind neben dem Programm selbst noch einige andere beteiligt, etwa der Python-Interpreter (Überraschung!) und das Betriebssystem. Beide machen jeweils ihre eigenen Optimierungen, die reproduzierbar, aber häufig nicht unbedingt vorhersagbar sind. Deswegen ist es eher müßig, sich Gedanken über Zeitkomplexität zu machen, weil die reale Implementierung sich im Zweifelsfall dank der Optimierung anders verhält als unsere klugen Gedanken. Und da Python mit den Modulen "profile" und "cProfile" bereits recht leistungsfähige Profiler mitbringt, ist es einfacher und zielführender, diese Profiler einfach zu benutzen, ganz im Sinne von Kirk Pepperdine: "Measure, don't guess". (Dasselbe gilt übrigens auch für Programme in kompilierten Sprachen wie C und C++, da der Compiler dabei je nach Einstellung sehr umfangreiche und für Nicht-Experten schwer vorhersagbare Optimierungen vornehmen kann.)

[1] https://wiki.python.org/moin/TimeComplexity
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Den IB-Part lasse ich nun "normal" laufen, also ohne Threading, dauert ca. 1800s, liegt aber an der lahmen IB-API.

Meine Hauptfrage ist immer noch, wie ich die jeweiligen Dataframes aus den Threads nun in ein Gesamt-Dataframe bekomme.
LukeNukem
User
Beiträge: 232
Registriert: Mittwoch 19. Mai 2021, 03:40

mirko3107 hat geschrieben: Donnerstag 8. Juli 2021, 17:12 Den IB-Part lasse ich nun "normal" laufen, also ohne Threading, dauert ca. 1800s, liegt aber an der lahmen IB-API.

Meine Hauptfrage ist immer noch, wie ich die jeweiligen Dataframes aus den Threads nun in ein Gesamt-Dataframe bekomme.
https://pandas.pydata.org/pandas-docs/s ... oncat.html
Sirius3
User
Beiträge: 17754
Registriert: Sonntag 21. Oktober 2012, 17:20

LukeNukem hat geschrieben: Donnerstag 8. Juli 2021, 16:12 Nun könnte man zwar vermuten, daß die zweite Variante eine neue Speicherallokation provoziert, allerdings... einerseits optimiert Python hier schon ziemlich gut (warum gibt es eigentlich das Modul "copy"?) und auf der anderen Seite hat da womöglich auch das Betriebssystem bzw. dessen Speicherverwaltung noch etwas beizutragen, etwa mit Copy-On-Write und Same-Page-Merging.
Ich weiß nicht, welche Vorstellungen Du da hast. Dir sagt der Begriff "Referenz" etwas?
Benutzeravatar
snafu
User
Beiträge: 6741
Registriert: Donnerstag 21. Februar 2008, 17:31
Wohnort: Gelsenkirchen

LukeNukem hat geschrieben: Donnerstag 8. Juli 2021, 16:12 das Dumme ist halt, daß ich in den letzten acht Jahren beruflich wenig anderes gemacht habe als Massendatenverarbeitung unter Echtzeitbedingungen, genauer: die Verarbeitung von Transaktionsdaten zur Betrugsdetektion und -prävention. Wenn man so etwas über längere Zeiträume hinweg macht, dann gewinnt man eine... ziemlich innige Beziehung dazu, was eine Skriptsprache auf dem System und was das System dann auf der Hardware macht. Insofern habe ich vielleicht ein bisschen Erfahrung in Randbereichen, die die meisten anderen Entwickler vielleicht nicht jeden Tag sehen, und versuche meine Erfahrungen aus diesen Bereichen hier einzubringen.
Massendaten-Verarbeitung mit Python als Randbereich anzupreisen, lässt schon tief blicken. Das macht dich nicht zum Guru und dies zeigen auch deine Aussagen hier im Thread. Du scheinst dich ja echt sehr oft missverständlich auszudrücken. Oder will sich vielleicht jemand herausreden, wenn er korrigiert wird...? :o
mirko3107
User
Beiträge: 75
Registriert: Freitag 23. April 2021, 15:42

Hab eben festgestellt, dass sich mein Eingangsdataframe nicht von meinem Ausgangsdataframe unterscheidet, es werden quasi keine Werte überschrieben.
Das gleiche Script mit Multiprocessing funktioniert hingegen.

Code: Alles auswählen

def yahoo():
    watch = pd.read_csv("watch.csv", usecols=['Ticker', 'Name', 'Price', 'Change', 'Cap', 'EPS', 'Volumen', '52Whigh', '52Wlow', 'Rating'])
    rows = len(watch.index)
    while True:
        run = time.time()
        for i in range(0, rows):
            try:
                ticker = watch['Ticker'].values[i]
                quote_url = 'https://query1.finance.yahoo.com/v7/finance/quote?symbols='+ticker
                quote_req = requests.get(quote_url)
                change = round(quote_req.json()['quoteResponse']['result'][0].get('regularMarketChangePercent', 0),2)
                price = round(quote_req.json()['quoteResponse']['result'][0].get('regularMarketPrice', 0),2)
                volume = quote_req.json()['quoteResponse']['result'][0].get('regularMarketVolume', 0)
                W52high = round(quote_req.json()['quoteResponse']['result'][0].get('fiftyTwoWeekHigh', 0),2)
                W52low = round(quote_req.json()['quoteResponse']['result'][0].get('fiftyTwoWeekLow', 0),2)
                rat1 = quote_req.json()['quoteResponse']['result'][0].get('averageAnalystRating', 'None')
                rating=''
                for i in rat1:
                    if i.isalpha():
                        rating = "".join([rating, i])
                if 'epsCurrentYear' in quote_req.json()['quoteResponse']['result'][0]:
                    eps = round(quote_req.json()['quoteResponse']['result'][0].get('epsCurrentYear', 0),2)
                else:
                    eps = '0'
                cap = round(quote_req.json()['quoteResponse']['result'][0].get('marketCap', 0)/1000000000, 2)
                #print(ticker,price,change,cap,eps,volume,W52high,W52low,rating)
                watch.at[i, 'Price'] = price
                watch.at[i, 'Change'] = change
                watch.at[i, 'Cap'] = cap
                watch.at[i, 'EPS'] = eps
                watch.at[i, 'Volumen'] = volume
                watch.at[i, '52Whigh'] = W52high
                watch.at[i, '52Wlow'] = W52low
                watch.at[i, 'Rating'] = rating
            except (KeyError, IndexError, RemoteDataError) as error:
                print(f"Failed reading {ticker}: {error}")
                pass
        watch.to_csv('pack1.csv', header=True, index=False)
        time.sleep(600)
Benutzeravatar
__blackjack__
User
Beiträge: 13116
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@LukeNukem: Der amortisierte „worst case“ hat nichts mit der Grösse des Wörterbuchs zu tun. Ich weiss nicht wo Du das aus der Wikiseite heraus liest. Der betrifft *jede* Wörterbuchgrösse.
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

Code: Alles auswählen

import aiohttp
import asyncio
import time
import statistics

loop = asyncio.get_event_loop()


async def get_result():
    async with aiohttp.ClientSession() as session:
        async with session.get(
            "https://query1.finance.yahoo.com/v7/finance/quote?symbols=GOOG"
        ) as response:
            response.raise_for_status()
            return await response.json()


async def no_data_storage():
    await asyncio.gather(*[get_result() for _ in range(1000)])


async def with_data_storage():
    response = await asyncio.gather(*[get_result() for _ in range(1000)])


def measure(measure_func):
    times = []
    for _ in range(10):
        start = time.perf_counter()
        loop.run_until_complete(measure_func())
        delta = time.perf_counter() - start
        times.append(delta)
        print(f"{delta:10.3f} s")

    print(f"Durchschnitt: {statistics.mean(times):0.3f}")


print("Ohne Speichern")
measure(no_data_storage)
print()
print("Mit Speichern")
measure(with_data_storage)
Ausgabe:

Code: Alles auswählen

Ohne Speichern
    14.601 s
    27.252 s
    20.859 s
    43.963 s
    33.140 s
    28.015 s
    16.654 s
    18.356 s
    15.417 s
    20.989 s
Durchschnitt: 23.925

Mit Speichern
    21.014 s
    20.790 s
    27.653 s
    19.263 s
    19.793 s
    29.764 s
    27.843 s
    18.609 s
    28.514 s
    27.414 s
Durchschnitt: 24.066
10 mal 1000 Anfragen an Yahoo-Finance. Beim zweiten Durchlauf werden die JSON-Daten jeweils zur möglichen Weiterverarbeitung in einen Dictionary gespeichert.
Antworten