Durchsuchen von Verzeichnisbäumen (os.walk Alternative)

Code-Stücke können hier veröffentlicht werden.
Antworten
Tom_H
User
Beiträge: 13
Registriert: Sonntag 3. Juli 2011, 10:12

Moin moin,

ich mache mittlerweile viele Datei-Sortier-, Aufräum- und Archivier-Arbeiten mit kleinen Python-Progrämmchen.
Dabei braucht man immer wieder Listen der Dateien und zughörige Metadaten. Mit os.walk und darauffolgendem os.stat kann das recht zeitaufwändig werden, wenn man mehrere Verzeichnisse mit vielen Dateien abgrasen muß.
Hier ist mal der erste Versuch das ein wenig zu beschleunigen:

Code: Alles auswählen

# -*- coding: utf-8 -*-
import os, stat, Queue, threading

def get_direntries(pathlist):
    '''scan [pathlist] and return all file-entries as a queue of tuples'''
    class PushUrls(threading.Thread):
        '''non-recursive walker'''
        def __init__(self, pathque, tuplque):
            threading.Thread.__init__(self)
            self.pathque = pathque
            self.tuplque = tuplque
        def run(self):
            while True:
                pathstr = self.pathque.get() # pop path from fifo
                try:
                    dirlist = os.listdir(pathstr)
                except os.error:
                    dirlist = []
                for direntry in dirlist:
                    urlstr = os.path.join(pathstr, direntry)
                    try:
                        stdict = os.stat(urlstr)
                        if not stat.S_ISLNK(stdict.st_mode): # don't follow links
                            if stat.S_ISDIR(stdict.st_mode): # if entry = subdir
                                self.pathque.put(urlstr)     # push subdir
                            else:
                                self.tuplque.put((urlstr, stdict))
                    except os.error:
                        self.tuplque.put((urlstr, None)) # onerror: stat = None
                self.pathque.task_done()

    def max_threads(jobs):
        try:
            from multiprocessing import cpu_count
            cores = cpu_count()
        except ImportError:
            cores = 2 # default: assume dual-core
        return min([jobs * 2, cores * 2])

    pathque, tuplque = Queue.Queue(), Queue.Queue()
    for threadcount in range(max_threads(len(pathlist))):
        scanins = PushUrls(pathque, tuplque)
        scanins.setDaemon(True)
        scanins.start()   # start scanner-thread(s)
    for path in pathlist:
        pathque.put(path) # push path(s)
    pathque.join()        # block until fifo == empty
    return tuplque        # queue of (urlstring, os.stat)-tuples

#sample-call
print get_direntries(['C:\\', 'D:\\']).qsize()
Da ich Python-Newbie bin, wollte ich dieses Routinchen mal zum "Korrekturlesen" hier posten.
Ich stamme aus der Computer-Steinzeit und bin mit Fortran-77 und Pascal / Modula2 aufgewachsen. Die ersten CP/M-Rechner habe ich noch selbst gelötet.
Leider hab' ich in den letzten zwei Jahrzehnten nicht viel dazugelernt - ich bitte daher allfällige "un-Pythonische" Konstrukte zu entschuldigen und ggf, zu korrigieren.

mfg, Tom
BlackJack

@Tom_H: Also ich würde auf jeden Fall mal überprüfen ob das tatsächlich etwas bringt, denn Threads laufen bei CPython nicht wirklich parallel und wenn man tatsächlich parallel auf verschiedene Bereiche einer Festplatte zugreift, kann es sogar deutlich langsamer als schneller werden.

Klassen innerhalb von Funktionen definieren ist IMHO kein guter Stil, wenn man das nicht wirklich benötigt. Bei jedem Aufruf der Funktion wird ein neues Klassenobjekt erstellt, aber die sind ja alle gleich, also ist das unnötig.

Abkürzungen und Datentypen in Namen sollte man vermeiden. Datentypen in Namen machen es schwieriger den tatsächlichen Typ zu ändern, weil man dann entweder überall den Namen ändern muss, oder aber einen irreführenden Namen im Programm hat. Und so etwas wie `tuplque` sagt überhaupt nichts darüber aus was die Bedeutung des Objekts ist, welches sich dahinter verbirgt. *Das* ist doch aber die Information die ein Leser braucht um ein Programm zu verstehen.
Tom_H
User
Beiträge: 13
Registriert: Sonntag 3. Juli 2011, 10:12

