Mehrfach-Dateien (Duplikate) erkennen

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
Benutzeravatar
HarteWare
User
Beiträge: 69
Registriert: Samstag 23. Februar 2013, 21:16
Wohnort: localhost

Hallo zusammen,

ich habe versucht, in Python ein zuvor in C++ geschriebenes Programm zu entwickeln. Das Ergebnis ist folgendes:

Code: Alles auswählen

import filecmp
import os
import sys
from pathlib import Path
from timeit import default_timer as timer

def is_duplicate_file(file_a, file_b):
    return filecmp.cmp(file_a, file_b, shallow=False)

def recursive_directory_iterator(path):
    for entry in os.scandir(path):
        if entry.is_dir(follow_symlinks=False):
            yield from recursive_directory_iterator(entry);
        else:
            yield entry

def build_file_index(start_path):
    files_by_size = {}
    for entry in recursive_directory_iterator(start_path):
        if entry.is_file():
            files_by_size.setdefault(entry.stat().st_size, []).append(entry)
    return files_by_size
    
def find_duplicate_files(files):
    dupes = {}
    for size, paths in files.items():
        if len(paths) < 2:
            continue
        matched = [False] * len(paths)
        #print(f'Potential duplicates for size {size} bytes: {paths}')
        for idx_i, i in enumerate(paths):
            for idx_j, j in enumerate(paths[idx_i+1:], start=idx_i+1):
                #print(f'Checking [{idx_i}] = {i} and [{idx_j}] = {j}')
                if not matched[idx_i] and not matched[idx_j] and is_duplicate_file(i, j):
                    dupes.setdefault(i.path, []).append(j.path)
                    matched[idx_j] = True
    return dupes

def print_dupes(dupes):
    for a, b in dupes.items():
        print(f'\nReference: {a}')
        for dup in b:
            print(f'Duplicate: {dup}')
    print(f'\nFound a total of {len(dupes)} duplicate groups.\n')

def main():
    if len(sys.argv) != 2:
        print("Program expects one argument: <Starting path>")
        return 1
        
    start_path = Path(sys.argv[1])
    if not start_path.exists():
        print("Provided argument is not an existing path.")
        return 2
        
    start = timer()
    files = build_file_index(start_path)
    dupes = find_duplicate_files(files)
    print_dupes(dupes)
    stop = timer()

    print(f'\nDuration: {1000*(stop - start):.6f}ms')        
   
if __name__ == '__main__':
    main()
Mich würde interessieren, ob das gut ist, oder wie man es besser machen würde.
Ich habe bewusst auf fortgeschrittene Techniken wie Analyse der Dateiträgerart und Multithreading verzichtet.
Sirius3
User
Beiträge: 18274
Registriert: Sonntag 21. Oktober 2012, 17:20

Warum importierst Du ›pathlib.Path‹, benutzt aber trotzdem die low-level-Funktionen aus ›os‹?
›recursive_directory_iterator‹ ist einfach ›Path.rglob‹. Statt ›setdefault‹ würde man defaultdict verwenden. ›i‹ und ›j‹ sind zwei ganz schlechte Variablennamen für zwei Pfade. Statt der zwei for-Schleifen über den Index würde man ›itertools.combinations‹ verwenden.
Mit dem return-Wert von ›main‹ wird gar nichts gemacht.

Code: Alles auswählen

import sys
import filecmp
from collections import defaultdict
from itertools import combinations
from pathlib import Path
from time import perf_counter

def is_duplicate_file(file_a, file_b):
    return filecmp.cmp(file_a, file_b, shallow=False)

def build_file_index(start_path):
    files_by_size = defaultdict(list)
    for entry in start_path.rglob('*'):
        if entry.is_file():
            files_by_size[entry.stat().st_size].append(entry)
    return files_by_size
    
def find_duplicate_files(files):
    duplicates = defaultdict(list)
    for size, paths in files.items():
        found = set()
        for path_a, path_b in combinations(paths, 2):
            if path_a not in found:
                if is_duplicate_file(path_a, path_b):
                    duplicates[path_a].append(path_b)
                    found.add(path_b)
    return duplicates

