Effizienter Timeframe Counter

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
Benutzeravatar
Defnull
User
Beiträge: 778
Registriert: Donnerstag 18. Juni 2009, 22:09
Wohnort: Göttingen
Kontaktdaten:

Hi Leute, nach langer Zeit komm ich mal wieder mit einer Frage her.

Ich suche nach einer Datenstruktur plus passende Algorithmen um folgendes Problem möglichst effizient zu lösen:

Ein System produziert sehr viele Events pro Sekunde. Ich möchte zählen, wie viele Events ungefähr im Laufe der letzten N Millisekunden erzeugt wurden.

- N ist fest (100 < N < 10000)
- Der Zähler muss nicht exakt sein, aber die "Auflösung" sollte einstellbar sein.
- Der Zähler wird extrem oft aktualisiert (ein mal pro Event), aber nur selten ausgewertet (etwa ein mal pro Sekunde).

Meine (naive) Lösung wäre eine Liste mit M Buckets, die jeweils ein Zeitfenster (mit der Länge N/M) repräsentieren. Bei jedem Event wird der letzte Bucket inkrementiert, wenn wir noch in dessen Zeitfenster liegen, oder ein neuer Bucket hinzu gefügt. Ab und zu werden alte Buckets gelöscht, um Speicher zu sparen. Bei der Auswertung werden alle Buckets summiert, die jünger als now()-N sind.

Meine Implementierung hat aber so hässliche Dinge wie list.pop(0) drin. Hat jemand eine Idee, wie man das möglichst effizient gestalten könnte?

Edit: Bei Kreispuffern habe ich das Problem, das ich bei jedem Update aufräumen müsste. Wobei... vieleicht... hmm...
Bottle: Micro Web Framework + Development Blog
BlackJack

@Defnull: Wenn Du ``pop(0)`` auf Listen vermeiden möchtest wäre vielleicht eine `collections.deque` geeignet(er)‽
Benutzeravatar
snafu
User
Beiträge: 6738
Registriert: Donnerstag 21. Februar 2008, 17:31
Wohnort: Gelsenkirchen

Defnull hat geschrieben:Bei Kreispuffern habe ich das Problem, das ich bei jedem Update aufräumen müsste. Wobei... vieleicht... hmm...
Nach jedem Update nicht unbedingt. Du könntest erst dann aufräumen, sobald dein Puffer eine bestimmte Größe erreicht hat. In einer chronologisch geordneten Liste (die du ja vermutlich in irgendeiner Form vorliegen haben wirst) schmeißt du dann alle für den aktuellen Zeitpunkt zu alten Events in einem Schwung raus. Diese Maximalgröße würde ich übrigens schön hoch wählen, damit sich das Rausschmeißen auch lohnt. Und da du ja sagst, N ist fix, könntest du auch beim Abfragen für die Auswertung schon diejenigen Events rausschmeißen, die nicht mehr infrage kommen.

Vermutlich wirst du deine Liste ja durchlaufen bis du den ersten infrage kommenden Timestamp gefunden hast und dann anfangen, für das Ergebnis zu zählen, oder wie machst du das? IMHO schreit das ganze ja nach einer verketteten Liste, wo man sehr schön nen Cut machen könnte ab der Stelle, an der die Events bereits zu alt sind. Ich weiß aber nicht, wie effizient so etwas in reinem Python ist - weniger hinsichtlich des Speicherverbrauchs, sondern eher in Bezug auf das Verhalten des GCs, wenn Events freigegeben werden - müsste man einfach mal ausprobieren. Mit `collections.deque()` müsste man gucken, ob es beim Löschen von Events nicht zu unhandlich wird. Kommt sicherlich auch darauf an, wie dein Algo implementiert ist.

Das waren jetzt nur so grundsätzliche Gedanken - keine Ahnung, ob sie hilfreich für dich sind. Für eine genauere Analyse wäre es wohl sinnvoll, wenn du weitere Details zu deinem geplanten Vorgehen schildern könntest...

