Testclass.test() wird gleichzeitig mit 2 Prozessen aufgerufen. a ist die Prozessnummer. 0 ist der Hauptprozess und 1 ist der Nebenprozess.
Der Hauptprozess packt informationen in die queue und der nebenprozess soll diese erhalten.
Code: Alles auswählen
class Testclass:
def __init__(self,q,a):
self.q = q # ein dict mit vielen queues. bei a==0 ist es {1:queue1,2:queue2,...} , bei allen anderen ist es direkt die dazugeörige queue
self.a = a # instanznummer (zb 0 Hauptprozess , 1 nebenprozess)
if self.a!=0:
self.queuespeicher = {} # alle neuen infos werden von der queue erstmal hierdrin gespeichert
self.queuespeicher["dies"] = 0
def test(self,dummy=None,dummy2=None):
if self.a == 0: # HauptProzess
time.sleep(0.5) # kurz warten bis andre instanzen ihre threads gestartet haben
i = 0
while i<1000:
i += 1
self.q[1].put({"typ":"dies","response":i})
if self.a == 1: # erste instanz
_thread.start_new_thread(self.checkqueue, ()) # ein thread der dauerhaft nebenbei läuft und guckt obs neue infos in der queue gibt
time.sleep(0.55)
print(self.queuespeicher)
time.sleep(0.01)
print(self.queuespeicher)
time.sleep(0.3)
print(self.queuespeicher)
time.sleep(2.3) ## dies hier darf erst nach dem hauptprozess fertig sein, sonst beendet sich skript nicht.. ?!
if self.a!=0:
self.stopcheckqueue() # thread beenden, welcher die queue leert
self.q.close() # die queue selbst beenden
def stopcheckqueue(self):
self.q.put(None) # kill
def checkqueue(self):
while True:
ding = self.q.get()
if ding is None: # stop
return
else:
self.verwerteNeueInfo(ding)
def verwerteNeueInfo(self,dic):
self.queuespeicher[dic["typ"]] = dic["response"]
Erstaunlicherweise bekomme ich (bisher nur windows getestet) nämlich eine Fehlermeldung, wenn ich es über concurrent.futures.ProcessPoolExecutor(2) aufrufe. RuntimeError: Queue objects should only be shared between processes through inheritance
Keinen Fehler bekomme ich bei meiner üblichen "parallel()" Funktion mit Prozessen.
Um diesen Post übersichtlich zu behalten lasse ich den Code dafür erstmal weg. Kann aber gerne nachgereicht werden.
Mit meiner funktionierenden parallel() Aufrufweise, habe ich aber noch das Problem, dass irgendwas verhindert, dass die Funktion/Klasse returned. Soweit ich weiß erstellt die queue einen thread zum übertragen und blockt daher. Deshalb hab ich self.q.close() für die Nebeninstanz eingebaut. Sofern der Nebenprozess nach dem Hauptprozess beendet wird, wird auch korrekt returned (ohne close tut es das nicht). Aber wenn der Nebenprozess vorher beendet wird, dann blockiert irgendwas weiterhin. Hab deswegen self.q[1].close() im hauptprozess dazugeschrieben, aber das bewirkt nichts.