Async streaming Daten parallel weiter verarbeiten statt if / elif... multiprocessing?

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
August1328
User
Beiträge: 71
Registriert: Samstag 27. Februar 2021, 12:18

Hallo zusammen,

ich habe ein Skript gebastelt, welches streaming Daten empfängt, asynchron mit ein paar if / elif Abfragen verarbeitet und dann über eine Broker API Aktien kaufen kann.

Grundsätzlich funktioniert das Skript, es macht Trades mit Gewinn, aber natürlich macht es auch "Fehler" d.h. trades mit Verlust... nein, eigentlich macht es keine Fehler, sondern es macht nur was ich ihm gesagt habe bzw. nicht ausgeschlossen habe - diese Fehler schmälern den Gewinn wieder, zum Teil erheblich... (wie erwartet habe ich keine Gelddruck Maschine erfunden :lol: :lol: ).

Deshalb habe ich vor ein paar Wochen angefangen zusätzliche Variablen und Abfragen einzubauen, um wiederholte Fehler / falsche Trades zu vermeiden. Das hat es nur begrenzt besser gemacht: Einerseits hat das Skript schlechte trades vermieden, andererseits ist es, da es einen Haufen if /elif abfragen durcharbeitet, immer langsamer geworden, so daß andere trades schlechter wurden (das Skript macht kurze day trades).

Inzwischen habe ich die ganzen "Verbesserungen" bzw. zusätzlichen Abfragen rückgängig gemacht und suche nach einem anderen Weg, weitere Abfragen einzubauen, aber auch die Geschwindigkeit zu erhöhen.

Die letzten Wochen habe ich viel über threading, GIL, coroutines und multiprocessing gelesen und ich denke, das ich parallele Bearbeitung bzw. multiprocessing benötige, aber andererseits waren das so viele Informationen, daß ich seit ein paar Tagen etwas auf dem Schlauch stehe und irgendwie nicht weiß, wo und wie ich ansetzen soll oder was ich probieren soll.

In meinem Kopf sieht es so aus: Es gibt multiprocessing, für rechenintensive Funktionen. Dann gibt es multiprocessing für async. Das ist ein wenig spezieller. Es gibt ein paar Beispiele im Netz, die sich aber alle mit 1 rechenintensiven Aufgabe beschäftigen. In diesen Beispielen geht es um tasks, futures, coroutines, pipein / pipeout, queues, ... Es gibt spezielle Module wie aioprocessing oder aiomultiprocess, die async multiprocessing vereinfachen sollen, aber ich finde meine Aufgabenstellung da nicht wieder, weil die if Abfragen eigentlich nicht sehr rechenintensiv sind und auch nichts miteinander zu tun haben, d.h. die könnten eigentlich unabhängig voneinander parallel ablaufen. Seit ein paar Tagen gibt es das Modul MPIRE, das funktioniert aber noch nicht unter Windows, ganz zu schweigen von async...

Also, ich habe folgendes Skript und ich würde gerne die if / elif für verschiedene "Informationstypen" parallel bearbeiten lassen, statt nacheinander (sorry für die Kürzungen, aber ich poste nicht meine selbstentwickelten Abfragen / knowhow):

Vorab zur Info: symbols ist eine Liste mit 0 bis x Aktien, ich prüfe ob nur 1 Aktie im Stream drin ist, alles andere ist irrelevant / zu kompliziert. Dann prüfe ich, ob das diese 1 Aktie = symbol in meiner vordefinierten Tabelle mit Aktien ist, die mich interessieren. symbols und symbol sind Json arrays.

Code: Alles auswählen

import asyncio
import ib_insync as ibi
import socketio
...

ib = ibi.IB()
sio = socketio.AsyncClient()

@sio.on('connect')
def on_connect():
    print("\nConnected to streaming API")

@sio.on('data')
async def on_data(data):
    if len(symbols) == 1:
        try:
            zeile = dfvolgainer.loc[symbol]
        except KeyError:
            pass

        else:
            if informationstyp1:
            	if ...:
            	   await trade(symbol, aktion)
            	elif ...:
            	    await trade(symbol, aktion)
            elif informationstyp2:
               if ...:
                   await trade(symbol, aktion)
               elif ...:
            	   await trade(symbol, aktion)
             elif informationstyp3:
                 ...

