ZMQ-Broker mit plain Autentifikation

Sockets, TCP/IP, (XML-)RPC und ähnliche Themen gehören in dieses Forum
albertus
User
Beiträge: 52
Registriert: Mittwoch 7. Juli 2010, 14:48

ZMQ-Broker mit plain Autentifikation

Beitragvon albertus » Donnerstag 8. Juni 2017, 17:03

Guten Abend,

seit heute Morgen 7:30 versuche ich bei einem ZMQ-Broker die Authentifizierung per Passwort und Name zum laufen zu bekommen. Ohne Authentifizierung funktioniert der Test-Broker wie gewünscht. Füge ich jetzt die Authentifizierung hinzu bleibt das ganze stehen. Genauer gesagt die plain Authentifizierung wird eingerichtet und gestartet, die Clients aber, können sich nicht verbinden. Deshalb hoffe ich hier Hilfe zu finden.

Hier erstmal die Ziele:
1. Die Clients (und nur die) sollen sich mit Benutzername und Passwort anmelden können
2. Danach schicken die Clients Aufträge an den Broker
3. Der Broker routet die Aufträge zu den Workern
4. Diese bearbeiten die Aufträge und schicken das ganze über den Broker zum Client.

Hier ein grober Umriss:

Broker: 2 Contexte; 1 für die Client-Seite (ctx_fe) und ein für die Worker-Seite (ctx_be)
Broker: 2 Router-Sockets; 1 für die Client-Seite (localfe) und ein für die Worker-Seite (localbe)
Worker: 1 Context für alle Threads. Je ein REQ-Socket pro Thread
Client: 1 Context für alle Threads. Je ein REQ-Socket pro Thread

Hier der Source-Code:

Start Broker
  1. import threading, zmq
  2. import logging
  3. import zmq.auth
  4. from zmq.auth.thread import ThreadAuthenticator
  5.  
  6. USER = [
  7.         ('Marie', 'Marie_pwd'), ('Sophie', 'Sophie_pwd'),
  8.         ('Sophia', 'Sophia_pwd'), ('Maria', 'Maria_pwd'),
  9.         ('Emma', 'Emma_pwd'), ('Elias', 'Elias_pwd'),
  10.         ('Alexander', 'Alexander_pwd'), ('Maximilian', 'Maximilian_pwd'),
  11.         ('Paul', 'Paul_pwd'), ('Leon', 'Leon_pwd')
  12.        ]
  13.  
  14. def get_logger():
  15.     logger = logging.getLogger('broker')
  16.     hdlr = logging.FileHandler('broker.log')
  17.     formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
  18.     hdlr.setFormatter(formatter)
  19.     logger.addHandler(hdlr)
  20.     logger.setLevel(logging.DEBUG)
  21.     return logger
  22.  
  23. def main():
  24.     print("Ich: starte den Broker")
  25.  
  26.     # Vorbereiten unseres context und sockets.
  27.     ctx_fe = zmq.Context()  # Context für das Frontend
  28.     ctx_be = zmq.Context()  # Context für das Backend
  29.  
  30.     # Hier wird das ZAP Protokoll gestartet
  31.     auth = ThreadAuthenticator(ctx_fe, log=get_logger())
  32.     auth.start()
  33.     auth.allow('127.0.0.1')
  34.     auth.configure_plain(domain='*', passwords=dict(USER))
  35.  
  36.     # Vorbereiten des lokalen frontends und backends.
  37.     localfe = ctx_fe.socket(zmq.ROUTER)
  38.     localfe.bind("tcp://*:5555")
  39.     localbe = ctx_be.socket(zmq.ROUTER)
  40.     localbe.bind("tcp://*:5556")
  41.  
  42.     # Der interessante Teil.
  43.     # -------------------------------------------------------------
  44.     # Request-Reply-Fluss
  45.     # - Abfragen des Backends und verarbeite local Antworten.
  46.     # - Solange Worker verfügbar, route localfe nach local
  47.  
  48.     workers = []
  49.  
  50.     # Poller Setup
  51.     pollerbe = zmq.Poller()
  52.     pollerbe.register(localbe, zmq.POLLIN)
  53.  
  54.     pollerfe = zmq.Poller()
  55.     pollerfe.register(localfe, zmq.POLLIN)
  56.  
  57.     while True:
  58.         # Wenn wir keine Worker haben, dann warten wir für eine
  59.         # unbestimmte Zeit.
  60.         try:
  61.             events = dict(pollerbe.poll(1000 if workers else None))
  62.         except zmq.ZMQError:
  63.             break  # Unterbrechnung
  64.  
  65.         # Behandle Antworten vom lokalen Worker.
  66.         msg = None
  67.         if localbe in events:
  68.             msg = localbe.recv_multipart()
  69.             (address, empty), msg = msg[:2], msg[2:]
  70.             workers.append(address)
  71.  
  72.             # Wenn die Meldung READY ist, dann route die Nachricht
  73.             # nicht weiter.
  74.             if msg[-1] == b'READY':
  75.                 msg = None
  76.  
  77.         if msg is not None:
  78.             address = msg[0]
  79.             # Route Antwort zum client.
  80.             localfe.send_multipart(msg)
  81.  
  82.         # Jetzt routen wir so viele Client-Anfragen wie wir behandeln können.
  83.         while workers:
  84.             events = dict(pollerfe.poll(0))
  85.             print("Broker pollerfe", events)
  86.             if localfe in events:
  87.                 msg = localfe.recv_multipart()
  88.                 reroutable = True
  89.             else:
  90.                 break  # Keine Arbeit, dann zurück zu den backends.
  91.             msg = [workers.pop(0), b''] + msg
  92.             localbe.send_multipart(msg)
  93.  
  94. if __name__ == '__main__':
  95.     main()

