multiprocessing + thread problem

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
Serpens66
User
Beiträge: 259
Registriert: Montag 15. Dezember 2014, 00:31

Hallo zusammen :)

Ich habe hier viewtopic.php?f=1&t=41146&start=15 mit eurer Hilfe eine "Limiter" Klasse erstellt, dessen Aufgabe es ist dafür zu sorgen, dass nicht mehr API Calls in einem Zeitraum gemacht werden, als erlaubt.

Und hier habe ich Code entwickelt, welcher vorhandene Aufgaben auf Prozesse und Threads veteilen soll viewtopic.php?f=1&t=40845&p=311902#p311899 . Also ich möchte zb 5 Instanzen von einem Programm laufen lassen. Habe 2 CPUs. Dann sollen also 2 Prozesse laufen, einer mit 3 Threads und einer mit 2 Threads, um diese 5 Programme möglichst effizient gleichzeitig laufen zu lassen.

Nun habe ich versucht das beides zu kombinieren. Also in mein Skript, welches mit 2 Prozessen und ein paar Threads in mehreren Instanzen gleichzeitig läuft, das Limiter() Projekt zu integrieren.
Einzeln laufen beide wunderbar und fehlerfrei.

Doch leider bekomme ich dabei einen Fehler den ich bisher nicht lösen konnte.

Der Code:

Code: Alles auswählen

import time
import threading
import concurrent.futures
import queue
import traceback
import multiprocessing

# zum limitieren von api calls.

class Limiter:
 
    def __init__(self, website, zeitlimit, calllimit, nosleepwhenmorethan=None, freihalten=0): 
        try:
            self.website = website
            self.zeitlimit = zeitlimit
            self.calllimit = calllimit # diese zahl an Calls darf innerhalb von zeitlimit sekunden gemacht werden.
            self.nosleepwhenmorethan = nosleepwhenmorethan # wenn mehr als x sekunden gesleept werden müsste, dann nicht sleepen sondern False returnen, wodurch der Call dann nicht gemacht wird
            self.freihalten = freihalten # soviele calls sollen im zeitlimit unverbraucht bleiben. nur sinnvoll bei hohen calllimit und hohem zeitlimit
            
            self.gemachte_anfragen = []
            self.anfragen = queue.Queue()
            self.thread = threading.Thread(target=self.run)
            self.thread.start()
        except Exception as err:
            print("Fehler in ratelimit skript init {}: {}\n{}".format(self.website,err,traceback.format_exc()))
            
    def limitkontrolle(self,art,aufrufer=""): # art kann "limit", "test"       
        try:
            e = threading.Event() # auch test muss über event und thread gemacht werden, damit liste nicht währenddessen geändert wird
            if art=="limit":
                wartezeit = []
            else: # bisher nur test. test returned [gemachte_anfragen in zeitlimit, calllimit, zeitlimit]. Kann also zur Anzeige, aber auch zum testen ob wieder aktiveren verwendet werden. sollte natürlich nicht >= callimit sein
                wartezeit = [art]
            self.anfragen.put((e, wartezeit))

            e.wait()
            if art=="limit":
                if self.nosleepwhenmorethan and wartezeit[0] > self.nosleepwhenmorethan:
                    return False 
                else:
                    if wartezeit[0] > 0:
                        time.sleep(wartezeit[0])
                    return True
            elif art=="test":    
                return wartezeit
        except Exception as err:
            print("Fehler in ratelimit skript limitkontrolle {}: {}\n{}".format(self.website,err,traceback.format_exc()))
    
    def wann_darfich(self,jetzt):
        try:    
            anfragencopy = self.gemachte_anfragen[:] # eine Kopie machen, damit einträge entfernt werden können (gehts auch weniger fehleranfällig?)
            for timestamp in anfragencopy:
                if timestamp < jetzt - self.zeitlimit:
                    self.gemachte_anfragen.remove(timestamp) # alle zu alten einträge entfernen.
                else: # sollten sortiert sein, dh sobald das nicht mehr zutrifft, kann es für einträge danach auch nicht mehr zutreffen. daher können wir breaken
                    break 
                    
            if len(self.gemachte_anfragen) < self.calllimit - self.freihalten:
                return jetzt
            relevantecalls = self.gemachte_anfragen[-self.calllimit:] # auch zukuenftige timestamps muessen beachtet werden
            if len(relevantecalls) < self.calllimit - self.freihalten: # !! es können viel mehr anfragen drin sein, weil auch zukünftige anfragen, die gerade warten, mit drin stehen!
                return jetzt
            else: # es wurden soviele wie erlaubt gemacht oder gar mehr, dann warten
                return relevantecalls[0] + self.zeitlimit # relevantecalls[0] ist der timestamp des ältesten relevanten calls
        except Exception as err:
            print("Fehler in ratelimit skript wann_darfich {}: {}\n{}".format(self.website,err,traceback.format_exc()))
    
    def stop(self):
        self.anfragen.put(None) # kill
    
    def run(self):
        try:
            while True:
                print("ratelimit skript {}: run vor get".format(self.website))
                ding = self.anfragen.get()
                print("ratelimit skript {}: run nach get".format(self.website))
                if ding is None: # stop
                    return
                else:
                    e,wartezeit = ding
                jetzt = time.time()
                if not wartezeit: # []
                    w = self.wann_darfich(jetzt) # absoluter ts
                    wartezeit.append(w-jetzt)
                    if not (self.nosleepwhenmorethan and wartezeit[0] > self.nosleepwhenmorethan): # ansonsten wird call nicht gemacht
                        self.gemachte_anfragen.append(w)
                elif wartezeit[0]=="test":
                    zahl = 0
                    for timestamp in self.gemachte_anfragen:
                        if not(timestamp < jetzt - self.zeitlimit):
                            zahl += 1
                    wartezeit[0] = zahl # die zahl der calls die innerhalb des zeitlimits bereits verplant sind
                    wartezeit.append(self.calllimit)
                    wartezeit.append(self.zeitlimit)
                    wartezeit.append(self.freihalten)
                e.set() # e.wait() in "darfich" beenden
        except Exception as err:
            print("Fehler in ratelimit skript run {}: {}\n{}".format(self.website,err,traceback.format_exc()))

            
            
            
            
            
            
