Seite 1 von 1

Verständnisfrage zur interprocess Kommunikation und Effizients

Verfasst: Donnerstag 20. Juli 2023, 14:59
von Xbash_Zero
Hallo,

bisher habe ich meistens für multiprocess Anwendungen multiprocess.Queue verwendet oder manchmal so etwas wie shared variables bzw. multiprocessing.Value.

Meine Frage ist mehr theoretischer Natur, ich möchte das einfach etwas besser verstehen.

Angenommen ich plane so eine Art Online Game Server für mehrere Spieler, welche effizienten Methoden würden sich anbieten, für eine Interprocess Kommunikation. Queues habe ich meist genutzt, da Threadsafe, allerdings bin ich mir in dem Punkt, in welchem die Daten verarbeitet werden sollen, für das Game, noch unschlüssig. Ich gehe hier jetzt mal mit der folgender Annahme in dieses Szenario, dass Threads nicht performant genung sind, weil GIL usw., dass muss nicht so sein, ist nur eine rein theoretische Annahme.


Ich habe dazu bereits mal GPT befragt und mir mit dessen Hilf ein Beispiel zusammen friemeln lassen, unklar für mich ist allerdings immer noch, ob ich so ein projekt nicht doch lieber in c++ oder GO schreiben sollte, um Multithreading auch auf mehreren Kernen nutzen zu können und den Vorteil, dass der Speicherbereich von den Threads geteilt wird?

Was würdet ihr machen, wenn ihr hypothetisch vor solch einem Problem stehen würdet?

Code: Alles auswählen

import socket
import threading
import multiprocessing
import time

class GameServer:

    def __init__(self):
        self.clients = {}  # dictionary to store client address -> socket connection
        self.game_state = {}  # dictionary to store game state

    def update_game_state(self, data, client_address):
        # Update game state based on data received
        # In a real game, there would be complex logic to update the game state
        # Here we just echo the received data
        self.game_state[client_address] = data

    def handle_client(self, client_socket, client_address):
        while True:
            data = client_socket.recv(1024)  # receive data from the client
            if not data:
                break

            # Update game state in a separate process
            p = multiprocessing.Process(target=self.update_game_state, args=(data, client_address))
            p.start()
            p.join()

            # Send updated game state to all clients
            for other_client_socket in self.clients.values():
                other_client_socket.sendall(data)

        client_socket.close()
        del self.clients[client_address]

    def run_server(self, host='localhost', port=12345):
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_socket:
            server_socket.bind((host, port))
            server_socket.listen()

            while True:
                client_socket, client_address = server_socket.accept()
                self.clients[client_address] = client_socket
                threading.Thread(target=self.handle_client, args=(client_socket, client_address)).start()


# Run the server
game_server = GameServer()
game_server.run_server()


Freue mich über Feedback und Erklärungen, vielen Dank im Voraus!

Re: Verständnisfrage zur interprocess Kommunikation und Effizients

Verfasst: Donnerstag 20. Juli 2023, 15:30
von __deets__
Da ich mal davon ausgehe, dass dein Spiel auf verschiedenen Rechnern laufen soll (also jeder Spieler sein Gerät hat), bellst du hier den komplett falschen Baum an. Das hat mit IPC nichts zu tun. Sondern passiert über Netzwerk. Dessen latenzen stellen alles weit in den Schatten, was du hier an vermeintlichen Gewinnen einfährst.

Ob nun wiederum deine eigentliche Game-Engine, die dir basierend auf den Client Daten einen Zustand berechnet, der an alle Clients verteilt wird, zu langsam ist, kann man nicht allgemein sagen. Da kommt’s dann schon sehr auf das eigentliche Spiel an. Aber wie gesagt: wenn die Netzwerkkommunikation 50-200ms brauchst, musst du nicht jeder Millisekunde aus dem Server quetschen.

Re: Verständnisfrage zur interprocess Kommunikation und Effizients

Verfasst: Donnerstag 20. Juli 2023, 18:10
von Xbash_Zero
@__deets__

Super, danke vielmals für die Antwort. Dann ist die interne Berechnung um Größenordnungen schneller als die Netzwerklatenz, dann spielt es vermutlich keine Rolle, ob threading oder multiprocessing verwendet wird.

