In decorator an bestimmtes Argument der Funktion kommen

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
Serpens66
User
Beiträge: 259
Registriert: Montag 15. Dezember 2014, 00:31

Ich schreibe in python 3.4

Ich würde gerne dieses ratelimit Skript für verschiedene Webseiten verwenden, um das API Call Limit nicht zu überschreiten.
https://github.com/tomasbasham/ratelimi ... _init__.py

So wie es aktuell aussieht, kann ich es nur für eine einzelne Seite verwenden.
Da ich das Skript ohnehin noch stark überarbeiten werde (aktuell funktioniert es ja mit frequency, aber das ist nicht zweckmäßig), dachte ich mir, ich bau das skript so um, dass mehrere Websiten mit derselben ratelimit Instanz möglich sind.

In meinem Hauptskript, welches die API Calls organisiert, sehen die Funktionen ungefähr so aus:

Code: Alles auswählen

def machcall_A(self,website,param1,param2...):
...
def machcall_B(self,website,param1,param2...):
usw.

Das einfachste wäre nun also, wenn ich das "last_called" im ratelimit skript in ein dictionary umwandle, welches last_called je nach website ändert.
Meine Idee war jetzt, direkt in der ratelimit "wrapper" Funktion die Argumente/Parameter von "func" auszulesen um so darauf zu kommen, was "website" ist.

Dafür habe ich bisher keine andere Möglichkeit gefunden als:
Mir "args" und "kwargs" anzucken und zusätzlich "list(inspect.signature(func).parameters)". Eben um rauszufinden welchen wert "website" hat.

Das scheint mir doch ziemlich umständlich. Deswegen möchte ich hier fragen, ob es da einen besseren Weg gibt?

edit:
mir fällt grad auf, dass es so nicht wirklich funktioniert, wenn die websiten alle ein anderes api rate limit haben, das muss ja auch irgendwie übergeben werden... hmmm
BlackJack

@Serpens66: Wieso kannst Du das nur für *eine* Webseite verwenden? Wenn Du verschiedene Funktionen hast, kannst Du doch jede davon dekorieren.
Serpens66
User
Beiträge: 259
Registriert: Montag 15. Dezember 2014, 00:31

BlackJack hat geschrieben:@Serpens66: Wieso kannst Du das nur für *eine* Webseite verwenden? Wenn Du verschiedene Funktionen hast, kannst Du doch jede davon dekorieren.
Die Funktionen sehen ja so aus wie im ersten Post.
Dh. ich habe eine einzige machcall_A Funktion für alle websiten.
Diese Funktion bearbeitet die parameter noch ein wenig und ruft dann das zu der website gehörende api modul auf um den api call zu machen (hinzu kommt noch, dass es viele verschiedene API Calls gibt. Die meisten werden alle zusammengezählt, also ein rate limit von zb 10 calls pro Minute, egal welcher Call).

Wenn ich nun über diese machcall_A Funktion das @rate_limited(x,y) schreibe, dann wird dadurch weder die website noch das website spezifische apirate limit beachtet, noch werden die machcall_B Aufrufe zu den gemachten Calls für dieselbe Website hinzugerechnet.

Mir fällt aktuell keine Lösung ein.
Oder kann ich eigentlich auch getrost auf den Decorator Kram verzichten und stattdessen in jeder meiner machcall_A / B Funktionen normal eine ratelimit Funktion aufrufen, welche dann gegebenenfalls sleep durchführt? Dieser Funktion kann ich ja dann direkt die website und das rate limit übergeben. Beim decorator scheint das nicht so leicht zu funktionieren...
Sirius3
User
Beiträge: 17749
Registriert: Sonntag 21. Oktober 2012, 17:20

@Serpens66: ich würde dafür gar keinen Dekorator benutzen, da man ja verschiedene Funktionen haben kann, die nicht zu oft aufgerufen werden sollen. Da wäre also eine Klasse besser, die pro Webseite einen Timer hat, den man halt vor einem Funktionsaufruf abfrägt.
Serpens66
User
Beiträge: 259
Registriert: Montag 15. Dezember 2014, 00:31

Sirius3 hat geschrieben:@Serpens66: ich würde dafür gar keinen Dekorator benutzen, da man ja verschiedene Funktionen haben kann, die nicht zu oft aufgerufen werden sollen. Da wäre also eine Klasse besser, die pro Webseite einen Timer hat, den man halt vor einem Funktionsaufruf abfrägt.
Ja danke, denke ich auch :)

Tatsächlich habe ich so eine Funktion schon, nur hab ich sie vor langer Zeit geschrieben und sie ist entsprechend kompliziert unübersichtlich und schlecht geschrieben. Auch ist sie noch teilweise fehlerhaft, doch ich dachte mir anstatt da noch mehr rumzuwurschteln, wäre es an der Zeit eine professionelle Lösung dafür zu nehmen, deswegen das ratelimit skript.

