ZMQ-Broker mit plain Autentifikation
Verfasst: Donnerstag 8. Juni 2017, 17:03
Guten Abend,
seit heute Morgen 7:30 versuche ich bei einem ZMQ-Broker die Authentifizierung per Passwort und Name zum laufen zu bekommen. Ohne Authentifizierung funktioniert der Test-Broker wie gewünscht. Füge ich jetzt die Authentifizierung hinzu bleibt das ganze stehen. Genauer gesagt die plain Authentifizierung wird eingerichtet und gestartet, die Clients aber, können sich nicht verbinden. Deshalb hoffe ich hier Hilfe zu finden.
Hier erstmal die Ziele:
1. Die Clients (und nur die) sollen sich mit Benutzername und Passwort anmelden können
2. Danach schicken die Clients Aufträge an den Broker
3. Der Broker routet die Aufträge zu den Workern
4. Diese bearbeiten die Aufträge und schicken das ganze über den Broker zum Client.
Hier ein grober Umriss:
Broker: 2 Contexte; 1 für die Client-Seite (ctx_fe) und ein für die Worker-Seite (ctx_be)
Broker: 2 Router-Sockets; 1 für die Client-Seite (localfe) und ein für die Worker-Seite (localbe)
Worker: 1 Context für alle Threads. Je ein REQ-Socket pro Thread
Client: 1 Context für alle Threads. Je ein REQ-Socket pro Thread
Hier der Source-Code:
Start Broker
Ende Broker
Start Worker
Ende Worker
Start Client
Ende Client
Vielleicht findet ja hier den "Käfer" der mich den ganzen Tag beschäftigt und genervt hat.
seit heute Morgen 7:30 versuche ich bei einem ZMQ-Broker die Authentifizierung per Passwort und Name zum laufen zu bekommen. Ohne Authentifizierung funktioniert der Test-Broker wie gewünscht. Füge ich jetzt die Authentifizierung hinzu bleibt das ganze stehen. Genauer gesagt die plain Authentifizierung wird eingerichtet und gestartet, die Clients aber, können sich nicht verbinden. Deshalb hoffe ich hier Hilfe zu finden.
Hier erstmal die Ziele:
1. Die Clients (und nur die) sollen sich mit Benutzername und Passwort anmelden können
2. Danach schicken die Clients Aufträge an den Broker
3. Der Broker routet die Aufträge zu den Workern
4. Diese bearbeiten die Aufträge und schicken das ganze über den Broker zum Client.
Hier ein grober Umriss:
Broker: 2 Contexte; 1 für die Client-Seite (ctx_fe) und ein für die Worker-Seite (ctx_be)
Broker: 2 Router-Sockets; 1 für die Client-Seite (localfe) und ein für die Worker-Seite (localbe)
Worker: 1 Context für alle Threads. Je ein REQ-Socket pro Thread
Client: 1 Context für alle Threads. Je ein REQ-Socket pro Thread
Hier der Source-Code:
Start Broker
Code: Alles auswählen
import threading, zmq
import logging
import zmq.auth
from zmq.auth.thread import ThreadAuthenticator
USER = [
('Marie', 'Marie_pwd'), ('Sophie', 'Sophie_pwd'),
('Sophia', 'Sophia_pwd'), ('Maria', 'Maria_pwd'),
('Emma', 'Emma_pwd'), ('Elias', 'Elias_pwd'),
('Alexander', 'Alexander_pwd'), ('Maximilian', 'Maximilian_pwd'),
('Paul', 'Paul_pwd'), ('Leon', 'Leon_pwd')
]
def get_logger():
logger = logging.getLogger('broker')
hdlr = logging.FileHandler('broker.log')
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
hdlr.setFormatter(formatter)
logger.addHandler(hdlr)
logger.setLevel(logging.DEBUG)
return logger
def main():
print("Ich: starte den Broker")
# Vorbereiten unseres context und sockets.
ctx_fe = zmq.Context() # Context für das Frontend
ctx_be = zmq.Context() # Context für das Backend
# Hier wird das ZAP Protokoll gestartet
auth = ThreadAuthenticator(ctx_fe, log=get_logger())
auth.start()
auth.allow('127.0.0.1')
auth.configure_plain(domain='*', passwords=dict(USER))
# Vorbereiten des lokalen frontends und backends.
localfe = ctx_fe.socket(zmq.ROUTER)
localfe.bind("tcp://*:5555")
localbe = ctx_be.socket(zmq.ROUTER)
localbe.bind("tcp://*:5556")
# Der interessante Teil.
# -------------------------------------------------------------
# Request-Reply-Fluss
# - Abfragen des Backends und verarbeite local Antworten.
# - Solange Worker verfügbar, route localfe nach local
workers = []
# Poller Setup
pollerbe = zmq.Poller()
pollerbe.register(localbe, zmq.POLLIN)
pollerfe = zmq.Poller()
pollerfe.register(localfe, zmq.POLLIN)
while True:
# Wenn wir keine Worker haben, dann warten wir für eine
# unbestimmte Zeit.
try:
events = dict(pollerbe.poll(1000 if workers else None))
except zmq.ZMQError:
break # Unterbrechnung
# Behandle Antworten vom lokalen Worker.
msg = None
if localbe in events:
msg = localbe.recv_multipart()
(address, empty), msg = msg[:2], msg[2:]
workers.append(address)
# Wenn die Meldung READY ist, dann route die Nachricht
# nicht weiter.
if msg[-1] == b'READY':
msg = None
if msg is not None:
address = msg[0]
# Route Antwort zum client.
localfe.send_multipart(msg)
# Jetzt routen wir so viele Client-Anfragen wie wir behandeln können.
while workers:
events = dict(pollerfe.poll(0))
print("Broker pollerfe", events)
if localfe in events:
msg = localfe.recv_multipart()
reroutable = True
else:
break # Keine Arbeit, dann zurück zu den backends.
msg = [workers.pop(0), b''] + msg
localbe.send_multipart(msg)
if __name__ == '__main__':
main()
Start Worker
Code: Alles auswählen
import threading, zmq
def worker_task(ctx, name, i):
"""Der Worker benutzt ein REQ-Socket für das LRU Routing."""
worker = ctx.socket(zmq.REQ)
tmp = "Worker-%s-%s" % (name, i)
worker.identity = tmp.encode()
worker.connect("tcp://127.0.0.1:5556")
# Sag den Broker wir sind bereit für Arbeit.
worker.send(b"READY")
# Die Nachrichten so verarbeiten wie sie ankommen.
while True:
try:
msg = worker.recv_multipart()
except zmq.ZMQError:
# Unterbrechnung
return
print("Worker-%s: %s\n" % (i, [i.decode() for i in msg]), end=' ')
msg[-1] = b"OK"
worker.send_multipart(msg)
def main():
# Erzeuge Worker- und Client-Threads.
ctx = zmq.Context()
threads = []
for i in range(5):
threads.append(threading.Thread(target=worker_task, args=(ctx, "worker", i)))
threads[i].daemon = True
threads[i].start()
threads[i].join() # auf den letzen warten
main()
Start Client
Code: Alles auswählen
import threading, zmq
USER = [
('Marie', 'Marie_pwd'), ('Sophie', 'Sophie_pwd'),
('Sophia', 'Sophia_pwd'), ('Maria', 'Maria_pwd'),
('Emma', 'Emma_pwd'), ('Elias', 'Elias_pwd'),
('Alexander', 'Alexander_pwd'), ('Maximilian', 'Maximilian_pwd'),
('Paul', 'Paul_pwd'), ('Leon', 'Leon_pwd')
]
def client_task(ctx, name, pwd):
"""Der Request-Reply-Client benutzt ein REQ-Socket."""
client = ctx.socket(zmq.REQ)
tmp = "Client-%s" % (name, )
client.identity = tmp.encode()
client.plain_username = name.encode("utf8")
client.plain_password = pwd.encode("utf8")
client.connect("tcp://127.0.0.1:5555")
while True:
client.send(b"Hallo")
try:
reply = client.recv()
except zmq.ZMQError:
# Unterbrechnung
return
print("Client-%s: %s\n" % (name, reply.decode()), end=' ')
time.sleep(1)
def main():
ctx = zmq.Context()
threads = []
for i, v in enumerate(USER):
threads.append(threading.Thread(target=client_task, args=(ctx, v[0], v[1])))
threads[i].daemon = True
threads[i].start()
threads[i].join() # auf den letzen Warten
main()
Vielleicht findet ja hier den "Käfer" der mich den ganzen Tag beschäftigt und genervt hat.