Ergebnis im Multiprocessing

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
nunatak
User
Beiträge: 17
Registriert: Donnerstag 3. März 2011, 19:42

Hallo,

ich schreibe gerade eine Routine mit der ich sehr viele (ca. 100.000) XML Dateien verarbeiten muss. Im ersten Schritt, als Vorselektion will ich im Header nach einem bestimmten String schauen, da ich dadurch die tatsächlich per lxml zu verarbeitende Anzahl deutlich reduzieren kann. Dafür hatte ich gedacht multiprocessing zu nutzen. Leider gelingt es mir bisher nicht aus den Subprozessen irgendein Resultat heraus zu lesen. Ich habe gestern sehr viel auf StackOverflow und sonst im Netz nach Lösungen gesucht. Habe es aus multiprocessing mit Manager(), Queue() und Array() versucht, aber nie bekam ich das Resultat wieder eingesammelt.
Auch der Versuch bei positivem Ergebnis eine temporäre Dummydatei zu schreiben, die dann im Anschluss eingesammelt werden können gelang mir nicht. Obwohl dafür ja gar kein Variablenaustausch zwischen den Prozessen notwendig wäre. Wahrscheinlich steckt schon vorher ein grundsätzlicher Fehler drin, so dass die XMLs gar nicht erst wie erwartet gelesen werden.

Hier mal ein Codeschnipsel, den ich natürlich bereits vielfach modifiziert hatte:

Code: Alles auswählen

def processFile(file):
    xmlname = '{}.xml'.format(file[:-4])
    zip = zipfile.ZipFile('{}/{}'.format(pp, file))
    inputfile = zip.open(xmlname, 'r')
    inputfile = io.TextIOWrapper(inputfile, encoding='utf-8')
    for i in range(15):
        line = inputfile.readline()
        if '<nasoperation>NBA' in line:
            newfile = open(inputfile[:-4], 'w')
            newfile.close()
            

def multiFileLoop():
    start = time.time()
    p = Pool()
    for lauf, file in enumerate(files[:3000]):
        if lauf > 1 and lauf%100 == 0:
            print(lauf)
        p.apply_async(processFile, file)

    p.close()
    p.join()
    ende = time.time() - start
    print('Laufzeit: {} sek.'.format(ende))
    #print('{} x NBA Protokolle.'.format(counter))
multiFileLoop() ist im Prinzip die main() aus der dann die Subprozesse über Pool().apply_async() aufgerufen werden. Die Verarbeitung der XML findet dann in processFile() statt.
Zuletzt geändert von nunatak am Donnerstag 26. Januar 2017, 12:38, insgesamt 2-mal geändert.
nunatak
User
Beiträge: 17
Registriert: Donnerstag 3. März 2011, 19:42

Ah, danke BlackJack! Hatte mich schon gefragt, wieso das so unschön aussieht. ;)
BlackJack

@nunatak: Du musst halt entweder mit dem Ergebnis von dem `apply_async()` auch irgendwann mal etwas machen, also zum Beispiel das Ergebnis der Berechnung abfragen, oder eine Rückruffunktion übergeben, die dann mit dem Ergebnis aufgerufen wird wenn es fertig ist.

Sonstinge Anmerkungen: `file` ist der Name eines eingebauten Datentyps. Zudem passt er nicht weil Du gar keine Dateiobjekte daram bindest, sondern Datei*namen*.

`zip` ist der Name einer eingebauten Funktion.

Falls diese ominöse -4 beim slicen die Dateinamenserweiterung entfernen soll: dafür gibt es `os.path.splitext()`. Und Pfade sollte man mit `os.path.join()` zusammensetzen und nicht mit Zeichenkettenformatierung.

`inputfile` ist dann tatsächlich ein Dateiobjekt, und da frage ich mich was Du bei ``inputfile[:-4]`` da wohl als Ergebnis erwartest. :-)

`pp` und `files` entsprechen in der Schreibweise nicht Konstanten. Und `files` ist wahrscheinlich noch nicht einmal eine Konstante, sollte also nicht einfach so magisch von irgendwo her kommen. `files` ist ein unpassender Name aus dem gleichen Grund wie `file` in `processFile()`. `pp` sowieso, genau wie `p` weil zu kurz und uninformativ.

Die beiden Funktionsnamen sind IMHO auch zu generisch. `processFile()` (also eigentlich ja `process_file()`) würde vielleicht noch für eine Funktion passen die tatsächlich eine Datei verarbeitet, aber die hier soll ja nur testen ob die Datei ein bestimmtes Kriterium erfüllt.

`mulitFileLoop()` enthält einen Fipptehler und ist auch wenn man es zu `multi_file_loop()` korrigiert nicht so aussagekräftig. `loop` würde ich ausser bei Hauptschleifen in ereingnisgetriebenen Rahmenwerken auch eher nicht in einem Funktionsnamen erwarten, denn ob da nun eine Schleife drin steckt, oder das mit Funktionen implementiert ist in denen dann die Schleifen versteckt sind, ist ein Implemetierungsdetail das nichts im Namen verloren hat. Generischer Name wäre `process_files()` oder eben ein besserer Name der auf dem besseren Namen für `process_file()` basiert.

