Datenaustausch zwischen zwei endlos-laufenden Scripten

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
CSchilling
User
Beiträge: 25
Registriert: Sonntag 4. April 2021, 09:13

Grüßt euch,

mich beschäftigt erneut ein Projekt, bei welchem ich nicht so recht weiter weiß.
Da ich noch recht grün hinter den Ohren bin, was Python und die fülle an Möglichkeiten angeht, such ich hier erneut nach Ideen und Lösungen.

Zum Projekt:
Ich habe zwei Python-Scripte, welche in einer Endlosschleife Daten von einem Websocket-Server erhalten.
Das eine Script erhält Finanzdaten von einer Börse und das andere Script erhält Accountbezogene Informationen Aktionen welche ich auf der Börse durchführe. Diese Information zu meinem Account, werden nur gesendet wenn eine Aktion ausgeführt wurde, anderst bei den Finanzdaten, welche alle ~~300ms empfangen werden.

Ich möchte nun vom Script_A(Finanzdaten) an unterschiedlichsten Zeitpunkten in der Laufzeit, Daten an das Script_B(Accountbezogene Infos) senden.
Das Script_B weiß nicht, wann die Zeitpunkte im Script_A sind, denn diese Zeitpunkte werden aus den Finanzdaten errechnet und sind nicht vorhersehbar.
Script_A sollte durch das senden der Daten zu Script_B nicht neu gestartet werden oder zu lange pausieren. 500ms bis maximal 1000ms sind in Ordnung.

Ich hab mich die letzten Tage hier im Forum und im Netz etwas recherchiert und bin dabei immer wieder auf Pipes oder Queues des Modules Multiprocessing gestoßen.
https://stackoverflow.com/questions/438 ... on-scripts oder https://stackoverflow.com/questions/846 ... 9#58015119
https://docs.python.org/3/library/multi ... -processes
Ich versteh jedoch nicht wie ich nun das ganze auf meine Wünsche anpassen kann, da ich keine Erklärung gefunden habe, welchen Anfängern weiterhilft.

Mein einziger Versuch war bisher, den folgenden Code(Code ist aus dem Netz kopiert) angepasst in meinen Code einzubauen. Vergebens.... :roll:

Code: Alles auswählen

#	Script_A
from multiprocessing import Process,Queue,Pipe
from Script_B import f

if __name__ == '__main__':
    parent_conn,child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "Hello"

Code: Alles auswählen

#	Script_B
from multiprocessing import Process,Pipe

def f(child_conn):
    msg = "Hello"
    child_conn.send(msg)
    child_conn.close()
Dies führt jedoch nicht zum gewünschten Ergebnis... da hier die Informationen von Script_B durch eine Abfrage an Script_A gesendet werden. Es sollte jedoch von Script_A nach Script_B gesendet werden. Alles muss von Script_A ausgehen.

Mein Basis-Code sieht wie folgt aus:

Code: Alles auswählen

"""

SIGNAL-SCRIPT (SCRIPT_A)

"""

import websocket
import json


def on_message(wsapp, message):
    stream_data = json.loads(message)
    print(stream_data)

    if stream_data['k']['x']:
        #   Send information to USERDATA-STREAM-SCRIPT (SCRIPT_B)
        pass


wsapp = websocket.WebSocketApp(url="wss://fstream.binance.com/ws/adausdt_perpetual@continuousKline_1m")

wsapp.last_ping_tm = 32
wsapp.last_pong_tm = 1

while True:
    try:
        wsapp.run_forever(ping_interval=40, ping_timeout=30)
    except Exception as e:
        print(e)
In Zeile 17 soll nun die Aktion stattfinden, an der das Script_A Daten an Script_B sendet. Die Daten werden vom Typ Float und Boolean sein.

Der Code für Script_B ist genau gleich, lediglich die URL ist eine andere.
Die Verarbeitung in Script_B ist soll hier nicht Thema sein.

Vielen herzlichen Dank, falls ihr es bis hier her gelesen habt und mir dabei helfen wollt.

Grüße

Christian
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

@CSchilling,

Ich verstehe leider nicht was du mit diesem Konzept erreichen willst. Warum machst du es kompliziert indem du das auf zwei Scipte aufteilst?
CSchilling
User
Beiträge: 25
Registriert: Sonntag 4. April 2021, 09:13

@rogerb,

das ist ohne den gesamten Nutzen des Projektes zu kennen etwas schwer zu verstehen. Da gebe ich recht!

Ich hatte bislang, beide Streams (den Userstream und den Finanzdatenstream) in einem Script über den Websocket-Server empfangen.
Jedoch kam es dabei regelmä0ig zu Problemen mit den gesendeten Daten. Wenn der Stream Daten zum User gesendet hatte, wurden die Finanzdaten verspätet empfangen und verwertet.
Zudem wurde das Script für mein Empfinden sehr lang und Fehleranfällig (was sicher auch an fehlender Code-Erfahrung meinerseits liegt).

Mein Ziel ist es, ein Script zu haben, welches ausschließlich die Finanzdaten verwendet um daraus Indikatoren und Kaufsignale zu berechnen. Diese Kaufsignale sollen dann an andere Scripte gesendet werden, welche dann das Kaufsignal in eine Order auf der betroffenen Börse umwandeln
und die dann entstandenen Userbezogenen Daten speichern.
Ich hatte mir auch überlegt das ganze nochmals über eine Client-Server-Verbindung zu regeln.