async def trade(tickersymbol, regel):
...

async def main_loop():
    await ib.connectAsync()
    await sio.connect('http://...')
    await sio.wait()

try:
    asyncio.run(main_loop())
...

Für jedes Beispiel, Tipp oder Link, wie man mehrere unabhängige Aufgaben / Prozesse, in diesem Fall if / elif informationstypx parallel in async abarbeiten kann, wäre ich wirklich dankbar - oder einfach nur ne Info, auf welches "Gleis" ich setzen soll.

Danke & Gruß
Andy
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

@August1328,

das ist leider etwas ungenau.
Aber erstmal ein vorweg: Automatisierter Handel macht erst Sinn nachdem man sich erfolgreiche Konzepte im manuellen Handel erarbeitet hat.
Der automatisierte Handel ist nichts anderes als das konsequente, emotionslose Umsetzen erfolgreicher manueller Strategien. Darin liegt der Erfolg, nicht in möglichst schneller Ausführung von möglichst vielen Orders.
Wie du sicher weißt, könnte das nämlich genauso gut einen größeren Verlust zur Folge haben.

Zum Code:
Es ist unklar wo "symbols" herkommt. Die "on_data"Funktion tut nur etwas wenn symbol gerade die Länge 1 hat. Wenn symbols also 1000 mal eine Länge größer als 1 hat, passiert bei den Aufrufen auch nichts und die Zeit verstreicht einfach.
Da du die KeyError Exception mit "pass" behandelst würde da im Fall eines nicht vorhandenen Keys nochmal nichts passieren. Also schonmal viele Gelegenheiten nichts zu machen.
Dann kommt die if-Verweigung nach Informationstypen. Aufgrund von if - elif ist es schonmal unmöglich diese parallel zu bearbeiten. Verzweigung ist ja quasi das Gegenteil von Parallelität.

Du brauchst ja deine Algorithmen nicht zu verraten, aber du must schon etwas konkreter, werden, was da eigentlich parallel laufen soll. Was bedeuten diese "Informationstypen" eigentlich? Was meinst du mit: "Informationstypen parallel bearbeiten"?
August1328
User
Beiträge: 71
Registriert: Samstag 27. Februar 2021, 12:18

Hallo rogerb,

danke für Deine Ausführungen und Fragen.

Ich hole mal ein wenig aus:

Das Skript macht event trading, d.h. über den Datenstream empfängt es Nachrichten aller Art, verarbeitet diese und kann bei gewissen Nachrichten und weiteren erfüllten Bedingungen Orders auslösen. Dabei kommen den ganzen Tag Nachrichten rein, zu manchen Uhrzeiten können es Dutzende Nachrichten pro Sekunde sein, z.B. bei Handelsbeginn oder nach Handelsschluss. Es geht nicht darum, möglichst viele trades in kurzer Zeit zu machen, sondern das Skript soll unter vielen Nachrichten einzelne relevante rausfiltern und dann einen trade ausführen.

Ein einfaches oder populäres Beispiel für event trading sind z.B. Nachrichten über ein "public offering" (ich trade US Aktien, da die Handelszeiten besser zu meinem Tagesablauf passen). Nach einem "public offering", also der Ankündigung der Ausgabe von neuen Aktien, fällt der Aktienkurs in der Regel deutlich. Dieses Event kann man handeln und die großen Hedgefonds geben angeblich viel Geld aus um als erster diese Nachricht zu erhalten - mein Skript ins zu langsam dafür, aber man kann z.B. die technische Reaktion handeln, d.h. nach dem Absturz gibt es einen Tiefpunkt und der Kurs steigt wieder, um 5%, 10% oder auch mehr. Weitere Nachrichten, die einen Kurs in Bewegung bringen sind z.B. die Quartalszahlen.

An dieser Stelle möchte ich kurz einfügen, daß ich für diesen Datenstream selbstverständlich monatlich bezahle, sowas gibt es nicht umsonst. Dazu zahle ich monatliche für die täglichen kompletten Handelsdaten und ich zahle für Live-Daten von meinem Broker. Dazu kommen noch weitere laufende Kosten, inzwischen trägt das Skirpt aber die Kosten und teilweise ein wenig mehr.

Was macht nun der Teil des Skriptes, den ich gezeigt habe und wo will ich hin:

