Re: Ein Objekt über mehrere Prozesse hinweg verwenden
Verfasst: Donnerstag 19. Oktober 2017, 20:21
				
				soooo... ich hab mal probiert etwas mit queues zu schreiben. 
Testclass.test() wird gleichzeitig mit 2 Prozessen aufgerufen. a ist die Prozessnummer. 0 ist der Hauptprozess und 1 ist der Nebenprozess.
Der Hauptprozess packt informationen in die queue und der nebenprozess soll diese erhalten.
Am besten diskutieren wir erstmal über diesen Code, was man daran anders/besser machen sollte.
Erstaunlicherweise bekomme ich (bisher nur windows getestet) nämlich eine Fehlermeldung, wenn ich es über concurrent.futures.ProcessPoolExecutor(2) aufrufe. RuntimeError: Queue objects should only be shared between processes through inheritance
Keinen Fehler bekomme ich bei meiner üblichen "parallel()" Funktion mit Prozessen.
Um diesen Post übersichtlich zu behalten lasse ich den Code dafür erstmal weg. Kann aber gerne nachgereicht werden.
Mit meiner funktionierenden parallel() Aufrufweise, habe ich aber noch das Problem, dass irgendwas verhindert, dass die Funktion/Klasse returned. Soweit ich weiß erstellt die queue einen thread zum übertragen und blockt daher. Deshalb hab ich self.q.close() für die Nebeninstanz eingebaut. Sofern der Nebenprozess nach dem Hauptprozess beendet wird, wird auch korrekt returned (ohne close tut es das nicht). Aber wenn der Nebenprozess vorher beendet wird, dann blockiert irgendwas weiterhin. Hab deswegen self.q[1].close() im hauptprozess dazugeschrieben, aber das bewirkt nichts.
			Testclass.test() wird gleichzeitig mit 2 Prozessen aufgerufen. a ist die Prozessnummer. 0 ist der Hauptprozess und 1 ist der Nebenprozess.
Der Hauptprozess packt informationen in die queue und der nebenprozess soll diese erhalten.
Code: Alles auswählen
class Testclass:
    def __init__(self,q,a):
        self.q = q # ein dict mit vielen queues. bei a==0 ist es {1:queue1,2:queue2,...} , bei allen anderen ist es direkt die dazugeörige queue
        self.a = a # instanznummer (zb 0 Hauptprozess , 1 nebenprozess)
        if self.a!=0:
            self.queuespeicher = {} # alle neuen infos werden von der queue erstmal hierdrin gespeichert
            self.queuespeicher["dies"] = 0
            
    def test(self,dummy=None,dummy2=None):
        
        if self.a == 0: # HauptProzess
            time.sleep(0.5) # kurz warten bis andre instanzen ihre threads gestartet haben
            
            i = 0
            while i<1000:
                i += 1
                self.q[1].put({"typ":"dies","response":i})
        if self.a == 1: # erste instanz
            _thread.start_new_thread(self.checkqueue, ()) # ein thread der dauerhaft nebenbei läuft und guckt obs neue infos in der queue gibt
            
            time.sleep(0.55)
            print(self.queuespeicher)
            time.sleep(0.01)
            print(self.queuespeicher)
            time.sleep(0.3)
            print(self.queuespeicher)
            time.sleep(2.3)  ## dies hier darf erst nach dem hauptprozess fertig sein, sonst beendet sich skript nicht.. ?!
        if self.a!=0:
            self.stopcheckqueue() # thread beenden, welcher die queue leert
            self.q.close() # die queue selbst beenden
    def stopcheckqueue(self):
        self.q.put(None) # kill
    
    def checkqueue(self):
        while True:
            ding = self.q.get()
            if ding is None: # stop
                return
            else:
                self.verwerteNeueInfo(ding)
            
    def verwerteNeueInfo(self,dic): 
        self.queuespeicher[dic["typ"]] = dic["response"]Erstaunlicherweise bekomme ich (bisher nur windows getestet) nämlich eine Fehlermeldung, wenn ich es über concurrent.futures.ProcessPoolExecutor(2) aufrufe. RuntimeError: Queue objects should only be shared between processes through inheritance
Keinen Fehler bekomme ich bei meiner üblichen "parallel()" Funktion mit Prozessen.
Um diesen Post übersichtlich zu behalten lasse ich den Code dafür erstmal weg. Kann aber gerne nachgereicht werden.
Mit meiner funktionierenden parallel() Aufrufweise, habe ich aber noch das Problem, dass irgendwas verhindert, dass die Funktion/Klasse returned. Soweit ich weiß erstellt die queue einen thread zum übertragen und blockt daher. Deshalb hab ich self.q.close() für die Nebeninstanz eingebaut. Sofern der Nebenprozess nach dem Hauptprozess beendet wird, wird auch korrekt returned (ohne close tut es das nicht). Aber wenn der Nebenprozess vorher beendet wird, dann blockiert irgendwas weiterhin. Hab deswegen self.q[1].close() im hauptprozess dazugeschrieben, aber das bewirkt nichts.