ich würde gerne Feedback haben ob man den unten stehenden Code so lassen kann ( funktionieren tut er soweit ).
Ich danke euch
Code: Alles auswählen
from pathlib import Path
from shutil import copy2
import configparser
from datetime import date
import logging
from glob import glob
from module import config
from datetime import datetime
import tarfile
def ordner_struktur(benoetigte_ordner, speicherort):
"""
Falls benötigte ordner nicht vorhanden sind werden diese erzeugt.
exist_ok -> Schaltet Exception aus
:param speicherort liste mit zur verfügung stehenden speicherpfaden
:param benoetigte_ordner liste mit namen
"""
for ordner_name in benoetigte_ordner:
ordner_pfad = speicherort.joinpath(ordner_name)
if not ordner_pfad.exists():
ordner_pfad.mkdir(parents=True, exist_ok=True)
def setup_logger(name, log_file, level=logging.INFO):
"""
Hier werden verschiedene logger vorbereitet.
:return:logger
"""
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s', datefmt='%d/%m/%Y %I:%M:%S %p')
handler = logging.FileHandler(log_file)
handler.setFormatter(formatter)
logger = logging.getLogger(name)
logger.setLevel(level)
logger.addHandler(handler)
return logger
def configdatei_laden():
"""
DUMMY wird später eigene datei
ANMERKUNG:Man könnte das auslesen auch in die Config datei verlagern.
:return: Dictonary mit pfaden
"""
if not Path("config/config.ini").exists():
config.main()
config_einstellung = configparser.ConfigParser()
config_einstellung.sections()
config_einstellung.read("config/config.ini")
zu_speichernde_dateien_pfad = Path(config_einstellung['PATH']["fertigdaten_pfad"])
medien_pfade = [Path(config_einstellung['PATH']["nas_pfad"]),
Path(config_einstellung['PATH']["externefestplatte_pfad"])]
muster_fuer_dateien = {"muster_datei_jahr": config_einstellung["DATEI_MUSTER"]["muster_jahres_dateien"],
"muster_datei_monat": config_einstellung["DATEI_MUSTER"]["muster_monats_dateien"]}
backup_tag = datetime.strptime((config_einstellung['BACKUP_TAG']["backup_tag"]), "%Y-%m-%d").date()
return zu_speichernde_dateien_pfad, medien_pfade, muster_fuer_dateien, backup_tag
def speichermediumpfade_pruefen(medien_pfade):
"""
Es wird geguckt ob jeder Pfad in dem Dictonary existiert,sollte kein Pfad exestieren wird ein Fehler geworfen
:param medien_pfade: dictonary mit pfaden Beispiel{WindowsPath('C:/Users/Marcel/Desktop/Nas')
"""
# info_logger.info(f"start:{speichermediumpfade_pruefen.__name__}")
vorhandene_pfade = []
for pfad in medien_pfade:
if pfad.exists():
vorhandene_pfade.append(pfad)
if vorhandene_pfade:
# info_logger.info(f"{speichermediumpfade_pruefen.__name__},pass")
return vorhandene_pfade
else:
# info_logger.info(f"{speichermediumpfade_pruefen.__name__},faild")
# critical_logger.critical("Keine der angegebenen Pfade existiert")
raise FileExistsError
def dateien_ermitteln(such_pfad, muster_fuer_dateien):
"""
:param such_pfad:
:param muster_fuer_dateien:
:return:
"""
gefundene_dateien = [path.name for path in such_pfad.glob(muster_fuer_dateien)]
return gefundene_dateien
def vorhandene_dateien_auf_medium_speichern(zu_speichernde_dateien_pfad, medien_pfade, zu_speichernde_dateien_jahre,
zu_speichernde_dateien_monate):
"""
:param zu_speichernde_dateien_pfad:
:param medien_pfade:
:param zu_speichernde_dateien_jahre:
:param zu_speichernde_dateien_monate:
:return:
"""
for pfad in medien_pfade:
dateien_kopieren(source_path=zu_speichernde_dateien_pfad, destination_path=pfad,
existing_files=zu_speichernde_dateien_jahre)
dateien_kopieren(source_path=zu_speichernde_dateien_pfad, destination_path=pfad,
existing_files=zu_speichernde_dateien_monate)
def neuen_pfad_erzeugen(datei_namen, directory_path):
"""
Es wird die länge des übergebenen Datei_namens geprüft sollte der namen zum Beispiel Fertigdaten_2018 sein wird
dieser in 2 teile gespilltet somit weiß man das es sich hierbei um eine Jahresdatei handelt,im umkehrschluss wissen
wir das es sonst eine Monatsdatei ist abhängig um was für eine Datei es sich handelt extrahieren wir das Jahr aus
dem Dateinamen und legen einen Unterordner an in das jeweilige Verzeichnis und geben am Ende einen neu
zusammengesetzen Pfad zurück
:param datei_namen:Fertigdaten_2018_07.wrt
:param directory_path: C:\\Users\\Marcel\\Desktop\\Nas
:return: Pathobjekt
"""
'Fertigdaten_2020.wrt'
if len(datei_namen.split("_")) == 2:
directory_path = Path(directory_path / "Jahre" / datei_namen.split("_")[1])
directory_path = directory_path.with_suffix('')
if not directory_path.exists():
directory_path.mkdir(parents=True, exist_ok=True)
return directory_path.joinpath(datei_namen)
else:
directory_path = Path(directory_path / "Monate" / datei_namen.split("_")[1])
directory_path = directory_path.with_suffix('')
if not directory_path.exists():
directory_path.mkdir(parents=True, exist_ok=True)
return directory_path.joinpath(datei_namen)
def dateien_kopieren(source_path, destination_path, existing_files):
"""
FUNKTIONSERKLÄRUNG:
Wir bekommen eine Liste mit den existierenden Dateien übergeben,(sollten keine vorhanden sein ist die Liste leer),
sollte die Liste leer sein wird die kopier Funktion direkt beendet.Sollten Dateien vorhanden sein iterieren wir
einzelnd über diese Dateien , wir erzeugen einen neuen destination pfad sowie einen neuen source pfad diese
setzen sich aus dem alten pfade "basispfad" + die bezeichnung der datei zusammen lediglich bei dem neuendestination
pfad wird ebenfalls eine ordner struktur gleich mit angelegt.Sollte der neue Destination pfad nicht existieren(also
die Datei gibt es auf dem Medium noch nicht) und es sich bei dem neuen Source Path um eine Datei handeln wird der
kopiervorgang gestartet , es ist eine kleine Pause zum puffern eingebaut.
:param source_path: Pathobjekt mit dem pfad zur Verzeichnis wo die Dateien vorhanden sind,
C:\\Users\\Marcel\\Desktop\\Fertigdaten
:param destination_path: Pathobjekt mit dem pfad zur Verzeichnis wo die Dateien fehlen,
C:\\Users\\Marcel\\Desktop\\Nas
:param existing_files: Liste mit den fehlenden Dateinamen,Fertigdaten_2018_07.wrt
:return:
"""
if existing_files: # geht nur rein wenn nicht leer
for file in existing_files:
# dest
neuer_destination_path = neuen_pfad_erzeugen(file, destination_path)
# search
neuer_source_path = source_path / file
if not neuer_source_path.exists():
neuer_source_path = neuen_pfad_erzeugen(file, source_path)
# datei vorhanden und gleich
if neuer_destination_path.exists() and (neuer_destination_path.stat().st_size
== neuer_source_path.stat().st_size):
continue
# nicht vorhanden -> kopieren
elif not neuer_destination_path.exists() and neuer_source_path.is_file():
copy2(src=neuer_source_path, dst=neuer_destination_path)
# vorhanden aber ungleiche datei
elif neuer_destination_path.exists() and (neuer_destination_path.stat().st_size
!= neuer_source_path.stat().st_size):
copy2(src=neuer_source_path, dst=neuer_destination_path)
def speichermedien_synchronisieren(medien_pfade, muster_fuer_dateien):
"""
:param medien_pfade:[WindowsPath('C:/Users/Marcel/Desktop/Nas'), WindowsPath('C:/Users/Marcel/Desktop/Externefestplatte')]
:return:
"""
if medien_pfade[0].exists() and medien_pfade[1].exists():
files_medium_a_jahre = [file.name for file in
medien_pfade[0].glob("**/" + muster_fuer_dateien.get("muster_datei_jahr"))]
files_medium_a_monate = [file.name for file in
medien_pfade[0].glob("**/" + muster_fuer_dateien.get("muster_datei_monat"))]
files_medium_b_jahre = [file.name for file in
medien_pfade[1].glob("**/" + muster_fuer_dateien.get("muster_datei_jahr"))]
files_medium_b_monate = [file.name for file in
medien_pfade[1].glob("**/" + muster_fuer_dateien.get("muster_datei_monat"))]
# prints the missing elements in medium_b
fehlene_dateien_b_jahre = list((set(files_medium_a_jahre).difference(files_medium_b_jahre)))
if fehlene_dateien_b_jahre:
dateien_kopieren(source_path=medien_pfade[0],
destination_path=medien_pfade[1],
existing_files=fehlene_dateien_b_jahre)
fehlene_dateien_b_monate = list((set(files_medium_a_monate).difference(files_medium_b_monate)))
if fehlene_dateien_b_jahre:
dateien_kopieren(source_path=medien_pfade[0],
destination_path=medien_pfade[1],
existing_files=fehlene_dateien_b_monate)
# prints the missing elements in medium_a
fehlene_dateien_a_jahre = list((set(files_medium_b_jahre).difference(files_medium_a_jahre)))
if fehlene_dateien_a_jahre:
dateien_kopieren(source_path=medien_pfade[1],
destination_path=medien_pfade[0],
existing_files=fehlene_dateien_b_jahre)
fehlene_dateien_a_monate = list((set(files_medium_b_monate).difference(files_medium_a_monate)))
if fehlene_dateien_a_jahre:
dateien_kopieren(source_path=medien_pfade[1],
destination_path=medien_pfade[0],
existing_files=fehlene_dateien_a_monate)
def nas_backup_einmal_im_jahr(backup_tag, medien_pfade, muster_fuer_dateien):
"""
--------------------------------------------------------------------------------------------------------------------
Hinweise für mich:
#alternative modi zum packen
'w:gz'
# Open for gzip compressed writing.(best speed/filesize ratio)
# 'w:bz2'
# Open for bzip2 compressed writing.(best compression ratio)
# 'w:xz'
# Open for lzma compressed writing.
"""
# if date.today() == backup_tag:
ordner_struktur(["Backup", "Archiv"],
medien_pfade[0])
archiv_pfad = Path(medien_pfade[0].joinpath("Archiv","Archiv " + str(backup_tag) + ".tar.gz"))
with tarfile.open(archiv_pfad, "w:gz") as tar:
for name in [medien_pfade[0]/"Jahre", medien_pfade[0]/"Monate"]:
tar.add(name)
def main():
ordner_struktur(benoetigte_ordner=["logs", "config", "module"], speicherort=Path(__file__).parents[0])
# first file logger
info_logger = setup_logger('info_logger', 'logs/info.log')
info_logger.info(f"-------------------------------Neuer Eintrag-----------------------------")
# second file logger
critical_logger = setup_logger('criticallogger', 'logs/critical.log')
critical_logger.critical(f"-------------------------------Neuer Eintrag-----------------------------")
# ausgelesene paramter der config datei
zu_speichernde_dateien_pfad, medien_pfade, muster_fuer_dateien, backup_tag = configdatei_laden()
# ------------------------------------------------------------------------------------------------------------------
# der Teil wird ausgeführt wenn mindestens 1 medienpfad vorhanden ist.
medien_pfade = speichermediumpfade_pruefen(medien_pfade)
zu_speichernde_dateien_jahre = dateien_ermitteln(zu_speichernde_dateien_pfad,
muster_fuer_dateien.get("muster_datei_jahr"))
zu_speichernde_dateien_monate = dateien_ermitteln(zu_speichernde_dateien_pfad,
muster_fuer_dateien.get("muster_datei_monat"))
vorhandene_dateien_auf_medium_speichern(zu_speichernde_dateien_pfad,
medien_pfade,
zu_speichernde_dateien_jahre,
zu_speichernde_dateien_monate)
# ------------------------------------------------------------------------------------------------------------------
# der Teil wird nur ausgeführt wenn mindestens 2 medienpfade vorhanden sind.
speichermedien_synchronisieren(medien_pfade, muster_fuer_dateien)
# ------------------------------------------------------------------------------------------------------------------
# der Teil wird nur am ende vom Jahr ausgeführt.
nas_backup_einmal_im_jahr(backup_tag, medien_pfade, muster_fuer_dateien)
if __name__ == "__main__":
main()