Dann noch eine weitere Frage, wenn das nun kein Game-Server wäre, sondern eine Server-Client-Anwendung für sagen wir mal 1000 Clients, als Beispiel. Was wäre, wenn man sich dort um die gleichzeitige Bearbeitung von Daten I/O kümmern müsste.

Es geht mir primär um das Verständnis, wie man so hohe Serverlasten am besten synchronisiert und gleichzeitig handhaben kann? Wäre super, falls mir das jemand mit dem Bezug Performance genauer erklären könnte. Danke!

Re: Verständnisfrage zur interprocess Kommunikation und Effizients

Verfasst: Donnerstag 20. Juli 2023, 18:59
von noisefloor
Hallo,

wenn man viele Socketverbindungen verwalten muss sollte man auch einen Blick auf asyncio werfen. Das ist für sowas gemacht. Oder alternative Bibliotheken wie z.B. Trio. Oder das Urgestein Twisted.

Gruß, noisefloor

Re: Verständnisfrage zur interprocess Kommunikation und Effizients

Verfasst: Donnerstag 20. Juli 2023, 19:40
von Xbash_Zero
@noisefloor

Danke vielmals für den Tipp. Asyncio kannte ich schon, die von Dir erwähnten Bibliotheken aber noch nicht.

Ich hatte gehofft, an asyncio erst noch vorbeizukommen, welche Möglichkeiten gibt es für ein solches Szenario noch? Ich hatte mir überlegt, vielleicht so eine Art Loadbalancer zu bauen, bestehend aus 2 oder mehr Prozessen, an welche dann die Client-Anfragen weitergereicht werden, um von diesen dann bearbeitet zu werden, aber auch für diesen Fall müsste ich mir Gedanken über gemeinsame Ressourcen und evtl. Mutexes machen.

Gibt es da nicht auch exotischere Möglichkeiten, meinetwegen dynamisches Thread spawnen? Hatte gehofft noch tiefere Einblicke in die Sprache zu erhaschen, nichtsdestotrotz schaue ich mir mal die Bibliotheken an und asyncio kann auch nicht schaden, vielleicht finde ich ja noch gefallen daran.

Viele Grüße

Re: Verständnisfrage zur interprocess Kommunikation und Effizients

Verfasst: Donnerstag 20. Juli 2023, 20:59
von sparrow
Darauf gibt es keine klare Antwort, weil die Frage zu unscharf ist. 1000 Clients bei einem MMO können etwas anderes sein als 1000 Clients eines Online ERP können etwas anderes sein als 1000 Clients einer Streamingplattform.
Und selbst da gibt es unterschiedliche Ansätze.

Re: Verständnisfrage zur interprocess Kommunikation und Effizients

Verfasst: Donnerstag 20. Juli 2023, 21:06
von Xbash_Zero
sparrow hat geschrieben: Donnerstag 20. Juli 2023, 20:59 Darauf gibt es keine klare Antwort, weil die Frage zu unscharf ist. 1000 Clients bei einem MMO können etwas anderes sein als 1000 Clients eines Online ERP können etwas anderes sein als 1000 Clients einer Streamingplattform.
Und selbst da gibt es unterschiedliche Ansätze.
Danke! Welche Implementierungsmethoden oder Konzepte gibt es dafür, kannst du mir ein paar Stichpunkte nennen als Recherche-Grundlage?

Ich will mir einfach verschiedene Techniken ansehen, um meinen Horizont zu erweitern.

Re: Verständnisfrage zur interprocess Kommunikation und Effizients

Verfasst: Freitag 21. Juli 2023, 02:05
von Xbash_Zero
Ok, ich glaube, dass ich da etwas mit asynchroner und paralleler Programmierung ein wenig durcheinander gebracht habe, weil Threads nicht direkt asynchron ablaufen, sondern solange bis zB. die Berechnung bzw. der Code-Abschnitt zu Ende ist.

Aber anscheinend lassen sich parallele und asynchrone Programm-Abläufe kombinieren, bzw. die jeweils CPU und I/O bound-lastigen Teile separieren. Also das heißt quasi, ich müsste den Teil des Programms, in welchen Befehle von dem Client oder andersherum gesendet und verarbeitet werden, mittels asyncio implementieren und der Teil, welcher die rechenintensive Operationen durchführt, mittels multiprocessing.

Das könnte in einem Producer und Consumer Konzept so aussehen:

Code: Alles auswählen

