Code-Durchsicht

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
Benutzeravatar
Dennis89
User
Beiträge: 1555
Registriert: Freitag 11. Dezember 2020, 15:13

Guten Morgen zusammen,

wie gestern angekündigt würde ich euch gerne mein kleines Projekt zeigen. Falls jemand sich die Zeit vertreiben will, würde ich mich über Kritik am Code freuen.
Es geht dabei um folgendes. Ein Raspberry Pi ist an einen Monitor angeschlossen und hat über die GPIO's einen RFID Reader angeschlossen. Damit möchte ich in meiner Werkstatt die Arbeitszeit pro Auftrag dokumentieren. Ich habe in der Werkstatt kein Internet, deswegen zähle ich einfach die Sekunden und berechne daraus die Stunden und Minuten. Pro Auftrag wird eine *.csv-Datei auf einem USB-Stick angelegt und darin werden die Zeiten dokumentiert.
Das machen die folgenden zwei Codes:

Code: Alles auswählen

#!/usr/bin/env python3

from RPi import GPIO
import tkinter as tk
from mfrc522 import SimpleMFRC522
from TimeRecording import TimeRecording
from pathlib import Path, PurePosixPath
import csv


READER = SimpleMFRC522()
ID = 1070737171454
ORDER_PATH = Path('/media/pi/')
ORDERS = ['Auftrag auswählen']


