ZMQ-Broker mit plain Autentifikation

Sockets, TCP/IP, (XML-)RPC und ähnliche Themen gehören in dieses Forum
Antworten
albertus
User
Beiträge: 52
Registriert: Mittwoch 7. Juli 2010, 14:48

Guten Abend,

seit heute Morgen 7:30 versuche ich bei einem ZMQ-Broker die Authentifizierung per Passwort und Name zum laufen zu bekommen. Ohne Authentifizierung funktioniert der Test-Broker wie gewünscht. Füge ich jetzt die Authentifizierung hinzu bleibt das ganze stehen. Genauer gesagt die plain Authentifizierung wird eingerichtet und gestartet, die Clients aber, können sich nicht verbinden. Deshalb hoffe ich hier Hilfe zu finden.

Hier erstmal die Ziele:
1. Die Clients (und nur die) sollen sich mit Benutzername und Passwort anmelden können
2. Danach schicken die Clients Aufträge an den Broker
3. Der Broker routet die Aufträge zu den Workern
4. Diese bearbeiten die Aufträge und schicken das ganze über den Broker zum Client.

Hier ein grober Umriss:

Broker: 2 Contexte; 1 für die Client-Seite (ctx_fe) und ein für die Worker-Seite (ctx_be)
Broker: 2 Router-Sockets; 1 für die Client-Seite (localfe) und ein für die Worker-Seite (localbe)
Worker: 1 Context für alle Threads. Je ein REQ-Socket pro Thread
Client: 1 Context für alle Threads. Je ein REQ-Socket pro Thread

Hier der Source-Code:

Start Broker

Code: Alles auswählen

import threading, zmq
import logging
import zmq.auth
from zmq.auth.thread import ThreadAuthenticator

USER = [
        ('Marie', 'Marie_pwd'), ('Sophie', 'Sophie_pwd'),
        ('Sophia', 'Sophia_pwd'), ('Maria', 'Maria_pwd'),
        ('Emma', 'Emma_pwd'), ('Elias', 'Elias_pwd'),
        ('Alexander', 'Alexander_pwd'), ('Maximilian', 'Maximilian_pwd'),
        ('Paul', 'Paul_pwd'), ('Leon', 'Leon_pwd')
       ]

def get_logger():
    logger = logging.getLogger('broker')
    hdlr = logging.FileHandler('broker.log')
    formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
    hdlr.setFormatter(formatter)
    logger.addHandler(hdlr) 
    logger.setLevel(logging.DEBUG)
    return logger

def main():
    print("Ich: starte den Broker")

    # Vorbereiten unseres context und sockets.
    ctx_fe = zmq.Context()  # Context für das Frontend
    ctx_be = zmq.Context()  # Context für das Backend

    # Hier wird das ZAP Protokoll gestartet
    auth = ThreadAuthenticator(ctx_fe, log=get_logger())
    auth.start()
    auth.allow('127.0.0.1')
    auth.configure_plain(domain='*', passwords=dict(USER))

    # Vorbereiten des lokalen frontends und backends.
    localfe = ctx_fe.socket(zmq.ROUTER)
    localfe.bind("tcp://*:5555")
    localbe = ctx_be.socket(zmq.ROUTER)
    localbe.bind("tcp://*:5556")

    # Der interessante Teil.
    # -------------------------------------------------------------
    # Request-Reply-Fluss
    # - Abfragen des Backends und verarbeite local Antworten.
    # - Solange Worker verfügbar, route localfe nach local

    workers = []

    # Poller Setup
    pollerbe = zmq.Poller()
    pollerbe.register(localbe, zmq.POLLIN)

    pollerfe = zmq.Poller()
    pollerfe.register(localfe, zmq.POLLIN)

    while True:
        # Wenn wir keine Worker haben, dann warten wir für eine
        # unbestimmte Zeit.
        try:
            events = dict(pollerbe.poll(1000 if workers else None))
        except zmq.ZMQError:
            break  # Unterbrechnung

        # Behandle Antworten vom lokalen Worker.
        msg = None
        if localbe in events:
            msg = localbe.recv_multipart()
            (address, empty), msg = msg[:2], msg[2:]
            workers.append(address)

            # Wenn die Meldung READY ist, dann route die Nachricht
            # nicht weiter.
            if msg[-1] == b'READY':
                msg = None

        if msg is not None:
            address = msg[0]
            # Route Antwort zum client.
            localfe.send_multipart(msg)

        # Jetzt routen wir so viele Client-Anfragen wie wir behandeln können.
        while workers:
            events = dict(pollerfe.poll(0))
            print("Broker pollerfe", events)
            if localfe in events:
                msg = localfe.recv_multipart()
                reroutable = True
            else:
                break  # Keine Arbeit, dann zurück zu den backends.
            msg = [workers.pop(0), b''] + msg
            localbe.send_multipart(msg)

