Hi Leute, nach langer Zeit komm ich mal wieder mit einer Frage her.
Ich suche nach einer Datenstruktur plus passende Algorithmen um folgendes Problem möglichst effizient zu lösen:
Ein System produziert sehr viele Events pro Sekunde. Ich möchte zählen, wie viele Events ungefähr im Laufe der letzten N Millisekunden erzeugt wurden.
- N ist fest (100 < N < 10000)
- Der Zähler muss nicht exakt sein, aber die "Auflösung" sollte einstellbar sein.
- Der Zähler wird extrem oft aktualisiert (ein mal pro Event), aber nur selten ausgewertet (etwa ein mal pro Sekunde).
Meine (naive) Lösung wäre eine Liste mit M Buckets, die jeweils ein Zeitfenster (mit der Länge N/M) repräsentieren. Bei jedem Event wird der letzte Bucket inkrementiert, wenn wir noch in dessen Zeitfenster liegen, oder ein neuer Bucket hinzu gefügt. Ab und zu werden alte Buckets gelöscht, um Speicher zu sparen. Bei der Auswertung werden alle Buckets summiert, die jünger als now()-N sind.
Meine Implementierung hat aber so hässliche Dinge wie list.pop(0) drin. Hat jemand eine Idee, wie man das möglichst effizient gestalten könnte?
Edit: Bei Kreispuffern habe ich das Problem, das ich bei jedem Update aufräumen müsste. Wobei... vieleicht... hmm...
Effizienter Timeframe Counter
- Defnull
- User
- Beiträge: 778
- Registriert: Donnerstag 18. Juni 2009, 22:09
- Wohnort: Göttingen
- Kontaktdaten:
Bottle: Micro Web Framework + Development Blog
@Defnull: Wenn Du ``pop(0)`` auf Listen vermeiden möchtest wäre vielleicht eine `collections.deque` geeignet(er)‽
Nach jedem Update nicht unbedingt. Du könntest erst dann aufräumen, sobald dein Puffer eine bestimmte Größe erreicht hat. In einer chronologisch geordneten Liste (die du ja vermutlich in irgendeiner Form vorliegen haben wirst) schmeißt du dann alle für den aktuellen Zeitpunkt zu alten Events in einem Schwung raus. Diese Maximalgröße würde ich übrigens schön hoch wählen, damit sich das Rausschmeißen auch lohnt. Und da du ja sagst, N ist fix, könntest du auch beim Abfragen für die Auswertung schon diejenigen Events rausschmeißen, die nicht mehr infrage kommen.Defnull hat geschrieben:Bei Kreispuffern habe ich das Problem, das ich bei jedem Update aufräumen müsste. Wobei... vieleicht... hmm...
Vermutlich wirst du deine Liste ja durchlaufen bis du den ersten infrage kommenden Timestamp gefunden hast und dann anfangen, für das Ergebnis zu zählen, oder wie machst du das? IMHO schreit das ganze ja nach einer verketteten Liste, wo man sehr schön nen Cut machen könnte ab der Stelle, an der die Events bereits zu alt sind. Ich weiß aber nicht, wie effizient so etwas in reinem Python ist - weniger hinsichtlich des Speicherverbrauchs, sondern eher in Bezug auf das Verhalten des GCs, wenn Events freigegeben werden - müsste man einfach mal ausprobieren. Mit `collections.deque()` müsste man gucken, ob es beim Löschen von Events nicht zu unhandlich wird. Kommt sicherlich auch darauf an, wie dein Algo implementiert ist.
Das waren jetzt nur so grundsätzliche Gedanken - keine Ahnung, ob sie hilfreich für dich sind. Für eine genauere Analyse wäre es wohl sinnvoll, wenn du weitere Details zu deinem geplanten Vorgehen schildern könntest...
EDIT: Mich würde übrigens auch mal interessieren, was genau es mit den Buckets auf sich hat. Gehst du einfach intervallmäßig vor und jedes Bucket steht für eine Zeiteinheit, oder wie? Ist das nicht unnötig kompliziert und ungenau...?

