Parallelisierung mittels processpoolexecutor funktioniert nicht, über threadpoolexecutor schon

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
dietmar
User
Beiträge: 3
Registriert: Montag 15. Januar 2007, 10:36

Hallo zusammen,

Lange nicht mehr hier gewesen ;)
Ich bin mal wieder an einem Python-Problem. Ich möchte ein Skript intern parallelisieren und zwar dynamisch. D.H. je nachdem wieviel Daten anfallen, dementsprechend eine gewisse Anzahl an Prozessoren nutzen. Bisher machen wir das unter Linux in der Shell-Ebene, leider nicht sehr komfortabel und komplett undynamisch, d.h. bei einem Change muß jedesmal eine Produktionsfreigabe erstellt werden, das dauert immer Tage.
Ich hab mir jetzt das multiprocessing angeschaut und möchte es implementieren.
Wenn ich den ThreadPoolexecutor nutze läuft der Job (eine Liste von Trades abarbeiten und deren Cashflows ausgeben) ganz hervorragend durch, nur eben mit einem Processor, aber x-Threads. Da aber während des Prozesses rechenintensive Aufgaben verrichtet werden ist das nicht zielführend, weil ich alle zur Verfügung stehenden Prozessoren nutzen möchte.
Mit dem processpoolexecutor kommt der process leider nicht zurück, d.h. der erste Trade wird im debug angedruckt, d.h. die Trade_Id, dann bleibt aber alles stehen. Prozesse werden so viele aufgemacht wie ich mitgebe, das wars, dito wenn ich das ganze auch nur mit einem Prozess mache. Chatai hilft manchmal ganz gut, nur eben hier nicht, drehe mich im Kreis.
Ich würde den Code im nächsten Schritt posten wenn das hilft?
Grüße Dietmar
Sirius3
User
Beiträge: 18334
Registriert: Sonntag 21. Oktober 2012, 17:20

Bei multiprocessing kann man viel falsch machen, ohne den Code zu sehen, kann man da wenig helfen.
Erster Schritt wäre immer, den Code an sich effizienter zu schreiben. Erst wenn das nicht mehr vernünftig besser wird, kann man sich mit Parallelisierung beschäftigen, denn nicht alles ist parallelisierbar. Zudem darf man den Overhead nicht unterschätzen.
Statt multiprocessing ist concurrent.futures die klarere Art und Weise, etwas parallel zu bearbeiten.
dietmar
User
Beiträge: 3
Registriert: Montag 15. Januar 2007, 10:36

Zuallererst mal danke für deine schnelle Antwort.
Ich nutze
from concurrent.futures import ProcessPoolExecutor
Auch das hilft nicht. Ich poste morgen mal die relevanten Codeschnipsel.
Ich drehe mich etwas im Kreis.
Buenas noches
Dietmar
Benutzeravatar
sparrow
User
Beiträge: 4598
Registriert: Freitag 17. April 2009, 10:28

@Dietmar: Vorbeugend, weil das klingt als würdest du Teile deines Codes posten: Bitte poste ein minimales, lauffähige Beispiel, bei dem das Problem auftritt. Sonst wird das hier Glaskugellesen.
dietmar
User
Beiträge: 3
Registriert: Montag 15. Januar 2007, 10:36

Hier mal ein verkürzter lauffähiger Code. Lauffähig insoweit, wenn ich prozesspoolexecutor gegen threadpoolexecutor austausche dann läufts. Chatai meint das hinge mit dem Import acm zusammen, da ich diese Library nutze. Deswegen hab ich schon den Import gekapselt un bei jedem Neu-Initialisieren eines Prozesses das acm mitgegegen. Vorschlag von KI. Nützt trotzdem nichts. api_lock hatte auch chatai vorgeschlagen.
Die Einrückungen sind weg :?
import pandas as pd
from concurrent.futures import ProcessPoolExecutor, as_completed, TimeoutError
import gc
import time
import multiprocessing
api_lock = multiprocessing.Lock()

acm = None # global, initial None

# Maximale Anzahl der verfügbaren Prozessoren
max_cpus = multiprocessing.cpu_count()

reportType = 'RC'

def init_worker():
global acm
import acm

# Klasse für MoneyFlow
class MoneyFlow():
def __init__(self, moneyFlow, reportType, runDate, cancelType):
try:
self.trade = moneyFlow.Trade() # acm.Trade
self.instrument = moneyFlow.Trade().Instrument() # acm.Instrument
self.moneyFlow = moneyFlow # acm.Moneyflow
self.ReportType = reportType
self.ReportTag = runDate
self.TradeID = self.trade.Oid()
self.OriginalTradeID = moneyFlow.Trade().ContractTrdnbr()
self.Transaktionstyp = cancelType
except Exception as x:
print(f'Fehler', x)
return None