Wie ich geschrieben habe, kommen die ganzen Zeit Nachrichten / Events rein. Das können sehr allgemeine Nachrichten sein, die keine Aktien betreffen, es können Nachrichten sein, die nur 1 Aktie betrifft und es können Nachrichten sein, die mehrere Aktien betreffen. Mich interessieren nur die Nachrichten zu 1 Aktie.

Jede Nachricht die rein kommt ist ein Json Objekt. In diesem steckt ein Json array, in welchem die verschiedenen Aktien-Kürzel (Symbole) drin stecken. Dieses array habe ich in der Variable symbols gespeichert. Ich hatte das nicht eingefügt, weil das ggf. verwirrt, aber die Definition sieht so aus:

Code: Alles auswählen

symbols = data['data'][0]['symbols']
Das Skript fängt also nur an zu arbeiten, wenn die len(symbols) == 1, weil alle anderen Nachrichten über Null oder mehrere Aktien interessieren mich nicht.

Wenn nun eine Nachricht nur über 1 Aktie ist, prüft das Skript, ob diese Aktie in meiner vorher definierten Liste / Dataframe vorhanden ist, wenn nicht, hört es auf (der Name des dataframe verrät ein wenig, welche Aktien in diesem dataframe stecken).

Das 1 Symbol ist durch ...

Code: Alles auswählen

symbol = data['data'][0]['symbols'][0]
definiert.

Wenn eine Nachricht nun die ersten beiden Hürden / Abfragen genommen hat, beginnt über mehrere if / elif Abfragen der Teil, der entscheidet, ob es zu einer Order kommt.

Ich hatte die oberste if / elif Abfrage Informationstypen genannt, aber das wahr wohl verwirrend. Hier frägt das Skript ab, um welchen Nachrichten sich handelt, z.B. kann es um eine Nachricht von Reuters kommen, oder es kann um eine Bewertung von Analysten gehen oder es ist eine ad-hoc Nachricht der SEC oder... diese Information steckt auch in dem Json Objekt der Nchricht, die Variable habe ich source genannt.

Diese if / elif Abfrage sieht also so aus:

Code: Alles auswählen

	...
        else:
            if source == "Reuters":
            	if ...:
            	   await trade(symbol, aktion)
            	elif ...:
            	    await trade(symbol, aktion)
            elif source == "Analyst Upgrades":
               if ...:
                   await trade(symbol, aktion)
               elif ...:
            	   await trade(symbol, aktion)
             elif source == "SEC":
                 ...
Die Reihenfolge der Abfrage von "source" folgt einer Priorität, die ich gemacht habe, aber ich möchte mal probieren diese Abfrage parallel ablaufen zu lassen (ob das dann schneller oder besser ist, muss die Praxis zeigen).

Dazu habe ich einige Artikel und youtube Videos zu concurrency, threading, multiprocessing, asyncio und asyncio mit all dem gelesen, aber ich stehe irgendwie komplett aufm Schlauch, da alle Beispiele und codes, die ich gefunden (und verstanden habe) sich um 1 Rechenintensive Aufgabe drehen, die dann auf verschiedenen Prozessoren verteilt wird.

Ich habe mE mehrere voneinander unabhängige Abfragen, die auch nicht besonders rechenintensiv sind, aber es wäre mal interessant zu sehen, wie das Skript läuft, wenn diese Abfragen nach "source" alle gleichzeitig auf unterschiedlichen Prozessoren beginnen (und ein Teil natürlich wieder sofort stoppt). Wenn das klappt, bringt das hoffentlich nicht nur Perfomance sondern würde das neue tools wie NLP möglich machen, auf welches ich momentan verzichte.

Also, meine Frage nochmal kurz zusammengefasst: Lässt sich dieser asynchrone Datenstrom, nachdem er die ersten beiden Abfragen passiert hat, parallel weiter verarbeiten?

Für jeden Tipp, Kommentar oder auch Tritt in die richtige Richtung wäre ich dankbar.

Grüße Andy
Benutzeravatar
__blackjack__
User
Beiträge: 14069
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@August1328: Das wird doch schon asynchron weiterverarbeitet.
“Vir, intelligence has nothing to do with politics!” — Londo Mollari
August1328
User
Beiträge: 71
Registriert: Samstag 27. Februar 2021, 12:18