@snafu: Ich denke mal das Problem mit einer im Grunde naheliegenden Lösung ist das „System produziert sehr viele Events pro Sekunde”. Also das so eine Liste in der man tatsächlich einfach alle Events ablegt und dann bei der Auswertung zählt/bereinigt zu gross wird um speicher- und zeiteffizient zu sein. Da wäre auch meine erste Idee so etwas wie „buckets” in denen schon mal aggregiert wird.
Die Idee mit den Buckets erscheint mir gar nicht so schlecht. Hier noch ein naiver (Pseudocode-)Ansatz dazu:
Je nachdem wie deque intern mit dem Speicher umgeht, lässt sich das sicher noch verbessern.
Code: Alles auswählen
create deque
create bucket
while running:
if not in timeinterval:
deque.append(bucket)
if len(deque) > max_size:
deque.popleft()
create new bucket
increment bucket
Ah, also im Sinne von "jetzt schnappt sich für die nächsten 50 Millisekunden ein neues Bucket alles, was reinkommt"? Ok, dann würde das den Umständen entsprechend noch Sinn machen - das stimmt.BlackJack hat geschrieben:@snafu: Ich denke mal das Problem mit einer im Grunde naheliegenden Lösung ist das „System produziert sehr viele Events pro Sekunde”. Also das so eine Liste in der man tatsächlich einfach alle Events ablegt und dann bei der Auswertung zählt/bereinigt zu gross wird um speicher- und zeiteffizient zu sein. Da wäre auch meine erste Idee so etwas wie „buckets” in denen schon mal aggregiert wird.
- Defnull
- User
- Beiträge: 778
- Registriert: Donnerstag 18. Juni 2009, 22:09
- Wohnort: Göttingen
- Kontaktdaten:
So, meine Lösung zu der ich heute gekommen bin:
Code: Alles auswählen
from time import time as now
class RollingTimeCounter(tuple):
''' Counter to measure events over time in a rolling time window.
This counter provides statistics over events that happened within a
rolling time window. The window is 'rolling' in the sense that the
measurements are not nulled each X seconds, but continuously updated.
This is implemented as a circular ring buffer of buckets, each
representing a small fraction of the time window.
Example: If 10 buckets are used to measure a time window of one second,
each bucket covers 1/10 seconds (100ms). The counter represents
events measured during the last 900-1000ms. More buckets increase
accuracy.
The data structure is optimized for fast and lock-free updates as well
as constant and low memory usage. Read performance depends on the number
of buckets.
'''
def __new__(cls, window=1, buckets=10):
self = super().__new__(cls, [[0,0] for x in range(buckets)])
self.window = window
self.buckets = buckets
self.tfunc = lambda: int(now()*buckets/window)
return self
def increment(self, value=1):
''' Increment current bucket. '''
t = self.tfunc()
c = self[-t % len(self)]
if c[0] == t:
c[1] += value
else:
c[0] = t
c[1] = value
def set(self, value):
''' Change the value of the current bucket. '''
t = self.tfunc()
c = self[-t % len(self)]
c[0] = t
c[1] = value
def set_max(self, value):
''' Update current bucket if the new value is higher. '''
t = self.tfunc()
c = self[-t % len(self)]
if c[0] == t:
c[1] = max(c[1], value)
else:
c[0] = t
c[1] = value
def set_min(self, value):
''' Update current bucket if the new value is lower. '''
t = self.tfunc()
c = self[-t % len(self)]
if c[0] == t:
c[1] = min(c[1], value)
else:
c[0] = t
c[1] = value
def get_buckets(self):
''' Return the current values of all bucket. '''
tmin = self.tfunc() - len(self)
return [v for t,v in self if t > tmin or 0]
def sum(self):
''' Return the total number of events during the observed time window.
(equals: sum(buckets)) '''
return sum(self.get_buckets())
def rate(self):
''' Return the number of events per seconds. If the time window is
shorter than a second, the rate is interpolated. If it is longer,
than an average is returned. (equals: sum(buckets) / window) '''
return self.value() / self.window
def rate_max(self):
''' Return highest event rate (events per second) observed during the
time window. '''
return max(self.get_buckets()) * self.buckets / self.window
def rate_min(self):
''' Return lowest event rate (events per second) observed during the
time window. '''
return min(self.get_buckets()) * self.buckets / self.window
Bottle: Micro Web Framework + Development Blog
- Defnull
- User
- Beiträge: 778
- Registriert: Donnerstag 18. Juni 2009, 22:09
- Wohnort: Göttingen
- Kontaktdaten:
Etwa so hat es Hystrix (eine Java Library) implementiert. Das ist in Python denke ich aber nicht so effizient wie die ring-puffer Lösung.kbr hat geschrieben:Die Idee mit den Buckets erscheint mir gar nicht so schlecht. Hier noch ein naiver (Pseudocode-)Ansatz dazu:Je nachdem wie deque intern mit dem Speicher umgeht, lässt sich das sicher noch verbessern.Code: Alles auswählen
create deque create bucket while running: if not in timeinterval: deque.append(bucket) if len(deque) > max_size: deque.popleft() create new bucket increment bucket
Bottle: Micro Web Framework + Development Blog
Hystrix kannte ich nicht, aber deque hat sicher den Nachteil, dass gelegentlich ein malloc erfolgen wird, was bei einem Ringpuffer nicht der Fall ist. (Es könnte interessant sein, bei Gelegenheit mal den Geschwindigkeitsunterschied zu ermitteln.)Defnull hat geschrieben:Etwa so hat es Hystrix (eine Java Library) implementiert. Das ist in Python denke ich aber nicht so effizient wie die ring-puffer Lösung.
Wo aber holst Du bei
Code: Alles auswählen
def rate(self):
return self.value() / self.window
Code: Alles auswählen
return self.sum() / self.window
- Defnull
- User
- Beiträge: 778
- Registriert: Donnerstag 18. Juni 2009, 22:09
- Wohnort: Göttingen
- Kontaktdaten:
Jup, hatte das kurzfristig noch in sum() umbenannt und da wohl was vergessen.kbr hat geschrieben:Hystrix kannte ich nicht, aber deque hat sicher den Nachteil, dass gelegentlich ein malloc erfolgen wird, was bei einem Ringpuffer nicht der Fall ist. (Es könnte interessant sein, bei Gelegenheit mal den Geschwindigkeitsunterschied zu ermitteln.)Defnull hat geschrieben:Etwa so hat es Hystrix (eine Java Library) implementiert. Das ist in Python denke ich aber nicht so effizient wie die ring-puffer Lösung.
Wo aber holst Du beiself.value() her? Müsste dies nichtCode: Alles auswählen
def rate(self): return self.value() / self.window
sein?Code: Alles auswählen
return self.sum() / self.window
Die ring-puffer Version schafft auf meinem Laptop 600.000 Updates (increment) pro Sekunde. Das ist schon ziemlich ordentlich. Der tuple-subclass Trick hat die meiste Performance gebracht.
Bottle: Micro Web Framework + Development Blog
@Defnull: Du könntest `increment()` noch ein bißchen tunen. Die `self.tfunc` würde ich rausschmeißen und den `int()`-Cast kannst du dir ebenfalls sparen, wenn du eine Ganzzahldivision verwendest. Die erste Zeile in `increment()` wäre demnach:
Und in der Zeile darunter könntest du schreiben:
Vorausgesetzt natürlich, dass der Anwender die Instanzattribute nicht nachträglich verändert.
Sind zwar Mikro-Optimierungen, aber ich denke schon, dass man damit noch einige Updates pro Sekunde rausholen kann.
EDIT: Ach, Mist. Den `int()`-Cast brauchst du trotzdem, da der Indexzugriff ja nur mit Integern möglich ist...
Code: Alles auswählen
t = now() * self.buckets // self.window
Code: Alles auswählen
# anstatt
c = self[-t % len(self)]
# lieber
c = self[-t % self.buckets]
Sind zwar Mikro-Optimierungen, aber ich denke schon, dass man damit noch einige Updates pro Sekunde rausholen kann.
EDIT: Ach, Mist. Den `int()`-Cast brauchst du trotzdem, da der Indexzugriff ja nur mit Integern möglich ist...