def print_dupes(duplicates):
    for reference_path, paths in duplicates.items():
        print(f'\nReference: {reference_path}')
        for duplicate in paths:
            print(f'Duplicate: {duplicate}')
    print(f'\nFound a total of {len(duplicates)} duplicate groups.\n')

def main():
    if len(sys.argv) != 2:
        print("Program expects one argument: <Starting path>")
        return 1
        
    start_path = Path(sys.argv[1])
    if not start_path.exists():
        print("Provided argument is not an existing path.")
        return 2
        
    start = perf_counter()
    files = build_file_index(start_path)
    duplicates = find_duplicate_files(files)
    print_dupes(duplicates)
    stop = perf_counter()

    print(f'\nDuration: {1000*(stop - start):.6f}ms')
    return 0       
   
if __name__ == '__main__':
    sys.exit(main())
Benutzeravatar
HarteWare
User
Beiträge: 69
Registriert: Samstag 23. Februar 2013, 21:16
Wohnort: localhost

@Sirius3: Vielen Dank für die Rückmeldung.
So Sachen wie defaultdict hatte ich nicht mehr im Sinn, aber genau das brauche ich hier, danke.
Das set ist natürlich etwas eleganter, ich war leider gar nicht draufgekommen.
Das mit dem Returnwert hatte ich nicht so drüber nachgedacht, aber das ist natürlich auch ein wenig sauberer.
Variablenamen -> Da war ich einfach faul, stimmt schon^^
Das combinations-Zeug muss ich mir mal noch anschauen. In C++ gibt es "nur" `std::next_permutation` und mir fehlte eine saubere Lösung wie (was ich mir vorstellte) `std::next_pair`, es sieht so aus, als würde das `combinations(paths, 2)` aber genau das machen.
Bzgl. der "walk"-Fuktionalität, ich hatte Schwierigkeiten bei der Recherche (man findet etliche Varianten) und habe meine eigenen Benchmarks durchgeführt. In diesem Fall bin ich der Meinung, dass die Lösung mit os.scandir deutlich besser (schneller) ist, außer ich vergleiche hier Äpfel mit Birnen:

Code: Alles auswählen

from time import perf_counter
from pathlib import Path
import os
import sys

def test_rglob(path):
    files = []
    for entry in path.rglob('*'):
        if entry.is_file():
            files.append(Path(entry).absolute())
    return files
            
def recursive_directory_iterator(path):
    for entry in os.scandir(path):
        if entry.is_dir(follow_symlinks=False):
            yield from recursive_directory_iterator(entry);
        else:
            yield entry
    
def test_scandir(path):
    files = []
    for entry in recursive_directory_iterator(path):
        if entry.is_file():
            files.append(Path(entry).absolute())
    return files

def main():
    path = Path('D:/Programming')
    
    start = perf_counter()
    files_glob = test_rglob(path)
    print(f'{perf_counter()-start}s')
    print(f'Sanity check file count: {len(files_1)}')
    
    start = perf_counter()
    files_scandir = test_scandir(path)
    print(f'{perf_counter()-start}s')
    print(f'Sanity check file count: {len(files_2)}')
    
    set_scandir = set(files_scandir)
    set_glob = set(files_glob)
    
    print('Files found by scandir but not glob:')
    print(list(set_scandir - set_glob))
    
    print('Files found by glob but not scandir:')
    print(list(set_glob - set_scandir))

if __name__ == '__main__':
    main()
Außerdem ist mir duch eine Plausibilitätsprüfung aufgefallen, dass `glob` hier drei Dateien einfach "übersieht", aus einem mit unerklärlichen Grund:

Code: Alles auswählen

$ python3 bench.py
52.18136180000147s
Sanity check file count: 391611
8.468215000000782s
Sanity check file count: 391614
Files found by scandir but not glob:
[WindowsPath('D:/Programming/09_SourceCode/linux/drivers/gpu/drm/nouveau/nvkm/subdev/i2c/aux.c'), WindowsPath('D:/Programming/09_SourceCode/linux/include/soc/arc/aux.h'), WindowsPath('D:/Programming/09_SourceCode/linux/drivers/gpu/drm/nouveau/nvkm/subdev
/i2c/aux.h')]
Files found by glob but not scandir:
[]
Antworten