Es soll nun also ein "zentrales" Script geben, welches andere "scripte" mit Signalen versorgt. Das ist der Grundgedanke.
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

@CSchilling,

Bei dem Kopierten Code-Beispiel wird ja nur die Funktion f aus dem anderen Skript B importiert um sie dann in Skript A als Prozess zu starten. Das macht für mich keinen Sinn.
Dann kannst du das auch in einem Skript haben.

Hier ist ein funktionierendes Beispiel. Ich habe aber keine Ahnung, ob das sinnvoll ist.
Das erste ist ein Websocket Server der zwei Verbindungen zur Verfügung stellt. Die eine Verbindung simuliert die Finanzdaten (data_server), die du von deinem Provider bekommen würdest. Die werden alle 300ms gesendet.
Die andere Verbindung simuliert die Verbindung um die Orders zu empfangen (order_server). Das ist in der Realität auch dein Provider. Ich habe die nur zur Demonstrationszwecken eingebaut.

Code für den simulierten Websocket Server

Code: Alles auswählen

import asyncio
import websockets
import random



async def data_server(websocket, path):
    await websocket.recv()
    while True:
        payload = random.randint(100, 200)
        await websocket.send(f"{payload}")
        await asyncio.sleep(0.3)


async def action_server(websocket, path):
    print("Order Server gestartet")
    while True:
        received = await websocket.recv()
        await websocket.send("OK")
        print(f"{received} Order erhalten")


asyncio.get_event_loop().run_until_complete(
    websockets.serve(action_server, "localhost", 7000)
)
asyncio.get_event_loop().run_until_complete(
    websockets.serve(data_server, "localhost", 8000)
)

asyncio.get_event_loop().run_forever()

Jetzt kommt der Client, also der Code der bei dir läuft:
Es werden zwei Prozesse mit jeweils einer Websocket gestartet. Der eine Prozess nimmt die Finanzdaten entgegen und wertet sie aus. Wenn es ein simuliertes Kauf oder Verkauf Signal gibt wird über eine Queue das Signal an den anderen Prozess übergeben. Der wiederum hat die Websocketverbindung zum Orderserver hergestellt und schickt dann die entsprechende Order raus.

Code: Alles auswählen

import asyncio
import websockets
import multiprocessing


async def action_socket(queue):
    websocket = await websockets.connect(f"ws://localhost:{7000}")
    await websocket.send("Hallo Server")

    while True:
        await websocket.recv()
        signal = queue.get()
        print(f"Ich will {signal}")
        await websocket.send(signal)


async def data_socket(queue):
    websocket = await websockets.connect(f"ws://localhost:{8000}")
    await websocket.send("Gib mir Daten ...")
    while True:
        data = await websocket.recv()
        if int(data) > 180:
            signal = "Verkaufen"
            queue.put(signal)
            print(f"Die Daten sagen {signal}!")
        elif int(data) < 120:
            signal = "Kaufen"
            queue.put(signal)
            print(f"Die Daten sagen {signal}!")
        
        print(f"Kurs: {data}")


def action(queue):
    asyncio.get_event_loop().run_until_complete(action_socket(queue))


def data(queue):
    asyncio.get_event_loop().run_until_complete(data_socket(queue))


if __name__ == "__main__":
    multiprocessing.set_start_method("spawn")
    q = multiprocessing.Queue()

    targets = [action, data]

    processes = [
        multiprocessing.Process(target=target, args=(q,)) for target in targets
    ]
    for process in processes:
        process.start()

    for process in processes:
        process.join()
Ausgabe vom Simulierten Provider Server:

Code: Alles auswählen

Order Server gestartet
Hallo Server Order erhalten
Kaufen Order erhalten
Kaufen Order erhalten
Verkaufen Order erhalten
Kaufen Order erhalten
Verkaufen Order erhalten
Verkaufen Order erhalten
Kaufen Order erhalten
Kaufen Order erhalten
Verkaufen Order erhalten
Verkaufen Order erhalten
Kaufen Order erhalten
Kaufen Order erhalten
Verkaufen Order erhalten
Kaufen Order erhalten
Kaufen Order erhalten
Verkaufen Order erhalten
Kaufen Order erhalten
Ausgabe vom Client:

Code: Alles auswählen

Kurs: 154
Kurs: 168
Kurs: 123
Die Daten sagen Kaufen!
Kurs: 100
Ich will Kaufen
Die Daten sagen Kaufen!
Ich will Kaufen
Kurs: 111
Die Daten sagen Verkaufen!
Ich will Verkaufen
Kurs: 181
Kurs: 159
Die Daten sagen Kaufen!
Ich will Kaufen
Kurs: 119
Kurs: 136
Die Daten sagen Verkaufen!
Kurs: 186
Ich will Verkaufen
Kurs: 139
Kurs: 152
Kurs: 150
Die Daten sagen Verkaufen!
Ich will Verkaufen
Kurs: 197
Kurs: 136
Kurs: 165
CSchilling
User
Beiträge: 25
Registriert: Sonntag 4. April 2021, 09:13

@rogerb

wow!! Das klingt nach der Art wie ich es mir vorgestellt hab! Besten Dank :o
Ich werde mir deine Idee morgen in aller Ruhe durchschauen und dir nochmal ein Feedback dazugeben.

Auf jeden Fall schonmal großen Dank dafür.

Schönes Wochenende, Chris
Antworten