- Defnull
- User
- Beiträge: 778
- Registriert: Donnerstag 18. Juni 2009, 22:09
- Wohnort: Göttingen
- Kontaktdaten:
snafu hat geschrieben:@Defnull: Du könntest `increment()` noch ein bißchen tunen. Die `self.tfunc` würde ich rausschmeißen und den `int()`-Cast kannst du dir ebenfalls sparen, wenn du eine Ganzzahldivision verwendest. Die erste Zeile in `increment()` wäre demnach:Und in der Zeile darunter könntest du schreiben:Code: Alles auswählen
t = now() * self.buckets // self.window
Vorausgesetzt natürlich, dass der Anwender die Instanzattribute nicht nachträglich verändert.Code: Alles auswählen
# anstatt c = self[-t % len(self)] # lieber c = self[-t % self.buckets]
Sind zwar Mikro-Optimierungen, aber ich denke schon, dass man damit noch einige Updates pro Sekunde rausholen kann.
EDIT: Ach, Mist. Den `int()`-Cast brauchst du trotzdem, da der Indexzugriff ja nur mit Integern möglich ist...
Bei tupeln ist len(self) sogar schneller als ein dot-lookup hab ich raus gefunden. Und die tfunc() funktion hatte ich gebaut, weil man dann statt zwei dot-lookups nur einen hat. Da habe ich allerdings noch nicht getestet, ob der overhead des Funktionsaufrufes dann den Vorteil wieder weg frisst. Wäre noch eine idee, ja.
Bottle: Micro Web Framework + Development Blog
Krass, ich dachte bisher, dass Aufrufe deutlich mehr Zeit fressen als Attributzugriffe.Defnull hat geschrieben:Bei tupeln ist len(self) sogar schneller als ein dot-lookup hab ich raus gefunden.
EDIT: Aber da zeigt sich mal wieder, dass Fakten deutlich zielführender sind als Vermutungen...
EDIT2: Wobei das natürlich "Lowlevel-Scheisse" ist, die implementierungsabhängig ist und rein theoretisch bei der nächsten Python-Version wieder ganz anders aussehen kann.