class Test:
    
    def __init__(self,instanz):
        self.instanz = instanz
        self.websites = ["abc","efg"]
        self.Limiter = {}
        for website in self.websites:
            self.Limiter[website] = Limiter(website,2,3)#,None,0,self.loggen)
        
    def loggen(self,txt,q="",w="",e="",r=""):
        print("{}: Log {}: {}".format(time.asctime(),self.instanz,txt)) # write into txt file
    
    def testen(self,a=""):
        for website in self.websites:
            if self.Limiter[website].limitkontrolle("limit","testcall "+website):
                print("{}: {} mache call zu {}".format(time.asctime(),self.instanz,website))
        
        for website in self.websites:
            self.Limiter[website].stop()
            
            
            
            
def parallel(aufgabenliste): # aufgabenliste enthaelt die funktionen, argsliste die zugehoerigen argumente ... wird auch nur noch für den start gebraucht, alles andere macht nun concurrent
    
    threads = list(map(Multithread, aufgabenliste)) # uebergibt nacheinander die Elemente der aufgaben an die Klasse Multithread
    for thread in threads: # nun werden alle threads gestartet und machen parallel jeweils eine der aufgaben
        thread.start()
   
    for thread in threads: # es wird geweartet bis alle threads fertig sind, bevor es hier weitergeht
        thread.join()
    return(list(thread.result for thread in threads))  #ergebnisse als liste printen

def parallelBeides(aufgabenliste): # nur zum starten der botinstanzen. Erstelle soviele Prozesse, wie cpu vorhanden, und teile die botinstanzen auf die prozesse auf
    
    threadaufgaben = {}
    for c in range(cpus):
        threadaufgaben[c] = [] # zahlen 0 bis 7 , bei 8 cpus
    while aufgabenliste:
        for c in range(cpus):
            if aufgabenliste:
                threadaufgaben[c].append(aufgabenliste.pop()) # gleichmäßige Verteilung der aufgaben auf die cpus
    
    processes = []
    for c in range(cpus):
        if threadaufgaben[c]:
            processes.append(MultiProcessBeides(threadaufgaben[c]))
    
    for process in processes: # nun werden alle processes gestartet und machen parallel jeweils eine der aufgaben
        process.start()
   
    for process in processes: # es wird geweartet bis alle processes fertig sind, bevor es hier weitergeht
        process.join()
    
    return # brauchen kein ergebnis fur die botinstanzen