Doch dies scheint nun keine sinnvolle Lösung zu sein, da es außer dem decorator und threading.RLock nicht viel enthält.

Daher ist nun der Gedanke meine eigenen Funktion neu zu schreiben, sodass sie einfacher und übersichtlicher ist.
Dabei stelle ich nun fest, dass sie hautptsächlich deshalb so kompliziert geschrieben ist, weil ich sichergehen wollte, dass sie auch fehlerfrei funktioniert, wenn über threads diese ratelimit Funktion mehrfach gleichzeitig aufgerufen wird.

Funktionsweise meiner ratelimit Funktion (das kann sie und muss sie können):
1) Wenn zb 30 calls pro Minute erlaubt sind, dann soll dies NICHT dazu führen, dass 1 Call alle 2 Sekunden gemacht wird. Stattdessen wird eben dafür gesorgt, dass in 1 Minute nicht mehr als 30 Calls gemacht werden. Erreicht wird das über eine Liste, die bei jedem Call den aktuellen timestamp in milisekunden hinzugefügt bekommt. Auf die Weise kann ich erkennen, wieviele Calls in einem bestimmten Zeitraum gemacht wurden.
2) Die Funktion soll mir auch anzeigen können, wieviele Calls ich in einem bestimmten Zeitrau noch machen kann, bevor ich an das Limit stoße.
3) Zusätzlich soll sie mir anzeigen, wie lange ich nachdem das limit erreicht wurde, warten muss, um wieder x calls ohne pause machen zu können.

Das was meine Funktion so unübersichtlich und fehleranfällig macht, ist meine Maßnahme um es "thread safe" zu machen, also gleichzeitigen Aufruf.

Beispiel:
Eine Fkt wird 5 mal zurselben Zeit aufgerufen. Es sind aber nur noch 3 calls erlaubt, bevor 10 sekunden gewartet werden muss.
Nun muss sichergestellt werden, dass nicht alle 5 durchgeführt werden, sondern 2 davon erkennen, dass sie warten müssen.

Meine Funktion macht es aktuell so, dass diese 5 Funktionen mit den parametern 1 bis 5 aufgerufen werden, also sozusagen eine Reihenfolge festgelegt wird. Erst wenn die 1 das ratelimit geprüft hat, prüft die 2 es usw. Der Aufbau um das zu erreichen ist ziemlich hässlich.

Daher nun meine Frage:
Was würdet ihr empfehlen, um sicherzustellen, dass ich es gleichzeitig mehrfach aufrufen kann?

Als Stichworte fallen mir da queue und threading.RLock() ein.
Aber mit queue fällt mir nichts ein, wie ich das hier verwenden könnte.
Und threading.RLock() habe ich noch zu wenig Verständnis... es klingt so, als würde dadurch ein Codeblock ausgeführt und währenddessen alle anderen threads gestoppt? Doch was sind die "anderen threads" ? alle im skript existierenden? Oder kann ich das steuern? Denn eigentlich müssen ja nur diejnigen Threads pausieren, die einen API Call bei derselben website zur Folge haben. Alle anderen können problemlos weiterlaufen bzw müssen es sogar.

Falls meine Fragestellung bzw mein Ziel nicht deutlich genug für eine Antwort war, bitte ich um Rückmeldung, dann versuch ich es nochmal deutlicher zu machen.
Sirius3
User
Beiträge: 17749
Registriert: Sonntag 21. Oktober 2012, 17:20

@Serpens66: Du mußt eben alles, was Du Dir wünschst, Stück für Stück implementieren und hier besonders wichtig, jeden Teil gründlich testen. Was hast Du denn schon versucht?
__deets__
User
Beiträge: 14539
Registriert: Mittwoch 14. Oktober 2015, 14:29

Ich würde so vorgehen:

Du betrachtest deine Abrufe als Ereignisse. Diese speicherst du als Liste sortiert nach Zeitpunkt. Deine Rate ist dann die Anzahl der Ereignisse ein einem Zeitfenster.

Wenn du jetzt einen neuen Abruf tätigst, prüfst du wieviele Ereignisse im Zeitfenster (jetzt, jetzt - 1min) sind. Sind es weniger als die erlaubte Rate kommt der Abruf in eine queue mit einem timestamp "jetzt". Ein Pool von Arbeitsthreads puhlt sich den nächsten Job raus, und wartet erstmal bis der timestamp erreicht ist. Im einfachen Fall also gar nicht. Sobald er wirklich loslegt, fügt er ein Ereignis in die Liste ein.