Unter einem `lauf` stellt man sich eher etwas vor bei dem eine ganze Sammlung von Dateien verarbeitet wird und nicht nur eine Einzige.

Ich würde ja so etwas wie `Pool.imap()` oder `Pool.imap_undordered()` verwenden, je nach dem ob die Reihenfolge der Ergebnisse wichtig sind.

Macht es denn überhaupt Sinn den Test von der tatsächlichen Verarbeitung zu trennen?
nunatak
User
Beiträge: 17
Registriert: Donnerstag 3. März 2011, 19:42

Hallo BlackJack,

hast natürlich Recht, da gibt es aus PEP8-Sicht vieles zu kritistieren und einige Dinge habe ich bisher echt nicht auf dem Schirm. Danke z.B. für den Hinweis zu os.path.splitext() und os.path.join(). Werde ich in Zukunft einsetzen.

Aber wie gesagt, es handelt sich um Codeschnippsel, die ich nur zum Testen des multiprocessing auf die Schnelle geschrieben habe und huntermal hin und her angepasst habe. file war auch nicht der eigentliche Übergabename, normalerweise verwendet ich das aus den von dir genannten Gründen nicht als Variable. Das Slicen von inputfile[:-4], da hast du Recht. Ist auch dem vorherigen mehrmaligen Editieren des Codes geschuldet. Es gab also durchaus schon einen Zustand in dem nicht das Dateiobjekt sondern der Dateiname gesliced wurde. ;)

p = Pool(), files ist die Liste aller Dateien. Dateinamen, nicht Dateiobjekte. und pp ist der Pfad:

Code: Alles auswählen

pp = 'H:/pfad/zu/den/xml/dateien'
files = [f for f in os.listdir(pp) if f.startswith('prot_') and f.endswith('.zip')]

Aber wie sammle ich konkret ein Ergebnis des Subprozesses ein? Die von dir genannten Möglichkeiten Pool.imap() und Pool.imap_unordered() schaue ich mir an.

In diesem ersten Versuch wäre als Rückgabe ausreichend ein True oder False für die entsprechende Datei. Später für die tatsächliche Verarbeitung würde ich dann aber eher ein Dict() als Rückgabe benötigen.

Ob das Trennen Sinn macht? Das weiß ich auch noch nicht. Daher der Timer. Ich wollte einfach mal verschiedene Alternativen ausprobieren und schauen wie sich das auf die Performance auswirkt.
Sirius3
User
Beiträge: 17749
Registriert: Sonntag 21. Oktober 2012, 17:20

Was passiert denn, wenn Du processFile direkt aufrufst?
So wie es jetzt dasteht, kommt mindestens ein TypeError, weil processFile nur ein Argument erwartet, ein Dateiname aber sicher aus mehr als einem Zeichen besteht. Statt eine Datei anzulegen, wäre es besser, einen Rückgabewert zu haben und mit imap zu arbeiten.
nunatak
User
Beiträge: 17
Registriert: Donnerstag 3. März 2011, 19:42

Ok, jetzt hab ich dir Lösung. Danke BlackJack mit imap() funktioniert es super. Und 42 Sek. für alle 94.000 Dateien im Vergleich zu 1322 Sek. bei Single-Processing kann sich doch sehr gut sehen lassen!

Code: Alles auswählen

def processFile(filename):
    xmlname = '{}.xml'.format(os.path.splitext(filename)[0])
    gezippt = zipfile.ZipFile('{}/{}'.format(pp, filename))
    inputfile = gezippt.open(xmlname, 'r')
    inputfile = io.TextIOWrapper(inputfile, encoding='utf-8')
    for num, line in enumerate(inputfile):
        if num == 15:
            return False
        elif '<nasoperation>NBA' in line:
            #print(line)
            return True
            

def multiFileLoop():
    start = time.time()
    p = Pool()
    result = p.imap(processFile, files)
    counter = 0
    for i in result:
        if i: counter += 1
    p.close()
    p.join()
    ende = time.time() - start
    print('Laufzeit: {} sek.'.format(ende))
    print('{} x NBA Protokolle.'.format(counter))
Sirius3
User
Beiträge: 17749
Registriert: Sonntag 21. Oktober 2012, 17:20

@nunatak: wenn Du nur die ersten 15 Zeilen willst, nimm islice. Namen werden klein_mit_unterstrich geschrieben. pp taucht plötzlich auf.

Code: Alles auswählen

from itertools import islice

def process_file(filename):
    xmlname = '{}.xml'.format(os.path.splitext(filename)[0])
    gezippt = zipfile.ZipFile('{}/{}'.format(pp, filename))
    inputfile = gezippt.open(xmlname, 'r')
    inputfile = io.TextIOWrapper(inputfile, encoding='utf-8')
    return any('<nasoperation>NBA' in line for line in islice(inputfile, 15))
 