EDIT: Mich würde übrigens auch mal interessieren, was genau es mit den Buckets auf sich hat. Gehst du einfach intervallmäßig vor und jedes Bucket steht für eine Zeiteinheit, oder wie? Ist das nicht unnötig kompliziert und ungenau...? :o
BlackJack

@snafu: Ich denke mal das Problem mit einer im Grunde naheliegenden Lösung ist das „System produziert sehr viele Events pro Sekunde”. Also das so eine Liste in der man tatsächlich einfach alle Events ablegt und dann bei der Auswertung zählt/bereinigt zu gross wird um speicher- und zeiteffizient zu sein. Da wäre auch meine erste Idee so etwas wie „buckets” in denen schon mal aggregiert wird.
Benutzeravatar
kbr
User
Beiträge: 1487
Registriert: Mittwoch 15. Oktober 2008, 09:27

Die Idee mit den Buckets erscheint mir gar nicht so schlecht. Hier noch ein naiver (Pseudocode-)Ansatz dazu:

Code: Alles auswählen

create deque
create bucket
while running:
    if not in timeinterval:
        deque.append(bucket)
        if len(deque) > max_size:
            deque.popleft()
        create new bucket
    increment bucket
Je nachdem wie deque intern mit dem Speicher umgeht, lässt sich das sicher noch verbessern.
Benutzeravatar
snafu
User
Beiträge: 6738
Registriert: Donnerstag 21. Februar 2008, 17:31
Wohnort: Gelsenkirchen

BlackJack hat geschrieben:@snafu: Ich denke mal das Problem mit einer im Grunde naheliegenden Lösung ist das „System produziert sehr viele Events pro Sekunde”. Also das so eine Liste in der man tatsächlich einfach alle Events ablegt und dann bei der Auswertung zählt/bereinigt zu gross wird um speicher- und zeiteffizient zu sein. Da wäre auch meine erste Idee so etwas wie „buckets” in denen schon mal aggregiert wird.
Ah, also im Sinne von "jetzt schnappt sich für die nächsten 50 Millisekunden ein neues Bucket alles, was reinkommt"? Ok, dann würde das den Umständen entsprechend noch Sinn machen - das stimmt.
Benutzeravatar
Defnull
User
Beiträge: 778
Registriert: Donnerstag 18. Juni 2009, 22:09
Wohnort: Göttingen
Kontaktdaten:

So, meine Lösung zu der ich heute gekommen bin:

Code: Alles auswählen

from time import time as now

class RollingTimeCounter(tuple):
    ''' Counter to measure events over time in a rolling time window.

        This counter provides statistics over events that happened within a
        rolling time window. The window is 'rolling' in the sense that the
        measurements are not nulled each X seconds, but continuously updated.
        This is implemented as a circular ring buffer of buckets, each
        representing a small fraction of the time window.
        
        Example: If 10 buckets are used to measure a time window of one second,
            each bucket covers 1/10 seconds (100ms). The counter represents
            events measured during the last 900-1000ms. More buckets increase
            accuracy.
        
        The data structure is optimized for fast and lock-free updates as well
        as constant and low memory usage. Read performance depends on the number
        of buckets.
    '''

    def __new__(cls, window=1, buckets=10):
        self = super().__new__(cls, [[0,0] for x in range(buckets)])
        self.window = window
        self.buckets = buckets
        self.tfunc = lambda: int(now()*buckets/window)
        return self

    def increment(self, value=1):
        ''' Increment current bucket. '''
        t = self.tfunc()
        c = self[-t % len(self)]
        if c[0] == t:
            c[1] += value
        else:
            c[0] = t
            c[1] = value

    def set(self, value):
        ''' Change the value of the current bucket. '''
        t = self.tfunc()
        c = self[-t % len(self)]
        c[0] = t
        c[1] = value

    def set_max(self, value):
        ''' Update current bucket if the new value is higher. '''
        t = self.tfunc()
        c = self[-t % len(self)]
        if c[0] == t:
            c[1] = max(c[1], value)
        else:
            c[0] = t
            c[1] = value

    def set_min(self, value):
        ''' Update current bucket if the new value is lower. '''
        t = self.tfunc()
        c = self[-t % len(self)]
        if c[0] == t:
            c[1] = min(c[1], value)
        else:
            c[0] = t
            c[1] = value

    def get_buckets(self):
        ''' Return the current values of all bucket. '''
        tmin = self.tfunc() - len(self)
        return [v for t,v in self if t > tmin or 0]

    def sum(self):
        ''' Return the total number of events during the observed time window.
            (equals: sum(buckets)) '''
        return sum(self.get_buckets())

    def rate(self):
        ''' Return the number of events per seconds. If the time window is
            shorter than a second, the rate is interpolated. If it is longer,
            than an average is returned. (equals: sum(buckets) / window) '''
        return self.value() / self.window

    def rate_max(self):
        ''' Return highest event rate (events per second) observed during the
            time window. '''
        return max(self.get_buckets()) * self.buckets / self.window

    def rate_min(self):
        ''' Return lowest event rate (events per second) observed during the
            time window. '''
        return min(self.get_buckets()) * self.buckets / self.window
