Parallelisierung mittels processpoolexecutor funktioniert nicht, über threadpoolexecutor schon

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
dietmar
User
Beiträge: 6
Registriert: Montag 15. Januar 2007, 10:36

Hallo zusammen,

Lange nicht mehr hier gewesen ;)
Ich bin mal wieder an einem Python-Problem. Ich möchte ein Skript intern parallelisieren und zwar dynamisch. D.H. je nachdem wieviel Daten anfallen, dementsprechend eine gewisse Anzahl an Prozessoren nutzen. Bisher machen wir das unter Linux in der Shell-Ebene, leider nicht sehr komfortabel und komplett undynamisch, d.h. bei einem Change muß jedesmal eine Produktionsfreigabe erstellt werden, das dauert immer Tage.
Ich hab mir jetzt das multiprocessing angeschaut und möchte es implementieren.
Wenn ich den ThreadPoolexecutor nutze läuft der Job (eine Liste von Trades abarbeiten und deren Cashflows ausgeben) ganz hervorragend durch, nur eben mit einem Processor, aber x-Threads. Da aber während des Prozesses rechenintensive Aufgaben verrichtet werden ist das nicht zielführend, weil ich alle zur Verfügung stehenden Prozessoren nutzen möchte.
Mit dem processpoolexecutor kommt der process leider nicht zurück, d.h. der erste Trade wird im debug angedruckt, d.h. die Trade_Id, dann bleibt aber alles stehen. Prozesse werden so viele aufgemacht wie ich mitgebe, das wars, dito wenn ich das ganze auch nur mit einem Prozess mache. Chatai hilft manchmal ganz gut, nur eben hier nicht, drehe mich im Kreis.
Ich würde den Code im nächsten Schritt posten wenn das hilft?
Grüße Dietmar
Sirius3
User
Beiträge: 18335
Registriert: Sonntag 21. Oktober 2012, 17:20

Bei multiprocessing kann man viel falsch machen, ohne den Code zu sehen, kann man da wenig helfen.
Erster Schritt wäre immer, den Code an sich effizienter zu schreiben. Erst wenn das nicht mehr vernünftig besser wird, kann man sich mit Parallelisierung beschäftigen, denn nicht alles ist parallelisierbar. Zudem darf man den Overhead nicht unterschätzen.
Statt multiprocessing ist concurrent.futures die klarere Art und Weise, etwas parallel zu bearbeiten.
dietmar
User
Beiträge: 6
Registriert: Montag 15. Januar 2007, 10:36

Zuallererst mal danke für deine schnelle Antwort.
Ich nutze
from concurrent.futures import ProcessPoolExecutor
Auch das hilft nicht. Ich poste morgen mal die relevanten Codeschnipsel.
Ich drehe mich etwas im Kreis.
Buenas noches
Dietmar
Benutzeravatar
sparrow
User
Beiträge: 4599
Registriert: Freitag 17. April 2009, 10:28

@Dietmar: Vorbeugend, weil das klingt als würdest du Teile deines Codes posten: Bitte poste ein minimales, lauffähige Beispiel, bei dem das Problem auftritt. Sonst wird das hier Glaskugellesen.
dietmar
User
Beiträge: 6
Registriert: Montag 15. Januar 2007, 10:36

Hier mal ein verkürzter lauffähiger Code. Lauffähig insoweit, wenn ich prozesspoolexecutor gegen threadpoolexecutor austausche dann läufts. Chatai meint das hinge mit dem Import acm zusammen, da ich diese Library nutze. Deswegen hab ich schon den Import gekapselt un bei jedem Neu-Initialisieren eines Prozesses das acm mitgegegen. Vorschlag von KI. Nützt trotzdem nichts. api_lock hatte auch chatai vorgeschlagen.
Die Einrückungen sind weg :?
import pandas as pd
from concurrent.futures import ProcessPoolExecutor, as_completed, TimeoutError
import gc
import time
import multiprocessing
api_lock = multiprocessing.Lock()

acm = None # global, initial None

# Maximale Anzahl der verfügbaren Prozessoren
max_cpus = multiprocessing.cpu_count()

reportType = 'RC'

def init_worker():
global acm
import acm