def multi_file_loop():
    start = time.time()
    p = Pool()
    counter = sum(p.imap_unordered(processFile, files))
    p.close()
    p.join()
    ende = time.time() - start
    print('Laufzeit: {} sek.'.format(ende))
    print('{} x NBA Protokolle.'.format(counter))
BlackJack

Ich würde mir hier noch das schliessen und joinen des Pools sparen, denn der ist ja nur lokal und wir sind an der Stelle ja sicher das es nicht möglich ist, das noch jemand von wo anders eine Aufgabe hinzufügt und da wir alle Ergebnisse verarbeitet haben, braucht man auch nicht auf ausstehende Aufgaben warten.
nunatak
User
Beiträge: 17
Registriert: Donnerstag 3. März 2011, 19:42

Hab eure Vorschläge mal eingebaut. Mit dem islice sieht das auf jeden Fall deutlich eleganter aus. Allerdings ist die Laufzeit jetzt bei 172 sek. Zuvor waren es nur 42. Werde noch prüfen woran genau das liegt.

EDIT: Beim zweiten Mal war's wieder schneller. 43 sek.

Code: Alles auswählen

import io, os
import sys
import shutil
import zipfile
import time
from multiprocessing import Pool
from itertools import islice

verzeichnis = 'H:/Pfad/zum/File'
files = [f for f in os.listdir(verzeichnis) if f.startswith('prot_') and f.endswith('.zip')]

def process_file(filename):
    """
    Funktioniert: 42 Sekunden, Ergebnis stimmt!
    """
    xmlname = '{}.xml'.format(os.path.splitext(filename)[0])
    gezippt = zipfile.ZipFile('{}/{}'.format(verzeichnis, filename))
    inputfile = gezippt.open(xmlname, 'r')
    inputfile = io.TextIOWrapper(inputfile, encoding='utf-8')
    return any('<nasoperation>NBA' in line for line in islice(inputfile, 15))
            

def multifile():
    start = time.time()
    p = Pool()
    counter = sum(p.imap_unordered(process_file, files))
    #result = p.imap(processFile, files)
    #counter = 0
    #for i in result:
    #    if i: counter += 1
    ende = time.time() - start
    print('Laufzeit: {} sek.'.format(ende))
    print('{} x NBA Protokolle.'.format(counter))


def singleFileLoop():
    start = time.time()
    counter = 0
    for lauf, filename in enumerate(files):
        if lauf > 1 and lauf%10000 == 0:
            print(lauf)
        xmlname = '{}.xml'.format(filename[:-4])
        gezippt = zipfile.ZipFile('{}/{}'.format(verzeichnis, filename))
        inputfile = gezippt.open(xmlname, 'r')
        inputfile = io.TextIOWrapper(inputfile, encoding='utf-8')
        for i in range(15):
            line = inputfile.readline()
            if '<nasoperation>NBA' in line:
                counter += 1
    ende = time.time() - start
    print('Laufzeit: {} sek.'.format(ende))
    print('{} x NBA Protokolle.'.format(counter))


if __name__ == '__main__':
    multifile()
Benutzeravatar
pyHoax
User
Beiträge: 84
Registriert: Donnerstag 15. Dezember 2016, 19:17

42 Sek. für alle 94.000 Dateien im Vergleich zu 1322
Dann hast du eventuell mit Multiprozessing über das Ziel hinausgeschossen...

Besitzt du wirklich mehr als 30 Cores die dein Problem derart beschleunigen, oder ist es nicht eher so das du jetzt gleichzeitig auf die I/O vom Betriebsystem wartest ?

Wenn du Python3 verwendest könntest du die neuen 'async' Funktionalitäten verwenden und.. und Pythons Single-core-multithreathing würde vermutlich auch gut beschleunigen.
nunatak
User
Beiträge: 17
Registriert: Donnerstag 3. März 2011, 19:42

Ja, ich denke auch, dass diese Beschleunigung auch auf anderen Effekten beruht. Eigentlich sollte es max. 8x so schnell sein mit den 8 Cores. Mit "async" meinst du das asyncio Modul, das seit Python 3.4 in der Standardbibliothek ist? Habe auch schon gehört, dass es gut sein soll. Steht noch auf der Todo-Liste, damit "herumzuspielen".
Benutzeravatar
pyHoax
User
Beiträge: 84
Registriert: Donnerstag 15. Dezember 2016, 19:17

Steht noch auf der Todo-Liste, damit "herumzuspielen".
Das kenne ich,
Ich hänge derart auf einer veralteten Plattform fest, das ich nicht von Python 2.7 wegkomme (wir sind froh das der Code den Ugrade von 2.5 unbeschadet überlebt hat)

Das nächste Projekt wird in Python 3+ umgesetzt.. ich schwör ;)

asyncio Modul
Aye .. und das hier ist der neuste async synatx: https://www.python.org/dev/peps/pep-0530/
result = [await fun() for fun in funcs]
Die Liste hier wird asyncron befüllt.
Antworten