Wenn aber schon zu viele Ereignisse drin sind, dann kannst du einfach herausbekommen, wann du den nächsten Abruf tätigen darfst: zum Zeitpunkt des letzten Timestamps + 1min. Und vll noch eine Marge von 1-10 Sekunden drauf (hängt etwas aber davon wie rigoros der Service ist)

Damit wartet dann eben der nächste Worker bevor er loslegt.
Serpens66
User
Beiträge: 259
Registriert: Montag 15. Dezember 2014, 00:31

Sirius3 hat geschrieben:@Serpens66: Du mußt eben alles, was Du Dir wünschst, Stück für Stück implementieren und hier besonders wichtig, jeden Teil gründlich testen. Was hast Du denn schon versucht?
Ist ja schon alles implementiert.

Was ich nun suche ist eine Alternative, wie das ganze eben "parallel sicher" ist, also 5 mal gleichzeitig ratelimit mit derselben Website aufgerufen wird, während nur noch 3 mal aktuell erlaubt wäre, sodass es korrekt umgesetzt wird.

Meine Funktion sieht so aus (wie gesagt ich hoffe es geht deutlich übersichtlicher und besser geschrieben):

Code: Alles auswählen

def limitkontrolle(self,website,Zeitlimit,CallLimit,Limitcount):
        anfangszeit = int(time.time() * 1000)

        #Zeitlimit  hier die Zeit in milisek eintragen, wie weit die CallLimit calls auseinander sein duerfen
        #CallLimit  Anzahl der Calls in in der Zeit Zeitlimit gemacht werden duerfen
        Ergebnisda = False
        timer = 0
        
        while not self.Datadict[website]["Limitdict"].get(Limitcount-1): # solange es den eintrag zuvor noch nicht gibt, nicht weitermachen
            if Limitcount==0: # wenn limitcount 0, dann ausnahme machen
                break
            timer += 1
            sleep(0.001)
            if timer>=10000: # notausstieg falls fehler in schleife und sonst unendlich (max 10 sek)
                print("unendlich LOOP schleife {} 1. Laenge:{}. Limitcount:{}".format(website,len(self.Datadict[website]["Limitdict"]),Limitcount))
                self.Datadict[website]["Schleifenposition"] = 0
                self.Datadict[website]["Limitdict"] = {}
                self.Datadict[website]["Limitcounter"] = 1
                #break
                return False
            continue  # unendliche oft im kreis drehen, bis eintrag exisitert
        

        while not Ergebnisda:
            sleepen = 0
            zeit = int(time.time() * 1000)
            if len(self.Datadict[website]["Limitdict"]): # wenn schon Eintraege enthalten sind 
                for position in range(self.Datadict[website]["Schleifenposition"],len(self.Datadict[website]["Limitdict"])+min(self.Datadict[website]["Limitdict"])): # min addieren heisst zu anfang null addieren. Aber wenn z.b alle eintraege von 0 bis 4 geloescht wurden, startet es bei 5, dann muss das hier auch bei 5 starten und bis laenge+5 gehen
                    differenz = zeit - self.Datadict[website]["Limitdict"][position] # jetzige Zeit mit erstem Eintrag vergleichen
                    if differenz <= Zeitlimit:  # hier die Zeit in milisek eintragen, wie weit die x calls auseinander sein duerfen
                        if Limitcount - position >= CallLimit:  # wenn die laenge der liste nun CallLimit oder mehr uebersteigt, dann sleepen
                            sleepen = self.Datadict[website]["Limitdict"][position] + Zeitlimit - zeit
                            self.Datadict[website]["Schleifenposition"] = position
                            break
                        else: # falls nicht,dann einfach weiter   
                            self.Datadict[website]["Schleifenposition"] = position # gibt an, bis zu welchem Punkt hinterher geloescht werden soll
                            break # und for schleife beenden, da alles fertig
                    else: # andernfalls alle listenelemte durchgehen, diese evlt rausschmeissen und wenn ueberall differenz <= y, dann liste loeschen und zeit als erten eintrag setzen
                        if position == len(self.Datadict[website]["Limitdict"])+min(self.Datadict[website]["Limitdict"])-1: # wenn das letzte listenelement erreicht ist und differenz offensichtlich immernoch groesser y, dann liste loeschen und nur noch neue zeit drin lassen
                            self.Datadict[website]["Schleifenposition"] = position # gibt an, bis zu welchem Punkt hinterher geloescht werden soll
                            break
                        else:
                            continue # naechste position durchgehen            
            if sleepen:
                sleep(sleepen/1000) # sleepen in sekunden
                continue # von vorne, damit zeit neu genommen wird

            if not sleepen:
                self.Datadict[website]["Limitdict"][Limitcount]=int(time.time() * 1000)
                Ergebnisda = True
                break
        endzeit = int(time.time() * 1000)
        dauer = (float(endzeit)-float(anfangszeit))/1000 # in sek
        if dauer>=0.5:
            self.history("Ein Apicall fuer {} wurde aufgrund des API Limits {} sekunden zurueckgehalten. Evlt. pruefen, ob limit optimaler genutzt werden kann, da durch das Warten viel Zeit verloren geht.".format(website,dauer),0,0,1)
        return True # Call kann gemacht werden