Ja, mir ist auch schon einmal aufgefallen, dass in Subklassen von Python-Datentypen Referenzierungen auf 'self' extrem schnell sind.Defnull hat geschrieben:Bei tupeln ist len(self) sogar schneller als ein dot-lookup hab ich raus gefunden.
Jetzt war ich doch neugierig, ob ein Ringbuffer (RollingTimeCounter) oder eine deque (EventCounter) effizienter ist und habe einen Vergleich vorgenommen:
http://pastebin.com/msjPLj96
Und das folgende Ergebnis erhalten (rate, rate_max, rate_min):
RollingTimeCounter:
373970.0
402690.0
141610.0
EventCounter:
1082862.0
1085100.0
1079370.0
D.h. die deque-Variante wäre ca. 2.5 mal so schnell wie der Ringbuffer.
Oder habe ich da einen Fehler übersehen?
http://pastebin.com/msjPLj96
Und das folgende Ergebnis erhalten (rate, rate_max, rate_min):
RollingTimeCounter:
373970.0
402690.0
141610.0
EventCounter:
1082862.0
1085100.0
1079370.0
D.h. die deque-Variante wäre ca. 2.5 mal so schnell wie der Ringbuffer.
Oder habe ich da einen Fehler übersehen?
- Defnull
- User
- Beiträge: 778
- Registriert: Donnerstag 18. Juni 2009, 22:09
- Wohnort: Göttingen
- Kontaktdaten:
Du hattest zwei kleine Fehler drin:kbr hat geschrieben:Jetzt war ich doch neugierig, ob ein Ringbuffer (RollingTimeCounter) oder eine deque (EventCounter) effizienter ist und habe einen Vergleich vorgenommen:
http://pastebin.com/msjPLj96
D.h. die deque-Variante wäre ca. 2.5 mal so schnell wie der Ringbuffer.
Oder habe ich da einen Fehler übersehen?
- sum() muss self.bucket mit drauf rechnen und vorher ein increment(0) machen, sonst bezieht sich sum auf das letzte Inkrement und nicht auf die Gegenwart.
- Du hast in increment() buckets übersprungen, wenn länger als self.dt nichts passiert ist. Dann waren zu alte buckets in der deque und das Erebnis verfälscht. Mit while statt if in der äußeren Bedingung/Schleife und self.bucket_lifetime += self.dt weiter hinten war das aber schnell gelöst. Dann werden 0-Buckets als Lückenfüller hinzu gefügt.
Nach den Korrekturen war deine Lösung trotzdem deutlich schneller. Ich vermute mal, das das berechnen des Bucket-index bei mir zu teuer ist (das fällt bei dir ja weg) aber das ist nur geraten. Ich klau mir deine Implementierung mal, ja?

Edit: Sehr hier? Genau deshalb habe ich hier gefragt! Die Optimierungs-wettkämpfe hier im Forum sind legendär

Bottle: Micro Web Framework + Development Blog
Mir gefiele es besser, den letzten, vielleicht nur halb gefüllten Bucket, nicht mit in die Statistik aufzunehmen.Defnull hat geschrieben:sum() muss self.bucket mit drauf rechnen und vorher ein increment(0) machen, sonst bezieht sich sum auf das letzte Inkrement und nicht auf die Gegenwart
Durch Angabe einer »maxlen« der »deque« kann man sich den »popleft« auch noch sparen.
Ja, stimmt. Diese Implementierungsdetails hatte ich wohl auf die Schnelle übersehen.Defnull hat geschrieben:Du hattest zwei kleine Fehler drin:
Ich vermute ähnlich, und ja — nimm' diese Variante ruhigDefnull hat geschrieben:Ich vermute mal, das das berechnen des Bucket-index bei mir zu teuer ist (das fällt bei dir ja weg) aber das ist nur geraten. Ich klau mir deine Implementierung mal, ja?

Stimmt gleichfalls — das ist dann das Finetuning ...Sirius3 hat geschrieben:Durch Angabe einer »maxlen« der »deque« kann man sich den »popleft« auch noch sparen.