Hier mal ein verkürzter lauffähiger Code. Lauffähig insoweit, wenn ich prozesspoolexecutor gegen threadpoolexecutor austausche dann läufts. Chatai meint das hinge mit dem Import acm zusammen, da ich diese Library nutze. Deswegen hab ich schon den Import gekapselt un bei jedem Neu-Initialisieren eines Prozesses das acm mitgegegen. Vorschlag von KI. Nützt trotzdem nichts. api_lock hatte auch chatai vorgeschlagen.
Die Einrückungen sind weg
import pandas as pd
from concurrent.futures import ProcessPoolExecutor, as_completed, TimeoutError
import gc
import time
import multiprocessing
api_lock = multiprocessing.Lock()
acm = None # global, initial None
# Maximale Anzahl der verfügbaren Prozessoren
max_cpus = multiprocessing.cpu_count()
reportType = 'RC'
def init_worker():
global acm
import acm
# Klasse für MoneyFlow
class MoneyFlow():
def __init__(self, moneyFlow, reportType, runDate, cancelType):
try:
self.trade = moneyFlow.Trade() # acm.Trade
self.instrument = moneyFlow.Trade().Instrument() # acm.Instrument
self.moneyFlow = moneyFlow # acm.Moneyflow
self.ReportType = reportType
self.ReportTag = runDate
self.TradeID = self.trade.Oid()
self.OriginalTradeID = moneyFlow.Trade().ContractTrdnbr()
self.Transaktionstyp = cancelType
except Exception as x:
print(f'Fehler', x)
return None
def to_dict(self):
return {
'ReportType': self.ReportType,
'ReportTag': self.ReportTag,
'TradeID': self.TradeID,
'OriginalTradeID': self.OriginalTradeID,
'Transaktionstyp': self.Transaktionstyp
}
def getMoneyFlows(trade, reportType):
global acm, api_lock
with api_lock:
runDate = acm.Time.DateToday()
tPlus30 = acm.Time.DateAddDelta(runDate, 0, 0, 30)
print('CancelType = C')
cancelType = 'C'
money_flows = []
acm_t = acm.FTrade[trade]
if not acm_t.MoneyFlows():
print('keine Moneyflows vorhanden')
return money_flows
mf = acm_t.MoneyFlows()[0]
moneyFlow = MoneyFlow(mf, reportType, runDate, cancelType)
money_flows.append(moneyFlow)
return money_flows
def process_trade(trade_id, reportType):
try:
print(f"Verarbeite Trade ID: {trade_id}")
money_flows = getMoneyFlows(trade_id, reportType)
return [mf.to_dict() for mf in money_flows]
except Exception as e:
print(f"Fehler bei der Verarbeitung von Trade ID {trade_id}: {e}")
return []
def process_trades_parallel(trades, reportType, batch_size=1000, timeout=60):
all_moneyflows = []
num_processes = multiprocessing.cpu_count() or 1
# zum Testen nur mit 4 Prozessoren
num_processes = 4
with ProcessPoolExecutor(max_workers=num_processes, initializer=init_worker) as executor:
# Erstellen von Futures für jeden Trade
futures = [executor.submit(process_trade, trade_id, reportType) for trade_id in trades]
for future in futures:
try:
# Ergebnis mit Timeout abholen
batch = future.result(timeout=timeout)
if batch:
all_moneyflows.extend(batch)
except TimeoutError:
print("Prozess hat zu lange gebraucht und wurde abgebrochen.")
except Exception as e:
print(f"Fehler beim Abrufen eines Ergebnisses: {e}")
return all_moneyflows
def create_dataframe(moneyFlow_dicts):
df = pd.DataFrame(moneyFlow_dicts)
return df
if __name__ == '__main__':
import acm
tradefilter = 'E073734_dt_test_trema'
trades = [t.Oid() for t in acm.FTradeSelection[tradefilter].Trades()]
print(f'Anzahl Trades {len(trades)}')
reportType = 'RC'
start_time = time.time()
# Parallel verarbeiten mit Timeout
moneyFlow_dicts = process_trades_parallel(trades, reportType, batch_size=100, timeout=60)
# DataFrame erstellen
df = create_dataframe(moneyFlow_dicts)
# In CSV speichern
df.to_csv(r'money_flows_neu25.csv', sep=';', index=False, encoding='utf-8')
end_time = time.time()
print(f"Verarbeitungszeit: {end_time - start_time} Sekunden")