self.Datadict[website]["Limitdict"] ist ein dictionary, welches allerdings Zahlen als key verwendet.
Dies habe ich als nötig erachtet, damit ich die Reihenfolge überwachen kann.
self.Datadict[website]["Limitcounter"] ist eine Zahl, nämlich der aktuell neuste Key des Limitdicts.
self.Datadict[website]["Schleifenposition"] ist ebenfalls eine Zahl, ich weiß die Verwendung garnicht mehr so genau... aber anhand meiner Kommentare aus dem Code entnehme ich mal, dass es verwendet wird um das Limitdict außerhalb der limitkontrolle ab und zu auszumisten, sodass es nicht unendlich lang wird, sondern nur die relevanten Einträge enthält.

Es funktioniert nun so:
Es werden mithilfe von Threads 5 Funktionen aufgerufen, um 5 api calls gleichzeitig bei derselben Website zu machen. Diese 5 Funktionen bekommen den aktuellen Limitcounter + 1 bis + 5 übergeben. Die Funktionen rufen, bevor sie den API Call machen, die limitkontrolle auf.
Zuerst wird überprüft, ob es im Limitdict bereits den vorangegangenen Eintrag gibt. Dies soll sicherstellen, dass Limitcount+1 zuerst ausgeführt wird und danach die anderen. Sicherheitshalber ist dort ein notausstieg eingebaut.
Danach wird einfach überprüft, wieviele Calls in dem erlaubten Zeitlimit gemacht wurden, und ob noch Platz für weitere ist. Wenn nicht, dann wird entprechend sleep durchgeführt. Sobald dann wieder möglich wird ein neuer Eintrag in das Limitdict aufgenommen mit dem aktuellen timestamp.

Am liebsten würde ich nun den ganzen komplizierten Limitcount(er), Schleifenposition und den "unendlich Loop" Kram weglassen. Das Limitdict könnte man auch als Liste führen.
Das alles wegzulassen funktioniert, wenn ich die Aufrufe in Reihe, also nacheinander mache. Aber bei mehreren gleichzeitig funktioniert das halt nicht.

Daher ist die Frage nun, wie ich den Aufbau dennoch vereinfachen kann.

edit:
@__deets__:
Ich schick diesen Post jetzt erstmal ab, ohne deine Antwort zu lesen, schaue es mir aber sofort an (bzw vllt besser morgen, ist schon spät, danke :) )
Serpens66
User
Beiträge: 259
Registriert: Montag 15. Dezember 2014, 00:31

__deets__ hat geschrieben:Ich würde so vorgehen:

Du betrachtest deine Abrufe als Ereignisse. Diese speicherst du als Liste sortiert nach Zeitpunkt. Deine Rate ist dann die Anzahl der Ereignisse ein einem Zeitfenster.

Wenn du jetzt einen neuen Abruf tätigst, prüfst du wieviele Ereignisse im Zeitfenster (jetzt, jetzt - 1min) sind. Sind es weniger als die erlaubte Rate kommt der Abruf in eine queue mit einem timestamp "jetzt". Ein Pool von Arbeitsthreads puhlt sich den nächsten Job raus, und wartet erstmal bis der timestamp erreicht ist. Im einfachen Fall also gar nicht. Sobald er wirklich loslegt, fügt er ein Ereignis in die Liste ein.

Wenn aber schon zu viele Ereignisse drin sind, dann kannst du einfach herausbekommen, wann du den nächsten Abruf tätigen darfst: zum Zeitpunkt des letzten Timestamps + 1min. Und vll noch eine Marge von 1-10 Sekunden drauf (hängt etwas aber davon wie rigoros der Service ist)

Damit wartet dann eben der nächste Worker bevor er loslegt.
Danke :-)

Aber sofern ich es richtig verstanden habe, löst das nicht meine Kernfrage:
Eine Fkt wird 5 mal zurselben Zeit aufgerufen. Es sind aber nur noch 3 calls erlaubt, bevor 10 sekunden gewartet werden muss.
Nun muss sichergestellt werden, dass nicht alle 5 durchgeführt werden, sondern 2 davon erkennen, dass sie warten müssen.
Wo wird in deinem Ansatz dies sichergestellt?
Wenn nun 5 aufträge zurselben Zeit reinflattern, dann sieht jeder einzlne Auftrag für sich ja "oh in der Ereignisliste ist noch Platz, also kann der Call sofort gemacht werden". In der queue wird also für alle 5 Fälle der aktuelle timestamp eingetragen und alle 5 Aufträge sofort durchgeführt.
Oder übersehe ich was?

