Ich habe hier viewtopic.php?f=1&t=41146&start=15 mit eurer Hilfe eine "Limiter" Klasse erstellt, dessen Aufgabe es ist dafür zu sorgen, dass nicht mehr API Calls in einem Zeitraum gemacht werden, als erlaubt.
Und hier habe ich Code entwickelt, welcher vorhandene Aufgaben auf Prozesse und Threads veteilen soll viewtopic.php?f=1&t=40845&p=311902#p311899 . Also ich möchte zb 5 Instanzen von einem Programm laufen lassen. Habe 2 CPUs. Dann sollen also 2 Prozesse laufen, einer mit 3 Threads und einer mit 2 Threads, um diese 5 Programme möglichst effizient gleichzeitig laufen zu lassen.
Nun habe ich versucht das beides zu kombinieren. Also in mein Skript, welches mit 2 Prozessen und ein paar Threads in mehreren Instanzen gleichzeitig läuft, das Limiter() Projekt zu integrieren.
Einzeln laufen beide wunderbar und fehlerfrei.
Doch leider bekomme ich dabei einen Fehler den ich bisher nicht lösen konnte.
Der Code:
Code: Alles auswählen
import time
import threading
import concurrent.futures
import queue
import traceback
import multiprocessing
# zum limitieren von api calls.
class Limiter:
 
    def __init__(self, website, zeitlimit, calllimit, nosleepwhenmorethan=None, freihalten=0): 
        try:
            self.website = website
            self.zeitlimit = zeitlimit
            self.calllimit = calllimit # diese zahl an Calls darf innerhalb von zeitlimit sekunden gemacht werden.
            self.nosleepwhenmorethan = nosleepwhenmorethan # wenn mehr als x sekunden gesleept werden müsste, dann nicht sleepen sondern False returnen, wodurch der Call dann nicht gemacht wird
            self.freihalten = freihalten # soviele calls sollen im zeitlimit unverbraucht bleiben. nur sinnvoll bei hohen calllimit und hohem zeitlimit
            
            self.gemachte_anfragen = []
            self.anfragen = queue.Queue()
            self.thread = threading.Thread(target=self.run)
            self.thread.start()
        except Exception as err:
            print("Fehler in ratelimit skript init {}: {}\n{}".format(self.website,err,traceback.format_exc()))
            
    def limitkontrolle(self,art,aufrufer=""): # art kann "limit", "test"       
        try:
            e = threading.Event() # auch test muss über event und thread gemacht werden, damit liste nicht währenddessen geändert wird
            if art=="limit":
                wartezeit = []
            else: # bisher nur test. test returned [gemachte_anfragen in zeitlimit, calllimit, zeitlimit]. Kann also zur Anzeige, aber auch zum testen ob wieder aktiveren verwendet werden. sollte natürlich nicht >= callimit sein
                wartezeit = [art]
            self.anfragen.put((e, wartezeit))
            e.wait()
            if art=="limit":
                if self.nosleepwhenmorethan and wartezeit[0] > self.nosleepwhenmorethan:
                    return False 
                else:
                    if wartezeit[0] > 0:
                        time.sleep(wartezeit[0])
                    return True
            elif art=="test":    
                return wartezeit
        except Exception as err:
            print("Fehler in ratelimit skript limitkontrolle {}: {}\n{}".format(self.website,err,traceback.format_exc()))
    
    def wann_darfich(self,jetzt):
        try:    
            anfragencopy = self.gemachte_anfragen[:] # eine Kopie machen, damit einträge entfernt werden können (gehts auch weniger fehleranfällig?)
            for timestamp in anfragencopy:
                if timestamp < jetzt - self.zeitlimit:
                    self.gemachte_anfragen.remove(timestamp) # alle zu alten einträge entfernen.
                else: # sollten sortiert sein, dh sobald das nicht mehr zutrifft, kann es für einträge danach auch nicht mehr zutreffen. daher können wir breaken
                    break 
                    
            if len(self.gemachte_anfragen) < self.calllimit - self.freihalten:
                return jetzt
            relevantecalls = self.gemachte_anfragen[-self.calllimit:] # auch zukuenftige timestamps muessen beachtet werden
            if len(relevantecalls) < self.calllimit - self.freihalten: # !! es können viel mehr anfragen drin sein, weil auch zukünftige anfragen, die gerade warten, mit drin stehen!
                return jetzt
            else: # es wurden soviele wie erlaubt gemacht oder gar mehr, dann warten
                return relevantecalls[0] + self.zeitlimit # relevantecalls[0] ist der timestamp des ältesten relevanten calls
        except Exception as err:
            print("Fehler in ratelimit skript wann_darfich {}: {}\n{}".format(self.website,err,traceback.format_exc()))
    
    def stop(self):
        self.anfragen.put(None) # kill
    
    def run(self):
        try:
            while True:
                print("ratelimit skript {}: run vor get".format(self.website))
                ding = self.anfragen.get()
                print("ratelimit skript {}: run nach get".format(self.website))
                if ding is None: # stop
                    return
                else:
                    e,wartezeit = ding
                jetzt = time.time()
                if not wartezeit: # []
                    w = self.wann_darfich(jetzt) # absoluter ts
                    wartezeit.append(w-jetzt)
                    if not (self.nosleepwhenmorethan and wartezeit[0] > self.nosleepwhenmorethan): # ansonsten wird call nicht gemacht
                        self.gemachte_anfragen.append(w)
                elif wartezeit[0]=="test":
                    zahl = 0
                    for timestamp in self.gemachte_anfragen:
                        if not(timestamp < jetzt - self.zeitlimit):
                            zahl += 1
                    wartezeit[0] = zahl # die zahl der calls die innerhalb des zeitlimits bereits verplant sind
                    wartezeit.append(self.calllimit)
                    wartezeit.append(self.zeitlimit)
                    wartezeit.append(self.freihalten)
                e.set() # e.wait() in "darfich" beenden
        except Exception as err:
            print("Fehler in ratelimit skript run {}: {}\n{}".format(self.website,err,traceback.format_exc()))
            
            
            
            
            
            
