Hallo zusammen,
ich versuche mit seit einigen Tagen an Python und hätte mal eine Frage zu weiteren Vorgehen.
Ich habe ein Script geschrieben, welches diverse Finanzdaten aus mehreren Quellen zusammenträgt und in ein Dataframe schreibt.
Das Script verarbeitet ein Dataframe mit 700 Zeilen über eine Schleife.
Da mir das ganze doch arg lange dauert (ca.40min), hab ich mich mit Multiprocessing beschäftigt und das Script in mehrere Prozesse
aufgeteilt, wodurch sich die Dauer auf 15min reduziert hat.
Wie kann ich die Ergebnisse aus jedem Prozess in ein Gesamt-Dataframe zusammenfassen?
Momentan mach ich es so, dass jeder Prozess sein Dataframe in eine CSV schreibt, welche ich in einem anderen Script wieder zu einem Dataframe
zusammenbaue.
Das geht doch sicherlich auch einfacher, oder?
Danke
mirko3107
Dataframes aus verschiedenen Prozessen zusammenfügen
Bedeutet das zusammentragen aus mehreren Quellen, dass die Abfragen aus dem Internet sehr lange dauern? Dann hast Du ein IO-bound Problem.
Solche Probleme löst man nicht mit multiprocessing, sondern mit einem Prozess, z.B. per asyncio.
Um genaueres zu sagen, müßte man aber den Programmcode kennen.
Solche Probleme löst man nicht mit multiprocessing, sondern mit einem Prozess, z.B. per asyncio.
Um genaueres zu sagen, müßte man aber den Programmcode kennen.
Hier mal das Script, nix professionelles, aber es funktioniert.
Code: Alles auswählen
import multiprocessing
import xml.etree.ElementTree as ET
import datetime as dt
from datetime import date
from ib_insync import *
import requests
import requests_cache
import pandas as pd
import pandas_datareader as pdr
from pandas_datareader._utils import RemoteDataError
import yfinance as yf
import time
ib = IB()
ib.connect('127.0.0.1', 7496, clientId=12)
expire_after = dt.timedelta(days=3)
session = requests_cache.CachedSession(cache_name='cache', backend='sqlite',expire_after=expire_after)
yf.pdr_override()
start_date = '01-01-2021'
end_date = '07-01-2021'
start = dt.datetime.strptime(start_date, '%d-%m-%Y')
def pack1():
list = pd.read_csv("watch.csv", usecols=['Ticker', 'Name', 'Price', 'Change', 'Cap', 'EPS'])
rows = len(list.index)
while True:
for i in range(0, rows):
try:
ticker = list['Ticker'].values[i]
quote_url = 'https://query1.finance.yahoo.com/v7/finance/quote?symbols='+ticker
quote_req = requests.get(quote_url)
change = round(quote_req.json()['quoteResponse']['result'][0].get('regularMarketChangePercent', 0),2)
price = round(quote_req.json()['quoteResponse']['result'][0].get('regularMarketPrice', 0),2)
if 'epsCurrentYear' in quote_req.json()['quoteResponse']['result'][0]:
eps = round(quote_req.json()['quoteResponse']['result'][0].get('epsCurrentYear', 0),2)
else:
eps = '0'
cap = round(quote_req.json()['quoteResponse']['result'][0].get('marketCap', 0)/1000000000, 2)
time.sleep(1)
print(ticker,change,price,cap)
list.at[i, 'Price'] = price
list.at[i, 'Change'] = change
list.at[i, 'Cap'] = cap
list.at[i, 'EPS'] = eps
except KeyError as ke:
print(ke)
pass
except TypeError as te:
print(te)
pass
except RemoteDataError as rde:
print(rde)
pass
list.to_csv('pack1.csv', header=True, index=False)
time.sleep(600)
def pack2():
list = pd.read_csv("watch.csv", usecols=['Ticker', 'Volumen', '52Whigh', '52Wlow'])
rows = len(list.index)
while True:
for i in range(0, rows):
try:
ticker = list['Ticker'].values[i]
url_cap = 'https://query2.finance.yahoo.com/v10/finance/quoteSummary/'
url_cap2 = '?formatted=true&crumb=8ldhetOu7RJ&lang=en-US®ion=US&modules=summaryDetail&corsDomain=finance.yahoo.com'
cap_url = url_cap + ticker + url_cap2
req_cap = requests.get(cap_url)
volume = req_cap.json()['quoteSummary']['result'][0]['summaryDetail']['volume'].get('raw', 0)
high = req_cap.json()['quoteSummary']['result'][0]['summaryDetail']['fiftyTwoWeekHigh'].get('raw', 0)
low = req_cap.json()['quoteSummary']['result'][0]['summaryDetail']['fiftyTwoWeekLow'].get('raw', 0)
time.sleep(1)
print(ticker,volume,high,low)
list.at[i, 'Volumen'] = volume
list.at[i, '52Whigh'] = round(high, 2)
list.at[i, '52Wlow'] = round(low, 2)
except KeyError as ke:
print(ke)
pass
except TypeError as te:
print(te)
pass
except RemoteDataError as rde:
print(rde)
pass
list.to_csv('pack2.csv', header=True, index=False, columns=['Ticker', 'Volumen', '52Whigh', '52Wlow'])
time.sleep(180)
def rating():
list = pd.read_csv("watch.csv", usecols=['Ticker', 'Rating'])
rows = len(list.index)
while True:
for i in range(0, rows):
try:
ticker = list['Ticker'].values[i]
rat1_url = 'https://query2.finance.yahoo.com/v10/finance/quoteSummary/'
rat2_url = '?formatted=true&crumb=swg7qs5y9UP&lang=en-US®ion=US&' \
'modules=upgradeDowngradeHistory,recommendationTrend,' \
'financialData,earningsHistory,earningsTrend,industryTrend&' \
'corsDomain=finance.yahoo.com'
rat_url = rat1_url + ticker + rat2_url
rat_req = requests.get(rat_url)
rating = rat_req.json()['quoteSummary']['result'][0]['financialData'].get('recommendationKey', 'None')
time.sleep(1)
print(ticker,rating)
list.at[i, 'Rating'] = rating
except KeyError as ke:
print(ke)
pass
except TypeError as te:
print(te)
pass
except RemoteDataError as rde:
print(rde)
pass
list.to_csv('rating.csv', header=True, columns=['Ticker', 'Rating'])
time.sleep(86400)
def rsi():
list = pd.read_csv("watch.csv", usecols=['Ticker', 'RSI'])
rows = len(list.index)
while True:
for i in range(0, rows):
try:
ticker = list['Ticker'].values[i]
rsi = pdr.get_data_yahoo(ticker, dt.datetime(2021, 5, 1), dt.datetime.now(),session=session)
delta = rsi['Close'].diff()
up = delta.clip(lower=0)
down = -1 * delta.clip(upper=0)
ema_up = up.ewm(com=13, adjust=False).mean()
ema_down = down.ewm(com=13, adjust=False).mean()
rs = ema_up / ema_down
rsi['RSI'] = 100 - (100 / (1 + rs))
rsi_value = rsi.iloc[-1]['RSI']
list.at[i, 'RSI'] = round(rsi_value, 2)
print(ticker,rsi_value)
except KeyError as ke:
print(ke)
pass
except TypeError as te:
print(te)
pass
except RemoteDataError as rde:
print(rde)
list.at[i, 'RSI'] = '0'
pass
list.to_csv('rsi.csv', header=True, index=False, columns=['Ticker', 'RSI'])
print('RSI CSV written')
time.sleep(1800)
def sma():
list = pd.read_csv("watch.csv", usecols=['Ticker', 'SMA200'])
rows = len(list.index)
while True:
for i in range(0, rows):
try:
ticker = list['Ticker'].values[i]
sma = pdr.get_data_yahoo(ticker, dt.datetime(2020, 5, 1),session=session)
#sma = yf.download(ticker, dt.datetime(2020, 5, 1))
sma['SMA10'] = sma['Close'].rolling(10).mean()
sma['SMA50'] = sma['Close'].rolling(50).mean()
sma['SMA200'] = sma['Close'].rolling(200).mean()
sma_value = sma.iloc[-1]['SMA200']
print(ticker, sma_value)
list.at[i, 'SMA200'] = round(sma_value, 2)
#time.sleep(1)
except KeyError as ke:
print(ke)
pass
except TypeError as te:
print(te)
pass
except RemoteDataError as rde:
print(rde)
pass
list.to_csv('sma.csv', header=True, index=False, columns=['Ticker', 'SMA200'])
time.sleep(180)
def stoch():
list = pd.read_csv("watch.csv", usecols=['Ticker', 'Stoch'])
rows = len(list.index)
while True:
for i in range(0, rows):
try:
ticker = list['Ticker'].values[i]
stoch = pdr.get_data_yahoo(ticker, dt.datetime(2021, 5, 1), dt.datetime.now(),session=session)
stoch['14-high'] = stoch['High'].rolling(14).max()
stoch['14-low'] = stoch['Low'].rolling(14).min()
stoch['%K'] = (stoch['Close'] - stoch['14-low']) * 100 / (stoch['14-high'] - stoch['14-low'])
stoch['%D'] = stoch['%K'].rolling(3).mean()
stoch2 = stoch.iloc[-1]['%D']
list.at[i, 'Stoch'] = round(stoch2, 2)
print(ticker,stoch2)
except KeyError as ke:
print(ke)
pass
except TypeError as te:
print(te)
pass
except RemoteDataError as rde:
print(rde)
pass
list.to_csv('stoch.csv', header=True, index=False, columns=['Ticker', 'Stoch'])
time.sleep(180)
def earnings_iv():
list = pd.read_csv("watch.csv", usecols=['Ticker', 'Earnings', 'IV'])
rows = len(list.index)
while ib.isConnected():
for i in range(0, rows):
try:
ticker = list['Ticker'].values[i]
contract = Stock(ticker, 'SMART', 'USD')
ib.reqMarketDataType(1)
ib.qualifyContracts(contract)
data = ib.reqMktData(contract, "106,100", False, False)
ib.sleep(1)
iv = round(data.impliedVolatility * 100, 2)
earn = ib.reqFundamentalData(contract, 'CalendarReport')
if type(earn) == str:
tree = ET.ElementTree(ET.fromstring(earn))
root = tree.getroot()
item = tree.find('.//Date')
earnings = item.text
else:
earnings = '01/01/2000'
print(ticker, earnings,iv)
list.at[i, 'Earnings'] = earnings
list.at[i, 'IV'] = iv
ib.cancelMktData(contract)
except KeyError as ke:
print(ke)
pass
except TypeError as te:
print(te)
pass
ib.disconnect()
list.to_csv('earnings_iv.csv', header=True, index=False, columns=['Ticker', 'Earnings', 'IV'])
time.sleep(86400)
def main():
p1 = multiprocessing.Process(target=rsi)
p2 = multiprocessing.Process(target=sma)
p3 = multiprocessing.Process(target=stoch)
p4 = multiprocessing.Process(target=rating)
p5 = multiprocessing.Process(target=pack1)
p6 = multiprocessing.Process(target=pack2)
p7 = multiprocessing.Process(target=earnings_iv)
p1.start()
p2.start()
p3.start()
p4.start()
p5.start()
p6.start()
p7.start()
if __name__ == "__main__":
main()
IMHO wäre es sinnvoller, erstmal die Basics glattzuziehen, denn so... wie sage ich das... gehst Du eine ungewöhnliche Intimität mit Deinem eigenen Knie ein. Das Parsen von JSON-Daten muß zeichenweise erfolgen und ist aus diesem Grund eine recht aufwändige Veranstaltung, und Dictionaries sind jetzt auch nicht unbedingt das performanteste Feature von Python.mirko3107 hat geschrieben: ↑Mittwoch 7. Juli 2021, 14:44 Hier mal das Script, nix professionelles, aber es funktioniert.
Code: Alles auswählen
change = round(quote_req.json()['quoteResponse']['result'][0].get('regularMarketChangePercent', 0),2) price = round(quote_req.json()['quoteResponse']['result'][0].get('regularMarketPrice', 0),2) if 'epsCurrentYear' in quote_req.json()['quoteResponse']['result'][0]: eps = round(quote_req.json()['quoteResponse']['result'][0].get('epsCurrentYear', 0),2) else: eps = '0' cap = round(quote_req.json()['quoteResponse']['result'][0].get('marketCap', 0)/1000000000, 2)
Code: Alles auswählen
data = quote_req.json()['quoteResponse']['result'][0]
change = round(data.get('regularMarketChangePercent', 0),2)
price = round(data.get('regularMarketPrice', 0),2)
eps = round(data.get('epsCurrentYear', 0),2)
cap = round(data.get('marketCap', 0)/1000000000, 2)
Code: Alles auswählen
data = quote_req.json()['quoteResponse']['result'][0]
results = {key: round(data.get(key, 0), 2) for key in ('regularMarketChangePercent', 'regularMarketPrice', 'epsCurrentYear', 'marketCap')}
results['market_cap'] = round(results['market_cap']/1000000000, 2)
Neben solcher Kleinigkeiten gibt es da so ein paar Leitsätze, die für jede Performanceoptimierung gelten, darunter: "Measure, don't guess" von Kirk Pepperdine. Genau das hast Du aber nicht getan, und einige der anderen Threadteilnehmer wohl auch nicht. Es gibt aber im Python-Universum sehr probate Mittel für solche Messungen, etwa die Module "profile" und "cProfile". Eine andere Möglichkeit können altbekannte und etablierte UNIX-Programme wie time(1), top(1) und ps(1) bieten, aber im Kern bleibt es immer dasselbe: Du solltest zunächst herausfinden, womit Dein Programm so viel Zeit vertrödelt, und dann genau dort gezielt ansetzen. Und ja: es ist eine durchaus valide Annahme, daß das Problem die I/O-Waits sein könnten -- wobei sich multithreading dann wohl eher anböte als multiprocessing, noch eher aber concurrent.futures oder asyncio -- aber es kann natürlich auch an allem anderen liegen: seien es die vielen Aufrufe von req_json(), die Dict-Lookups, oder die vielen, vielen manuellen Schreibzugriffe auf Deine bereits vorhandenen Dataframes, ... genau, oder. Measure, don't guess. HTH!
Beim unnötigen wiederholten Parsen der JSON-Daten stimme ich zu. Das sollte man immer nur einmal pro Datensatz machen und das jeweilige Ergebnis ansprechen. Was ich jedoch nicht teile, ist die Aussage, dass Wörterbuch-Zugriffe langsam wären. Eher das Gegenteil ist der Fall.
Man kann über so ein Problem auch nachdenken und vielleicht hat man auch ein bisschen intuitives Verständnis davon wie lange typische Operationen dauern. Zur groben Einordnung hilft z.b. dies. Hat man dieses Verständnis sollte auch ganz ohne Profiler klar sein dass Code der einen O(n) Algorithmus hat und sequentiell n HTTP Requests macht, wohl eher nicht wegen einem dict lookup (< <1ms), JSON parsing (<1ms) oder Schreibzugriffe auf einem Dataframe (<<1ms) langsam ist sondern dass diese HTTP Requests (<~1s) wohl eher das Problem darstellen. Schliesslich müsste man so um die 1000 dict lookups pro Iteration machen um in den Bereich eines HTTP Requests zu kommen.Neben solcher Kleinigkeiten gibt es da so ein paar Leitsätze, die für jede Performanceoptimierung gelten, darunter: "Measure, don't guess" von Kirk Pepperdine. Genau das hast Du aber nicht getan, und einige der anderen Threadteilnehmer wohl auch nicht
Ein bessere Ansatz wäre z.b. bei `pack1` alle Ticker Werte zu nehmen, die Requests alle nebenläufig zu machen (mit Threads oder asyncio), die Ergebnis dann zu sammeln daraus ein Dataframe zu bauen und dies mit dem input zu auf `Ticker` zu mergen. Im Prinzip macht man dann bei den anderen Funktionen genauso, also die Daten möglichst nebenläufig alle auf einmal ziehen um nicht auf einzelne Requests zu warten und sobald alles im Speicher ist möglichst in großen Schritten verarbeiten um pandas auszunutzen.
@mirko3107: lass Dich nicht von anderen Beitragenden zu diesem Thread verwirren, Wörterbuchzugriffe sind sehr performant. Statt dessen gilt hier das DRY-Prinzip "Don't repeat yourself": Code der nur einmal dasteht muß man auch nur einmal verstehen.
Was man auf keinen Fall benutzt, sind globale Variablen, erst recht nicht in Zusammenhang mit multiprocessing.
Benutze keine kryptischen Abkürzungen, sondern aussagekräftige Variablennamen.
`list` ist der Name einer eingebauten Klasse und sollte nicht überdeckt werden. Über einen Index iteriert man nicht, weil man auch direkt über die Zeilen Deines Dataframes iterieren könnte.
Strings stückelt man nicht mit + zusammen, vor allem nicht bei URL-Parameters, weil die nicht alle Zeichen enthalten dürfen.
Das Ergebnis eines requests.get ist kein req(est) sondern ein response.
`eps` ist mal eine Zahl und mal ein String, das sollte nicht sein.
Exceptions fängt man dort ab, wo sie auftreten.
Die erste Funktion könnte so aussehen:
Und natürlich dauert das relativ lang, wegen der vielen `sleep`.
Wenn ein Ticker aus einem unbekannten Problem nicht gelesen werden kann, enthält der noch die Werte des letzten Laufs. Ist das so gewollt? Aber was ist beim ersten Lauf? Ist 0 wirklich der Wert, den Du haben möchtest, wenn einer der Schlüssel nicht existiert?
Die Funktion macht aber viel zu viel.
Eigentlich müßte die Funktion so aussehen:
Dann kann man sie flexibler einsetzen.
Das ganze packt man in eine äußere Schleife, wo man alle Quellen abfrägt, benutzt concurrent.futures.ThreadPoolExecutor um einfach parallelisieren zu können, und gleichzeitig die Funktionen samt Rückgabewert verwenden zu können und kann dann dort auch einfach die Ergebnisse irgendwie zusammenpacken.
@DasIch: beim parallelen Abfragen von der selben Quelle hat der Server bestimmt etwas dagegen.
Was man auf keinen Fall benutzt, sind globale Variablen, erst recht nicht in Zusammenhang mit multiprocessing.
Benutze keine kryptischen Abkürzungen, sondern aussagekräftige Variablennamen.
`list` ist der Name einer eingebauten Klasse und sollte nicht überdeckt werden. Über einen Index iteriert man nicht, weil man auch direkt über die Zeilen Deines Dataframes iterieren könnte.
Strings stückelt man nicht mit + zusammen, vor allem nicht bei URL-Parameters, weil die nicht alle Zeichen enthalten dürfen.
Das Ergebnis eines requests.get ist kein req(est) sondern ein response.
`eps` ist mal eine Zahl und mal ein String, das sollte nicht sein.
Exceptions fängt man dort ab, wo sie auftreten.
Die erste Funktion könnte so aussehen:
Code: Alles auswählen
QUOTE_URL = "https://query1.finance.yahoo.com/v7/finance/quote"
def pack1():
watch = pd.read_csv("watch.csv", usecols=['Ticker', 'Name', 'Price', 'Change', 'Cap', 'EPS'])
while True:
for index, ticker in enumerate(watch.Ticker):
try:
quote_response = requests.get(QUOTE_URL, params={"symbols": ticker})
result = quote_response.json()['quoteResponse']['result'][0]
except (KeyError, IndexError, RemoteDataError) as error:
print(f"Failed reading {ticker}: {error}")
else:
watch.iloc[index].Change = round(result.get('regularMarketChangePercent', 0),2)
watch.iloc[index].Price = round(result.get('regularMarketPrice', 0),2)
watch.iloc[index].EPS = round(result.get('epsCurrentYear', 0),2)
watch.iloc[index].Cap = round(result.get('marketCap', 0)/1000000000, 2)
time.sleep(1)
watch.to_csv('pack1.csv', header=True, index=False)
time.sleep(600)
Wenn ein Ticker aus einem unbekannten Problem nicht gelesen werden kann, enthält der noch die Werte des letzten Laufs. Ist das so gewollt? Aber was ist beim ersten Lauf? Ist 0 wirklich der Wert, den Du haben möchtest, wenn einer der Schlüssel nicht existiert?
Die Funktion macht aber viel zu viel.
Eigentlich müßte die Funktion so aussehen:
Code: Alles auswählen
QUOTE_URL = "https://query1.finance.yahoo.com/v7/finance/quote"
def query_quote(tickers):
quote = []
for ticker in tickers:
try:
quote_response = requests.get(QUOTE_URL, params={"symbols": ticker})
result = quote_response.json()['quoteResponse']['result'][0]
except (KeyError, IndexError, RemoteDataError) as error:
print(f"Failed reading {ticker}: {error}")
quote.append((ticker, 0, 0, 0, 0))
else:
quote.append((
ticker,
round(result.get('regularMarketPrice', 0),2),
round(result.get('regularMarketChangePercent', 0),2),
round(result.get('marketCap', 0)/1000000000, 2),
round(result.get('epsCurrentYear', 0),2)
))
time.sleep(1)
return pd.DataFrame(quote, columns=['Ticker', 'Price', 'Change', 'Cap', 'EPS'])
Das ganze packt man in eine äußere Schleife, wo man alle Quellen abfrägt, benutzt concurrent.futures.ThreadPoolExecutor um einfach parallelisieren zu können, und gleichzeitig die Funktionen samt Rückgabewert verwenden zu können und kann dann dort auch einfach die Ergebnisse irgendwie zusammenpacken.
@DasIch: beim parallelen Abfragen von der selben Quelle hat der Server bestimmt etwas dagegen.
Hier ein ganz gutes Tutorial zum nebenläufigen Anfordern von Daten, wenn viele URLs benötigt werden: https://www.twilio.com/blog/asynchronou ... th-aiohttp
In das asyncio-Framework muss man sich aber erstmal eine Weile einlesen, wenn das Thema neu für einen ist. Oder notfalls eins der dort gezeigten Beispiele nehmen und anpassen.
In das asyncio-Framework muss man sich aber erstmal eine Weile einlesen, wenn das Thema neu für einen ist. Oder notfalls eins der dort gezeigten Beispiele nehmen und anpassen.
Wenn man es übertreibt sicherlich aber die meisten Browser bauen bis zu 6 Verbindungen gleichzeitig auf. Ich denke daran kann man sich in so einer Situation guten Gewissens orientieren, es sei den es werden explizite Limits dokumentiert oder man trifft auf ein Rate Limit (also 429 Antworten).@DasIch: beim parallelen Abfragen von der selben Quelle hat der Server bestimmt etwas dagegen.
Wie gesagt, ich bin kein Experte in Python, hab mir das meiste zusammengegoogelt und mir zig Videos angeschaut, um erstmal grob dahinter zu steigen.
Ich werd mir eure Anregungen annehmen und mich mal an die Optimierung machen.
Die sleeps hab ich eingefügt, weil doch ab und an mal von yahoo ein Fehler kam, aber sehr selten.
Ich werd mir eure Anregungen annehmen und mich mal an die Optimierung machen.
Die sleeps hab ich eingefügt, weil doch ab und an mal von yahoo ein Fehler kam, aber sehr selten.
Das eh.
Sei mir bitte nicht böse, aber ich hab' mir das nicht ausgesucht, daß es so ist... und ich fürchte, Du hast meinen Punkt nicht ganz verstanden. Dict-Zugriffe in Python sind relativ flott (okay, solange man sie nicht mit C oder C++ vergleicht), aber das ANLEGEN eines Dict in Python ist eine ziemlich teure Veranstaltung, und so ein Dict selbst ist obendrein eine... recht speicherintensive Angelegenheit.
Der Aufbau einer Datenstruktur nimmt mit zunehmender Komplexität und Datenmenge natürlich auch mehr Zeit in Anspruch. Aber langsam im Vergleich wozu? Was wäre deine Alternative und warum sollte man diese anstelle von Wörterbüchern nutzen bzw welchen immensen Zeitgewinn brächte sie?
@LukeNukem: C hat keine Wörterbücher in der Standardbibliothek und die Standardimplementierung in C++ ist deutlich langsamer, weil man sich bei Python sehr viele Gedanken um Geschwindigkeit gemacht hat, weil Wörterbücher so ein zentrales Element sind. Außerdem wären das Microoptimierungen. Zudem setzt Du selbst Wörterbücher ein. So schlimm kann es also nicht sein.
Um mal "zentrales Element" von Sirius3 etwas auszuführen: In CPython ist im Prinzip jedes Objekt letztendlich ein Dictionary, ggfs. mit bisschen Kram drumherum. Jeder Attributzugriff macht mindestens einen dict lookup. Der Zugriff auf eine Variable, sofern sie nicht lokal zu einer Funktion sind, führt zu einem dict lookup. Das erstellen eines Objektes führt zwangsläufig auch dazu dass ein dict erstellt wird, es sei den man hat __slots__ definiert oder sie ist in C implementiert. Die Konsequenz ist natürlich auch dass das erstellen von allen Objekten viel speicherintensiver ist als es in z.B. C oder Rust wäre. Deswegen nutzt man ja übrigens auch Dinge wie numpy oder pandas.
Du solltest auch bedenken dass das anlegen von Objekten auf der Heap in Sprachen wie Python massiv optimiert ist und i.d.R. nicht einem naiven malloc() o.ä. wie in C oder C++ entspricht. Das anlegen von Objekten und damit auch dicts ist wahrscheinlich deutlich schneller als du erwartest.
Du solltest auch bedenken dass das anlegen von Objekten auf der Heap in Sprachen wie Python massiv optimiert ist und i.d.R. nicht einem naiven malloc() o.ä. wie in C oder C++ entspricht. Das anlegen von Objekten und damit auch dicts ist wahrscheinlich deutlich schneller als du erwartest.
Vor allem hat C++ in der Standardimplementierung fuer map & set Implementierungen, die auf Baumsuche basieren. Darum muss man ja nur < und = implementieren fuer die Schluesseltypen. Handelt sich aber O(log(n)) ein. Anders als Python, das gleich zu Beginn auch alles hashen konnte. Mit unordered_map geht das inzwischen auch in C++, aber dafuer muss man dann eben auch eine hash-Funktion anbieten. Und da hat man dann eben O(1) (amortisiert....)
Das ist natürlich eine Stufe, auf der ich noch lange nicht angekommen bin, aber vielen Dank für die Tips, ich werd mich dran versuchen.Sirius3 hat geschrieben: ↑Mittwoch 7. Juli 2021, 20:08 @mirko3107: lass Dich nicht von anderen Beitragenden zu diesem Thread verwirren, Wörterbuchzugriffe sind sehr performant. Statt dessen gilt hier das DRY-Prinzip "Don't repeat yourself": Code der nur einmal dasteht muß man auch nur einmal verstehen.
Was man auf keinen Fall benutzt, sind globale Variablen, erst recht nicht in Zusammenhang mit multiprocessing.
Benutze keine kryptischen Abkürzungen, sondern aussagekräftige Variablennamen.
`list` ist der Name einer eingebauten Klasse und sollte nicht überdeckt werden. Über einen Index iteriert man nicht, weil man auch direkt über die Zeilen Deines Dataframes iterieren könnte.
Strings stückelt man nicht mit + zusammen, vor allem nicht bei URL-Parameters, weil die nicht alle Zeichen enthalten dürfen.
Das Ergebnis eines requests.get ist kein req(est) sondern ein response.
`eps` ist mal eine Zahl und mal ein String, das sollte nicht sein.
Exceptions fängt man dort ab, wo sie auftreten.
Die erste Funktion könnte so aussehen:Und natürlich dauert das relativ lang, wegen der vielen `sleep`.Code: Alles auswählen
QUOTE_URL = "https://query1.finance.yahoo.com/v7/finance/quote" def pack1(): watch = pd.read_csv("watch.csv", usecols=['Ticker', 'Name', 'Price', 'Change', 'Cap', 'EPS']) while True: for index, ticker in enumerate(watch.Ticker): try: quote_response = requests.get(QUOTE_URL, params={"symbols": ticker}) result = quote_response.json()['quoteResponse']['result'][0] except (KeyError, IndexError, RemoteDataError) as error: print(f"Failed reading {ticker}: {error}") else: watch.iloc[index].Change = round(result.get('regularMarketChangePercent', 0),2) watch.iloc[index].Price = round(result.get('regularMarketPrice', 0),2) watch.iloc[index].EPS = round(result.get('epsCurrentYear', 0),2) watch.iloc[index].Cap = round(result.get('marketCap', 0)/1000000000, 2) time.sleep(1) watch.to_csv('pack1.csv', header=True, index=False) time.sleep(600)
Wenn ein Ticker aus einem unbekannten Problem nicht gelesen werden kann, enthält der noch die Werte des letzten Laufs. Ist das so gewollt? Aber was ist beim ersten Lauf? Ist 0 wirklich der Wert, den Du haben möchtest, wenn einer der Schlüssel nicht existiert?
Die Funktion macht aber viel zu viel.
Eigentlich müßte die Funktion so aussehen:Dann kann man sie flexibler einsetzen.Code: Alles auswählen
QUOTE_URL = "https://query1.finance.yahoo.com/v7/finance/quote" def query_quote(tickers): quote = [] for ticker in tickers: try: quote_response = requests.get(QUOTE_URL, params={"symbols": ticker}) result = quote_response.json()['quoteResponse']['result'][0] except (KeyError, IndexError, RemoteDataError) as error: print(f"Failed reading {ticker}: {error}") quote.append((ticker, 0, 0, 0, 0)) else: quote.append(( ticker, round(result.get('regularMarketPrice', 0),2), round(result.get('regularMarketChangePercent', 0),2), round(result.get('marketCap', 0)/1000000000, 2), round(result.get('epsCurrentYear', 0),2) )) time.sleep(1) return pd.DataFrame(quote, columns=['Ticker', 'Price', 'Change', 'Cap', 'EPS'])
Das ganze packt man in eine äußere Schleife, wo man alle Quellen abfrägt, benutzt concurrent.futures.ThreadPoolExecutor um einfach parallelisieren zu können, und gleichzeitig die Funktionen samt Rückgabewert verwenden zu können und kann dann dort auch einfach die Ergebnisse irgendwie zusammenpacken.
@DasIch: beim parallelen Abfragen von der selben Quelle hat der Server bestimmt etwas dagegen.
Und von concurrent.futures.ThreadPoolExecutor hab ich noch nie was gehört, aber wie gesagt, bin totaler Python-Noob im ersten Level.
@mirko3107,
noch ein Hinweis:
Das von dir verwendete ib-insync
https://ib-insync.readthedocs.io/api.html
ist ein asynchrones Framework, daher ist aus meiner Sicht, asyncio von allen anderen Vorschlägen zu bevorzugen.
Besonders den Punkt "The One Rule" bzgl time.sleep() würde ich mir mal durchlesen.
Das was dich hier am stärksten limitiert ist wahrscheinlich die Beschränkung der API von Yahoo-Finance, denn die erlauben eine begrenzte Anzahl von Requests pro Stunde.
Ich würde dir empfehlen das entsprechen zu "tunen" um das Optimum da heraus zu holen, bzw. insgesamt weniger oft auf die API zuzugreifen.
Eine Sekunde pauschal zu warten ist sicher nicht nötig.
noch ein Hinweis:
Das von dir verwendete ib-insync
https://ib-insync.readthedocs.io/api.html
ist ein asynchrones Framework, daher ist aus meiner Sicht, asyncio von allen anderen Vorschlägen zu bevorzugen.
Besonders den Punkt "The One Rule" bzgl time.sleep() würde ich mir mal durchlesen.
Das was dich hier am stärksten limitiert ist wahrscheinlich die Beschränkung der API von Yahoo-Finance, denn die erlauben eine begrenzte Anzahl von Requests pro Stunde.
Ich würde dir empfehlen das entsprechen zu "tunen" um das Optimum da heraus zu holen, bzw. insgesamt weniger oft auf die API zuzugreifen.
Eine Sekunde pauschal zu warten ist sicher nicht nötig.
Also ib_insync hab ich bisher nur per Multiprocessing ans laufen bekommen, bei Threading kommt "RuntimeError: There is no current event loop in thread 'Thread-7'", alle anderen Threads laufen problemlos.
Code: Alles auswählen
def earnings_iv():
list = pd.read_csv("watch.csv", usecols=['Ticker', 'Earnings', 'IV'])
rows = len(list.index)
while ib.isConnected():
for i in range(0, 10):
try:
ticker = list['Ticker'].values[i]
contract = Stock(ticker, 'SMART', 'USD')
ib.reqMarketDataType(1)
ib.qualifyContractsAsync(contract)
data = ib.reqMktData(contract, "106,100", False, False)
ib.sleep(1)
iv = round(data.impliedVolatility * 100, 2)
earn = ib.reqFundamentalData(contract, 'CalendarReport')
if type(earn) == str:
tree = ET.ElementTree(ET.fromstring(earn))
root = tree.getroot()
item = tree.find('.//Date')
earnings = item.text
else:
earnings = '01/01/2000'
print(ticker, earnings,iv)
list.at[i, 'Earnings'] = earnings
list.at[i, 'IV'] = iv
ib.cancelMktData(contract)
except (KeyError, IndexError) as error:
print(f"Failed reading {ticker}: {error}")
pass
ib.disconnect()
#list.to_csv('earnings_iv.csv', header=True, index=False, columns=['Ticker', 'Earnings', 'IV'])
time.sleep(3600)
Man bekommt das auch threaded zum laufen (die Fehlermeldung sagt im Grunde schon wie), es bringt nur nix. Mit asyncio kannst du mehrere Tasks in EINEM thread (dem main tread eben) quasi-parallel ausführen lassen. Darum muss da weder threading noch multiprocessing her.
Und allen Unkenrufen zum trotz ist, wie schon von diversen Leuten angemerkt, das wahrscheinlichste problem einfach die Wartezeiten und Limitierungen der Server. Nicht dein Code, der irgendwas signifikantes zu rechnen hätte. Noch ein Grund weniger, multiprocessing zu verwenden.
Und allen Unkenrufen zum trotz ist, wie schon von diversen Leuten angemerkt, das wahrscheinlichste problem einfach die Wartezeiten und Limitierungen der Server. Nicht dein Code, der irgendwas signifikantes zu rechnen hätte. Noch ein Grund weniger, multiprocessing zu verwenden.