Bottle: Micro Web Framework + Development Blog
Benutzeravatar
Defnull
User
Beiträge: 778
Registriert: Donnerstag 18. Juni 2009, 22:09
Wohnort: Göttingen
Kontaktdaten:

kbr hat geschrieben:Die Idee mit den Buckets erscheint mir gar nicht so schlecht. Hier noch ein naiver (Pseudocode-)Ansatz dazu:

Code: Alles auswählen

create deque
create bucket
while running:
    if not in timeinterval:
        deque.append(bucket)
        if len(deque) > max_size:
            deque.popleft()
        create new bucket
    increment bucket
Je nachdem wie deque intern mit dem Speicher umgeht, lässt sich das sicher noch verbessern.
Etwa so hat es Hystrix (eine Java Library) implementiert. Das ist in Python denke ich aber nicht so effizient wie die ring-puffer Lösung.
Bottle: Micro Web Framework + Development Blog
Benutzeravatar
kbr
User
Beiträge: 1487
Registriert: Mittwoch 15. Oktober 2008, 09:27

Defnull hat geschrieben:Etwa so hat es Hystrix (eine Java Library) implementiert. Das ist in Python denke ich aber nicht so effizient wie die ring-puffer Lösung.
Hystrix kannte ich nicht, aber deque hat sicher den Nachteil, dass gelegentlich ein malloc erfolgen wird, was bei einem Ringpuffer nicht der Fall ist. (Es könnte interessant sein, bei Gelegenheit mal den Geschwindigkeitsunterschied zu ermitteln.)
Wo aber holst Du bei

Code: Alles auswählen

def rate(self):
    return self.value() / self.window
self.value() her? Müsste dies nicht

Code: Alles auswählen

return self.sum() / self.window
sein?
Benutzeravatar
Defnull
User
Beiträge: 778
Registriert: Donnerstag 18. Juni 2009, 22:09
Wohnort: Göttingen
Kontaktdaten:

kbr hat geschrieben:
Defnull hat geschrieben:Etwa so hat es Hystrix (eine Java Library) implementiert. Das ist in Python denke ich aber nicht so effizient wie die ring-puffer Lösung.
Hystrix kannte ich nicht, aber deque hat sicher den Nachteil, dass gelegentlich ein malloc erfolgen wird, was bei einem Ringpuffer nicht der Fall ist. (Es könnte interessant sein, bei Gelegenheit mal den Geschwindigkeitsunterschied zu ermitteln.)
Wo aber holst Du bei

Code: Alles auswählen

def rate(self):
    return self.value() / self.window
self.value() her? Müsste dies nicht

Code: Alles auswählen

return self.sum() / self.window
sein?
Jup, hatte das kurzfristig noch in sum() umbenannt und da wohl was vergessen.