class Test:
    
    def __init__(self,instanz):
        self.instanz = instanz
        self.websites = ["abc","efg"]
        self.Limiter = {}
        for website in self.websites:
            self.Limiter[website] = Limiter(website,2,3)#,None,0,self.loggen)
        
    def loggen(self,txt,q="",w="",e="",r=""):
        print("{}: Log {}: {}".format(time.asctime(),self.instanz,txt)) # write into txt file
    
    def testen(self,a=""):
        for website in self.websites:
            if self.Limiter[website].limitkontrolle("limit","testcall "+website):
                print("{}: {} mache call zu {}".format(time.asctime(),self.instanz,website))
        
        for website in self.websites:
            self.Limiter[website].stop()
            
            
            
            
def parallel(aufgabenliste): # aufgabenliste enthaelt die funktionen, argsliste die zugehoerigen argumente ... wird auch nur noch für den start gebraucht, alles andere macht nun concurrent
    
    threads = list(map(Multithread, aufgabenliste)) # uebergibt nacheinander die Elemente der aufgaben an die Klasse Multithread
    for thread in threads: # nun werden alle threads gestartet und machen parallel jeweils eine der aufgaben
        thread.start()
   
    for thread in threads: # es wird geweartet bis alle threads fertig sind, bevor es hier weitergeht
        thread.join()
    return(list(thread.result for thread in threads))  #ergebnisse als liste printen
def parallelBeides(aufgabenliste): # nur zum starten der botinstanzen. Erstelle soviele Prozesse, wie cpu vorhanden, und teile die botinstanzen auf die prozesse auf
    
    threadaufgaben = {}
    for c in range(cpus):
        threadaufgaben[c] = [] # zahlen 0 bis 7 , bei 8 cpus
    while aufgabenliste:
        for c in range(cpus):
            if aufgabenliste:
                threadaufgaben[c].append(aufgabenliste.pop()) # gleichmäßige Verteilung der aufgaben auf die cpus
    
    processes = []
    for c in range(cpus):
        if threadaufgaben[c]:
            processes.append(MultiProcessBeides(threadaufgaben[c]))
    
    for process in processes: # nun werden alle processes gestartet und machen parallel jeweils eine der aufgaben
        process.start()
   
    for process in processes: # es wird geweartet bis alle processes fertig sind, bevor es hier weitergeht
        process.join()
    
    return # brauchen kein ergebnis fur die botinstanzen
class Multithread(threading.Thread):
    def __init__(self, aufgabe=0):
        if aufgabe:
            threading.Thread.__init__(self)
            self.aufgabe = aufgabe[0]  #ist ja ein Tupel aus funktion und argumenten
            self.argumente = aufgabe[1]   
            self.result = None  
        return
        
    def run(self):
        self.result = self.aufgabe(*self.argumente)  # hier wird das * verwendet, um eine beliebige Anzahl an argumenten zuzufuegen
        return
class MultiProcessBeides(multiprocessing.Process):
    def __init__(self, aufgaben=0):
        if aufgaben!=0:
            multiprocessing.Process.__init__(self)
            self.aufgaben = aufgaben
        return
        
    def run(self):
        machen = []
        for aufgabe in self.aufgaben:
            machen.append(aufgabe)
        parallel(machen) # nun jeden der prozesse die aufgaben mithilfe von threads erledigen lassen
        return 
        
if __name__ == "__main__":  
    cpus = 2 # multiprocessing.cpu_count()
    Instanzen = [] 
    InstanzenDict = {}
    for i in range(4):
        InstanzenDict[i] = Test(i) # einmal aspeichern, damit immer dieselbe instanz genommen wird, und nicht mehrere verschiedene.
        Instanzen.append((InstanzenDict[i].testen,("")))
    if cpus and cpus > 1:
        parallelBeides(Instanzen) # fuer jede cpu einen prozess und dort dann threads
    else:
        parallel(Instanzen) # nur mit threads
    print("Vollständig beendet!")
    time.sleep(1)
Traceback (most recent call last):
File "ratelimittest.py", line 198, in <module>
parallelBeides(Instanzen) # fuer jede cpu einen prozess und dort dann threads
File "ratelimittest.py", line 155, in parallelBeides
process.start()
File "C:\Python34\lib\multiprocessing\process.py", line 105, in start
self._popen = self._Popen(self)
File "C:\Python34\lib\multiprocessing\context.py", line 212, in _Popen
return _default_context.get_context().Process._Popen(process_obj)
File "C:\Python34\lib\multiprocessing\context.py", line 313, in _Popen
return Popen(process_obj)
File "C:\Python34\lib\multiprocessing\popen_spawn_win32.py", line 66, in __init__
reduction.dump(process_obj, to_child)
File "C:\Python34\lib\multiprocessing\reduction.py", line 59, in dump
ForkingPickler(file, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <class '_thread.lock'>: attribute lookup lock on _thread failed
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:\Python34\lib\multiprocessing\spawn.py", line 106, in spawn_main
exitcode = _main(fd)
File "C:\Python34\lib\multiprocessing\spawn.py", line 116, in _main
self = pickle.load(from_parent)
EOFError: Ran out of input