def to_dict(self):
return {
'ReportType': self.ReportType,
'ReportTag': self.ReportTag,
'TradeID': self.TradeID,
'OriginalTradeID': self.OriginalTradeID,
'Transaktionstyp': self.Transaktionstyp
}

def getMoneyFlows(trade, reportType):
global acm, api_lock
with api_lock:
runDate = acm.Time.DateToday()
tPlus30 = acm.Time.DateAddDelta(runDate, 0, 0, 30)
print('CancelType = C')
cancelType = 'C'
money_flows = []
acm_t = acm.FTrade[trade]
if not acm_t.MoneyFlows():
print('keine Moneyflows vorhanden')
return money_flows
mf = acm_t.MoneyFlows()[0]
moneyFlow = MoneyFlow(mf, reportType, runDate, cancelType)
money_flows.append(moneyFlow)
return money_flows

def process_trade(trade_id, reportType):
try:
print(f"Verarbeite Trade ID: {trade_id}")
money_flows = getMoneyFlows(trade_id, reportType)
return [mf.to_dict() for mf in money_flows]
except Exception as e:
print(f"Fehler bei der Verarbeitung von Trade ID {trade_id}: {e}")
return []

def process_trades_parallel(trades, reportType, batch_size=1000, timeout=60):
all_moneyflows = []
num_processes = multiprocessing.cpu_count() or 1
# zum Testen nur mit 4 Prozessoren
num_processes = 4
with ProcessPoolExecutor(max_workers=num_processes, initializer=init_worker) as executor:
# Erstellen von Futures für jeden Trade
futures = [executor.submit(process_trade, trade_id, reportType) for trade_id in trades]
for future in futures:
try:
# Ergebnis mit Timeout abholen
batch = future.result(timeout=timeout)
if batch:
all_moneyflows.extend(batch)
except TimeoutError:
print("Prozess hat zu lange gebraucht und wurde abgebrochen.")
except Exception as e:
print(f"Fehler beim Abrufen eines Ergebnisses: {e}")

return all_moneyflows

def create_dataframe(moneyFlow_dicts):
df = pd.DataFrame(moneyFlow_dicts)
return df

if __name__ == '__main__':
import acm
tradefilter = 'E073734_dt_test_trema'
trades = [t.Oid() for t in acm.FTradeSelection[tradefilter].Trades()]
print(f'Anzahl Trades {len(trades)}')
reportType = 'RC'

start_time = time.time()

# Parallel verarbeiten mit Timeout
moneyFlow_dicts = process_trades_parallel(trades, reportType, batch_size=100, timeout=60)

# DataFrame erstellen
df = create_dataframe(moneyFlow_dicts)

# In CSV speichern
df.to_csv(r'money_flows_neu25.csv', sep=';', index=False, encoding='utf-8')

end_time = time.time()
print(f"Verarbeitungszeit: {end_time - start_time} Sekunden")
Sirius3
User
Beiträge: 18334
Registriert: Sonntag 21. Oktober 2012, 17:20

Die Einrückungen sind weg, weil Du den Code nicht in Code-Tags </> eingeschlossen hast.

Code sollte sich generell an die Namenskonvention halten: alle Variablen und Funktionen werden klein_mit_unterstrich geschrieben, Klassen CamelCase und Konstanten KOMPLETT_GROSS.
`global` sollte man überhaupt nicht verwenden, egal wo, erst recht nicht bei Multiprocessing.
Durch Dein Lock hast Du jetzt effektiv jede Parallelverarbeitung ausgeschlossen.
Was ist `acm` für eine Bibliothek? Bitte Quelle angeben. So muß man raten, was das sein soll. Die Bibliothek scheint ja das eigentliche Problem zu sein.
Generell gilt, per Multiprocessing nur reine Python-Objekte hin und her schicken. Keine Ahnung was ein ContractTrdnbr(), ein Oid() oder ein DateToday() ist, aber das kann ja alles möglich sein.

Exceptions sollten dort verarbeitet werden, wo das sinnvoll möglich ist. In __init__ ganz bestimmt nicht, denn bei einem Fehler hast Du ein halb initialiisertes kaputtes Objekt.
Und statt einer Exception eine leere Liste zurückzuliefen ist auch nicht sinnvoll.
Ein Deiner Hauptschleife hast Du doch schon ein Exception-Handling. Alle anderen brauchst Du nicht.

`create_dataframe` ist eine nicht sehr sinnvolle Funktion. Würde man statt dessen direkt `pd.DataFrame` schreiben, wüßte jeder sofort, was da passiert.
Antworten