import asyncio
import multiprocessing
from multiprocessing import Queue
import time


async def download_data(n):
    await asyncio.sleep(0.1)
    return f'Daten {n}'


def process_data(n, data, queue):
    time.sleep(5) 
    result = f'Verarbeitete {data}'
    queue.put(result)

async def producer(queue):
    for n in range(5):

        data = await download_data(n)

        process = multiprocessing.Process(target=process_data, args=(n, data, queue))
        process.start()

def consumer(queue):
    while True:
        result = queue.get() 
        if result == 'STOP':
            break
        print(result)

if __name__ == "__main__":
    queue = Queue()

    producer_process = multiprocessing.Process(target=asyncio.run, args=(producer(queue),))
    consumer_process = multiprocessing.Process(target=consumer, args=(queue,))
    producer_process.start()
    consumer_process.start()

    producer_process.join()
    queue.put('STOP')
    consumer_process.join()


Was denkt ihr, macht das Sinn für euch? Ich verstehe natürlich, dass es für alles ein gewisses use-case Szenario gibt, allerdings rein zum besseren Verständnis, ist es für mich wichtig den Fall noch mal zu erörtern.

Weil sagen wir, der Berechungsintensive Part, dauert mit einem Prozess länger, als wie wenn dieser auf mehrere Processe aufgeteilt werden würde, dann würde dieser so zusagen den Ablauf des Programms blockieren... Wenn das getrennt von einander abläuft, können dank asyncio nebenher noch Anfragen angenommen werden, oder? Das Problem, was aber dann entsteht, ist die Interprocesskommunictaion, welche zB. mittels Queue umgesetzt werden kann, wäre das dann wiederum ein Flaschenhals, frage ich mich?

Mh, irgendwie ist das wired, ich muss mir noch mal Gedanken dazu machen...

Re: Verständnisfrage zur interprocess Kommunikation und Effizients

Verfasst: Freitag 21. Juli 2023, 06:15
von noisefloor
Hallo,
Aber anscheinend lassen sich parallele und asynchrone Programm-Abläufe kombinieren, bzw. die jeweils CPU und I/O bound-lastigen Teile separieren.
Das ist korrekt. Trivial ist das aber nicht, also vom reinen Verständnis her, weil man halt zwei Arten von Nebenläufigkeit nebeneinander nutzt.

