@jens Ich weiss nicht, was Du hast. Nocht nicht kapiert, dass man keinen zusätzlichen Parameter für einen generierten Event braucht, wenn man eine Queue nimmt.
Jetzt kommt es aber dicke. Ich habe das Messaging System nun implementiert, welches erlaubt, dass beliebige viele Tasks miteinander Nachrichten austauschen.
Und sie brauchen sich auch nicht gegenseitig zu kennen, weil der Nachrichtenaustausch über einen Proxy geht, der allen bekannt ist.
Hier ein Bespiel mit drei Tasks, die über einen vierten miteinander kommunizieren.
Erster Task, der Fragesteller frage.py. Der Benutzer tippt etwas ein in die Konsole, etwa eine Frage. Das schickt diese Task an den extern_proxy weiter.
frage.py:
Code: Alles auswählen
import threading
import extern_proxy
class MyThread(threading.Thread):
def run(self):
while True:
frage = input("> ")
extern_proxy.proxy.send("FRAGE",frage)
mythread = MyThread()
mythread.daemon = True
mythread.start()
Der Letze in der Reihe bekommt die frage und die Antwort und gibt sie in der Konsole aus.
frage_antwort.py:
Code: Alles auswählen
import threading
import extern_proxy
import proxy
class MyThread(threading.Thread):
def run(self):
self.proxy = proxy.Proxy(extern_proxy.proxy)
self.proxy.do_receive(self,"FRAGE_ANTWORT",lambda msg: print("Frage:",msg[0],"- Antwort:",msg[1]))
self.proxy.do_receive_extern(("FRAGE_ANTWORT",))
self.proxy.loop()
mythread = MyThread()
mythread.daemon = True
mythread.start()
Um Nachrichten auszutauschen, wird ein Proxy eingerichtet. Damit Nachrichtentausch auch mit dem externen Proxy möglich ist, wird dieser dem eigenen Proxy bekanntgegeben:
Dann wird eine Callback Funktion für die Message "FRAGE_ANTWORT" eingerichtet, welche dann das Ergebnis ausgibt:
Code: Alles auswählen
self.proxy.do_receive(self,"FRAGE_ANTWORT",lambda msg: print("Frage:",msg[0],"- Antwort:",msg[1]))
Woher soll diese Message kommen? Von hier nicht, sondern vom externen Proxya. Dem gibt man eine Liste bekannt. Aber hier nur ein Element:
Irgendwer muss ja die Frage beantworten. Das macht dann eine tkinter GUI Anwendung.
antwort.py:
Code: Alles auswählen
import frage
import frage_antwort
import extern_proxy
import proxy as myproxy
import tkinter as tk
proxy = myproxy.Proxy(extern_proxy.proxy)
# external message callbacks
proxy.do_receive_extern(("FRAGE",))
proxy.do_send_extern(("FRAGE_ANTWORT",))
root = tk.Tk()
frage = tk.Label()
frage.pack()
antwort = tk.Entry()
antwort.pack()
def get_frage(msg):
frage['text'] = msg
antwort.delete(0,'end')
# internal callbacks
proxy.do_receive(root,"FRAGE",get_frage)
antwort.bind("<Return>",lambda event: proxy.send('FRAGE_ANTWORT',(frage['text'],antwort.get())))
# tk trigger for proxy
root.bind("<<SEND>>",proxy.work)
proxy.set_trigger(lambda: root.event_generate("<<SEND>>", when="tail"))
root.mainloop()
Hier auch wieder der Proxy, der den externen kennt. Dem gibt man bekannt, dass man "FRAGE" von extern empfangen will und FRAGE_ANTWORT nach außen weiterleiten will:
Code: Alles auswählen
proxy.do_receive_extern(("FRAGE",))
proxy.do_send_extern(("FRAGE_ANTWORT",))
Die Callbackfunktionen sind ganz normal wie bei internem Senden und Empfangen (es wird nur weitergeletet durch die Definitionen vorher):
Code: Alles auswählen
proxy.do_receive(root,"FRAGE",get_frage)
antwort.bind("<Return>",lambda event: proxy.send('FRAGE_ANTWORT',(frage['text'],antwort.get())))
In die normale threading Event Loop darf man den Proxy nicht schicken, daher setzt man die tk GUI Trigger für ihn:
Code: Alles auswählen
# tk trigger fror proxy
root.bind("<<SEND>>",proxy.work)
proxy.set_trigger(lambda: root.event_generate("<<SEND>>", when="tail"))
Und diese Kommunikation erfolgt über eine vierte Task, welche die Verbindung zwischen den anderen Tasks herstellt. Da seid Ihr sicher gespannt, wie dieses Wunderding aussieht, oder?
extern_proxy.py:
Code: Alles auswählen
import threading
import proxy as myproxy
proxy = None
class MyThread(threading.Thread):
def run(self):
global proxy
self.proxy = myproxy.Proxy()
proxy=self.proxy
self.proxy.loop()
mythread = MyThread()
mythread.daemon = True
mythread.start()
Es ist einfach nur ein Thread mit einem Proxy in der Loop. Die anderen Threads geben diesem Proxy einfach nur bekant, welche Nachrichten sie haben wollen. Und wenn Nachrichten zu diesm Proxy geschickt werden, sendet er sie an die Empfängerproxys weiter.
Die Logik findet sich im Proxy.
proxy.py:
Code: Alles auswählen
import queue
import threading
class Proxy:
def __init__(self,extern_proxy=None):
if extern_proxy == None: self.extern_proxy = self
else: self.extern_proxy = extern_proxy
self.reset()
def noop(self): pass
def reset(self):
self.Dictionary = {}
self.owners = {}
self.Queue = queue.Queue()
self.Queue_HighPrio = queue.Queue()
self._register("execute_function",lambda msg: msg())
self.running = False
self.trigger = self.do_work
self.extern_trigger = self.noop
def set_trigger(self,trigger):
self.trigger = trigger
self.extern_trigger = trigger
def do_work(self):
if self.running: return
self.running = True
while self.work(): pass
self.running = False
def loop(self):
self.event = threading.Event()
self.set_trigger(self.event.set)
self.trigger()
while True:
self.event.wait()
while self.work(): pass
def work(self,*args):
if not self.Queue_HighPrio.empty(): data = self.Queue_HighPrio.get()
elif not self.Queue.empty(): data = self.Queue.get()
else: return False
msgid = data[0]
msgdata = data[1]
if msgid in self.Dictionary:
receivers = self.Dictionary[msgid].items()
for receive,packed in receivers:
if packed: receive((msgid,msgdata))
else: receive(msgdata)
return True
# sending ==========================================================
def send(self,msgid,msgdata=None):
self.Queue.put((msgid,msgdata))
self.trigger()
# extern send and receive callbacks ==========================================
def do_send_extern(self,message_ids):
for mid in message_ids: self.do_receive(self,mid,self.send_extern,True)
def send_extern(self,message): self.extern_proxy.send(message[0],message[1])
def do_receive_extern(self,message_ids):
for mid in message_ids: self.extern_proxy.do_receive(self,mid,self.receive_extern,True)
def receive_extern(self,message):
self.Queue.put(message)
self.extern_trigger()
# register receiver ================================================
def do_receive(self,owner,msgid,receive,packed=False):
self.Queue_HighPrio.put(("execute_function",lambda: self._do_receive(owner,msgid,receive,packed)))
self.trigger()
def _do_receive(self,owner,msgid,receive,packed):
if not owner in self.owners: self.owners[owner] = {}
self.owners[owner][receive]=msgid
self._register(msgid,receive,packed)
def _register(self,msgid,receive,packed=False):
if msgid not in self.Dictionary: self.Dictionary[msgid] = {}
self.Dictionary[msgid][receive] = packed
# unregister receiver ================================================
def undo_receive(self,owner,msgid,receive):
self.Queue_HighPrio.put(("execute_function",lambda: self._undo_receive(owner,msgid,receive)))
self.trigger()
def _undo_receive(self,owner,msgid,receive):
if owner in self.owners:
if receive in self.owners[owner]: del self.owners[owner][receive]
self._unregister1(msgid,receive)
def _unregister1(self,msgid,receive):
if msgid in self.Dictionary:
receivers = self.Dictionary[msgid]
if receive in receivers:
del receivers[receive]
if len(receivers) == 0: del self.Dictionary[msgid]
# unregister Owner ================================================
def undo_receiveAll(self,owner):
self.Queue_HighPrio.put(("execute_function",lambda: self._undo_receiveAll(owner)))
self.trigger()
def _undo_receiveAll(self,owner):
if owner in self.owners:
messages = self.owners[owner]
del self.owners[owner]
for receive,msgid in messages.items(): self._unregister1(msgid,receive)
Konnte keine Ruhe finden. Musste es einfach machen.