# Klasse für MoneyFlow
class MoneyFlow():
def __init__(self, moneyFlow, reportType, runDate, cancelType):
try:
self.trade = moneyFlow.Trade() # acm.Trade
self.instrument = moneyFlow.Trade().Instrument() # acm.Instrument
self.moneyFlow = moneyFlow # acm.Moneyflow
self.ReportType = reportType
self.ReportTag = runDate
self.TradeID = self.trade.Oid()
self.OriginalTradeID = moneyFlow.Trade().ContractTrdnbr()
self.Transaktionstyp = cancelType
except Exception as x:
print(f'Fehler', x)
return None

def to_dict(self):
return {
'ReportType': self.ReportType,
'ReportTag': self.ReportTag,
'TradeID': self.TradeID,
'OriginalTradeID': self.OriginalTradeID,
'Transaktionstyp': self.Transaktionstyp
}

def getMoneyFlows(trade, reportType):
global acm, api_lock
with api_lock:
runDate = acm.Time.DateToday()
tPlus30 = acm.Time.DateAddDelta(runDate, 0, 0, 30)
print('CancelType = C')
cancelType = 'C'
money_flows = []
acm_t = acm.FTrade[trade]
if not acm_t.MoneyFlows():
print('keine Moneyflows vorhanden')
return money_flows
mf = acm_t.MoneyFlows()[0]
moneyFlow = MoneyFlow(mf, reportType, runDate, cancelType)
money_flows.append(moneyFlow)
return money_flows

def process_trade(trade_id, reportType):
try:
print(f"Verarbeite Trade ID: {trade_id}")
money_flows = getMoneyFlows(trade_id, reportType)
return [mf.to_dict() for mf in money_flows]
except Exception as e:
print(f"Fehler bei der Verarbeitung von Trade ID {trade_id}: {e}")
return []

def process_trades_parallel(trades, reportType, batch_size=1000, timeout=60):
all_moneyflows = []
num_processes = multiprocessing.cpu_count() or 1
# zum Testen nur mit 4 Prozessoren
num_processes = 4
with ProcessPoolExecutor(max_workers=num_processes, initializer=init_worker) as executor:
# Erstellen von Futures für jeden Trade
futures = [executor.submit(process_trade, trade_id, reportType) for trade_id in trades]
for future in futures:
try:
# Ergebnis mit Timeout abholen
batch = future.result(timeout=timeout)
if batch:
all_moneyflows.extend(batch)
except TimeoutError:
print("Prozess hat zu lange gebraucht und wurde abgebrochen.")
except Exception as e:
print(f"Fehler beim Abrufen eines Ergebnisses: {e}")

return all_moneyflows

def create_dataframe(moneyFlow_dicts):
df = pd.DataFrame(moneyFlow_dicts)
return df

if __name__ == '__main__':
import acm
tradefilter = 'E073734_dt_test_trema'
trades = [t.Oid() for t in acm.FTradeSelection[tradefilter].Trades()]
print(f'Anzahl Trades {len(trades)}')
reportType = 'RC'

start_time = time.time()

# Parallel verarbeiten mit Timeout
moneyFlow_dicts = process_trades_parallel(trades, reportType, batch_size=100, timeout=60)

# DataFrame erstellen
df = create_dataframe(moneyFlow_dicts)

# In CSV speichern
df.to_csv(r'money_flows_neu25.csv', sep=';', index=False, encoding='utf-8')

end_time = time.time()
print(f"Verarbeitungszeit: {end_time - start_time} Sekunden")
Sirius3
User
Beiträge: 18335
Registriert: Sonntag 21. Oktober 2012, 17:20

Die Einrückungen sind weg, weil Du den Code nicht in Code-Tags </> eingeschlossen hast.

Code sollte sich generell an die Namenskonvention halten: alle Variablen und Funktionen werden klein_mit_unterstrich geschrieben, Klassen CamelCase und Konstanten KOMPLETT_GROSS.
`global` sollte man überhaupt nicht verwenden, egal wo, erst recht nicht bei Multiprocessing.
Durch Dein Lock hast Du jetzt effektiv jede Parallelverarbeitung ausgeschlossen.
Was ist `acm` für eine Bibliothek? Bitte Quelle angeben. So muß man raten, was das sein soll. Die Bibliothek scheint ja das eigentliche Problem zu sein.
Generell gilt, per Multiprocessing nur reine Python-Objekte hin und her schicken. Keine Ahnung was ein ContractTrdnbr(), ein Oid() oder ein DateToday() ist, aber das kann ja alles möglich sein.