BlackJack hat geschrieben:@Tom_H: Also ich würde auf jeden Fall mal überprüfen ob das tatsächlich etwas bringt, denn Threads laufen bei CPython nicht wirklich parallel und wenn man tatsächlich parallel auf verschiedene Bereiche einer Festplatte zugreift, kann es sogar deutlich langsamer als schneller werden.
in meinem Fall scheint's doch ein bisschen etwas zu bringen:

Code: Alles auswählen

>>> import os, time, ThreadedScanner
>>> def the_old_way(pathlist):
    fileentries = []
    for path in pathlist:
        for actualpath, dirlist, namelist in os.walk(path):
            for filename in namelist:
                fileurl = os.path.join(actualpath, filename)
                try:
                    filestat = os.stat(fileurl)
                    fileentries.append((fileurl, filestat))
                except os.error:
                    fileentries.append((fileurl, None))
    return fileentries

>>> def showtime(message, start, count):
    print message, count, 'elements, ', int(time.time() - start), 'sec'

>>> def test():
    pathlist = ['P:\\', '\\\BKUPNAS\\Public\\Daten']
    start = time.time()
    showtime('threads', start, ThreadedScanner.get_direntries(pathlist).qsize())
    start = time.time()
    showtime('classic', start, len(the_old_way(pathlist)))

>>> test()
threads 587463 elements,  206 sec
classic 587463 elements,  1535 sec
>>> 
Verschiedene Tests (mit verschiedenen Platten, Storage-Systemen, SMB-Freigaben und unterschiedlicher Thread-Anzahl) zeigen, daß das Optimum bei zwei bis vier Threads pro Laufwerk liegt - zumindestscheint das bei Python 2.7.1 unter Win-64 so zu sein. Obiges Beispiel ist übrigens genau der Auslöser für diesen Beschleunigungsversuch. Das ist ein Datei-basiertes Backup auf ein NAS, bei dem ich beim Kopieren gleich noch ein paar Korrekturen in den ACLs und beim Zeitstempel erledige.
Klassen innerhalb von Funktionen definieren ist IMHO kein guter Stil, wenn man das nicht wirklich benötigt. Bei jedem Aufruf der Funktion wird ein neues Klassenobjekt erstellt, aber die sind ja alle gleich, also ist das unnötig.

Abkürzungen und Datentypen in Namen sollte man vermeiden. Datentypen in Namen machen es schwieriger den tatsächlichen Typ zu ändern, weil man dann entweder überall den Namen ändern muss, oder aber einen irreführenden Namen im Programm hat. Und so etwas wie `tuplque` sagt überhaupt nichts darüber aus was die Bedeutung des Objekts ist, welches sich dahinter verbirgt. *Das* ist doch aber die Information die ein Leser braucht um ein Programm zu verstehen.
Danke für die Anmerkungen - ich hab's mal "verbessert" (ich lerne ja noch)...

Code: Alles auswählen

import os, stat, Queue, threading

class DirectoryScanner(threading.Thread):
    '''non-recursive walker'''
    def __init__(self, pathfifo, namefifo):
        threading.Thread.__init__(self)
        self.pathfifo = pathfifo
        self.namefifo = namefifo
    def run(self):
        while True:
            path = self.pathfifo.get() # pop path from fifo
            try:
                dirlist = os.listdir(path)
            except os.error:
                dirlist = []
            for filename in dirlist:
                fileurl = os.path.join(path, filename)
                try:
                    filestat = os.stat(fileurl)
                    if not stat.S_ISLNK(filestat.st_mode): # don't follow links
                        if stat.S_ISDIR(filestat.st_mode): # if entry = subdir
                            self.pathfifo.put(fileurl)     # push subdir
                        else:
                            self.namefifo.put((fileurl, filestat))
                except os.error:
                    self.namefifo.put((fileurl, None)) # onerror: stat = None
            self.pathfifo.task_done()

def get_direntries(pathlist):
    '''scan [pathlist] and return all file-entries as a queue of tuples'''
    def maxthreads(jobs):
        try:
            from multiprocessing import cpu_count
            cores = cpu_count()
        except ImportError:
            cores = 2 # default: assume dual-core
        return min([jobs * 2, cores * 2])

    pathfifo, namefifo = Queue.Queue(), Queue.Queue()
    for threadcount in range(maxthreads(len(pathlist))):
        scanner = DirectoryScanner(pathfifo, namefifo)
        scanner.setDaemon(True)
        scanner.start()    # start scanner-thread(s)
    for path in pathlist:
        pathfifo.put(path) # push path(s)
    pathfifo.join()        # block until fifo == empty
    return namefifo        # queue of (urlstring, os.stat)-tuples
Ich hoffe, das ist besser so?

TNX, Tom
Antworten