Und genau das ist die Problematik für die ich eine alternative Lösung suche. Alles andere ist fertig. Meine aktuelle Lösung sieht man in meinem letzten Post.
__deets__
User
Beiträge: 14539
Registriert: Mittwoch 14. Oktober 2015, 14:29

Kommen die aus 5 Threads gleichzeitig? Wenn ja, muss man das halt synchronisieren über ein lock. Wie immer, wenn man was nebenläufig macht.
nezzcarth
User
Beiträge: 1634
Registriert: Samstag 16. April 2011, 12:47

Ich fand die Ausgangsfrage zu den Dekoratoren-Argumenten ganz interessant. Ich hatte Lust, mal auszuprobieren, wie das mit Keyword-only-Argumenten in Python 3 gehen würde (für die praktische Anwendung würde ich aber auch eher in die von __deets__ vorgeschlagene Richtung schauen). Ein paar Nachteile hat das schon (z.B. wäre die Reihenfolge bei weiteren Parametern unnatürlich)-- aber gehen würde es :) (hier nur mit einer "Aufrufsbegrenzung"; ein Zeitlimit wäre aber auch mit derselben Vorgehensweise realisierbar):

Nachfolgend mal ein Beispiel mit

Code: Alles auswählen

In [1]: from collections import defaultdict

In [2]: import requests

In [3]: class limit:
   ...:     def __init__(self, target, limit):
   ...:         self.target = target
   ...:         self.limit = limit
   ...:         self.cache = defaultdict(int)
   ...:     
   ...:     def __call__(self, func):
   ...:         def inner(*args, **kwargs):
   ...:             target = kwargs[self.target]
   ...:             issued = self.cache.get(target, 0)
   ...:             if issued >= self.limit:
   ...:                 raise ValueError('Limit exceeded for parameter "{}"'.format(target))
   ...:             self.cache[target] += 1
   ...:             return func(*args, **kwargs)
   ...:         return inner
   ...:     

In [4]: @limit(limit=2, target='website')
   ...: def ping(*, website):
   ...:     result = requests.head(website)
   ...:     return result.status_code
   ...: 

In [5]: ping('http://127.0.0.1:8080')
---------------------------------------------------------------------------
KeyError                                  Traceback (most recent call last)
<ipython-input-5-5179bb6d5a6e> in <module>()
----> 1 ping('http://127.0.0.1:8080')

<ipython-input-3-a9f00dc00351> in inner(*args, **kwargs)
      7     def __call__(self, func):
      8         def inner(*args, **kwargs):
----> 9             target = kwargs[self.target]
     10             issued = self.cache.get(target, 0)
     11             if issued >= self.limit:

KeyError: 'website'

In [6]: ping(website='http://127.0.0.1:8080')
Out[6]: 200

In [7]: ping(website='http://127.0.0.1:8080')
Out[7]: 200

In [8]: ping(website='http://127.0.0.1:8080')
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-8-cf3339e8a9a6> in <module>()
----> 1 ping(website='http://127.0.0.1:8080')

<ipython-input-3-a9f00dc00351> in inner(*args, **kwargs)
     10             issued = self.cache.get(target, 0)
     11             if issued >= self.limit:
---> 12                 raise ValueError('Limit exceeded for parameter "{}"'.format(target))
     13             self.cache[target] += 1
     14             return func(*args, **kwargs)

ValueError: Limit exceeded for parameter "http://127.0.0.1:8080"

In [9]: ping(website='http://127.0.0.1:8080/test')
Out[9]: 404

In [10]: ping(website='http://127.0.0.1:8080/test')
Out[10]: 404

In [11]: ping(website='http://127.0.0.1:8080/test')
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-11-380d42fc74c0> in <module>()
----> 1 ping(website='http://127.0.0.1:8080/test')

<ipython-input-3-a9f00dc00351> in inner(*args, **kwargs)
     10             issued = self.cache.get(target, 0)
     11             if issued >= self.limit:
---> 12                 raise ValueError('Limit exceeded for parameter "{}"'.format(target))
     13             self.cache[target] += 1
     14             return func(*args, **kwargs)

ValueError: Limit exceeded for parameter "http://127.0.0.1:8080/test"

__deets__
User
Beiträge: 14539
Registriert: Mittwoch 14. Oktober 2015, 14:29

Nochmal drüber nachgedacht: ganz so einfach ist es nicht. Ein simples lock garantiert nicht, das nicht mehr Threads etwas eintüten, das dann in der Summe Zuviel wird.