ja, schon, aber ich "wünsche" mir, daß in der asynchronen Bearbeitung ein Teil der Abfragen, also die "if / elif source == ..." nicht nacheinander bearbeitet werden (wie durch das if vorgegeben), sondern diese if Abfagre als einzelne Aufgabe parallel auf verschiedenen Prozessoren abgearbeitet wird.

Das Skript läuft auf einem PC mit einem Intel 10700F Core mit 8 Prozessoren. Wenn das Skript aktiv ist und Daten empfängt kostet das 3 bis 4% CPU Leistung.

Oder verlange ich als Python Anfänger zu viel? Wenn es nicht geht, ist das auch ein Ergebnis...

Gruß
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

@Andy,

das hört sich schonmal sehr viel konkreter an.
Grundsätzlich, wie du sicher schon gelesen hast, gilt:
Multiprozessing für rechenintensive Aufgaben. Es ermöglicht die Verteilung der Aufgaben auf mehrere CPU Kerne. Ein Kern kann eben nur so schnell wie möglich arbeiten. Mehrere Kerne können mehr berechnen.
Bei Ein-Ausgabe also sog. io-bound Tasks verwendet man gerne Threads oder Asnycio. Dabei muss oft auf Antworten entfernter Prozesse gewartet werden. Da in einem Prozess aber mehrere Threads laufen können, ist die Gesamtaufgabe während der Wartezeit nicht blockiert. In der Zwischenzeit wird einfach mit einem anderen Threads weitergearbeitet.

Wenn ich richtig verstehe, verwendest du ib_insync für die Trades. Ich habe keine Praxiserfahrung mit ib_insync, aber ich weiß aus der Dokumentation und dem Sourcecode, dass ib_insync auf Asyncio verwendet. Daher sollte man sämtlichen Code drumherum auch async programmieren, was du ja auch schon machst.
Asyncio basiert auf einer Eventloop die selbst entscheidet, wann es sinnvoll ist, die einzelnen Aufgaben auszuführen. Dadurch wird eine quasi Parallelität ähnlich wie beim Threading erreicht.

Man kann diese Methoden miteinander kombinieren.
Wie ich sehe, verwendest du socketio um wohl mit dem Nachrichtendienst zu kommunizieren? Wenn dem so ist, könnte man das Einsammeln und Auswerten von Nachrichten in einen zweiten Prozess auslagern. Dieser kann völlig unabhängig vom Prozess der das Traden durchführt laufen. Theoretisch können diese Prozesse sogar in verschiedenen Skripten auf verschiedenen Rechnern laufen.
Sie müssen nur mit einander kommunizieren können. Für die Kommunikation gibt es Queues. Das können sehr einfache Queues, die schon in Python eingebaut sind, oder leistungsfähige Drittpakete sein.

Hier ist ein lauffähiges Konzept. Welches nicht socketio oder ib_insync verwendet, aber den groben Aufbau demonstriert.
Der Prozess für die Nachrichten ist getrennt von den Orders. Hier kommen zum Beispiel auf einen Schlag drei Nachrichten an, die nacheinander in Orders umgesetzt werden. Dabei kommen sogar weitere Nachrichten, die auch entsprechend umgesetzt werden.
Dies ist wirklich nur ein sehr, sehr grobes Konzept, enthält aber meiner Meinung nach alle nötigen Bausteine. Du müsstest selbst einmal versuchen deine bestehende Applikation so ähnlich aufzubauen.

Code: Alles auswählen

import multiprocessing as mp
from multiprocessing import Queue
import asyncio
import random


async def place_orders(queue):
    while True:
        order = queue.get()
        print(f"Placed order: {order}")


async def get_source():
    return random.choice(
        [("Good", "GOOG"), ("Bad", "GOOG"), ("Good", "AAPL"), ("Bad", "AAPL")]
    )


async def get_news(queue):
    while True:
        await asyncio.sleep(3)
        news = await asyncio.gather(*[get_source() for i in range(5)])

        for message, symbol in news:
            if message == "Good":
                print(f"{message} news for {symbol}")
                queue.put(symbol)


def orders(queue):
    asyncio.run(place_orders(queue))


def news(queue):
    asyncio.run(get_news(queue))