Ende Broker
Start Worker
  1. import threading, zmq
  2.  
  3.  
  4. def worker_task(ctx, name, i):
  5.     """Der Worker benutzt ein REQ-Socket für das LRU Routing."""
  6.     worker = ctx.socket(zmq.REQ)
  7.     tmp = "Worker-%s-%s" % (name, i)
  8.     worker.identity = tmp.encode()
  9.     worker.connect("tcp://127.0.0.1:5556")
  10.  
  11.     # Sag den Broker wir sind bereit für Arbeit.
  12.     worker.send(b"READY")
  13.  
  14.     # Die Nachrichten so verarbeiten wie sie ankommen.
  15.     while True:
  16.         try:
  17.             msg = worker.recv_multipart()
  18.         except zmq.ZMQError:
  19.             # Unterbrechnung
  20.             return
  21.         print("Worker-%s: %s\n" % (i, [i.decode() for i in msg]), end=' ')
  22.         msg[-1] = b"OK"
  23.         worker.send_multipart(msg)
  24.  
  25. def main():
  26.     # Erzeuge Worker- und Client-Threads.
  27.     ctx = zmq.Context()
  28.     threads = []
  29.     for i in range(5):
  30.         threads.append(threading.Thread(target=worker_task, args=(ctx, "worker", i)))
  31.         threads[i].daemon = True
  32.         threads[i].start()
  33.     threads[i].join() # auf den letzen warten
  34.  
  35. main()

Ende Worker
Start Client
  1. import threading, zmq
  2.  
  3. USER = [
  4.         ('Marie', 'Marie_pwd'), ('Sophie', 'Sophie_pwd'),
  5.         ('Sophia', 'Sophia_pwd'), ('Maria', 'Maria_pwd'),
  6.         ('Emma', 'Emma_pwd'), ('Elias', 'Elias_pwd'),
  7.         ('Alexander', 'Alexander_pwd'), ('Maximilian', 'Maximilian_pwd'),
  8.         ('Paul', 'Paul_pwd'), ('Leon', 'Leon_pwd')
  9.        ]
  10.  
  11. def client_task(ctx, name, pwd):
  12.     """Der Request-Reply-Client benutzt ein REQ-Socket."""
  13.     client = ctx.socket(zmq.REQ)
  14.     tmp = "Client-%s" % (name, )
  15.     client.identity = tmp.encode()
  16.     client.plain_username = name.encode("utf8")
  17.     client.plain_password = pwd.encode("utf8")
  18.     client.connect("tcp://127.0.0.1:5555")
  19.     while True:
  20.         client.send(b"Hallo")
  21.         try:
  22.             reply = client.recv()
  23.         except zmq.ZMQError:
  24.             # Unterbrechnung
  25.             return
  26.         print("Client-%s: %s\n" % (name, reply.decode()), end=' ')
  27.         time.sleep(1)
  28.  
  29. def main():
  30.     ctx = zmq.Context()
  31.     threads = []
  32.     for i, v in enumerate(USER):
  33.         threads.append(threading.Thread(target=client_task, args=(ctx, v[0], v[1])))
  34.         threads[i].daemon = True
  35.         threads[i].start()
  36.     threads[i].join() # auf den letzen Warten
  37.  
  38. main()

Ende Client

Vielleicht findet ja hier den "Käfer" der mich den ganzen Tag beschäftigt und genervt hat.
Zuletzt geändert von Anonymous am Donnerstag 8. Juni 2017, 17:21, insgesamt 1-mal geändert.
Grund: Quelltext in Python-Codebox-Tags gesetzt.
Mit freundlichen Grüßen

Albertus
BlackJack

Re: ZMQ-Broker mit plain Autentifikation

Beitragvon BlackJack » Donnerstag 8. Juni 2017, 17:57

@albertus: Nicht zum eigentlichen Problem aber: Ich denke es ist ein Programmfehler nur auf den zuletzt gestarteten Thread zu warten. Das muss ja nicht derjenige sein, der zuletzt beendet wird, das heisst es gibt dann Threads die bis zum Ende laufen durften, und solche die hart durch das Programmende beendet werden.
albertus
User
Beiträge: 52
Registriert: Mittwoch 7. Juli 2010, 14:48

Re: ZMQ-Broker mit plain Autentifikation

Beitragvon albertus » Donnerstag 8. Juni 2017, 18:37

BlackJack hat geschrieben:@albertus: Nicht zum eigentlichen Problem aber: Ich denke es ist ein Programmfehler nur auf den zuletzt gestarteten Thread zu warten. Das muss ja nicht derjenige sein, der zuletzt beendet wird, das heisst es gibt dann Threads die bis zum Ende laufen durften, und solche die hart durch das Programmende beendet werden.

Ja da hast du recht :D . Es war mir auch bewusst das man das normalerweise so nicht macht, aber fürs Testen reichte es, ansonsten hätte ich es schon geändert :D
Mit freundlichen Grüßen

Albertus

Wer ist online?

Mitglieder in diesem Forum: 0 Mitglieder