Gibt es einen Grund warum du mehrere Threads brauchst? Auf caller Seite? Wenn ja, kann man das immer noch machen, indem man eine Thread nimmt der die Anfragen prozessiert, und die aufrufenden Threads per Event aufweckt, wenn sie was tun dürfen. Bzw. die Worker Threads tun das. Die bekommen den Auftrag und das Event, auf das der originale Thread wartet. Aber da it sollte dann alles schön seriell ablaufen bezüglich des limiting.
Serpens66
User
Beiträge: 259
Registriert: Montag 15. Dezember 2014, 00:31

@nezzcarth:
Danke :) vllt brauch ich das ja doch irgendwann nochmal, dann schau ich hier nochmal rein .
__deets__ hat geschrieben:Nochmal drüber nachgedacht: ganz so einfach ist es nicht. Ein simples lock garantiert nicht, das nicht mehr Threads etwas eintüten, das dann in der Summe Zuviel wird.

Gibt es einen Grund warum du mehrere Threads brauchst? Auf caller Seite? Wenn ja, kann man das immer noch machen, indem man eine Thread nimmt der die Anfragen prozessiert, und die aufrufenden Threads per Event aufweckt, wenn sie was tun dürfen. Bzw. die Worker Threads tun das. Die bekommen den Auftrag und das Event, auf das der originale Thread wartet. Aber da it sollte dann alles schön seriell ablaufen bezüglich des limiting.
Also es geht darum, 5 API calls gleichzeitig aufderselben website zu machen. Das geht nicht auf allen websiten wegen dem nonce den man mit übergibt, aber auf manchen geht das, weil die einen erlaubten Spielraum für den nonce wert haben.

Durchführen tue ich das, indem ich dafür 5 Threads öffne, welche dann alle 5 die limitkontrolle machen und dann schließlich den Call.
Es wird gewartet, bis alle 5 threads fertig sind und dann das ergebnis ausgewertet.

Zusätzlich soll es aber auch möglich sein, direkt von Beginn an 2 oder mehr Threads laufen zu lassen, die fast unabhängig voneinander das Hauptskript ausführen. Diese würden dennoch beide Calls bei derselben Website machen, sodass dadurch ebenfalls mehrere Calls gleichzeitig gemacht werden könnten. In diesem Fall hätte ich deutlich weniger Kontrolle, als im ersten Fall mit den 5 gleichzeitigen Calls. Das wird aber auch noch nicht von meinem aktuellen limitkontrolle Aufbau berücksichtigt. Dafür hab ich noch keine Lösung und ist dann die "advanced" Aufgabe, sobald die erste gelöst ist :D

Falls du in deinem Post für eins oder beides davon bereits eine Lösung genannt hast, dann wiederhole sie bitte. Ich habe nämlich leider nicht verstanden worauf du hinaus wolltest =/
__deets__
User
Beiträge: 14539
Registriert: Mittwoch 14. Oktober 2015, 14:29

Ich denke ja das ist alles viel zu kompliziert gemacht. Das du nebenläufig in der Generierung der Abfragen bist ist meines Erachtens völlig unnötig - das macht es kompliziert & erhöht nicht den Durchsatz. Einen Pool von Worker Threads zu haben um dann bei der eigentlichen Abfrage zu warten bis was kommt, ist schon ok. Du würdest dir also wirklich einen Gefallen tun die Parallelität rauszunehmen bei der generierung.

Aber trotzdem ginge mit meinem Ansatz. Ein Thread wird gestartet mit dem Limiter-Code. Alle Abfrage-Threads stellen an den die Frage "wann darf ich". Diese Frage (sprich : Methode) blockiert so lange, bis die Abfrage ok geht.

Der Limiter könnte skizziert so aussehen:

Code: Alles auswählen

class Limiter():

    def __init__(self, rate):
          self.rate = rate
          self.laufende_fragen =[]
          self.anfragen = Queue()
          threading.Thread(target=self.run)


     def darfich(self):
           e = Event()
           wartezeit = []
           self.anfragen.put((e, wartezeit))
           e.wait()
           time.sleep(wartezeit)
           

    def run(self):
         while True:
               e, wartezeit = self.anfragen.get()
               w = self.wann_darf_ich() # absoluter ts
               self.laufende_fragen.append(w)
               wartezeit.append(w-jetzt)
               e.notify()
Da fehlt natürlich noch was, aber die Idee wird hoffentlich klar.
Serpens66
User
Beiträge: 259
Registriert: Montag 15. Dezember 2014, 00:31

Vielen vielen Dank, habe die Idee dahinter verstanden und sie ist brilliant :)

Ich habe den Code denke ich nun vervollständigt, bin mir aber sicher, dass man das noch besser/effizienter machen kann. Bitte um Korrekturen :)

Code: Alles auswählen

import time
import threading
import concurrent.futures
import queue

def print2(txt,*args):
    txt = str(time.time()) + ": " + txt
    print(txt,*args)