Exceptions sollten dort verarbeitet werden, wo das sinnvoll möglich ist. In __init__ ganz bestimmt nicht, denn bei einem Fehler hast Du ein halb initialiisertes kaputtes Objekt.
Und statt einer Exception eine leere Liste zurückzuliefen ist auch nicht sinnvoll.
Ein Deiner Hauptschleife hast Du doch schon ein Exception-Handling. Alle anderen brauchst Du nicht.

`create_dataframe` ist eine nicht sehr sinnvolle Funktion. Würde man statt dessen direkt `pd.DataFrame` schreiben, wüßte jeder sofort, was da passiert.
dietmar
User
Beiträge: 6
Registriert: Montag 15. Januar 2007, 10:36

Ja, dass da Verbesserungspotential drin steckt ist mir schon klar.
Kurz zum Hintergrund, acm ist eine Library mit der ich auf die Datenbank und deren Funktionen einer Tradeanwendung komme und nutzen muss.
Anfangs hatte ich es total Basic ohne Locks etc.
Die Fehlerbehandlung im init habe ich eingebaut um zu sehen wo er hängt. Das kommt auch wieder raus.
Komisch ist das wie schon geschrieben der Code mittels ThreadPoolExecutor durchläuft, mit dem ProcessPoolEcecutor den ersten Trade andruckt und nicht zurück kommt.
Das hängt zu 100% mit der acm Library zusammen, weil ähnlicher Code ohne acm durchläuft.
Deswegen habe ich das mit dem global acm angefangen, schlug mir chatai vor. Bin noch am überlegen wie ich da weiter vorgehe, noch keinen Plan.
Bzgl. Codingguidelines, habe mich der Schreibweise im Projekt angepasst.
Sirius3
User
Beiträge: 18335
Registriert: Sonntag 21. Oktober 2012, 17:20

Ohne zu wissen, was acm macht, kommen wir hier nicht weiter.
Um das Problem weiter einzugrenzen, solltest Du die acm-Aufrufe einzeln testen.
Läuft das Programm denn nur mit dem Import durch?

Code: Alles auswählen

from concurrent.futures import ProcessPoolExecutor, TimeoutError
import acm

def process_trade(trade_id, report_type):
    return []

def process_trades_parallel(trades, report_type, timeout=60):
    all_moneyflows = []
    with ProcessPoolExecutor() as executor:
        # Erstellen von Futures für jeden Trade
        futures = [executor.submit(process_trade, trade_id, report_type) for trade_id in trades]
        for future in futures:
            try:
                # Ergebnis mit Timeout abholen
                batch = future.result(timeout=timeout)
                all_moneyflows.extend(batch)
            except TimeoutError:
                print("Prozess hat zu lange gebraucht und wurde abgebrochen.")
            except Exception as e:
                print(f"Fehler beim Abrufen eines Ergebnisses: {e}")
    return all_moneyflows

if __name__ == '__main__':
    tradefilter = 'E073734_dt_test_trema'
    trades = [t.Oid() for t in acm.FTradeSelection[tradefilter].Trades()]
    print(f'Anzahl Trades {len(trades)}')
    report_type = 'RC'
    moneyflows = process_trades_parallel(trades, report_type)
dietmar
User
Beiträge: 6
Registriert: Montag 15. Januar 2007, 10:36

Erstmal herzlichen Dank, daß du/Ihr euch der Sache annehmt. Konnte heute nix machen, werde gleich mal schauen und das durchlaufen lassen wie du vorgeschlagen hast.
dietmar
User
Beiträge: 6
Registriert: Montag 15. Januar 2007, 10:36

Hab jetzt das mal so gemacht, sobald ich eine Liste von Zahlen mitgebe, läuft der Prozess durch. Aber wenn ich eine Liste von Trade-Ids mitgebe (auch alles integer) laufen die Prozesse nicht weiter. Auch wenn ich strings daraus mache.
Irgendwie seltsam.
Noch kurz zum acm, mit der Library habe ich Zugriff auf alle Objekte, Methoden, Berechnungen in der Datenbank, nennt sich Arena Class Modell, d.h. mit einer TradeID kann ich direkt auf einen Trade in der DB zugreifen, z.B. Trade = acm.FTrade[TradeID], dann habe ich das Tradeobjekt in der Hand, samt allen Referenzen. Wenn das hilft?