class Multithread(threading.Thread):
    def __init__(self, aufgabe=0):
        if aufgabe:
            threading.Thread.__init__(self)
            self.aufgabe = aufgabe[0]  #ist ja ein Tupel aus funktion und argumenten
            self.argumente = aufgabe[1]   
            self.result = None  
        return
        
    def run(self):
        self.result = self.aufgabe(*self.argumente)  # hier wird das * verwendet, um eine beliebige Anzahl an argumenten zuzufuegen
        return

class MultiProcessBeides(multiprocessing.Process):
    def __init__(self, aufgaben=0):
        if aufgaben!=0:
            multiprocessing.Process.__init__(self)
            self.aufgaben = aufgaben
        return
        
    def run(self):
        machen = []
        for aufgabe in self.aufgaben:
            machen.append(aufgabe)
        parallel(machen) # nun jeden der prozesse die aufgaben mithilfe von threads erledigen lassen
        return 
        

if __name__ == "__main__":  
    cpus = 2 # multiprocessing.cpu_count()
    Instanzen = [] 
    InstanzenDict = {}
    for i in range(4):
        InstanzenDict[i] = Test(i) # einmal aspeichern, damit immer dieselbe instanz genommen wird, und nicht mehrere verschiedene.
        Instanzen.append((InstanzenDict[i].testen,("")))
    if cpus and cpus > 1:
        parallelBeides(Instanzen) # fuer jede cpu einen prozess und dort dann threads
    else:
        parallel(Instanzen) # nur mit threads
    print("Vollständig beendet!")
    time.sleep(1)