if __name__ == "__main__":
    mp.set_start_method("spawn")
    queue = Queue()

    targets = [news, orders]

    processes = [mp.Process(target=target, args=(queue,)) for target in targets]
    for process in processes:
        process.start()

    for process in processes:
        process.join()


"""
Good news for AAPL
Good news for GOOG
Good news for AAPL
Placed order: AAPL
Placed order: GOOG
Placed order: AAPL
Good news for AAPL
Good news for AAPL
Placed order: AAPL
Good news for GOOG
Placed order: AAPL
Good news for AAPL
Placed order: GOOG
Placed order: AAPL
Good news for AAPL
Good news for GOOG
Placed order: AAPL
Good news for GOOG
Placed order: GOOG
Good news for AAPL
Placed order: GOOG
Placed order: AAPL
"""
__deets__
User
Beiträge: 14545
Registriert: Mittwoch 14. Oktober 2015, 14:29

rogerb hat doch schon gesagt: per Definition kann eine if/elif kaskade nicht parallel laufen. Wie soll denn das auch gehen? Ein elif kann nur ausgewertet werden, wenn das if davor (oder elif) negativ bescheiden wurde. Das ist also zwingend seriell.

Mir klingt das so, als ob du ‚irgendwas mit parallel oder so‘ machen willst - ohne eine tatsächliches Problem zu haben. Das macht bestenfalls deinen Code nur schwerer verständlich. Und schlimmstenfalls führt es Fehler ein.
August1328
User
Beiträge: 71
Registriert: Samstag 27. Februar 2021, 12:18

@rogerb: Wow, vielen dank für das Beispiel, jetzt bin ich platt! 8) 8)

@__deets__: Sorry, anscheinend habe ich mich missverständlich ausgedrückt habe. Natürlich soll die nicht die ganze if / elif Kaskade parallel ausgeführt werden, sondern jedes if / elif der Kaskade soll eine einzelne Aufgabe werden, die dann parallel laufen können...

Ich probiere das die nächsten Tage mal aus!

Grüße & schönen Abend miteinander
LukeNukem
User
Beiträge: 232
Registriert: Mittwoch 19. Mai 2021, 03:40

August1328 hat geschrieben: Dienstag 31. August 2021, 21:17 @__deets__: Sorry, anscheinend habe ich mich missverständlich ausgedrückt habe. Natürlich soll die nicht die ganze if / elif Kaskade parallel ausgeführt werden, sondern jedes if / elif der Kaskade soll eine einzelne Aufgabe werden, die dann parallel laufen können...
Ich bin zwar nicht deets, möchte aber kurz zwei ergänzende Ansätze einstreuen.

Für lange if- und elif-Kaskaden gibt es Expertensysteme in Form von Business Rule Engines, die heutzutage überwiegend nach dem RETE-Algorithmus und Abkömmlingen arbeiten. Ein Beispiel für Python wäre etwa Experta. Dabei formulierst Du Dein Expertenwissen in Form von Fakten und die Regeln legen dann fest, welche Aktion beim Eintritt bestimmter Fakten auszuführen ist. Solche Systeme sind deutlich schneller als if-elif-else-Kaskaden.

Ein weiterer Ansatz, der sich damit womöglich verknüpfen läßt, ist die Fuzzy-Logik, ein Beispiel für Python ist zum Beispiel scikit-fuzzy. Dabei werden die Daten über Zugehörigkeitsfunktionen bestimmten unscharf definierten Mengen zugeordnet, und daraus jeweils ein Wahrheitswert. So könnte zum Beispiel Gegenstand, der nach drei Monaten noch 1,0 "neu" und 0,0 "alt" ist, nach sechs Monaten nur noch 0,5 "neu" und 0.5 "alt" sein.
August1328
User
Beiträge: 71
Registriert: Samstag 27. Februar 2021, 12:18

Der Vollständigkeit halber möchte ich hier noch meine finale Umsetzung posten.

Da das mit den queues und der Interaktion der einzelnen Skripte meine Fähigkeiten übersteigt und ich es nicht fehlerfrei zum Laufen bekommen habe, habe ich mich dafür entschieden, aus jedem if/elif ein einzelnes Skript zu machen.

