Ich habe hier viewtopic.php?f=1&t=41146&start=15 mit eurer Hilfe eine "Limiter" Klasse erstellt, dessen Aufgabe es ist dafür zu sorgen, dass nicht mehr API Calls in einem Zeitraum gemacht werden, als erlaubt.
Und hier habe ich Code entwickelt, welcher vorhandene Aufgaben auf Prozesse und Threads veteilen soll viewtopic.php?f=1&t=40845&p=311902#p311899 . Also ich möchte zb 5 Instanzen von einem Programm laufen lassen. Habe 2 CPUs. Dann sollen also 2 Prozesse laufen, einer mit 3 Threads und einer mit 2 Threads, um diese 5 Programme möglichst effizient gleichzeitig laufen zu lassen.
Nun habe ich versucht das beides zu kombinieren. Also in mein Skript, welches mit 2 Prozessen und ein paar Threads in mehreren Instanzen gleichzeitig läuft, das Limiter() Projekt zu integrieren.
Einzeln laufen beide wunderbar und fehlerfrei.
Doch leider bekomme ich dabei einen Fehler den ich bisher nicht lösen konnte.
Der Code:
Code: Alles auswählen
import time
import threading
import concurrent.futures
import queue
import traceback
import multiprocessing
# zum limitieren von api calls.
class Limiter:
def __init__(self, website, zeitlimit, calllimit, nosleepwhenmorethan=None, freihalten=0):
try:
self.website = website
self.zeitlimit = zeitlimit
self.calllimit = calllimit # diese zahl an Calls darf innerhalb von zeitlimit sekunden gemacht werden.
self.nosleepwhenmorethan = nosleepwhenmorethan # wenn mehr als x sekunden gesleept werden müsste, dann nicht sleepen sondern False returnen, wodurch der Call dann nicht gemacht wird
self.freihalten = freihalten # soviele calls sollen im zeitlimit unverbraucht bleiben. nur sinnvoll bei hohen calllimit und hohem zeitlimit
self.gemachte_anfragen = []
self.anfragen = queue.Queue()
self.thread = threading.Thread(target=self.run)
self.thread.start()
except Exception as err:
print("Fehler in ratelimit skript init {}: {}\n{}".format(self.website,err,traceback.format_exc()))
def limitkontrolle(self,art,aufrufer=""): # art kann "limit", "test"
try:
e = threading.Event() # auch test muss über event und thread gemacht werden, damit liste nicht währenddessen geändert wird
if art=="limit":
wartezeit = []
else: # bisher nur test. test returned [gemachte_anfragen in zeitlimit, calllimit, zeitlimit]. Kann also zur Anzeige, aber auch zum testen ob wieder aktiveren verwendet werden. sollte natürlich nicht >= callimit sein
wartezeit = [art]
self.anfragen.put((e, wartezeit))
e.wait()
if art=="limit":
if self.nosleepwhenmorethan and wartezeit[0] > self.nosleepwhenmorethan:
return False
else:
if wartezeit[0] > 0:
time.sleep(wartezeit[0])
return True
elif art=="test":
return wartezeit
except Exception as err:
print("Fehler in ratelimit skript limitkontrolle {}: {}\n{}".format(self.website,err,traceback.format_exc()))
def wann_darfich(self,jetzt):
try:
anfragencopy = self.gemachte_anfragen[:] # eine Kopie machen, damit einträge entfernt werden können (gehts auch weniger fehleranfällig?)
for timestamp in anfragencopy:
if timestamp < jetzt - self.zeitlimit:
self.gemachte_anfragen.remove(timestamp) # alle zu alten einträge entfernen.
else: # sollten sortiert sein, dh sobald das nicht mehr zutrifft, kann es für einträge danach auch nicht mehr zutreffen. daher können wir breaken
break
if len(self.gemachte_anfragen) < self.calllimit - self.freihalten:
return jetzt
relevantecalls = self.gemachte_anfragen[-self.calllimit:] # auch zukuenftige timestamps muessen beachtet werden
if len(relevantecalls) < self.calllimit - self.freihalten: # !! es können viel mehr anfragen drin sein, weil auch zukünftige anfragen, die gerade warten, mit drin stehen!
return jetzt
else: # es wurden soviele wie erlaubt gemacht oder gar mehr, dann warten
return relevantecalls[0] + self.zeitlimit # relevantecalls[0] ist der timestamp des ältesten relevanten calls
except Exception as err:
print("Fehler in ratelimit skript wann_darfich {}: {}\n{}".format(self.website,err,traceback.format_exc()))
def stop(self):
self.anfragen.put(None) # kill
def run(self):
try:
while True:
print("ratelimit skript {}: run vor get".format(self.website))
ding = self.anfragen.get()
print("ratelimit skript {}: run nach get".format(self.website))
if ding is None: # stop
return
else:
e,wartezeit = ding
jetzt = time.time()
if not wartezeit: # []
w = self.wann_darfich(jetzt) # absoluter ts
wartezeit.append(w-jetzt)
if not (self.nosleepwhenmorethan and wartezeit[0] > self.nosleepwhenmorethan): # ansonsten wird call nicht gemacht
self.gemachte_anfragen.append(w)
elif wartezeit[0]=="test":
zahl = 0
for timestamp in self.gemachte_anfragen:
if not(timestamp < jetzt - self.zeitlimit):
zahl += 1
wartezeit[0] = zahl # die zahl der calls die innerhalb des zeitlimits bereits verplant sind
wartezeit.append(self.calllimit)
wartezeit.append(self.zeitlimit)
wartezeit.append(self.freihalten)
e.set() # e.wait() in "darfich" beenden
except Exception as err:
print("Fehler in ratelimit skript run {}: {}\n{}".format(self.website,err,traceback.format_exc()))
class Test:
def __init__(self,instanz):
self.instanz = instanz
self.websites = ["abc","efg"]
self.Limiter = {}
for website in self.websites:
self.Limiter[website] = Limiter(website,2,3)#,None,0,self.loggen)
def loggen(self,txt,q="",w="",e="",r=""):
print("{}: Log {}: {}".format(time.asctime(),self.instanz,txt)) # write into txt file
def testen(self,a=""):
for website in self.websites:
if self.Limiter[website].limitkontrolle("limit","testcall "+website):
print("{}: {} mache call zu {}".format(time.asctime(),self.instanz,website))
for website in self.websites:
self.Limiter[website].stop()
def parallel(aufgabenliste): # aufgabenliste enthaelt die funktionen, argsliste die zugehoerigen argumente ... wird auch nur noch für den start gebraucht, alles andere macht nun concurrent
threads = list(map(Multithread, aufgabenliste)) # uebergibt nacheinander die Elemente der aufgaben an die Klasse Multithread
for thread in threads: # nun werden alle threads gestartet und machen parallel jeweils eine der aufgaben
thread.start()
for thread in threads: # es wird geweartet bis alle threads fertig sind, bevor es hier weitergeht
thread.join()
return(list(thread.result for thread in threads)) #ergebnisse als liste printen
def parallelBeides(aufgabenliste): # nur zum starten der botinstanzen. Erstelle soviele Prozesse, wie cpu vorhanden, und teile die botinstanzen auf die prozesse auf
threadaufgaben = {}
for c in range(cpus):
threadaufgaben[c] = [] # zahlen 0 bis 7 , bei 8 cpus
while aufgabenliste:
for c in range(cpus):
if aufgabenliste:
threadaufgaben[c].append(aufgabenliste.pop()) # gleichmäßige Verteilung der aufgaben auf die cpus
processes = []
for c in range(cpus):
if threadaufgaben[c]:
processes.append(MultiProcessBeides(threadaufgaben[c]))
for process in processes: # nun werden alle processes gestartet und machen parallel jeweils eine der aufgaben
process.start()
for process in processes: # es wird geweartet bis alle processes fertig sind, bevor es hier weitergeht
process.join()
return # brauchen kein ergebnis fur die botinstanzen
class Multithread(threading.Thread):
def __init__(self, aufgabe=0):
if aufgabe:
threading.Thread.__init__(self)
self.aufgabe = aufgabe[0] #ist ja ein Tupel aus funktion und argumenten
self.argumente = aufgabe[1]
self.result = None
return
def run(self):
self.result = self.aufgabe(*self.argumente) # hier wird das * verwendet, um eine beliebige Anzahl an argumenten zuzufuegen
return
class MultiProcessBeides(multiprocessing.Process):
def __init__(self, aufgaben=0):
if aufgaben!=0:
multiprocessing.Process.__init__(self)
self.aufgaben = aufgaben
return
def run(self):
machen = []
for aufgabe in self.aufgaben:
machen.append(aufgabe)
parallel(machen) # nun jeden der prozesse die aufgaben mithilfe von threads erledigen lassen
return
if __name__ == "__main__":
cpus = 2 # multiprocessing.cpu_count()
Instanzen = []
InstanzenDict = {}
for i in range(4):
InstanzenDict[i] = Test(i) # einmal aspeichern, damit immer dieselbe instanz genommen wird, und nicht mehrere verschiedene.
Instanzen.append((InstanzenDict[i].testen,("")))
if cpus and cpus > 1:
parallelBeides(Instanzen) # fuer jede cpu einen prozess und dort dann threads
else:
parallel(Instanzen) # nur mit threads
print("Vollständig beendet!")
time.sleep(1)
Traceback (most recent call last):
File "ratelimittest.py", line 198, in <module>
parallelBeides(Instanzen) # fuer jede cpu einen prozess und dort dann threads
File "ratelimittest.py", line 155, in parallelBeides
process.start()
File "C:\Python34\lib\multiprocessing\process.py", line 105, in start
self._popen = self._Popen(self)
File "C:\Python34\lib\multiprocessing\context.py", line 212, in _Popen
return _default_context.get_context().Process._Popen(process_obj)
File "C:\Python34\lib\multiprocessing\context.py", line 313, in _Popen
return Popen(process_obj)
File "C:\Python34\lib\multiprocessing\popen_spawn_win32.py", line 66, in __init__
reduction.dump(process_obj, to_child)
File "C:\Python34\lib\multiprocessing\reduction.py", line 59, in dump
ForkingPickler(file, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <class '_thread.lock'>: attribute lookup lock on _thread failed
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:\Python34\lib\multiprocessing\spawn.py", line 106, in spawn_main
exitcode = _main(fd)
File "C:\Python34\lib\multiprocessing\spawn.py", line 116, in _main
self = pickle.load(from_parent)
EOFError: Ran out of input