Sobald Prozesse verwendet werden, also cpus größergleich 2 ist, kommt dieser Fehler:
Traceback (most recent call last):
File "ratelimittest.py", line 198, in <module>
parallelBeides(Instanzen) # fuer jede cpu einen prozess und dort dann threads
File "ratelimittest.py", line 155, in parallelBeides
process.start()
File "C:\Python34\lib\multiprocessing\process.py", line 105, in start
self._popen = self._Popen(self)
File "C:\Python34\lib\multiprocessing\context.py", line 212, in _Popen
return _default_context.get_context().Process._Popen(process_obj)
File "C:\Python34\lib\multiprocessing\context.py", line 313, in _Popen
return Popen(process_obj)
File "C:\Python34\lib\multiprocessing\popen_spawn_win32.py", line 66, in __init__
reduction.dump(process_obj, to_child)
File "C:\Python34\lib\multiprocessing\reduction.py", line 59, in dump
ForkingPickler(file, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <class '_thread.lock'>: attribute lookup lock on _thread failed
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:\Python34\lib\multiprocessing\spawn.py", line 106, in spawn_main
exitcode = _main(fd)
File "C:\Python34\lib\multiprocessing\spawn.py", line 116, in _main
self = pickle.load(from_parent)
EOFError: Ran out of input
Serpens66
User
Beiträge: 259
Registriert: Montag 15. Dezember 2014, 00:31

Niemand?

Diese Fehlermeldung bekomm ich unter Windows.

Unter Debian 8.1 bekomm ich keine Fehlermeldung es geht einfach nicht weiter, endet bei "ratelimit skript abc: run vor get".
Allerdings ist das normal, wenn ein Thread wegen eines Fehlers beendet wird, dass ich dann keine Fehlermeldung in die Ausgabe bekomme.. habe in parallel, parallelBeides und testen noch ein try/except Block eingebaut, bekomme aber weiterhin keinen Fehler unter Debian...

Wenn es zu umfangreich ist, mir hier bei zu helfen, bin ich auch bereit mich mit einer kleinen Spende zu bedanken.
Gerne nehme ich auch professionelle Hilfe in Anspruch, die mir die Idee hinter meinem Code sauber codet, sodass alles funktioniert.
Serpens66
User
Beiträge: 259
Registriert: Montag 15. Dezember 2014, 00:31

Falls jemand mal eine ähnliche Fragestellung hat, hier eine Lösung:

Also:
Die Fehlermeldung liegt an Windows+multiprocessing. Das skript nun windows kompatibel zu behalten war mir nun zuviel arbeit, weshalb ich das jetzt vernachlässige.

Ansonsten wurde das nun mit einer neuen "generate_instances" Funktion gelöst. Keine Ahnung warum das funktioniert, aber es funktioniert:

Code: Alles auswählen

def parallel(tasks):  # tasks contains the funktions with arguements that should be executed in threads
    try:
        threads = list(map(Multithread, tasks))
        for thread in threads:
            thread.start()

        for thread in threads:
            thread.join()
    except Exception as err:
        print("Error in parallel: {}\n{}".format(err, traceback.format_exc()))
    return (list(thread.result for thread in threads))


def parallelBoth(tasks, cpu=2):  # this function divides the functions in tasks to processes and threads.
    try:
        threadtasks = {}
        for c in range(cpu):  # for each cpu, we will create a process. each process will get half of the tasks, which are saved in threadtasks for each process
            threadtasks[c] = []
        while tasks:
            for c in range(cpu):
                if tasks:
                    threadtasks[c].append(tasks.pop())  # divide the tasks equally

        processes = []
        for c in range(cpu):
            if threadtasks[c]:
                task_ids = []
                for t in threadtasks[c]:
                    task_ids.append(t[2])

                processes.append(MultiProcessBoth(task_ids))

        for process in processes:
            process.start()
            
        for process in processes:
            process.join()
    except Exception as err:
        print("Error in parallelBoth: {}\n{}".format(err, traceback.format_exc()))

    return  # brauchen kein ergebnis fur die botinstanzen


class Multithread(threading.Thread):
    def __init__(self, task=None):
        threading.Thread.__init__(self)

        if task:
            self.task_operation = task
            self.result = None
        return

    def run(self):
        self.result = self.task_operation[0](*self.task_operation[1])  # hier wird das * verwendet, um eine beliebige Anzahl an argumenten zuzufuegen


class MultiProcessBoth(multiprocessing.Process):
    def __init__(self, tasks=None):
        self.tasks = tasks
        multiprocessing.Process.__init__(self)

    def run(self): # wird aus inem grund nicht für beide processes ausgeführt ?? liegt auch nicht am mainbot, der ist aus...
        process_tasks, instances = [], generate_instances(ids=self.tasks)

        for m in instances:
            process_tasks.append(m)

        parallel(process_tasks)  # now every process should start their tasks with help of threads
        return


def generate_instances(dummy=False, ids=None):

    if type(ids) is list:
        process_ids = True
    else:
        process_ids = False
    
    InstancesDict = {}  # a dictionary were we save the opened classes to identify each isntance
    Instances = []  # a list of the function we would like to run in instances
    lock = multiprocessing.Lock() # dieses wird vermutlich über Prozesse aktualsiert, eben weil es aus dem multiprocessing modul stammt
    
    for i in range(NO_INSTANCES):
        if dummy:
            Instances.append((None, (""), i))
        else:
            if process_ids:
                if i in ids:
                    InstancesDict[i] = Test(i, lock=lock)
                    Instances.append((InstancesDict[i].testing, (""), i))
            else: # wenn nur threads verwendet werden, keine prozesse
                InstancesDict[i] = Test(i)
                Instances.append((InstancesDict[i].testing, (""), i))



    return Instances



if __name__ == "__main__":
    cpus = 2  # multiprocessing.cpu_count()

    if cpus and cpus > 1:
        parallelBoth(generate_instances(dummy=True), cpu=cpus)  # if we have enough cpu, also use processes
    else:
        parallel(generate_instances())  # only with threads
    print("script done!")
Sirius3
User
Beiträge: 17737
Registriert: Sonntag 21. Oktober 2012, 17:20

@Serpens66: Deine ursprüngliche Frage habe ich irgendwie übersehen. Du mischst hier threading mit multiprocessing. Das ist keine gute Idee. Unter Posix sind Threads nichts anderes als Prozesse mit (teilweise) gemeinsam genutzten Resourcen (Speicher, Filehandles, etc). Von daher ist vieles, was multiprocessing nachbildet ziemlich ähnlich zu den Systemfunktionen die threading benutzt. Unter Windows ist das anders. Da muß multiprocessing viel mehr emulieren. Nutze also ausschließlich multiprocessing und schreibe Deine Skripte entsprechend um. (threading.Lock -> multiprocessing.Lock). Ich würde auch Deinen Limiter als Iterator umschreiben, so dass Du einfach multiprocessing.Pool.map_async oder ähnliches verwenden kannst, und Du das ganzen parallel-Zeugs nicht selbst schreiben mußt.
Serpens66
User
Beiträge: 259
Registriert: Montag 15. Dezember 2014, 00:31

Sirius3 hat geschrieben:@Serpens66: Deine ursprüngliche Frage habe ich irgendwie übersehen. Du mischst hier threading mit multiprocessing. Das ist keine gute Idee. Unter Posix sind Threads nichts anderes als Prozesse mit (teilweise) gemeinsam genutzten Resourcen (Speicher, Filehandles, etc). Von daher ist vieles, was multiprocessing nachbildet ziemlich ähnlich zu den Systemfunktionen die threading benutzt. Unter Windows ist das anders. Da muß multiprocessing viel mehr emulieren. Nutze also ausschließlich multiprocessing und schreibe Deine Skripte entsprechend um. (threading.Lock -> multiprocessing.Lock). Ich würde auch Deinen Limiter als Iterator umschreiben, so dass Du einfach multiprocessing.Pool.map_async oder ähnliches verwenden kannst, und Du das ganzen parallel-Zeugs nicht selbst schreiben mußt.
Ich danke für deine Antwort :)