Jedes Skript lädt nun seinen eigenen dataframe und ein Großteil des Codes ist gleich, was ich einerseits als Verschwendung empfinde und den Pflegeaufwand erhöht, aber andererseits kann ich nun feinere Anpassungen in jedem Skript machen.

Hier der Code:

Code: Alles auswählen

from os import system                                                           
from multiprocessing import Pool                                 
                                                                                
# Jedes Script: Neue ClientID, Investsumme & Text, DF Name, Pause definieren
scripts = (
    'C:/Users/ah/py/mp/1_004.py',
    'C:/Users/ah/py/mp/2_002.py',
    'C:/Users/ah/py/mp/3_004.py',
    'C:/Users/ah/py/mp/4_004.py',
    'C:/Users/ah/py/mp/5_002.py',
    'C:/Users/ah/py/mp/6_003.py',
    'C:/Users/ah/py/mp/7_001.py',
    'C:/Users/ah/py/mp/8_001.py',
    )  
                                                                                                       
def run_scripts(script):                                  
    system('python {}'.format(script))

if __name__ == '__main__':
    pool = Pool(processes=8)
    pool.map(run_scripts, scripts)
    pool.close()
    pool.join()
Das ist wirklich sehr simpel geworden und ich bin mir nicht sicher, ob das als echtes multiprocessing durchgeht...

Ich habe versucht die Prozessorbelastung vorher mit einem Skript zu nachher mit acht Skripten in Zahlen zu vergleichen, aber das geht irgendwie nicht. Das einzelne Skript hat beim Empfang und der Verarbeitung von streaming Daten eine CPU Auslastung von bis zu 5% verursacht und war über den Task-Manager sichtbar. Die 8 Skripte bzw. Python ist im Task Manager zwar vorhanden, aber es dümpelt unter der Vielzahl der anderen Prozessen rum, die unter 1% CPU verbrauchen und ab und zu mal ein wenig mehr nutzen - ich sehe das als positiven Fortschritt :lol:

Insgesamt bin ich sehr zufrieden mit dieser Lösung. Ich lasse dieses Skript oben in MS Visual Code starten und im Terminal Fenster gibt es die Daten aus jedem der 8 Skripten aus.

Das einzige neue Problem, was ich mir mit dieser Lösung eingebrockt habe, ist, daß es die Ausführung der 8 Skripte komplett zerlegt sobald einer der 8 asynchronen Streams die Verbindung verliert. Es gibt 1 "Event loop closed" Fehler und dann war´s das. Ich habe noch nicht verstanden, wodurch der Fehler entsteht, da die Fehlermeldung nur diesen Text bringt und es auch nicht weiter tragisch, da es nur alle paar Tage ein Mal vorkommt, sich das Skript danach wieder problemlos neu starten lässt und es auch keinen Blödsinn machen kann, wenn es nicht läuft.

Ich werde mir das in den nächsten Wochen mal genauer ansehen.
Sirius3
User
Beiträge: 18276
Registriert: Sonntag 21. Oktober 2012, 17:20

Du startest jetzt 8 unabhängige Skripte auf sehr umständliche Art und Weise. os.system sollte man nicht benutzen und multiprocessing ist total überflüssig.
`scripts` ist eigentlich eine Konstante und vor allem kein Tuple, sondern eine Liste.

Code: Alles auswählen

import sys
import subprocess

# Jedes Script: Neue ClientID, Investsumme & Text, DF Name, Pause definieren
SCRIPTS = [
    'C:/Users/ah/py/mp/1_004.py',
    'C:/Users/ah/py/mp/2_002.py',
    'C:/Users/ah/py/mp/3_004.py',
    'C:/Users/ah/py/mp/4_004.py',
    'C:/Users/ah/py/mp/5_002.py',
    'C:/Users/ah/py/mp/6_003.py',
    'C:/Users/ah/py/mp/7_001.py',
    'C:/Users/ah/py/mp/8_001.py',
]

def main():
    processes = [
        subprocess.Popen([sys.executable, script])
        for script in SCRIPTS
    ]
    for process in processes:
        process.wait()

if __name__ == '__main__':
    main()
__deets__
User
Beiträge: 14545
Registriert: Mittwoch 14. Oktober 2015, 14:29

