Filter mit multiprocessing ist langsamer als ohne

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
Benutzeravatar
Judge
User
Beiträge: 129
Registriert: Mittwoch 13. Juni 2012, 22:27
Wohnort: Ratingen
Kontaktdaten:

Hallo zusammen,

ich habe noch nie einen Anwendungsfall gehabt, bei dem ich dachte, das multiprocessing mir einen signifikanten Geschwindigkeitsvorteil bieten könnte, daher fange ich gerade das erste Mal an mich damit zu beschäftigen. Allerdings führen meine bisherigen Bemühungen dazu das mein multiprocessing code langsamer läuft als mein normaler.

Ich habe eine mehrere GB (~17) große logdatei. Ich möchte nun einen filter bauen, der nur solche Zeilen ausgibt, welche sich in einem bestimmten Zeitbereich befinden. Exemplarisch sehen diese Logzeilen so aus:

Code: Alles auswählen

2018-10-31T16:12:01+01:00 apphost96 TSWITCH-APP08: 2018-10-31 16:11:56,755 [INFO] ...
Der 2. Zeitstempel ist der für mich interessante (2018-10-31 16:11:56,755) und ist in CEST. Ich leite diese Logzeilen per Pipe unter Linux an mein Script weiter (zcat logfile.gz | python meinscript.py). Um dabei nur solche Zeilen auszugeben, bei denen sich dieser besagte 2. Zeitstempel zwischen 17:00:00 und 17:30:00 JST befindet, sieht mein single-process Ansatz folgendermaßen aus (Python 3.6):

Code: Alles auswählen

from datetime import datetime, timedelta
from pytz import timezone
import sys
import re

def filter_line(from_dt_obj, to_dt_obj, cest_tz_obj, line):
    line = line.strip()

    try:
            line_time = cest_tz_obj.localize(datetime.strptime(f'{line.split(" ")[3]}:{line.split(" ")[4].split(",")[0]}',
                                                               '%Y-%m-%d:%H:%M:%S'))
    except (IndexError, ValueError):
        return None

    if from_dt_obj <= line_time <= to_dt_obj:
        print(line)

jst_tz = timezone('Japan')
cest_tz = timezone('Europe/Berlin')

from_dt = jst_tz.localize(datetime(year=2018, month=11, day=1))
to_dt   = from_dt + timedelta(minutes=30)

for line in sys.stdin:
    filter_line(from_dt, to_dt, cest_tz, re_proxy_pattern, line)
Das ganze funktioniert auch und dauert bei 5.930.000 Zeilen (stark gekürztes Test-Log) auf meinem Rechner ca. 7 Minuten.

Mein multiprocessing - Ansatz sieht folgendermaßen aus:

Code: Alles auswählen

from datetime import datetime, timedelta
from pytz import timezone
from multiprocessing import Pool
from functools import partial
import sys
import re

def filter_line(from_dt_obj, to_dt_obj, cest_tz_obj, line):
    line = line.strip()

    try:
            line_time = cest_tz_obj.localize(datetime.strptime(f'{line.split(" ")[3]}:{line.split(" ")[4].split(",")[0]}',
                                                               '%Y-%m-%d:%H:%M:%S'))
    except (IndexError, ValueError):
        return None

    if from_dt_obj <= line_time <= to_dt_obj:
        print(line)

jst_tz = timezone('Japan')
cest_tz = timezone('Europe/Berlin')

from_dt = jst_tz.localize(datetime(year=2018, month=11, day=1))
to_dt   = from_dt + timedelta(minutes=30)

func = partial(filter_line, from_dt, to_dt, cest_tz, re_proxy_pattern)

with Pool() as pool:
    for x in pool.imap(func, sys.stdin):
        pass
In der Prozessübersicht meines Testsystemes (4 cores) sehe ich auch das dieser Code insgesamt 8 Prozesse forked. Allerdings benötigt dieser zur Auswertung derselben Eingabedatei ca 12 Minuten; also fast doppelt so lange. Ich habe zwar nicht erwartet das es genau 1/4 des single-processing Ansatzes dauert, jedoch schon das es bei 4 Prozessen schneller geht.

Was mache ich falsch?
Sirius3
User
Beiträge: 17747
Registriert: Sonntag 21. Oktober 2012, 17:20

Das Problem ist keines, das man mit Multiprocessing beschleunigen könnte. Eine if-Abfrage ist um Größenordnungen schneller als das kopieren einer Zeile von einem Prozess zu einem anderen. Multiprocessing ist dann gut, wenn wenig Daten übertragen werden müssen und die Berechnung lange dauert.