Zuvor hatte ich ausschließlich threading verwendet, aber dadurch werden die verfügbaren CPUs ja nicht alle verwendet, sondern nur einer.
Da ich nun 2 CPUs habe, dachte ich dass das bedeutet ich kann auch nur 2 Prozesse sinnvoll nutzen. Will ich aber mehr als 2 Instanzen gleichzeitig laufen haben, dann dachte ich, dass diese 2 Prozesse dann eben Threads starten müssen.
Und du schreibst jetzt, dass ich stattdessen soviele Prozesse starten soll, wie ich Instanzen brauche? Das können 10-20 oder später sogar mehr werden. Funktioniert das (genauso gut/schlecht bzw besser, als mit 2 Prozessen+Threads)?

(wie man die Limiter Klasse in einen iterator umschreiben könnte ist mir schleierhaft, die Klasse ist doch viel zu komplex, oder nicht?, aber bleiben wir erstmal bei der prozess-frage)
Sirius3
User
Beiträge: 17737
Registriert: Sonntag 21. Oktober 2012, 17:20

@Serpens66: unter Linux ist es fast egal, ob man Prozesse oder Threads nimmt. Unter Windows ist das Starten von Threads deutlich schneller als das von Prozessen und verbraucht auch deutlich weniger Arbeitsspeicher. Wie groß ist eigentlich Deine CPU-Auslastung bei Threads?
Serpens66
User
Beiträge: 259
Registriert: Montag 15. Dezember 2014, 00:31

Sirius3 hat geschrieben:@Serpens66: unter Linux ist es fast egal, ob man Prozesse oder Threads nimmt. Unter Windows ist das Starten von Threads deutlich schneller als das von Prozessen und verbraucht auch deutlich weniger Arbeitsspeicher. Wie groß ist eigentlich Deine CPU-Auslastung bei Threads?
Der Wechsel von "nur (zb) 10 Threads" zu "2 Prozessen mit je 5 Threads" hat auf Linux einen erheblichen Geschwindigkeitsboost gebracht, da die Auswertung der API Antworten schon sehr rechenintensiv ist (in erster linie durchgehen von 10tausenden von listeneinträgen und umrechnen von diesen.) Windows lassen wir jetzt mal außen vor, es reicht sich mit einem System rumzuschlagen. Windows ist nicht so wichtig, deswegen will ich meine (und deine) Zeit damit nicht länger verschwenden.