Ich würde auch sagen, dass die Parallelisierung kein Erfolg war. Im Gegenteil. Bei 5 % CPU-Last hast du kein Problem, aber jetzt hast du eines (oder viele), weil die Komplexität und Fehlermöglichkeiten substantiell gestiegen sind. Das steht für mich in keinem Verhältnis.
August1328
User
Beiträge: 71
Registriert: Samstag 27. Februar 2021, 12:18

__deets__

danke für Dein Feedback! Ich find´s toll, wieviel Geduld hier einige Teilnehmer mit Neulingen wie mir haben :D

Deine Anregungen sind umgesetzt. Du hast natürlich Recht wegen der Liste und Konstante. Ich war so darauf fixiert, es endlich zum Laufen zu kriegen, so daß ich eine angepasste Lösung aus dem Netz nicht mehr sorgfältig durchgegangen bin.

Die Parallelisierung empfinde ich persönlich weiter als Fortschritt. Vorher hatte ich 1 Skript, welches, wie weiter oben dargestellt, bei jeder streaming Nachricht, die if / elif Kaskade duchgeackert hat.

Inzwischen habe ich 10 Skripte, die nur die Aufgabe eines einzelnen if erfüllen, tw. auch ein if der nächsten unteren Stufe. Mir ist bewusst, daß jede Datei den kompletten Stream verarbeitet und bei 99,99% der Daten nix macht sondern auf die 1 "Chance", auch wenn dies manchmal Tage dauern kann.

Die Profis hier werden das als Verschwendung sehen oder sich denken, wat nen De**, aber den PC stört das nicht wie beschrieben und es ist eine Lösung, die ich mit meinen begrenzten Kenntnissen hinbekommen habe. Wahrscheinlich gibt´s ne Lösung in 1 Skript mit async, parallel, usw. aber die Umsetzung eines trading "bot" stand für mich im Vordergrund.

Grüße
Andy
__deets__
User
Beiträge: 14545
Registriert: Mittwoch 14. Oktober 2015, 14:29

Ehre wem Ehre gebührt: Sirius3 hat dir die code Verbesserungen nahegelegt.

Ich sehe immer noch keinen Gewinn, denn den PC stört das durchackern der Kaskade immer noch weniger als entsprechend viele Prozesse zu jonglieren. Aber natürlich ist es dein Projekt, und am Ende kannst du deine CPU Zyklen so einsetzen, wie du es für richtig hältst.
August1328
User
Beiträge: 71
Registriert: Samstag 27. Februar 2021, 12:18

Selbstverständlich, Ehre, wem Ehre gebürt: Danke Sirius3, das Code Beispiel läuft seit Freitag im Test. Ich werd´s mal ein paar Tage so laufen lassen und dann die Ergebnisse vergleichen.

Ich habe vorher überlegt, wie multiprocessing durch Python gesteuert wird: Da in meinem Fall sich jedes Skript mit einem streaming server verbindet, bin ich als Laie davon ausgegangen, daß das multiprocessing nach dem Start ein Skript einmal einem Prozessor zuordnet und diese Zuordnung belässt, also nichts jongliert wird...

Dein letzter Kommentar __deets__ hat mich nachdenklich gemacht - ich bedanke mich für Deine Hartnäckigkeit!

Leider habe ich nichts detailliertes finden können, wie im multiprocessing die Zuteilung der Skripte an die Prozessoren im Detail funktioniert, deshalb meine konkrete Frage(n):

Gibt es diese feste Zuordung von Skript zu Prozessor einfach nicht oder kann man die festlegen? Überprüft multiprocessing bei jedem Datenpaket, welches empfangen wird, welcher Prozessor gerade mit einem anderen Skript beschäftigt ist und bestimmt dann nen freien Prozessor zur Bearbeitung?
__deets__
User
Beiträge: 14545
Registriert: Mittwoch 14. Oktober 2015, 14:29

Diese Zuordnung macht dein OS. Und deine Skripte sind nur einer von 1000 Sachen, die das macht. Dem da in den Scheduler reinzugrätschen ist nicht so einfach und auch nicht wirklich nützlich. Das weiß besser, was es tut.
August1328
User
Beiträge: 71
Registriert: Samstag 27. Februar 2021, 12:18

Sehr simple und einleuchtende Erklärung...

Ich denke, ich habe verstanden, daß ich für meine Anwendung mit dem Thema multiprocessing auf dem Holzweg war :lol:
Antworten