class App(tk.Frame):
    def __init__(self, master):
        tk.Frame.__init__(self, master)
        self.status = None
        self.check_time = TimeRecording()
        self.menu_choice = tk.StringVar(self)
        self.orders = self.search_orders(ORDERS)
        self.menu_choice.set(self.orders[0])
        self.order_menu = tk.OptionMenu(self,  self.menu_choice, *self.orders)
        self.order_menu.grid(row=0, column=0)
        self.menu_choice.trace('w', self.read_menu)
        self.information = tk.Label(self, text='Hey, mit was gehts los?')
        self.information.grid(row=0, column=1, rowspan=2)
        tk.Button(self, text='Kommen', command=self.start_time_recording).grid(row=2, column=0)
        tk.Button(self, text='Gehen', command=self.stop_time_recording).grid(row=2, column=1)
        tk.Button(self, text='Neuer Auftrag erstellen', command=self.creat_order).grid(row=2, column=2)
        self.input_ordernumber = tk.Entry(self)
        self.input_ordernumber.grid(row=2, column=3)
        self.change_year = tk.Entry(self)
        self.change_year.grid(row=0, column=3)
        self.change_year.insert(0, '21')


    def search_orders(self, orders):
        try:
            for folder in ORDER_PATH.iterdir():
                order_path = Path(f'{folder})
            for order in order_path.iterdir():
                orders.append(PurePosixPath(order).stem)
        except Exception as e:
            logging.basicConfig(filename='TimeClock.log')
            logging.error(e)
            orders = ['USB-Stick fehlt']
        return orders
    
    def read_menu(self, *args):
        self.information.configure(text=self.menu_choice.get())
        
    def creat_order(self):
        year = self.change_year.get()
        new_ordernumber = self.input_ordernumber.get()
        if new_ordernumber == '':
            self.information.configure(text='Neue Auftragsnummer eingeben')
        elif f'VG-{new_ordernumber}' in self.orders:
            self.information.configure(text='Sorry, Auftragsnummer ist schon vorhanden')
        else:
            new_order_name = self.input_ordernumber.get()
            with open(f'{ORDER_PATH}/VG-{year}{new_order_name}.csv', 'w', newline='') as csvfile:
                csvwriter = csv.writer(csvfile, delimiter=',', quoting=csv.QUOTE_MINIMAL)
                csvwriter.writerow(['hour, minute'])
            self.order_menu['menu'].delete(0, 'end')
            new_order = f'VG-{year}{new_order_name}'
            self.orders.append(new_order)
            self.order_menu['menu'].delete(0, 'end')
            for order in self.orders:
                self.order_menu['menu'].add_command(label=order, command=tk._setit(self.menu_choice, order))
        self.information.configure(text=f'Auftrag {new_order} erstellt.')
    
    def check_id(self):
        id, text = READER.read()
        if id == ID:
            return True
        else:
            self.information.configure(text='Falscher Chip')
            return False
            
    def start_time_recording(self):
        if self.menu_choice.get() == 'Auftrag auswählen' or self.menu_choice.get() == 'USB-Stick fehlt':
            self.information.configure(text='Erst einen Auftrag auswäheln')
        else:
            self.check_time = TimeRecording()
            if self.check_id() == True:        
                self.check_time.start_time()
                self.status = 1
                self.information.configure(text='Hey Dennis, schaffs gut!')
                                
    def stop_time_recording(self):
        if self.check_id() == True:
            if self.status == None:
                pass
            else:
                self.check_time.total_time()
                hour, minute = self.check_time.convert_time()
                self.information.configure(text=f'Anwesend: {hour} Stunden und {minute} Minuten \n Ciao!')
                with open(f'{ORDER_PATH}/{self.menu_choice.get()}.csv', 'a', newline='') as csvfile:
                    csvwriter = csv.writer(csvfile, delimiter=',', quotechar='|', quoting=csv.QUOTE_MINIMAL)
                    csvwriter.writerow([hour, minute])
                self.status = None
                
                
def main():
    try:
        root = tk.Tk()
        root.title('Zeiterfassung')
        app = App(root)
        app.pack()
        app.mainloop()
    finally:
        GPIO.cleanup()


if __name__ == "__main__":
    main()
und TimeRecording.py :

Code: Alles auswählen

#!/usr/bin/env python3

from time import time


class TimeRecording():       
    def start_time(self):
        self.start = time()
        
    def total_time(self):
        self.end = time()
        self.total_time = (self.end - self.start) / 3600
        
    def convert_time(self):
        self.total_time = str(round(self.total_time, 2)).split('.')
        minute = round(float(self.total_time[1]) * 0.6)
        hour = self.total_time[0]
        return hour, minute
Wenn ich mit einem Auftrag fertig bin, nehme ich den USB-Stick und stecke ihn Daheim in einen Pi-zero. Dieser erkennt das ein USB-Stick hinzugefügt wurde, liest die Dateien aus und fügt die Arbeitszeiten einer Excel-Datei hinzu. Die Excel-Datei liegt auf unseren privaten Cloud (nicht im öffentlichen Netz erreichbar) und von dort aus, kann meine Freundin mit den Stunden weiter arbeiten. Da der Pi-zero kein Display hat, habe ich eine LED angeschlossen, die mir signalisieren soll, wann ich den Stick wieder abziehen kann.
Zum Code: @Sirius3 hat mir gesagt bei subprocess.run benutzt man kein 'shell=True' gilt das allgemein? Wenn ich das weglasse bekomme ich eine Meldung, dass der Befehl nicht ausgeführt werden kann. (Sorry die genau Fehlermeldung habe ich gerade nicht mehr im Kopf, kann ich aber später nochmal reproduzieren und nachreichen) Mit 'shell=True' funktioniert es.
Der Code an sich funktioniert auch. Zur Zeit starte ich ihn noch per SSH und erhalte diese Ausgabe:

Code: Alles auswählen

umount: /media/USB: not mounted.
obwohl ich keinen 'print'-Befehl im Code habe. Die Kopiervorgänge funktionieren aber, also der Stick war wohl gemountet.
Eigentlich steuert man ein Python-Programm in der 'main'-Funktion, das habe ich hier nicht hinbekommen, wegen der ständigen Kontrolle, ob ein Stick an- oder abgesteckt wird.

Code: Alles auswählen

#!/usr/bin/env python3

import pyudev
import csv
import logging
from subprocess import run
from pathlib import Path
from EditTimes import EditTimes
from gpiozero import LED

MOUNT_PATH = Path('/media/USB/')
LED = LED(7)


def device_action():
    context = pyudev.Context()
    monitor = pyudev.Monitor.from_netlink(context)
    for device in iter(monitor.poll, None):
        if 'ID_FS_TYPE' in device:
            if '{0}'.format(device.action) == 'add':
                new_device = '{0}'.format(device.get('ID_FS_UUID'))
                mount_device(new_device)
            if '{0}'.format(device.action) == 'remove':
                LED.off()


def mount_device(new_device):
        context = pyudev.Context()
    monitor = pyudev.Monitor.from_netlink(context)
    for device in iter(monitor.poll, None):
        if 'ID_FS_TYPE' in device:
            if '{0}'.format(device.action) == 'add':
                new_device = '{0}'.format(device.get('ID_FS_UUID'))
                mount_device(new_device)
            if '{0}'.format(device.action) == 'remove':
                LED.off()


def mount_device(new_device):
    try:
        run([f'mount -t ntfs -o umask=007,gid=046 UUID={new_device} {MOUNT_PATH}'], check=True, shell=True)
    except Exception as e:
        logging.basicConfig(filename='CheckDevice.log')
        logging.error(e)
    start_edit = EditTimes()
    csv_files = start_edit.search_files()
    for csv_file in csv_files:
        total_minutes = start_edit.read_time(csv_file)
        working_time = start_edit.calculate_minutes(total_minutes)
        start_edit.write_in_excel(csv_file.name, working_time)
    try:
        run([f'umount UUID={new_device} {MOUNT_PATH}'], check=True, shell=True)
    except Exception as e:
        logging.basicConfig(filename='CheckDevice.log')
        logging.error(e)
    LED.on()


def main():
    device_action()


if __name__ == '__main__':
    main()
Die EditTimes.py:

Code: Alles auswählen

#!/usr/bin/env python3

import csv
from openpyxl import load_workbook
from pathlib import Path


CSV_PATH = Path('/media/USB/')
EXCEL_PATH = Path('/media/Dennis/Arbeitszeiten/Arbeitszeit.xlsx')

class EditTimes():
    def read_time(self, csv_file):
        with open(csv_file, newline='') as csvfile:
            csvreader = csv.DictReader(csvfile)
            total_minutes = sum(
                int(row['hour']) * 60 + int(row['minute'])
                for row in csvreader)
            return total_minutes


    def calculate_minutes(self, minutes):
        return divmod(minutes, 60)


    def search_files(self):
        files = []
        for file in CSV_PATH.iterdir():
            files.append(file)
        return files


    def write_in_excel(self, csv_file, working_time):
        order = Path(csv_file).stem
        working_time = f'{working_time[0]}:{working_time[1]}'
        book = load_workbook(EXCEL_PATH)
        sheet = book['2021']
        for index, row in enumerate(sheet.values, 1):
            if row[0] == order:
                if row[1] == working_time and row[2] == 'X':
                    (CSV_PATH / csv_file).unlink()
                else:
                    sheet[f'B{index}'] = working_time
                break
        else:
            sheet.append([order, working_time])
        book.save(EXCEL_PATH)


    def delete_csv(self, csv_file):
        run([f'rm {CSV_PATH}/{csv_file}'], shell=True, check=True)
Ich bin gespannt, was ihr davon haltet.

Vielen Dank und Grüße
Dennis
"When I got the music, I got a place to go" [Rancid, 1993]
Sirius3
User
Beiträge: 18274
Registriert: Sonntag 21. Oktober 2012, 17:20

Ich geh einfach mal von oben nach unten.
erste Datei:
Zeile 11: READER ist keine Konstante, sollte also in main angelegt werden und dann durchgereicht werden.
Zeile 43: folder ist bereits an Path-Objekt, das erst in einen String und dann wieder in einen Path zu verwandeln ist unsinn.
Zeile 44: ist die Einrückung so gewollt? Die Reihenfolge der Pathes ist nicht fix, also der letzte order_path ist nicht immer der letzte.
Zeile 45: und auch hier wieder, order ist bereits ein Path.
Vielleicht willst Du auch gar nicht iterdir verwenden, sondern glob('*/*')?

Code: Alles auswählen

            for order in ORDER_PATH.glob('*/*'):
                orders.append(order.stem)
Später schreibst Du aber Dateien nur in ORDER_PATH, das Suchen nach Dateien ist also inkonsistent.
Zeile 55: Schreibfehler.
Zeile 60 und 64: einmal wird die Datei als VG-ordernumber und einmal als VG-year&ordernumber gebildet. Das ist ein Fehler.
Das liegt wohl auch daran, dass Du einmal das Eingabefeld input_ordernumber an new_ordernumber und einmal an new_order_name bindest.
Zeile 64/101: Pfade setzt man nicht mit Stringformatierung zusammen, dafür gibt es ja Path-Objekte!
Zeile 68: nächster Fehler ist, dass Du diesen VG-String drei mal erzeugst, statt einmal und dann in einer Variable zu speichern.
Zeile 67 und 70: zweimal löschen?
Zeile 84: wie sollen in menu_choice die Werte 'Auftrag auswählen' oder 'USB-Stick fehlt' kommen? Das sind doch Ordernummern?
Zeile 85: Schreibfehler
Zeile 88/94: man prüft nicht explizit auf True
Zeile 90: Es macht keinen Sinn eine Variable status die Werte None oder 1 annehmen zu lassen. Das ist ein Boolean mit True und False!
Zeile 96: Wenn in einem Block nur pass steht, willst Du eigentlich die Bedingung negieren.
Zeile 98: dass die Funktion total_time die Zeit stoppt ist verwirrend, stop_time wäre ein besserer Name.
Zeile 102: Du benutzt ein ungewöhnliches Quote-Zeichen. Warum???


Nächste Datei:
Das ist eigentlich zu kurz für eine eigene Datei.
Statt time.time benutze time.monotonic.
convert_time benutzt ja immer noch dieses unsinnige Umrechnen. Nein, so rechnet man nicht!


Nächste Datei:
Zeile 12: LED überschreibt gpiozero.LED.
Zeile 20 & 23 & 32 & 35: Das ist eine ziemlich umständliche Art str(device.action) zu schreiben, vor allem weil device.action wahrscheinlich eh schon ein String ist.
Zeile 21 & 33: s.o.
Zeile 27: die Funktionen device_action und mount_device sind identisch!
Zeile 28: Einrückfehler.
Zeile 39: die Funktion mount_device überschreibt die andere mount_device-Funktion.
Zeile 41: man benutzt kein shell=True.

Code: Alles auswählen

run(['mount' ,'-t', 'ntfs', '-o', 'umask=007,gid=046', f'UUID={new_device}', str(MOUNT_PATH)], check=True)
Nach einer Exception macht das weitere Abarbeiten keinen Sinn.
Zeile 47ff: wenn Du viele Updates hast, ist es unsinnig, jedesmal die komplette Excel-Datei zu lesen und wieder zu schreiben. Das solltest Du außerhalb der Schleife tun.
Logging wird einmal am Anfang konfiguriert.

Nächste Datei:
Die beiden letzten Dateien sind so kurz, dass es keinen Sinn macht, die in zwei aufzuspalten.
Die Klasse EditTimes macht überhaupt keinen Sinn, weil nirgends self benutzt wird. Das sind einfach nur ein paar Funktionen.
Zu search_files habe ich Dir ja schon was geschrieben.
`delete`csv` hab ich ja inzwischen komplett überflüssig gemacht.

Das ganze mal logischer umstrukturiert:

Code: Alles auswählen

#!/usr/bin/env python3
import csv
import logging
from pathlib import Path
from subprocess import run
from openpyxl import load_workbook
from gpiozero import LED
import pyudev

CSV_PATH = Path('/media/USB/')
EXCEL_PATH = Path('/media/Dennis/Arbeitszeiten/Arbeitszeit.xlsx')
STATUS_LED = LED(7)

def read_time(csv_file):
    with csv_file.open(newline='') as csvfile:
        csvreader = csv.DictReader(csvfile)
        total_minutes = sum(
            int(row['hour']) * 60 + int(row['minute'])
            for row in csvreader)
        return divmod(total_minutes, 60)


def process_data(sheet, csv_file):
    order = csv_file.stem
    working_hours, working_minutes = read_time(csv_file)
    working_time = f'{working_hours}:{working_minutes}'
    for index, row in enumerate(sheet.values, 1):
        if row[0] == order:
            if row[1] == working_time and row[2] == 'X':
                csv_file.unlink()
            else:
                sheet[f'B{index}'] = working_time
            break
    else:
        sheet.append([order, working_time])


def copy_data():
    book = load_workbook(EXCEL_PATH)
    sheet = book['2021']
    for csv_file in CSV_PATH.iterdir():
        process_data(sheet, csv_file)
    book.save(EXCEL_PATH)


def copy_data_to_device(device):
    try:
        run(['mount' ,'-t', 'ntfs', '-o', 'umask=007,gid=046', f'UUID={device}', str(CSV_PATH)], check=True)
    except Exception as e:
        logging.error(e)
    else:
        copy_data()
        try:
            run(['umount', f'UUID={new_device}', str(CSV_PATH)], check=True)
        except Exception as e:
            logging.error(e)
        STATUS_LED.on()


def main():
    logging.basicConfig(filename='CheckDevice.log')
    context = pyudev.Context()
    monitor = pyudev.Monitor.from_netlink(context)
    for device in iter(monitor.poll, None):
        if 'ID_FS_TYPE' in device:
            if device.action == 'add':
                new_device = device.get('ID_FS_UUID')
                copy_data_to_device(new_device)
            elif device.action == 'remove':
                STATUS_LED.off()


if __name__ == '__main__':
    main()
Benutzeravatar
Dennis89
User
Beiträge: 1555
Registriert: Freitag 11. Dezember 2020, 15:13

Hallo Sirius3,

vielen Dank für deine Zeit und Bemühungen.
Sirius3 hat geschrieben: Freitag 5. März 2021, 10:55 Zeile 44: ist die Einrückung so gewollt? Die Reihenfolge der Pathes ist nicht fix, also der letzte order_path ist nicht immer der letzte.
Zeile 45: und auch hier wieder, order ist bereits ein Path.
Vielleicht willst Du auch gar nicht iterdir verwenden, sondern glob('*/*')?
Wird der USB-Stick in den Pi gesteckt, dann wird er automatisch in 'ORDER_PATH' gemountet. Dabei entsteht dort ein neuer Ordner mit der UUID des Sticks. Ich wollte eigentlich nur den Name des Ordners rausfinden um dann die Daten des Sticks zugreifen zu können. 'ORDER_PATH' ist da wohl ein schlechter Name.
Sirius3 hat geschrieben: Freitag 5. März 2021, 10:55 Später schreibst Du aber Dateien nur in ORDER_PATH, das Suchen nach Dateien ist also inkonsistent.
Das war falsch von mir, habe nach Änderungen nicht mehr richtig getestet. Die Dateien wurden in den falschen Ordner geschrieben.
Sirius3 hat geschrieben: Freitag 5. März 2021, 10:55 Zeile 64/101: Pfade setzt man nicht mit Stringformatierung zusammen, dafür gibt es ja Path-Objekte!
Dann soll ich das so machen:

Code: Alles auswählen

file_path = Path(self.order_path / 'VG-'year / new_ordernumber'.csv')
with open(file_path, 'w'...) 
?
Sirius3 hat geschrieben: Freitag 5. März 2021, 10:55 Zeile 84: wie sollen in menu_choice die Werte 'Auftrag auswählen' oder 'USB-Stick fehlt' kommen? Das sind doch Ordernummern?
'Auftrag auswählen' schreibe ich zu Programmstart in das OptionsMenü um mir ein Label zu sparen. Wenn kein USB-Stick vorhanden ist, dann schreibe ich 'USB-Stick fehlt' in 'orders'.
Sirius3 hat geschrieben: Freitag 5. März 2021, 10:55 Zeile 102: Du benutzt ein ungewöhnliches Quote-Zeichen. Warum???
Hm das habe ich aus der Python-Dokumentation übernommen.
Sirius3 hat geschrieben: Freitag 5. März 2021, 10:55 convert_time benutzt ja immer noch dieses unsinnige Umrechnen. Nein, so rechnet man nicht!
Ja, weil wenn ich zum Beispiel 5000 Sekunden habe und dann:

Code: Alles auswählen

total_time = divmod(5000, 3600)
mache, dann erhalte ich '(1, 1400)'. Dann muss ich 'total_time[1]' wieder in einen String umwandeln und die Nullen wegformatieren, damit ich meine 14 Minuten habe.
Das Problem konnte ich nicht lösen und deswegen ist das da noch so drin. An der Stelle, an der du mir 'divmod' gezeigt hast, funktioniert es super. Wenn du mir auf die Sprünge hilftst, dann ändere ich das sofort.
Sirius3 hat geschrieben: Freitag 5. März 2021, 10:55 Zeile 27: die Funktionen device_action und mount_device sind identisch!
Sorry, das war ein Kopierfehler von mir. In meiner Datei gibt es nur eine 'mount_devices' und die ist nicht identisch mit 'device_action'.

Bis auf die gestellten Fragen bin ich soweit mal durch, ausführlich testen werde ich das ab heute Abend. Wenn ich jetzt auf den Balkon stehe kommt bestimmt Rauch aus meinen Ohren :lol:


Vielen Dank nochmals!

Grüße
Dennis
"When I got the music, I got a place to go" [Rancid, 1993]
Sirius3
User
Beiträge: 18274
Registriert: Sonntag 21. Oktober 2012, 17:20

Dann soll ich das so machen:

Code: Alles auswählen

file_path = Path(self.order_path / 'VG-'year / new_ordernumber'.csv')
with open(file_path, 'w'...) 
Das ist doch nicht einmal gültiges Python.

Code: Alles auswählen

filename = f'VG-{year}{new_order_name}.csv'
with (self.order_path / filename).open('w', newline="") as output:
Und zum anderen Punkt. Du mußt natürlich die Zeit in Minuten umrechnen, und nicht in Stunden:

Code: Alles auswählen

import time

class TimeRecording():       
    def start(self):
        self.start = time.monotonic()
        
    def stop(self):
        self.end = time.monotonic()
        
    def convert_time(self):
        return divmod(round((self.end - self.start)/60), 60)
Benutzeravatar
Dennis89
User
Beiträge: 1555
Registriert: Freitag 11. Dezember 2020, 15:13

Hallo Sirius3,

oh man, wenn ich deine Lösung sehe, werden mir meine Fragen ganz peinlich.

Vielen Dank für deine Unterstützung.

Jetzt wird mal jeder Unsinn zu Testzwecken gestempelt.

Grüße
Dennis
"When I got the music, I got a place to go" [Rancid, 1993]
Antworten