Die ring-puffer Version schafft auf meinem Laptop 600.000 Updates (increment) pro Sekunde. Das ist schon ziemlich ordentlich. Der tuple-subclass Trick hat die meiste Performance gebracht.
Bottle: Micro Web Framework + Development Blog
Benutzeravatar
snafu
User
Beiträge: 6738
Registriert: Donnerstag 21. Februar 2008, 17:31
Wohnort: Gelsenkirchen

@Defnull: Du könntest `increment()` noch ein bißchen tunen. Die `self.tfunc` würde ich rausschmeißen und den `int()`-Cast kannst du dir ebenfalls sparen, wenn du eine Ganzzahldivision verwendest. Die erste Zeile in `increment()` wäre demnach:

Code: Alles auswählen

t = now() * self.buckets // self.window
Und in der Zeile darunter könntest du schreiben:

Code: Alles auswählen

# anstatt
c = self[-t % len(self)]

# lieber
c = self[-t % self.buckets]
Vorausgesetzt natürlich, dass der Anwender die Instanzattribute nicht nachträglich verändert.

Sind zwar Mikro-Optimierungen, aber ich denke schon, dass man damit noch einige Updates pro Sekunde rausholen kann.

EDIT: Ach, Mist. Den `int()`-Cast brauchst du trotzdem, da der Indexzugriff ja nur mit Integern möglich ist... :(
Benutzeravatar
Defnull
User
Beiträge: 778
Registriert: Donnerstag 18. Juni 2009, 22:09
Wohnort: Göttingen
Kontaktdaten:

snafu hat geschrieben:@Defnull: Du könntest `increment()` noch ein bißchen tunen. Die `self.tfunc` würde ich rausschmeißen und den `int()`-Cast kannst du dir ebenfalls sparen, wenn du eine Ganzzahldivision verwendest. Die erste Zeile in `increment()` wäre demnach:

Code: Alles auswählen

t = now() * self.buckets // self.window
Und in der Zeile darunter könntest du schreiben:

Code: Alles auswählen

# anstatt
c = self[-t % len(self)]

# lieber
c = self[-t % self.buckets]
Vorausgesetzt natürlich, dass der Anwender die Instanzattribute nicht nachträglich verändert.

Sind zwar Mikro-Optimierungen, aber ich denke schon, dass man damit noch einige Updates pro Sekunde rausholen kann.

EDIT: Ach, Mist. Den `int()`-Cast brauchst du trotzdem, da der Indexzugriff ja nur mit Integern möglich ist... :(

Bei tupeln ist len(self) sogar schneller als ein dot-lookup hab ich raus gefunden. Und die tfunc() funktion hatte ich gebaut, weil man dann statt zwei dot-lookups nur einen hat. Da habe ich allerdings noch nicht getestet, ob der overhead des Funktionsaufrufes dann den Vorteil wieder weg frisst. Wäre noch eine idee, ja.
Bottle: Micro Web Framework + Development Blog
Benutzeravatar
snafu
User
Beiträge: 6738
Registriert: Donnerstag 21. Februar 2008, 17:31
Wohnort: Gelsenkirchen

Defnull hat geschrieben:Bei tupeln ist len(self) sogar schneller als ein dot-lookup hab ich raus gefunden.
Krass, ich dachte bisher, dass Aufrufe deutlich mehr Zeit fressen als Attributzugriffe.

EDIT: Aber da zeigt sich mal wieder, dass Fakten deutlich zielführender sind als Vermutungen...

EDIT2: Wobei das natürlich "Lowlevel-Scheisse" ist, die implementierungsabhängig ist und rein theoretisch bei der nächsten Python-Version wieder ganz anders aussehen kann. :o
Benutzeravatar
kbr
User
Beiträge: 1487
Registriert: Mittwoch 15. Oktober 2008, 09:27

Defnull hat geschrieben:Bei tupeln ist len(self) sogar schneller als ein dot-lookup hab ich raus gefunden.
Ja, mir ist auch schon einmal aufgefallen, dass in Subklassen von Python-Datentypen Referenzierungen auf 'self' extrem schnell sind.
Benutzeravatar
kbr
User
Beiträge: 1487
Registriert: Mittwoch 15. Oktober 2008, 09:27

Jetzt war ich doch neugierig, ob ein Ringbuffer (RollingTimeCounter) oder eine deque (EventCounter) effizienter ist und habe einen Vergleich vorgenommen:

http://pastebin.com/msjPLj96

Und das folgende Ergebnis erhalten (rate, rate_max, rate_min):

RollingTimeCounter:
373970.0
402690.0
141610.0
EventCounter:
1082862.0
1085100.0
1079370.0

D.h. die deque-Variante wäre ca. 2.5 mal so schnell wie der Ringbuffer.
Oder habe ich da einen Fehler übersehen?
Benutzeravatar
Defnull
User
Beiträge: 778
Registriert: Donnerstag 18. Juni 2009, 22:09
Wohnort: Göttingen
Kontaktdaten:

kbr hat geschrieben:Jetzt war ich doch neugierig, ob ein Ringbuffer (RollingTimeCounter) oder eine deque (EventCounter) effizienter ist und habe einen Vergleich vorgenommen:

http://pastebin.com/msjPLj96

D.h. die deque-Variante wäre ca. 2.5 mal so schnell wie der Ringbuffer.
Oder habe ich da einen Fehler übersehen?
Du hattest zwei kleine Fehler drin:
- sum() muss self.bucket mit drauf rechnen und vorher ein increment(0) machen, sonst bezieht sich sum auf das letzte Inkrement und nicht auf die Gegenwart.
- Du hast in increment() buckets übersprungen, wenn länger als self.dt nichts passiert ist. Dann waren zu alte buckets in der deque und das Erebnis verfälscht. Mit while statt if in der äußeren Bedingung/Schleife und self.bucket_lifetime += self.dt weiter hinten war das aber schnell gelöst. Dann werden 0-Buckets als Lückenfüller hinzu gefügt.

Nach den Korrekturen war deine Lösung trotzdem deutlich schneller. Ich vermute mal, das das berechnen des Bucket-index bei mir zu teuer ist (das fällt bei dir ja weg) aber das ist nur geraten. Ich klau mir deine Implementierung mal, ja? ;)


Edit: Sehr hier? Genau deshalb habe ich hier gefragt! Die Optimierungs-wettkämpfe hier im Forum sind legendär ;)
Bottle: Micro Web Framework + Development Blog
Sirius3
User
Beiträge: 17741
Registriert: Sonntag 21. Oktober 2012, 17:20

Defnull hat geschrieben:sum() muss self.bucket mit drauf rechnen und vorher ein increment(0) machen, sonst bezieht sich sum auf das letzte Inkrement und nicht auf die Gegenwart
Mir gefiele es besser, den letzten, vielleicht nur halb gefüllten Bucket, nicht mit in die Statistik aufzunehmen.

Durch Angabe einer »maxlen« der »deque« kann man sich den »popleft« auch noch sparen.
Benutzeravatar
kbr
User
Beiträge: 1487
Registriert: Mittwoch 15. Oktober 2008, 09:27

Defnull hat geschrieben:Du hattest zwei kleine Fehler drin:
Ja, stimmt. Diese Implementierungsdetails hatte ich wohl auf die Schnelle übersehen.
Defnull hat geschrieben:Ich vermute mal, das das berechnen des Bucket-index bei mir zu teuer ist (das fällt bei dir ja weg) aber das ist nur geraten. Ich klau mir deine Implementierung mal, ja?
Ich vermute ähnlich, und ja — nimm' diese Variante ruhig :wink:

Sirius3 hat geschrieben:Durch Angabe einer »maxlen« der »deque« kann man sich den »popleft« auch noch sparen.
Stimmt gleichfalls — das ist dann das Finetuning ...
Antworten