Ich hab keine Ahnung wo ich die CPU Auslastung bei Threads sehe. Aber ich denke mal meine Antwort, dass es einen erheblichen Geschwindigkeitsboost gab, ist Antwort genug.

Es bleibt aber bei deiner Meinung, dass ich die instanzen in vielen Prozessen laufen lassen sollte (sofern (auch) rechenintensiv und keine "reine wartezeit" wie es bei API Calls der Fall ist) und das definitv nicht schlechter wäre als 2 Prozesee mit je 5 Threads?
Sirius3
User
Beiträge: 17737
Registriert: Sonntag 21. Oktober 2012, 17:20

@Serpens66: die Laufzeitunterschiede zwischen 2 Prozesse und 5 Threads zu 10 Prozesse sind nicht wahrnehmbar, die Reduktion des Programmieraufwands dagegen schon.
Serpens66
User
Beiträge: 259
Registriert: Montag 15. Dezember 2014, 00:31

Sirius3 hat geschrieben:@Serpens66: die Laufzeitunterschiede zwischen 2 Prozesse und 5 Threads zu 10 Prozesse sind nicht wahrnehmbar, die Reduktion des Programmieraufwands dagegen schon.
gut, das wollte ich wissen, danke :)
Ich kam nur (schon wieder) mit Prozessen+cpus durcheinander und ging schonwieder davon aus, dass eine CPU auch nur einen Prozess aktiv verwenden kann (obwohl mir ineinem anderen Thread bereits erklärt wurde, dass das quatsch ist.. hab ich wohl noch nicht richtig verinnerlicht gehabt).
Dann werde ich baldmöglichst auf Prozesse wechseln :)
Ich denke damit ist dieser Thread erstmal gelöst. Zu Pipes und Co frage ich dann hier weiter, falls ich noch fragen hab:
viewtopic.php?p=314947#p314947
Benutzeravatar
kbr
User
Beiträge: 1487
Registriert: Mittwoch 15. Oktober 2008, 09:27

@Serpens66: Einen Prozess kannst Du Dir als geschlossenen Adressraum vorstellen, in dem Dein Programmcode als Main-Thread ausgeführt wird. Jeder zusätzliche Thread in einem Prozess ist eine Thread-Ausführung mehr, die alle Zugriff auf den gemeinsamen Adressraum des Prozesses haben. Das kann zu üblen Seiteneffekten und Problemen führen und deswegen sind threadsichere Programme auch nicht gerade trivial. Zudem läuft bei Python, dank GIL, jeweils nur ein Thread pro Prozess, auch bei mehreren Cores, was wieder ganz eigene Effekte mit sich bringt. Wenn Du hingegen mehrere Prozesse startest, dann kannst Du über die CPUs und Cores skalieren, was sich besonders bei rechenintensiven Anwendungen bemerkbar macht. Wie die Prozesse auf die Cores verteilt werden, bestimmt das OS.
Serpens66
User
Beiträge: 259
Registriert: Montag 15. Dezember 2014, 00:31

hallo zusammen :)

ich komme leider schon wieder nicht weiter, obwohl ich bereits einige Stunden grübel, kleine Fortschritte mache, aber mich letzlich doch nur im Kreis drehe =/

Also wie von Sirius empfohlen, möchte ich den Mischmasch aus threading und prozessen entfernen und stattdessen einfach nur Prozesse starten.
Dies könnte man mit concurrent.futures und dem Process Pool Executor machen, aber einmal gibt es da einen Python Bug, dass nicht mehr als 61 worker gehen und zweitens ist ein Pool hier meiner Ansicht nach eh fehl am Platz, da ich eine feste anzahl an Aufgaben habe, die alle gleichzeitig laufen sollen (und nicht abgearbeitet werden).
Da ein Pool also nicht in Frage kommt, ist meine einzige bekannte Lösung meine bisherige "self-made" Lösung:

Code: Alles auswählen

def parallel(tasks): 
    try:
        processes = list(map(MultiProcess, tasks))
        for process in processes:
            process.start()
            
        for process in processes:
            process.join()
    except Exception as err:
        print("Error in parallel: {}\n{}".format(err, traceback.format_exc()))
    return
    
class MultiProcess(multiprocessing.Process):
    def __init__(self, task=None):
        multiprocessing.Process.__init__(self)
        self.task_operation = task

    def run(self):
        if self.task_operation:
            self.task_operation[0](*self.task_operation[1])
        return
    
if __name__ == "__main__":

    Instanzen = []
    InstanzenDict = {}
    for i in range(4):
        InstanzenDict[i] = Test(i) # einmal aspeichern, damit immer dieselbe instanz genommen wird, und nicht mehrere verschiedene.
        Instanzen.append((InstanzenDict[i].testen,("")))
    parallel(Instanzen)
    print("Vollständig beendet!")
Dazu die Klassen aus dem ersten Post, wobei diese tatsächlich ignoriert werden können, da eine Änderungen des obigen Codes schon eine Lösung darstellt, ohne die Klassen zu ändern/berücksichtigen.
Der Code so wie er hier steht, führt wieder zu dem exakt selben Problem, was im Eingangspost und 2. Post beschrieben ist. Hat also nichts mit der Mischung von Threads und Prozessen zu tun, sondern nur mit Prozessen.

Im 3ten Post zeige ich einen Code, der mir von einem freelancer entwickelt wurde, welcher fehlerfrei funktioniert. Auch hier können die Klassen Limiter und Test ignoriert werden. Die Lösung verwendet irgendwie die Funktion "generate_instances".
Ich habe mich nun Stunden hingesetzt, und versucht diese Lösung sogut es geht nachzuvollziehen, damit ich sie auf mein aktuelles Problem anwenden kann. Doch mir wird einfach nicht klar, warum es mit dieser Funktion generate_instances funktioniert, aber nicht ohne.
Und ich komm auch nicht drauf, wie mein Code nun aussehen müsste, wenn ich keine Threads will, sondern einfach nur direkt Prozesse starten will.

Zur veranschaulichung, vorher die Threads/Prozess Lösung war ja: "Starte 2 Prozesse, welche jeweils 5 Threads laufen lassen" -> Summe 10.
Nun sollen direkt 10 Prozesse gestartet werden, keine Threads.
Wie ich "2 prozesse, welche jweils 5 Prozesse starten" mache, ist glaub ich relativ easy, aber nciht das was ich will.
Serpens66
User
Beiträge: 259
Registriert: Montag 15. Dezember 2014, 00:31

Mit concurrent.futures ProcessPoolExecutor gehts übrigens auch nicht (auch wenn man unter 61 bleibt, ich versuchs grad mit 2 bis 4)
Und "starte 2 Prozesse, welche 5 Prozesse starten" scheint auch nciht zu gehen... (im code des 3. Post einfach die Threads mit weiteren Prozessen ersetzen)

Aktuell ist meine Beste Lösung, wenn ich zb 2 Instanzen will:
Starte 2 Prozesse, welche jeweils 1 Thread starten (und in diesem Thread läuft die instanz dann).
Das scheint zu funktionieren (eben mit dem generate_instances Code aus Post 3), aber ist doch ein wenig über viele Ecken und wenns besser ginge, wäre das natürlich schön :)
(wobei ich mir grad nicht sicher bin, ob es genau das macht... sieht anhand der prints eher da nach aus, als würde 1 process mit 2 threads draus werden... muss ich mir noch genauer angucken.... "multiprocessing.current_process().name" gibt einmal "2" und einmal "3" aus, also sinds wohl doch unterschiedliche Prozesse... nur aus irgendeinem Grund wird nur 1 Thread gestartet. Der zweite wird erst bei beendingung des ersten gestartet, obwohl es zeitgleich sein sollte... und dennoch wird der gesamte Code wie er soll ausgeführt... merkwürdig..)
Antworten