if __name__ == '__main__':
    main()
Ende Broker
Start Worker

Code: Alles auswählen

import threading, zmq


def worker_task(ctx, name, i):
    """Der Worker benutzt ein REQ-Socket für das LRU Routing."""
    worker = ctx.socket(zmq.REQ)
    tmp = "Worker-%s-%s" % (name, i)
    worker.identity = tmp.encode()
    worker.connect("tcp://127.0.0.1:5556")

    # Sag den Broker wir sind bereit für Arbeit.
    worker.send(b"READY")

    # Die Nachrichten so verarbeiten wie sie ankommen.
    while True:
        try:
            msg = worker.recv_multipart()
        except zmq.ZMQError:
            # Unterbrechnung
            return
        print("Worker-%s: %s\n" % (i, [i.decode() for i in msg]), end=' ')
        msg[-1] = b"OK"
        worker.send_multipart(msg)

def main():
    # Erzeuge Worker- und Client-Threads.
    ctx = zmq.Context()
    threads = []
    for i in range(5):
        threads.append(threading.Thread(target=worker_task, args=(ctx, "worker", i)))
        threads[i].daemon = True
        threads[i].start()
    threads[i].join() # auf den letzen warten

main()
Ende Worker
Start Client

Code: Alles auswählen

import threading, zmq

USER = [
        ('Marie', 'Marie_pwd'), ('Sophie', 'Sophie_pwd'),
        ('Sophia', 'Sophia_pwd'), ('Maria', 'Maria_pwd'),
        ('Emma', 'Emma_pwd'), ('Elias', 'Elias_pwd'),
        ('Alexander', 'Alexander_pwd'), ('Maximilian', 'Maximilian_pwd'),
        ('Paul', 'Paul_pwd'), ('Leon', 'Leon_pwd')
       ]

def client_task(ctx, name, pwd):
    """Der Request-Reply-Client benutzt ein REQ-Socket."""
    client = ctx.socket(zmq.REQ)
    tmp = "Client-%s" % (name, )
    client.identity = tmp.encode()
    client.plain_username = name.encode("utf8")
    client.plain_password = pwd.encode("utf8")
    client.connect("tcp://127.0.0.1:5555")
    while True:
        client.send(b"Hallo")
        try:
            reply = client.recv()
        except zmq.ZMQError:
            # Unterbrechnung
            return
        print("Client-%s: %s\n" % (name, reply.decode()), end=' ')
        time.sleep(1)

def main():
    ctx = zmq.Context()
    threads = []
    for i, v in enumerate(USER):
        threads.append(threading.Thread(target=client_task, args=(ctx, v[0], v[1])))
        threads[i].daemon = True
        threads[i].start()
    threads[i].join() # auf den letzen Warten

main()
Ende Client

Vielleicht findet ja hier den "Käfer" der mich den ganzen Tag beschäftigt und genervt hat.
Zuletzt geändert von Anonymous am Donnerstag 8. Juni 2017, 17:21, insgesamt 1-mal geändert.
Grund: Quelltext in Python-Codebox-Tags gesetzt.
Mit freundlichen Grüßen

Albertus
BlackJack

@albertus: Nicht zum eigentlichen Problem aber: Ich denke es ist ein Programmfehler nur auf den zuletzt gestarteten Thread zu warten. Das muss ja nicht derjenige sein, der zuletzt beendet wird, das heisst es gibt dann Threads die bis zum Ende laufen durften, und solche die hart durch das Programmende beendet werden.
albertus
User
Beiträge: 52
Registriert: Mittwoch 7. Juli 2010, 14:48

BlackJack hat geschrieben:@albertus: Nicht zum eigentlichen Problem aber: Ich denke es ist ein Programmfehler nur auf den zuletzt gestarteten Thread zu warten. Das muss ja nicht derjenige sein, der zuletzt beendet wird, das heisst es gibt dann Threads die bis zum Ende laufen durften, und solche die hart durch das Programmende beendet werden.
Ja da hast du recht :D . Es war mir auch bewusst das man das normalerweise so nicht macht, aber fürs Testen reichte es, ansonsten hätte ich es schon geändert :D
Mit freundlichen Grüßen

Albertus
Antworten