class Limiter():
 
    def __init__(self, zeitlimit, calllimit):
        self.zeitlimit = zeitlimit
        self.calllimit = calllimit # diese zahl an Calls darf innerhalb von zeitlimit sekunden gemacht werden.
        self.gemachte_anfragen =[]
        self.anfragen = queue.Queue()
        self.thread = threading.Thread(target=self.run)
 
 
    def darfich(self,a):
        e = threading.Event()
        wartezeit = []
        self.anfragen.put((e, wartezeit))
        e.wait()
        print2("darfich {} sleep {}".format(a,wartezeit[0]))
        if wartezeit[0] > 0:
            time.sleep(wartezeit[0])
           
    
    def wann_darfich(self):
        anfragencopy = self.gemachte_anfragen[:] # eine Kopie machen, damit einträge entfernt werden können (gehts auch weniger fehleranfällig?)
        jetzt = time.time()
        for timestamp in anfragencopy:
            if timestamp < jetzt - self.zeitlimit:
                self.gemachte_anfragen.remove(timestamp) # alle zu alten einträge entfernen, damit len() hinterher gleich den bereits gemachten calls in unserm zeitlimit entspricht
        if len(self.gemachte_anfragen) < self.calllimit:
            return jetzt #darf also sofort gemacht werden
        else: # es wurden soviele wie erlaubt gemacht oder gar mehr, dann warten
            return self.gemachte_anfragen[0] + self.zeitlimit # self.gemachte_anfragen[0] ist der timestamp des ältesten calls
            
    
    def run(self):
        count = 0
        while True:
            count += 1
            e, wartezeit = self.anfragen.get() # wartet, bis etwas vorhanden ist ... nur wie beende ich das, wenn ich es nicht mehr brauche?
            w = self.wann_darfich() # absoluter ts
            self.gemachte_anfragen.append(w)
            wartezeit.append(w-time.time())
            e.set()
            if count>=5: # notmaßnahme damit der thread beendet wird
                return
            
limit = Limiter(5,2) # -> alle 5 sekunden werden 2 calls erlaubt

def test(a):
    limit.darfich(a)
    print2("test {}".format(a))
    return a
     
               
if __name__ == '__main__':  
    limit.thread.start()
    pool = concurrent.futures.ThreadPoolExecutor(5) 
    futures = []
    futures.append(pool.submit(test, 1))
    futures.append(pool.submit(test, 2))
    futures.append(pool.submit(test, 3))
    futures.append(pool.submit(test, 4))
    futures.append(pool.submit(test, 5))
    concurrent.futures.wait(futures) 
    print2("Vollständig beendet!")
Das ganze funktioniert leider nur fast.
Wenn man den Code so wie er ist aufruft, erhält man sowas:
1503666689.090744: darfich 1 sleep 0.0
1503666689.090744: test 1
1503666689.090744: darfich 2 sleep 0.0
1503666689.090744: darfich 3 sleep 5.0
1503666689.090744: test 2
1503666689.090744: darfich 5 sleep 5.0
1503666689.090744: darfich 4 sleep 5.0
1503666694.106423: test 3
1503666694.122043: test 4
1503666694.122043: test 5
1503666694.122043: Vollständig beendet!
Eigentlich müsste der Aufruf, der als letztes durch die Queue kommt, ja insg 10 sekunden warten, nicht 5.
Ich vermute, dass dieser fehler durch meinen Umgang mit "self.gemachte_aufrufe" zusammenhängt?

edit:
ich glaub das Problem mit den 5 statt 10 sekunden bekomm ich selbst raus.
Aber ihr dürft euch jetzt darüber auslassen, was man allgemein besser coden sollte :)
Serpens66
User
Beiträge: 259
Registriert: Montag 15. Dezember 2014, 00:31

So, hab das Problem gelöst, der Code funktioniert jetzt.

Was sollte man noch verbessern?

Code: Alles auswählen

import time
import threading
import concurrent.futures
import queue

def print2(txt,*args):
    txt = str(time.time()) + ": " + txt
    print(txt,*args)