Nächster Vorschlag: Sachen, die länger dauern (weil z.B. CPU-intensiv oder die abzuarbeitende Warteschlange so lang ist) an eine asynchrone Task Queue delegieren statt das selber zu machen. Für Python ist Celery (https://docs.celeryq.dev/en/stable/gett ... ction.html) ziemlich populär, weil ziemlich mächtig. Eine leichtgewichtige Alternative wäre z.B rq (https://github.com/rq/rq). Es gibt aber noch einen ganzen Schwung weiterer Task Queues für Python.

Gruß, noisefloor

Re: Verständnisfrage zur interprocess Kommunikation und Effizients

Verfasst: Freitag 21. Juli 2023, 12:23
von __blackjack__
@Xbash_Zero: Threads laufen asynchron.

Re: Verständnisfrage zur interprocess Kommunikation und Effizients

Verfasst: Sonntag 23. Juli 2023, 20:52
von Xbash_Zero
noisefloor hat geschrieben: Freitag 21. Juli 2023, 06:15 Hallo,
Aber anscheinend lassen sich parallele und asynchrone Programm-Abläufe kombinieren, bzw. die jeweils CPU und I/O bound-lastigen Teile separieren.
Das ist korrekt. Trivial ist das aber nicht, also vom reinen Verständnis her, weil man halt zwei Arten von Nebenläufigkeit nebeneinander nutzt.

Nächster Vorschlag: Sachen, die länger dauern (weil z.B. CPU-intensiv oder die abzuarbeitende Warteschlange so lang ist) an eine asynchrone Task Queue delegieren statt das selber zu machen. Für Python ist Celery (https://docs.celeryq.dev/en/stable/gett ... ction.html) ziemlich populär, weil ziemlich mächtig. Eine leichtgewichtige Alternative wäre z.B rq (https://github.com/rq/rq). Es gibt aber noch einen ganzen Schwung weiterer Task Queues für Python.

Gruß, noisefloor
Thx für die Tipps, ich habe das erst später und mit einer Recherche kapiert, warum man nicht multiprocessing.Queues() mit asyncio kombinieren kann.


Ich habe mir etrwas überlegt, wie das vielleicht aussehen könnte, dabei nutze ich zwei porzesse und lasse einmal den async websocket und die async message_handling parallel laufen. Das könnte evtl. tatsächlich zu einer Performance Steigerung führen. Die Kommunikation läuft über eine Redis Queue, das schöne an dieser Queue ist, dass diese bidirektional ist, aber man braucht einen externen Broker bzw. Server dafür, auch wenn der auf localhost laufen kann, je nach dem wie man es braucht.

Was denkt ihr, passt das so oder gibt es Kritikpunkte/Verbesserungsvorschläge?

Code: Alles auswählen

import asyncio
import multiprocessing
import websockets
import redis

import time, datetime, json, os, sys, uuid, pathlib, collections, logging

logging.root.setLevel(0)

redis_conn = redis.Redis(host='localhost', port=6379, db=0)

class WebSocketServer:
    def __init__(self, host, port, incoming_queue_name, outgoing_queue_name):
        self.host = host
        self.port = port
        self.incoming_queue_name = incoming_queue_name
        self.outgoing_queue_name = outgoing_queue_name
        self.active_connections = set()

    async def receive(self, websocket):
        async for message in websocket:
            start_time = datetime.datetime.now()
            redis_conn.rpush(self.incoming_queue_name, message)
            end_time = datetime.datetime.now()
            duration = (end_time - start_time).total_seconds()
            logging.info(f"Message received at {start_time}, processed in {duration} seconds")

    async def broadcast(self):
        while True:
            if redis_conn.llen(self.outgoing_queue_name) > 0:
                message = redis_conn.lpop(self.outgoing_queue_name).decode('utf-8')
                for websocket in self.active_connections:
                    if not websocket.closed:
                        await websocket.send(message)
            await asyncio.sleep(0.1)  # Avoid busy waiting

    async def server(self, websocket, path):
        # Add the new websocket connection to the set of active connections
        self.active_connections.add(websocket)
        try:
            receiver_task = asyncio.create_task(self.receive(websocket))
            broadcaster_task = asyncio.create_task(self.broadcast())  # This will send messages to all connected clients
            await asyncio.gather(receiver_task, broadcaster_task)
        finally:
            # Remove the websocket connection from the set of active connections
            self.active_connections.remove(websocket)


    async def start_server(self):
        async with websockets.serve(self.server, self.host, self.port, ssl=None):
            await asyncio.Future()  # This will keep the server running indefinitely.

    def run(self):
        asyncio.run(self.start_server())


class Message_Handler:
    def __init__(self, i, incoming_queue_name, outgoing_queue_name):
        self.i = i
        self.incoming_queue_name = incoming_queue_name
        self.outgoing_queue_name = outgoing_queue_name

    async def process_incoming_message(self):
        while True:
            message = redis_conn.lpop(self.incoming_queue_name)
            if message is not None:
                message = message.decode('utf-8')
                logging.info(f"Processed message: {message}")
                redis_conn.lpush(self.outgoing_queue_name, message.encode('utf-8'))
            else:
                await asyncio.sleep(self.i)

    def run(self):
        asyncio.run(self.process_incoming_message())


def main():
    incoming_queue_name = "incoming_messages"
    outgoing_queue_name = "outgoing_messages"

    ws_url = 'ws://example.com:1234'
    host = '0.0.0.0'  # Listen on all network interfaces
    port = 8765  # Replace this with your desired port number

    ws_server = WebSocketServer(host, port, incoming_queue_name, outgoing_queue_name)
    message_handler = Message_Handler(0.1, incoming_queue_name, outgoing_queue_name)

    process1 = multiprocessing.Process(target=ws_server.run)
    process2 = multiprocessing.Process(target=message_handler.run)

    process1.start()
    process2.start()

    process1.join()
    process2.join()


if __name__ == "__main__":
    main()

__blackjack__ hat geschrieben: Freitag 21. Juli 2023, 12:23 @Xbash_Zero: Threads laufen asynchron.
Du hast recht, ich habe mich falsch ausgedrückt, ich meinte innerhalb eines Threads läuft der Code synchron. (normalerweise, wenn man nicht gerade innerhalb des Threads asyncio benutzt)