Zum Code:
`filter_line` sollte genau das tun, was der Name sagt, eine Iterable mit Zeilen bekommen und Zeilen zurückgeben. Mehrfaches splitten der selben Zeile macht das Programm nicht nur langsam sondern auch unleserlich, vor allem wenn das alles in einem Formatstring passiert.
Das Datumsformat ist schon in der Form, dass man auch einen einfachen Stringvergleich machen könnte, ohne es vorher parsen zu müssen, was wiederum Zeit spart. Dazu müsstest Du natürlich vorher die Grenz-Zeiten von Japan nach Europa-Zeit umrechnen.

Code: Alles auswählen

def filter_lines(lines, from_cest_iso, to_cest_iso):
    for line in lines:
        timestamp = line.split(None, 3)[-1]
        if from_cest_iso <= timestamp <= to_cest_iso:
            yield line

from_cest_iso = from_dt.astimezone(cest_tz).strftime('%Y-%m-%d %H:%M:%S')
to_cest_iso = to_dt.astimezone(cest_tz).strftime('%Y-%m-%d %H:%M:%S')
for line in filter_lines(sys.stdin, from_cest_iso, to_cest_iso):
    print(line)
Benutzeravatar
__blackjack__
User
Beiträge: 13100
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

Man könnte hier vielleicht noch mit `chunksize` beim `Pool.imap()`-Aufruf spielen um den Aufwand für die Kommunikation zwischen den Prozessen und das geordnet weiterverarbeiten im Hauptprozess zu senken. Da, zumindest wenn man den Orginalcode anschaut, die Reihenfolge der Ergebniszeilen keine Rolle spielt, kann man sich den unnötigen „Ergebnissereihenfolge beachten“-Mehraufwand auch durch `Pool.imap_unordered()` ganz ersparen.
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
Benutzeravatar
Judge
User
Beiträge: 129
Registriert: Mittwoch 13. Juni 2012, 22:27
Wohnort: Ratingen
Kontaktdaten:

__blackjack__ hat geschrieben: Mittwoch 21. November 2018, 13:46 Man könnte hier vielleicht noch mit `chunksize` beim `Pool.imap()`-Aufruf spielen um den Aufwand für die Kommunikation zwischen den Prozessen und das geordnet weiterverarbeiten im Hauptprozess zu senken.
Das verstehe ich nicht - in wiefern senkt das den Aufwand?
__blackjack__ hat geschrieben: Mittwoch 21. November 2018, 13:46 Da, zumindest wenn man den Orginalcode anschaut, die Reihenfolge der Ergebniszeilen keine Rolle spielt, kann man sich den unnötigen „Ergebnissereihenfolge beachten“-Mehraufwand auch durch `Pool.imap_unordered()` ganz ersparen.
Das stimmt, die Reihenfolge ist nicht wichtig. Da der Aufwand zur Anpassung zwischen `Pool.imap()` und `Pool.imap_unordered()` ja gegen 0 geht, habe ich das mal mit beiden Varianten und gleicher Eingabedatei ausprobiert. Das Ergebnis ist exakt identisch zur `Pool.imap()` Variante. Daher bringt mich das nicht weiter, aber trotzdem vielen Dank für die Anregung!
Sirius3 hat geschrieben: Mittwoch 21. November 2018, 12:01 Das Problem ist keines, das man mit Multiprocessing beschleunigen könnte. Eine if-Abfrage ist um Größenordnungen schneller als das kopieren einer Zeile von einem Prozess zu einem anderen. Multiprocessing ist dann gut, wenn wenig Daten übertragen werden müssen und die Berechnung lange dauert.
Das bedeutet das man in diesem Fall von Multiprocessing garnicht profitieren kann? Deine Lösung sieht das ja scheinbar auch nicht vor.

Ich habe Deinen Lösungsansatz mal implementiert und gegen die Testdaten laufen lassen. Ich bin total baff vom Ergebnis: Das ist etwa 15 mal schneller als mein single-processing Ansatz!!
Phantastisch, vielen Dank für die Hilfe!
Benutzeravatar
__blackjack__
User
Beiträge: 13100
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@Judge: Der Unterschied ist ob man jede Zeile einzeln zu einem Prozess übermittelt, die einzeln verarbeitet, und das Ergebnis einzeln wieder zum Hauptprozess übermittelt, oder ob man mehrere Zeilen übermittelt, die alle verarbeitet, bevor man dann einmal die Ergebnisse für mehrere Zeilen zurück kommuniziert. Das wird sicher keine 15-fache Beschleunigung bringen. :-)
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
Antworten