Code: Alles auswählen

from concurrent.futures import ProcessPoolExecutor, TimeoutError
import acm

def process_trade(trade_id, report_type):
    return [trade_id, report_type]

def process_trades_parallel(trades, report_type, timeout=60):
    all_moneyflows = []
    with ProcessPoolExecutor(max_workers=4) as executor:
        # Erstellen von Futures für jeden Trade
        futures = [executor.submit(process_trade, trade_id, report_type) for trade_id in trades]
        for future in futures:
            try:
                # Ergebnis mit Timeout abholen
                batch = future.result(timeout=timeout)
                all_moneyflows.append(batch)
            except TimeoutError:
                print("Prozess hat zu lange gebraucht und wurde abgebrochen.")
            except Exception as e:
                print(f"Fehler beim Abrufen eines Ergebnisses: {e}")
    return all_moneyflows

if __name__ == '__main__':
    tradefilter = 'E073734_dt_test_trema'
    trades = [f'{t.Oid()}' for t in acm.FTradeSelection[tradefilter].Trades()]
    print(trades)
    #trades = [1234, 2345, 3456, 567, 876, 554, 2345, 9876]
    print(f'Anzahl Trades {len(trades)}')
    report_type = 'RC'
    moneyflows = process_trades_parallel(trades, report_type)
    print(moneyflows)
    
Wenn ich das mit einem oder 2 Prozessoren begrenze, läuft das durch, es kommt zwar eine Fehlermeldung, aber das scheint nicht zu stören:
[20754084, 20754086, 20754319, 20755722, 21972006, 21972012, 21972016]
Anzahl Trades 7
251219 195507 W asp asp_handle_packet: #0 ARENA dk-parzival dkparzival91 (PID 19446), packet number mismatch, got: 795 expected 791
[[20754084, 'RC'], [20754086, 'RC'], [20754319, 'RC'], [20755722, 'RC'], [21972006, 'RC'], [21972012, 'RC'], [21972016, 'RC']]

Sobald ich das auf 3 Prozessoren erweitere kommt der Job nicht mehr zurück:
[20754084, 20754086, 20754319, 20755722, 21972006, 21972012, 21972016]
Anzahl Trades 7
251219 195533 W asp asp_handle_packet: #0 ARENA dk-parzival dkparzival91 (PID 19446), packet number mismatch, got: 795 expected 791
251219 195533 W asp asp_handle_packet: #0 ARENA dk-parzival dkparzival91 (PID 19446), packet number mismatch, got: 799 expected 791
hier hängt der...
Benutzeravatar
sparrow
User
Beiträge: 4599
Registriert: Freitag 17. April 2009, 10:28

@dietmar: Du hast gekonnt die Frage von Sirius3 uminterpretiert. Sirius3 hat zuerst gefragt, was die Quelle der Bibliothek ist, nicht was sie tut. Die Antwort darauf wäre wohl "acm ist eine Bibliothek der Trading Software Front Arena".
Da von uns niemand weiß, wie die Anbindung implementiert ist, wäre die Frage beim Support des Herstellers oder beim Implementierungsdienstleister korrekt platziert.

Es muss von dort eine Kommunikation mit der Hauptanwendung stattfinden. Und es kann zum Beispiel sein, dass die nicht erlaubt, dass sich mehr als einmal verbunden wird und die Bibliothek eine globale Verbindung zur Verfügung stellt, die sich eine Anwendung teilt. Dann funktioniert das super in einem Thread und bei Multiprocessing nicht mehr, weil dann ganz viele Clients gleichzeitig verbinden wollen. Und je nachdem wie das gehandelt wird, kann die Implenentierung so in einen Deadlock laufen.
Das selbe könnte nicht nur für die reine Verbindung sondern auch auf das Verwenden der darin enthaltenen Objekte zutreffen. Deshalb funktioniert es auch super, wenn du sie nicht verwendest sondern irgendwas übergibt.

Ohne die Bibliothek zu kennen ist das alles Glaskugellesen. Deshalb den Support des Herstellers aktivieren.
Antworten