class Limiter():
 
    def __init__(self, zeitlimit, calllimit):
        self.zeitlimit = zeitlimit
        self.calllimit = calllimit # diese zahl an Calls darf innerhalb von zeitlimit sekunden gemacht werden.
        self.gemachte_anfragen =[]
        self.anfragen = queue.Queue()
        self.thread = threading.Thread(target=self.run)
 
 
    def darfich(self,a):
        e = threading.Event()
        wartezeit = []
        self.anfragen.put((e, wartezeit))
        e.wait()
        print2("darfich {} sleep {}".format(a,wartezeit[0]))
        if wartezeit[0] > 0:
            time.sleep(wartezeit[0])
           
    
    def wann_darfich(self):
        jetzt = time.time()

        anfragencopy = self.gemachte_anfragen[:] # eine Kopie machen, damit einträge entfernt werden können (gehts auch weniger fehleranfällig?)
        for timestamp in anfragencopy:
            if timestamp < jetzt - self.zeitlimit:
                self.gemachte_anfragen.remove(timestamp) # alle zu alten einträge entfernen.

        if len(self.gemachte_anfragen) < self.calllimit:
            return jetzt
            
        relevantecalls = self.gemachte_anfragen[-self.calllimit:] # auch zukünftige timestamps muessen beachtet werden

        if len(relevantecalls) < self.calllimit:
            return jetzt
        else: # es wurden soviele wie erlaubt gemacht oder gar mehr, dann warten
            return relevantecalls[0] + self.zeitlimit # relevantecalls[0] ist der timestamp des ältesten relevanten calls
            
    
    def run(self):
        count = 0
        while True:
            count += 1
            e, wartezeit = self.anfragen.get() # wartet, bis etwas vorhanden ist ... nur wie beende ich das, wenn ich es nicht mehr brauche?
            w = self.wann_darfich() # absoluter ts
            self.gemachte_anfragen.append(w)
            wartezeit.append(w-time.time())
            e.set() # e.wait() in "darfich" beenden
            if count>=7: # notmaßnahme damit der thread beendet wird
                return

                
limit = Limiter(5,2)

def test(a):
    limit.darfich(a)
    print2("test {}".format(a))
    return a
     
               
if __name__ == '__main__':  
    limit.thread.start()
    pool = concurrent.futures.ThreadPoolExecutor(7) 
    futures = []
    futures.append(pool.submit(test, 1))
    futures.append(pool.submit(test, 2))
    futures.append(pool.submit(test, 3))
    futures.append(pool.submit(test, 4))
    futures.append(pool.submit(test, 5))
    futures.append(pool.submit(test, 6))
    futures.append(pool.submit(test, 7))
    concurrent.futures.wait(futures) 
    print2("Vollständig beendet!")
__deets__
User
Beiträge: 14539
Registriert: Mittwoch 14. Oktober 2015, 14:29

Nur kurz: du kannst dem queue.get ein timeout-Argument mitgeben. Das fuehrt dann zu einer Ausnahme wenn innerhalb der Zeit keine Anfrage kam. Dann kannst du checken, ob der Limiter noch laufen soll, und wenn nicht, run verlassen.
Serpens66
User
Beiträge: 259
Registriert: Montag 15. Dezember 2014, 00:31

__deets__ hat geschrieben:Nur kurz: du kannst dem queue.get ein timeout-Argument mitgeben. Das fuehrt dann zu einer Ausnahme wenn innerhalb der Zeit keine Anfrage kam. Dann kannst du checken, ob der Limiter noch laufen soll, und wenn nicht, run verlassen.
danke :)
Ja das wäre auch meine nächste bessere Notlösung gewesen.
Hatte gehofft es gibt noch einen "queue auflösen" befehl oderso, aber finde auch nichts in der Doku.
Dann werd ich es mit dem timeout machen.

Was ist eigentlich "schlimmer", eine while schleife die sich ständig wiederholt? Oder ein Warten dass durch Queue.get() verursacht wird?
Also kann ich problemlos den timeout auf 0.001 setzen? Oder dreht mir die while schleife sonst durch und frisst zuviele Ressourcen?

sieht jetzt so aus:

Code: Alles auswählen

def run(self):
        while True:
            try:
                e, wartezeit = self.anfragen.get(timeout=0.001) # wartet, bis etwas vorhanden ist
            except queue.Empty:
                if not self.wirdgebraucht:
                    return
                continue # das warten von get() muss ab und zu unterbrochen werden, um zu schauen, ob dieser Thread noch gebraucht wird, oder nicht. Zb. wenn sonst alles fertig ist, sollte das hier auch beendet werden

            w = self.wann_darfich() # absoluter ts
            self.gemachte_anfragen.append(w)
            wartezeit.append(w-time.time())
            e.set() # e.wait() in "darfich" beenden
            if not self.wirdgebraucht:
                return
self.wirdgebraucht wird dann halt vor "vollständig beendet" auf False gesetzt.
__deets__
User
Beiträge: 14539
Registriert: Mittwoch 14. Oktober 2015, 14:29

Schleife ist schlimm. Das verbraet Rechenzeit.

Du kannst auch stattdessen einen sentinel in die Queue tun: implementiere eine stop-Methode auf dem Limiter, und die stopft ein None-Objekt in die Queue, und ruft dann ein join auf dem self.thread auf. Das wird dann bei naechstbester Gelegenheit ausgepackt, und wenn du vorher pruefst, ob das Argument None ist statt ein Tupel das ausgepackt werden will, beendest du dich. So geht's auch ohne timeout.
Antworten