Labeldruck und was draus folgt

Du hast eine Idee für ein Projekt?
Sirius3
User
Beiträge: 17711
Registriert: Sonntag 21. Oktober 2012, 17:20

Eigentlich hast Du ja nur eine Oberfläche und eine Datenbank, und verschiedene Dinge, die im Hintergrund laufen. Erstens, laden von Daten und zweitens Drucken.
Ob man das jetzt auf verschiedene Module (vier Stück) aufteilt, oder alles in einer Datei stehen hat, ist erst einmal nebensächlich.
Mainwindow zerstören ist auch gar nicht nötig, sondern nur Aktualisieren.
theoS
User
Beiträge: 108
Registriert: Dienstag 5. November 2019, 21:44

Mainwindow zerstören ist auch gar nicht nötig, sondern nur Aktualisieren.
Ja, das habe ich jetzt eine ganze Weile probiert.
Mein Problem ist, dass ich das Laden der Daten nicht in die Klasse mit reingebracht hab. Die Funkion dafür ist außerhalb und benötigt die Connection.
Wenn ich jetzt versuche das MainWindow mit update() von einem anderen Modul aufzurufen geht ihm entweder "self" ab oder die Connection.
Blick da jetzt nicht mehr so wirklich durch, denn die Connection kriegt es ja in Main zugewiesen.
theoS
User
Beiträge: 108
Registriert: Dienstag 5. November 2019, 21:44

Nachtrag.
Es ist verhext. Jetzt kann ich von dem einen Modul wenn der Stick eingesteckt wird die Ladefunktion des anderen Moduls starten ohne die Connection übergeben zu müssen, aber das MainWindow aktualisieren? Das will mir nicht gelingen.
Heut geh ich erst mal ins Bett. Wird gescheiter sein.
Sirius3
User
Beiträge: 17711
Registriert: Sonntag 21. Oktober 2012, 17:20

Dann zeig doch nochmal komplett, was Du jetzt hast.
theoS
User
Beiträge: 108
Registriert: Dienstag 5. November 2019, 21:44

Hab jetzt noch mal versucht - ich finde einfach keine Möglichkeit das update() so unterzubringen dass es auch ausgeführt wird.
Also, ich hab jetzt die beiden Dateien und eine kleine Testdatei dazu gebaut, dass ich nicht immer den USB-Stick einstecken muss.
Das hat einen timer der nach ein paar Sekunden den Befehl in das andere Modul rüberschickt. Währenddessen aktualisiere ich, dass ich was seh, die DB manuell. Ist umständlich aber einfacher als die Sache mit dem Stick.
Wie auch immer, das muss ja so gesehen beides funktionieren.
Tut aber nichts. Und ich suche nebenbei immer wieder mal, finde zig Seiten, aber ich kann das nicht auf meine Knöpfe anwenden.
Bin für jeden Tipp dankbar.

Also, ich rufe mit dem Modul das Knopfdruckmodul auf: (könnte ich mir vermutlich sparen, weil ja der Thread im Knopfdruckmodul gestartet wird.)

Code: Alles auswählen

#!/usr/bin/env python3
# -*- coding: utf8 -*-

import knopfdruck as knd

def main():

        knd.main()
        

if __name__ == '__main__':
    main()
Dann kommt das Test-Modul:

Code: Alles auswählen

#!/usr/bin/env python3
# -*- coding: utf8 -*-

import knopfdruck as knd
import time

def teste_aufruf():
        time.sleep(10)
        print("10 sekunden geschlafen")
        
        knd.MainWindow.antwort_von_usb("komme vom Schläferfenster", "info")
        

        
def main():
        teste_aufruf()
        

if __name__ == '__main__':
    main()
Hier ist das Modul das dann letztlich das Update vom MainWindow bräuchte.

Code: Alles auswählen

#!/usr/bin/env python3
# -*- coding: utf8 -*-

import sqlite3
from sqlite3 import Error
from pathlib import Path
#import csv  #bleibt erst mal drin weil ggf. doch die Dateien gebraucht werden
import tkinter as tk
#from tkinter import messagebox
from functools import partial
from contextlib import closing
import time
#from datetime import datetime as DateTime  #ggf. noch gebraucht
import subprocess
import threading
import configusbdb as uconf
import teste as test

PFAD = Path.home() / ".DruckData"
BARCODE_DB_FILENAME = PFAD / "sqlite/db"
DATABASE = BARCODE_DB_FILENAME / "config8.db"
ORT = "PZ Dingsbums 14"
CHECKPOINT = "42"
SQL_UPDATE_AKTUELLE_NR = """UPDATE numbers
    SET letzte_nr = (letzte_nr + 1) % 10000 
    WHERE id = ?"""
SQL_SELECT_KNOPFDATEN = """SELECT knd.ISPT_Nr,
                           num.Letzte_Nr
                           FROM knopfdaten as knd,
                           numbers as num
                           WHERE knd.ID = ? 
	                   AND knd.ID = num.ID"""



def prepare_rows_for_print(count, row, first_line):
    
    text = "\n".join(
        [
            "^XA",
            "^FXUnicode:",
            "^CI28",
            "^FXBox oben:",
            "^FO690,20^GB0,1150,8,^FS",
            "^FXgroßeBox:",
            "^FO15,20^GB790,1150,8^FS",
            "^FXZwischenlinie",
            "^FO495,20^GB0,1150,8,^FS",
            "^FXZwischenlinie unten",
            "^FO125,20^GB0,1150,8,^FS",
            "^FO720,200^A0R,50,50^FB800,1,0,C^FD",
            first_line,
            "^FS^FO620,200^A0R,60,60^FB800,1,0,C^FD",
            str(row[1]),
            "^FS^FO500,200^A0R,95,95^FB800,1,0,C^FD",
            str(row[2]),
            "^FS^FO0,180^BY3",
            "^BCR,90,Y,N,N",
            "^FO385,370^BY4^FD",
            str(count),
            "^FS^FO260,200^A0R,70,70^FB800,,0,C^FD",
            str(row[3]),
            "^FS^FO120,200^A0R,60,60^FB800,2,0,C^FD",
            str(row[4]),
            "^FS^FO40,200^A0R,70,70^FB800,1,0,C^FD",
            str(row[5]),
            "^FS",
            "^XZ",
            ""
        ]
    )
    #PFAD.mkdir(exist_ok=True)   #ggf. noch gebraucht
    #string_zum_druck = PFAD / f"testT_{row[3]}_{row[1]}.zpl"   
    #(string_zum_druck).write_text(text, "utf-8")
    #hier bin ich mir nicht sicher, ob es nicht schlauer wäre
    #auf das speichern zu verzichten und die Datei gleich mit
    #subporcess an den Drucker zu schicken. Wenn das geht...
    return text
    
    

def lade_daten():
               
    try:
        conn = sqlite3.connect(f"{DATABASE}")        
    except Error as e:
        print(e)
        
    curs2 = conn.cursor()
    config_sql = "select ID, E_St, Zeile3, Zeile5, Zeile6, Zeile7 from knopfdaten;"
    curs2.execute(config_sql)
    configdaten = curs2.fetchall()
    print("daten geladen")    
    return  configdaten, conn

 
def zaehl_ausdrucke(connection, id):   
    with closing(connection.cursor()) as cursor:
        cursor.execute(SQL_UPDATE_AKTUELLE_NR, [id])
        cursor.execute(SQL_SELECT_KNOPFDATEN, [id])
        ispt_nr, letzte_nr = cursor.fetchone()
        barcode = f"{ispt_nr}{CHECKPOINT}{letzte_nr}"
        cursor.execute("update knopfdaten SET Barcode = ? where id = ?",  (barcode, id))
        print(barcode, letzte_nr)
        connection.commit()
    return barcode

def datei_drucken(text):   
    
    subprocess.run(["lp", "-"], input=text.encode('utf-8'), check=True)
    



class MainWindow(tk.Tk):
    def __init__(self, conn, configdaten):
        super().__init__()
        self.title("Auswahl der Label")
        self["background"] = "#f2c618"
        button_frame = tk.Frame(self, width=1400, height=600)
        button_frame.grid(row=0, column=0, padx=0, pady=0)
        
        for index, entry in enumerate(configdaten):
            row_index, column_index = divmod(index, 4)
            tk.Button(
                button_frame,
                text="{}\n{}\n{}".format(entry[1], entry[2], entry[3]),
                bg="#f2c618",
                width=22,
                height=11,
                command=partial(self.on_click, entry, conn),
                ).grid(row=row_index, column=column_index, padx=0, pady=0)
        self.bind('<ButtonPress-1>', self.start_druck)
        self.bind('<ButtonRelease-1>', self.stopp_druck)
        self.knopf_drueck_zeit = None
    def start_druck(self, event):
        self.knopf_drueck_zeit = time.monotonic()
        print("start_druck", self.knopf_drueck_zeit)

    def stopp_druck(self, event):
        knopf_loslass_zeit = time.monotonic()
        print("stopp_druck", knopf_loslass_zeit)
        if knopf_loslass_zeit - self.knopf_drueck_zeit >5:
            print("schalte jetzt aus")
            self.ExitApplication()
    
    def ExitApplication(self):
        MsgBox = tk.messagebox.askokcancel(
        title="Ausschalten",
        message="Soll der Rechner ausgeschaltet werden?")
        print(MsgBox)
        if MsgBox == True:
           self.destroy()
        else:
            tk.messagebox.showinfo(
                'Zurück',
                'Das Hauptfenster wird wieder gezeigt'
                )
        
    def fensterfrisch():
        print("hier war ich im fensterfrisch")
        lade_daten()
        #MainWindow.update()
        
    def antwort_von_usb(antwort, icon):
        print("vom anderen Teil")          
        tk.messagebox.showinfo(
                message=antwort,
                icon=icon)
        MainWindow.fensterfrisch()
        
        
    def on_click(self, row, conn):
        count = zaehl_ausdrucke(conn, row[0])
        print(count)
        string_zum_druck = prepare_rows_for_print(count, row, ORT)
        datei_drucken(string_zum_druck)


def main():
    
    BARCODE_DB_FILENAME.mkdir(parents=True, exist_ok=True)
    print("starte main")

    configdaten, conn = lade_daten()  #lädt die Daten aus der csv nach
    print(configdaten)
    
    root = MainWindow(conn, configdaten)
    faden = threading.Thread(target=uconf.main)
    faden.start()
    faden2 = threading.Thread(target=test.main)
    faden2.start()

    root.mainloop()

        
if __name__ == '__main__':
    configdaten, conn = lade_daten()  #lädt die Daten aus der csv nach
    main()
Das USB-Teil:

Code: Alles auswählen

#!/usr/bin/env python3


import subprocess
import time
from datetime import datetime as DateTime
from pathlib import Path
import pandas as pd
import pyudev
from sqlalchemy import create_engine
from sqlalchemy.exc import SQLAlchemyError
import tkinter as tk
from tkinter import messagebox
import knopfdruck as knd
import threading


PFAD = Path.home() / ".DruckData"
MEDIA_PFAD = Path("/media/earl/")
PFAD = Path.home() / ".DruckData"
DATABASE = PFAD / "sqlite/db/config8.db"
CONFIG_ON_STICK = Path("TB_Ausgabe_8iii.txt")
CSV_DATEI_TO_STICK = Path("gedruckte_nummern.csv")

SQL_OUT = """select distinct
             knopfdaten.ID,
             knopfdaten.E_St,
             knopfdaten.ISPT_Nr,
             numbers.Letzte_Nr
             from knopfdaten,
             numbers
             where
             numbers.ID = knopfdaten.ID"""

USED_COLS = [
        'ID',
        'E_St',
        'Zeile3',
        'Zeile5',
        'Zeile6',
        'Zeile7',
        'Zeilex',
        'ISPT_Nr',
        'HallenPos',
        'Aktuell_Nr',
        'Barcode'
        ]

def warte_auf_usb_stick(udev_context):
    monitor = pyudev.Monitor.from_netlink(udev_context)
    monitor.filter_by("block")
    for device in iter(monitor.poll, None):
        print(device.action)
        if "ID_FS_TYPE" in device and device.action == "add":
            name_of_stick = Path(device.get("ID_FS_LABEL"))
            print(device.action, name_of_stick)
            time.sleep(2)
            return name_of_stick

    raise AssertionError("unreachable code")


def anzahl_drucke_dokumentieren(connection, name_of_stick, dateiname_roh):
    dateiname_ziel = (
        MEDIA_PFAD
        / name_of_stick
        / dateiname_roh.with_name(
            f"{dateiname_roh.stem}_{DateTime.now():%Y-%m-%d_%H_%M}.csv"
        )
    )
    pd.read_sql(SQL_OUT, connection).to_csv(
        dateiname_ziel, sep=";", decimal=",", index=False
    )


def config_arbeitsdb(connection, name_of_stick):
    config_datei = MEDIA_PFAD / name_of_stick / CONFIG_ON_STICK
    #
    # TODO Auf die Spalten einschränken die tatsächlich benötigt werden.
    #
    datensaetze = pd.read_csv(        
        config_datei,
        usecols=USED_COLS,
        na_values="x",
        quotechar='"',
        sep=";",
        encoding="utf-8",
        decimal=",",
        dtype={"ID": int, "E_St": int, "ISPT_Nr": str, "Barcode": str},
    )
    if len(datensaetze) != 8:
        print("es waren keine 8 Datensätze")
        antwort = "es waren keine 8 Datensätze"
    else:
        datensaetze.to_sql(
            "knopfdaten", connection, if_exists="replace", index=False
        )
        sql_ident = "insert or ignore into numbers(ID) select ID from knopfdaten;"
        connection.execute(sql_ident)
        antwort = "Daten erfolgreich übertragen, \n Programm startet neu"

    return antwort


def auswerfen(name_of_stick):
    subprocess.run(
        ["umount", MEDIA_PFAD / name_of_stick],
        check=True,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
    )

def erfolgsmeldung(ergebnis, icon):
    knd.MainWindow.antwort_von_usb(ergebnis, icon)
    
##    window = tk.Tk()
##    window.eval('tk::PlaceWindow %s center' % window.winfo_toplevel())
##    window.withdraw
##    messagebox.showinfo(message=ergebnis, icon=icon)
##    window.deiconify()
##    window.destroy()
##    window.quit()  

def neuladen_config():        
        print("schicke zu knd")
        knd.MainWindow.fensterfrisch()


    
def main():
    udev_context = pyudev.Context()
    db_engine = create_engine(f"sqlite:///{DATABASE}", encoding="utf-8")
    while True:
        name_of_stick = warte_auf_usb_stick(udev_context)
        try:
            ergebnis = config_arbeitsdb(db_engine, name_of_stick)
            anzahl_drucke_dokumentieren(
                db_engine, name_of_stick, CSV_DATEI_TO_STICK
            )
            ####
            print(ergebnis)
            erfolgsmeldung(ergebnis, 'info')
            neuladen_config()
        except SQLAlchemyError as error:
            print(error)
            erfolgsmeldung(error, 'error')
        except ValueError as error:
            print(error)
            erfolgsmeldung(error, 'error')
        except OSError as error:
            print("Fehler beim kopieren:", error)
            erfolgsmeldung(error, 'error')

        try:
            auswerfen(name_of_stick)
            
        except subprocess.CalledProcessError as error:
            print(f"Fehler {error.returncode} beim Auswerfen: {error.stdout}")
            error_lang = f"Fehler {error.returncode} beim Auswerfen: {error.stdout}"
            erfolgsmeldung(error_lang, 'error')


if __name__ == "__main__":
    main()
Sirius3
User
Beiträge: 17711
Registriert: Sonntag 21. Oktober 2012, 17:20

Das erste Skript ist unnötig, da das ja nur das main vom dritten Skript aufruft. Das zweite Skript macht nichts sinnvolles.
In `lade_daten` ist die „Fehlerbehandlung” unsinnig, da im Fehlerfall einfach nur ein weiterer Fehler auftritt. Fehler sollte man nur behandeln, die man auch sinnvoll behandeln kann.

In Mainwindow.__init__ ist configdata falsch. Da Du ja auch während das Programm läuft, die Knopfbeschriftung ändern willst. Also nur Knöpfe erstellen und die Beschriftung in einer extra Methode, in der Du auch gleich die Datenbankabfrage unterbringen kannst.
`ExitApplication` ist falsch geschrieben.
thread sollte man nicht ins deutsche übersetzt. Der zweite Thread macht irgendwie nichts sinnvolles. Und der erste Thread muß ja irgendwie mit dem Fenster kommunizieren können, zum einen für Statusmeldungen, die man ja irgendwie auch im Fenster sehen will, und nicht auf der Konsole, zum anderen, um bei neuen Daten die Ansicht zu aktualisieren. Dafür braucht es zum einen eine Queue in die diese ganzen Daten gefüttert werden, zum anderen im MainWindow eine regelmäßige Abfrage, ob etwas in der Queue steckt.
Die Aufrufe von knd.Mainwindow sind allesamt falsch, weil Du da direkt Funktionen der Klassendefinition aufrufst, zum anderen, weil aus einem Thread nicht direkt mit der GUI gesprochen werden darf, das muß, wie schon geschrieben, über eine Queue erfolgen. Zum Dritten, benutze keine Kryptischen Abkürzungen.
theoS
User
Beiträge: 108
Registriert: Dienstag 5. November 2019, 21:44

Und der erste Thread muß ja irgendwie mit dem Fenster kommunizieren können, zum einen für Statusmeldungen, die man ja irgendwie auch im Fenster sehen will, und nicht auf der Konsole, zum anderen, um bei neuen Daten die Ansicht zu aktualisieren. Dafür braucht es zum einen eine Queue in die diese ganzen Daten gefüttert werden, zum anderen im MainWindow eine regelmäßige Abfrage, ob etwas in der Queue steckt.
Drum funktioniert das also nicht.
Wenn ich das recht verstehe, dann muss ich eine Queue ins MainWindow und in das USB-Modul einbauen.

Würde vermutlich auch nötig sein, wenn ich den Code in dem anderen Modul mit aufnehme?

Das Thema Queue scheint nicht ganz ohne zu sein...
Danke.
theoS
User
Beiträge: 108
Registriert: Dienstag 5. November 2019, 21:44

configdata falsch
noch eine Frage die ich jetzt nicht ganz verstehe. configdata als Ausdruck falsch? das ist ja ein Konstrukt, das von lade_daten() kommt und wird dem Mainwindow übergeben. Irgendwie muss ich das doch da reinbringen, wenn das von den Knöpfen gebraucht wird. Geht das dann mit der Queue?
theoS
User
Beiträge: 108
Registriert: Dienstag 5. November 2019, 21:44

Es geht nicht weiter, ich versteh das Ganze mit dem Thread nicht so recht.
Es ist aber eigentlich, so denke ich naiv, einfacher, wenn ich die Daten dann mit einem Knopfdruck neu laden lasse.
Dazu habe ich mir einen "Restarter" zusammengegooglet, der dann auf Knopfdruck die Oberfläche neu startet.
Das klappt, dann werden auch die Daten neu gezogen.
Das würde denke ich reichen. Dumm ist nur, wenn ich Threads verwende, wo dann in dem Teil der Thread gestartet wird und ich den Ausschalte, was ja schon funktioniert, müsste ich doch auch den anderen Thread killen.
Das ist jetzt so wie ich es habe nicht sauber, aber momentan stehe ich auf 5 Schläuchen.
Wenn mir jemand runter helfen könnte, wäre ich sehr dankbar.

Code: Alles auswählen

#!/usr/bin/env python3
# -*- coding: utf8 -*-

import sqlite3
from sqlite3 import Error
from pathlib import Path
import csv
import tkinter as tk
from tkinter import messagebox
from functools import partial
from contextlib import closing
import time
from datetime import datetime as DateTime
import subprocess
import sys
import os
import usbconfig
import concurrent.futures
import queue
import threading


PFAD = Path.home() / ".DruckData"
BARCODE_DB_FILENAME = PFAD / "sqlite/db"
DATABASE = BARCODE_DB_FILENAME / "config8.db"
ORT = "PZ Dingsbums 14"
CHECKPOINT = "42"
SQL_UPDATE_AKTUELLE_NR = """UPDATE numbers
    SET letzte_nr = (letzte_nr + 1) % 10000 
    WHERE id = ?"""
SQL_SELECT_KNOPFDATEN = """SELECT knd.ISPT_Nr,
                           num.Letzte_Nr
                           FROM knopfdaten as knd,
                           numbers as num
                           WHERE knd.ID = ? 
	                   AND knd.ID = num.ID"""



def prepare_rows_for_print(count, row, first_line):
    
    text = "\n".join(
        [
            "^XA",
            "^FXUnicode:",
            "^CI28",
            "^FXBox oben:",
            "^FO690,20^GB0,1150,8,^FS",
            "^FXgroßeBox:",
            "^FO15,20^GB790,1150,8^FS",
            "^FXZwischenlinie",
            "^FO495,20^GB0,1150,8,^FS",
            "^FXZwischenlinie unten",
            "^FO125,20^GB0,1150,8,^FS",
            "^FO720,200^A0R,50,50^FB800,1,0,C^FD",
            first_line,
            "^FS^FO620,200^A0R,60,60^FB800,1,0,C^FD",
            str(row[1]),
            "^FS^FO500,200^A0R,95,95^FB800,1,0,C^FD",
            str(row[2]),
            "^FS^FO0,180^BY3",
            "^BCR,90,Y,N,N",
            "^FO385,370^BY4^FD",
            str(count),
            "^FS^FO260,200^A0R,70,70^FB800,,0,C^FD",
            str(row[3]),
            "^FS^FO120,200^A0R,60,60^FB800,2,0,C^FD",
            str(row[4]),
            "^FS^FO40,200^A0R,70,70^FB800,1,0,C^FD",
            str(row[5]),
            "^FS",
            "^XZ",
            ""
        ]
    )
    #PFAD.mkdir(exist_ok=True)
    #string_zum_druck = PFAD / f"testT_{row[3]}_{row[1]}.zpl"   
    #(string_zum_druck).write_text(text, "utf-8")
    #hier bin ich mir nicht sicher, ob es nicht schlauer wäre
    #auf das speichern zu verzichten und die Datei gleich mit
    #subporcess an den Drucker zu schicken. Wenn das geht...
    return text


def restart_program():   #das ist jetzt nicht schön, vermute ich, aber wirkungsvoll

    python = sys.executable
    os.execl(python, python, * sys.argv)    

    
    

def lade_daten():
               
    try:
        conn = sqlite3.connect(f"{DATABASE}")        
    except Error as e:
        print(e)
        
    curs2 = conn.cursor()
    config_sql = "select ID, E_St, Zeile3, Zeile5, Zeile6, Zeile7 from knopfdaten;"
    curs2.execute(config_sql)
    configdaten = curs2.fetchall()
    
    return  configdaten, conn

 
def zaehl_ausdrucke(connection, id):   
    with closing(connection.cursor()) as cursor:
        cursor.execute(SQL_UPDATE_AKTUELLE_NR, [id])
        cursor.execute(SQL_SELECT_KNOPFDATEN, [id])
        ispt_nr, letzte_nr = cursor.fetchone()
        barcode = f"{ispt_nr}{CHECKPOINT}{letzte_nr}"
        cursor.execute("update knopfdaten SET Barcode = ? where id = ?",  (barcode, id))
        print(barcode, letzte_nr)
        connection.commit()
    return barcode

def datei_drucken(text):   
    
    subprocess.run(["lp", "-"], input=text.encode('utf-8'), check=True)
    



class MainWindow(tk.Tk):
    def __init__(self, conn, configdaten):
        super().__init__()
        self.title("Auswahl der Label")
        self["background"] = "#f2c618"
        button_frame = tk.Frame(self, width=1400, height=600)
        button_frame.grid(row=0, column=0, padx=0, pady=0)
        
        for index, entry in enumerate(configdaten):
            row_index, column_index = divmod(index, 4)
            tk.Button(
                button_frame,
                text="{}\n{}\n{}".format(entry[1], entry[2], entry[3]),
                bg="#f2c618",
                width=22,
                height=11,
                command=partial(self.on_click, entry, conn),
                ).grid(row=row_index, column=column_index, padx=0, pady=0)
        self.bind('<ButtonPress-1>', self.start_druck)
        self.bind('<ButtonRelease-1>', self.stopp_druck)
        self.knopf_drueck_zeit = None
    def start_druck(self, event):
        self.knopf_drueck_zeit = time.monotonic()
        print("start_druck", self.knopf_drueck_zeit)

    def stopp_druck(self, event):
        knopf_loslass_zeit = time.monotonic()
        print("stopp_druck", knopf_loslass_zeit)
        if knopf_loslass_zeit - self.knopf_drueck_zeit >5:
            print("schalte jetzt aus")
            self.ExitApplication()
    
    def ExitApplication(self):
        MsgBox = tk.messagebox.askyesnocancel(
        title="Ausschalten/Neustarten",
        message="Soll der Rechner ausgeschaltet werden?\n Mit »Ja» ausschalten, Mit »Abbruch« Neustarten, mit »Nein« zurück zum Programm")
        print(MsgBox)
        if MsgBox == True:
            
        
            self.destroy()  #das geht
            faden.join()  #das nicht, behauptet das Terminal in dem das läuft
            sys.exit(0)  #das auch
            
        elif MsgBox == None:
            restart_program()  #das klappt ganz gut

           
        else:
            tk.messagebox.showinfo(
                'Zurück',
                'Das Hauptfenster wird wieder gezeigt'
                )
        

    def on_click(self, row, conn):
        count = zaehl_ausdrucke(conn, row[0])
        print(count)
        string_zum_druck = prepare_rows_for_print(count, row, ORT)
        datei_drucken(string_zum_druck)


def main():
    
    BARCODE_DB_FILENAME.mkdir(parents=True, exist_ok=True)
    print(f"{BARCODE_DB_FILENAME}/config8.db")

    root.mainloop()

        
if __name__ == '__main__':
    BARCODE_DB_FILENAME.mkdir(parents=True, exist_ok=True)
    print(f"{BARCODE_DB_FILENAME}/config8.db")

    configdaten, conn = lade_daten()  #lädt die Daten aus der csv nach
    print(configdaten)
    
    root = MainWindow(conn, configdaten)
    faden = threading.Thread(target=usbconfig.main)
    faden.start()
 
    
    #holer.usb_tausch
    root.mainloop()
    
    main(pipeline, event)
Sirius3
User
Beiträge: 17711
Registriert: Sonntag 21. Oktober 2012, 17:20

os.execl zu benutzen ist nicht schön und auch völlig unnötig. Der einfache Weg, einfach die Knöpfe zu aktualisieren, ist wohl zu offensichtlich.
Dass `lade_daten` auch eine Connection zur Datenbank erstellt, ist überraschend, weil man das von einer Funktion diesen Namens nicht erwartet. Und wie schon vorher geschrieben, ist die Fehlerbehandlung in dieser Funktion auch fehlerhaft.
Wenn man sich die Knöpfe in einer Liste merken würde, und die configdaten nicht gerade direkt in __init__ übergeben, dann könnte man auch einfach die Beschriftung ändern. Ist aber wohl zu einfach gedacht. messagebox wird als eigentständiges Modul importiert, das über tk.messagebox anzusprechen ist ungewöhnlich.
Ist es so schwer, sich an eine einfache Schreibregel zu halten? `ExitApplication` ist falsch geschrieben. MsgBox auch, aber dazu kommt, dass das ja das Ergebnis der MessageBox ist und nicht die Messagebox selbst.
`main` ist irgendwie überflüssig, weil das selbe unter dem if steht. Obwohl, das was unter dem if steht sollte eigentlich in main stehen. Und zum Schluß wird noch ein main mit falschen Parametern aufgerufen.
In `on_click` wird dann noch auf das globale ´conn` zugegriffen; fällt Dir nicht auf die Füße, weil ja keine richtige main-Funktion benutzt wird. Ist trotzdem unsauber.

Code: Alles auswählen

#!/usr/bin/env python3
# -*- coding: utf8 -*-
import csv
import queue
import threading
import sqlite3
import time
import subprocess
import tkinter as tk
from collections import namedtuple
from tkinter import messagebox
from functools import partial
from contextlib import closing
from pathlib import Path
from datetime import datetime as DateTime
import pandas as pd
import usbconfig
import pyudev


PFAD = Path.home() / ".DruckData"
BARCODE_DB_FILENAME = PFAD / "sqlite/db"
DATABASE = BARCODE_DB_FILENAME / "config8.db"

MEDIA_PFAD = Path("/media/earl/")
CONFIG_ON_STICK = Path("TB_Ausgabe_8iii.txt")
CSV_DATEI_TO_STICK = Path("gedruckte_nummern.csv")

ORT = "PZ Dingsbums 14"
CHECKPOINT = "42"
SQL_UPDATE_AKTUELLE_NR = """UPDATE numbers
    SET letzte_nr = (letzte_nr + 1) % 10000 
    WHERE id = ?"""
SQL_SELECT_KNOPFDATEN = """SELECT knd.ISPT_Nr,
                           num.Letzte_Nr
                           FROM knopfdaten as knd,
                           numbers as num
                           WHERE knd.ID = ? 
                       AND knd.ID = num.ID"""

SQL_OUT = """select distinct
             knopfdaten.ID,
             knopfdaten.E_St,
             knopfdaten.ISPT_Nr,
             numbers.Letzte_Nr
             from knopfdaten,
             numbers
             where
             numbers.ID = knopfdaten.ID"""

USED_COLS = [
        'ID',
        'E_St',
        'Zeile3',
        'Zeile5',
        'Zeile6',
        'Zeile7',
        'Zeilex',
        'ISPT_Nr',
        'HallenPos',
        'Aktuell_Nr',
        'Barcode'
        ]

def warte_auf_usb_stick(udev_context):
    monitor = pyudev.Monitor.from_netlink(udev_context)
    monitor.filter_by("block")
    for device in iter(monitor.poll, None):
        print(device.action)
        if "ID_FS_TYPE" in device and device.action == "add":
            name_of_stick = Path(device.get("ID_FS_LABEL"))
            print(device.action, name_of_stick)
            time.sleep(2)
            return name_of_stick
    raise AssertionError("unreachable code")  # das soll auch nie in Aktion sein, wenn es richtig funktioniert

def usb_stick_loop(queue):
    udev_context = pyudev.Context()
    while True:
        name_of_stick = warte_auf_usb_stick(udev_context)
        queue.put(name_of_stick)

def anzahl_drucke_dokumentieren(connection, name_of_stick, dateiname_roh):
    dateiname_ziel = (
        MEDIA_PFAD
        / name_of_stick
        / dateiname_roh.with_name(
            f"{dateiname_roh.stem}_{DateTime.now():%Y-%m-%d_%H_%M}.csv"
        )
    )
    pd.read_sql(SQL_OUT, connection).to_csv(
        dateiname_ziel, sep=";", decimal=",", index=False
    )

def config_arbeitsdb(connection, name_of_stick):
    config_datei = MEDIA_PFAD / name_of_stick / CONFIG_ON_STICK
    datensaetze = pd.read_csv(        
        config_datei,
        usecols=USED_COLS,
        na_values="x",
        quotechar='"',
        sep=";",
        encoding="utf-8",
        decimal=",",
        dtype={"ID": int, "E_St": int, "ISPT_Nr": str, "Barcode": str},
    )
    if len(datensaetze) != 8:
        raise RuntimeError("es waren keine 8 Datensätze")
    datensaetze.to_sql(
        "knopfdaten", connection, if_exists="replace", index=False
    )
    with closing(connection.cursor) as cursor:
        cursor.execute("insert or ignore into numbers(ID) select ID from knopfdaten")
    connection.commit()
    return "Daten erfolgreich übertragen"

def auswerfen(name_of_stick):
    subprocess.run(
        ["umount", MEDIA_PFAD / name_of_stick],
        check=True,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
    )

def copy_files(connection, name_of_stick):
    try:
        ergebnis = config_arbeitsdb(db_engine, name_of_stick)
        messagebox.showinfo(message=ergebnis, icon='info')
        anzahl_drucke_dokumentieren(
            db_engine, name_of_stick, CSV_DATEI_TO_STICK
        )
    except Exception as error:
        messagebox.showinfo(message=str(error), icon='error')
    try:
        auswerfen(name_of_stick)
    except subprocess.CalledProcessError as error:
        error_text = f"Fehler {error.returncode} beim Auswerfen: {error.stdout}"
        messagebox.showinfo(message=str(error_text), icon='error')


# und bitte bessere Namen für die Spalten!
KnopfDaten = namedtuple("KnopfDaten", "id, e_st, zeile3, zeile5, zeile6, zeile7")

def lade_daten(connection):
    with closing(connection.cursor()) as cursor:
        cursor.execute(f"select {','.join(KnopfDaten._fields)} from knopfdaten")
        return [KnopfDaten(*row) for row in cursor]

def prepare_rows_for_print(count, row, first_line):
    return "\n".join([
        "^XA",
        "^FXUnicode:",
        "^CI28",
        "^FXBox oben:",
        "^FO690,20^GB0,1150,8,^FS",
        "^FXgroßeBox:",
        "^FO15,20^GB790,1150,8^FS",
        "^FXZwischenlinie",
        "^FO495,20^GB0,1150,8,^FS",
        "^FXZwischenlinie unten",
        "^FO125,20^GB0,1150,8,^FS",
        "^FO720,200^A0R,50,50^FB800,1,0,C^FD",
        first_line,
        "^FS^FO620,200^A0R,60,60^FB800,1,0,C^FD",
        str(row.e_st),
        "^FS^FO500,200^A0R,95,95^FB800,1,0,C^FD",
        str(row.zeile3),
        "^FS^FO0,180^BY3",
        "^BCR,90,Y,N,N",
        "^FO385,370^BY4^FD",
        str(count),
        "^FS^FO260,200^A0R,70,70^FB800,,0,C^FD",
        str(row.zeile5),
        "^FS^FO120,200^A0R,60,60^FB800,2,0,C^FD",
        str(row.zeile6),
        "^FS^FO40,200^A0R,70,70^FB800,1,0,C^FD",
        str(row.zeile7),
        "^FS",
        "^XZ",
        ""
    ])

def datei_drucken(text):    
    subprocess.run(["lp", "-"], input=text.encode('utf-8'), check=True)

def zaehl_ausdrucke(connection, id):   
    with closing(connection.cursor()) as cursor:
        cursor.execute(SQL_UPDATE_AKTUELLE_NR, [id])
        cursor.execute(SQL_SELECT_KNOPFDATEN, [id])
        ispt_nr, letzte_nr = cursor.fetchone()
        barcode = f"{ispt_nr}{CHECKPOINT}{letzte_nr}"
        cursor.execute("update knopfdaten SET Barcode = ? where id = ?",  (barcode, id))
        print(barcode, letzte_nr)
        connection.commit()
    return barcode


class MainWindow(tk.Tk):
    def __init__(self):
        super().__init__()
        self.title("Auswahl der Label")
        self["background"] = "#f2c618"
        self.connection = sqlite3.connect(f"{DATABASE}")
        button_frame = tk.Frame(self, width=1400, height=600)
        button_frame.grid(row=0, column=0, padx=0, pady=0)
        self.buttons = []
        for index in range(8):
            row_index, column_index = divmod(index, 4)
            button = tk.Button(
                button_frame,
                bg="#f2c618",
                width=22,
                height=11,
                command=partial(self.on_click, index),
                )
            button.grid(row=row_index, column=column_index, padx=0, pady=0)
            self.buttons.append(button)
        self.load_config()
        self.bind('<ButtonPress-1>', self.start_druck)
        self.bind('<ButtonRelease-1>', self.stopp_druck)
        self.knopf_drueck_zeit = None
        self.queue = Queue()
        threading.Thread(target=usb_stick_loop, args=(queue,), daemon=True)
        self.after(500, self.queue_loop)

    def load_config(self):
        self.config = lade_daten(self.connection)
        for button, entry in zip(self.buttons, self.config):
            button['text'] = f"{entry.e_st}\n{entry.zeile3}\n{entry.zeile5}"

    def start_druck(self, event):
        self.knopf_drueck_zeit = time.monotonic()
        print("start_druck", self.knopf_drueck_zeit)

    def stopp_druck(self, event):
        knopf_loslass_zeit = time.monotonic()
        print("stopp_druck", knopf_loslass_zeit)
        if knopf_loslass_zeit - self.knopf_drueck_zeit > 5:
            print("schalte jetzt aus")
            self.exit_application()
    
    def exit_application(self):
        result = tk.messagebox.askyesno(
            title="Ausschalten/Neustarten",
            message="Soll der Rechner ausgeschaltet werden?\n Mit »Ja» ausschalten, mit »Nein« zurück zum Programm"
        )
        if result:
            self.destroy()
        else:
            tk.messagebox.showinfo(
                'Zurück',
                'Das Hauptfenster wird wieder gezeigt'
            )
        
    def on_click(self, index):
        entry = self.config[index]
        count = zaehl_ausdrucke(self.connection, entry.id)
        print(count)
        string_zum_druck = prepare_rows_for_print(count, entry, ORT)
        datei_drucken(string_zum_druck)

    def queue_loop(self):
        if not self.queue.empty():
            name_of_stick = self.queue.get()
            copy_files(self.connection, name_of_stick)
            self.load_config()
        self.after(500, self.queue_loop)

def main():
    BARCODE_DB_FILENAME.mkdir(parents=True, exist_ok=True)
    root = MainWindow()
    root.mainloop()
        
if __name__ == '__main__':
    main()
theoS
User
Beiträge: 108
Registriert: Dienstag 5. November 2019, 21:44

os.execl zu benutzen ist nicht schön und auch völlig unnötig. Der einfache Weg, einfach die Knöpfe zu aktualisieren, ist wohl zu offensichtlich.
Es tut mir leid, aber ich sehe das einfach nicht.
Das kommt vielleicht auch daher, dass ich den Code der mir die Knöpfe beschriftet vorher schon nicht so recht verstanden hab. Er ist doch eigentlich in einer Liste, oder?

Code: Alles auswählen

enumerate(configdaten):
Enumerate funktioniert doch mit Listen?
Die Schreibweise anzupassen hat mich leider in keiner Weise weitergebracht, wie du siehst, rutscht das immer wieder mit hoch weil das meine letzte funktionierende Version von dem Codeteil ist.
Sorry, wenn ich das dann immer wieder übersehe. Leider hilft mir das mit dem Code jetzt auch nicht so wirklich weiter, denn ich verstehe ihn wieder an vielen Stellen nicht und bekomme zudem die Fehlermeldung die ich schon die ganze Zeit in einer halben Million Abwandlungen sehe:

Code: Alles auswählen

  File "/home/earl/projekt/Zusammen2.05/alles.py", line 222, in __init__
    self.queue = Queue()
NameError: name 'Queue' is not defined

Das ist die erste Meldung beim Starten und seltsamerweise beim Drücken des zweiten Buttons in der zweiten Reihe.

Ich werde mal versuchen, den Code zu verstehen, aber ein paar Kommentare zwischen den Zeilen würden mich weniger frustrieren und mehr helfen.
Danke.
Sirius3
User
Beiträge: 17711
Registriert: Sonntag 21. Oktober 2012, 17:20

Sorry, muss ja auch queue.Queue heißen. Du hast dir halt kein einfaches Problem ausgedacht. GUIs sind komplex, Datenbanken sind komplex, und mit Threads kann man eigentlich nur alles falsch machen, wenn man nicht schon viel Erfahrung damit hat.

Welche Stelle mit den Buttons verstehst Du den nicht richtig?
theoS
User
Beiträge: 108
Registriert: Dienstag 5. November 2019, 21:44

GUIs sind komplex, Datenbanken sind komplex, und mit Threads kann man eigentlich nur alles falsch machen, wenn man nicht schon viel Erfahrung damit hat.
Das macht mir Mut...
Das mit den Knöpfen habe ich schon von Anfang an nur so halb verstanden, da stand ja

Code: Alles auswählen

for index, entry in enumerate(configdaten):
            row_index, column_index = divmod(index, 4)
enumerate nummeriert mit die Zeilen ja durch, gibt mir Zeile und Spalte zurück wo der Knopf in dem Gitter hin soll.
Und in meinem Code vorher habe ich das so verstanden, dass er damit auch gleich die Postion des Textes zuordnet. Das ist mir aber nicht so ganz klar gewesen, wie genau der Text zum Knopf kommt bzw. umgekehrt.
Nach dem Index?
Mit range(8) ist mir zumindest jetzt die Erstellung der Knöpfe ein wenig klarer, aber mit dem named_tuple hast du mich mal wieder geschafft. :x
Wenn ich das recht verstehe kann man da die zugehörigen Zahlen mit dem "Begriff" auslesen. Mit diesen Bezeichnungen bin ich selbst auch nicht glücklich, aber die kommen halt von dieser Datenbank so rüber. Bisher hatten die noch keine Gelegenheit sich da drüber Gedanken zu machen, so muss ich mit dem leben. Immerhin kann ich zuordnen was sie damit meinen, (Zeile3 ist die dritte Zeile des Labels das eben irgendwelche Bezeichnungen haben kann. Ich hoffe bloß, dass die nie auf den Gedanken kommen, bis zur ZeileX Einträge zu wollen, sonst muss ich da einen erwürgen.)

queue.Queue() hab ich an folgende Stelle gesetzt:

Code: Alles auswählen

        self.load_config()
        self.bind('<ButtonPress-1>', self.start_druck)
        self.bind('<ButtonRelease-1>', self.stopp_druck)
        self.knopf_drueck_zeit = None
        self.queue = queue.Queue()
        threading.Thread(target=usb_stick_loop, args=(queue,), daemon=True)
        self.after(500, self.queue_loop)
Das war die einzige, die mir logisch erschien. (heißt nicht viel)
Wieso kommt in args=(queue,) ein Komma und danach nix? würde da ein zweiter Wert erwartet?

In dem Teil jetzt habe ich in Spyder eine Meldung bekommen dass db_engine nicht definiert ist, dann hab ich die Zeile die das definiert (db_engine = create_engine(f"sqlite:///{DATABASE}", encoding="utf-8")) vom alten Code rüberkopiert festgestell, dass nun create_engine nicht definiert ist, weil das ja vom sqlalchemy-modul kam.
Eigentlich sollte das aber doch auch die connection sein, oder?
Ich konnte es nicht ausprobieren, denn der Code reagiert überhaupt nicht auf die Anwesenheit des Sticks. (so wie bei meinen eigenen Versuchen die beiden Codeteile zusammenzubringen)

Code: Alles auswählen

def copy_files(connection, name_of_stick):
    try:
        db_engine = connection
        ergebnis = config_arbeitsdb(db_engine, name_of_stick)
        messagebox.showinfo(message=ergebnis, icon='info')
        anzahl_drucke_dokumentieren(
            db_engine, name_of_stick, CSV_DATEI_TO_STICK
        )
    except Exception as error:
        messagebox.showinfo(message=str(error), icon='error')
    try:
        auswerfen(name_of_stick)
    except subprocess.CalledProcessError as error:
        error_text = f"Fehler {error.returncode} beim Auswerfen: {error.stdout}"
        messagebox.showinfo(message=str(error_text), icon='error')
Ach ja, noch eine Frage die du mir vielleicht beantworten kannst
. Obwohl, das was unter dem if steht sollte eigentlich in main stehen.
Das kam daher, dass der Code sonst nicht lief wenn ich ihn gestartet hab. In einem Beispiel zum concurrent future hab ich gesehen dass die das unter das if schreiben, dachte mir das probier ich und dann gings. Das ist mir zwar schon sehr seltsam erschienen, aber mit meiner Logik komme ich da leider meistens nicht sehr weit. (da fehlt meinem Hirn noch ein import durchblick from Python :cry: )
Wieso? Ich dachte, das mit dem if an der Stelle steht deshalb dass wenn man den Code von einem anderen Modul starten kann und aus dem Modul selbst auch. Bei einigen Beispielen in der Dokumentation steht das nicht drunter und es geht trotzdem. (Okay, das heißt nicht viel, denn ich hab festgestellt, dass manche Sachen in der Dokumentation nicht in IDLE funktionieren. Auch nicht gerade aufbauend weil das nirgendwo erwähnt wird)
Wieso also tun das manche? Grade im Zusammenhang mit Threading ist mir das total aufgefallen, machen das viele dass sie die Main() fast leer haben und das alles in das If schreiben?

Danke.
theoS
User
Beiträge: 108
Registriert: Dienstag 5. November 2019, 21:44

Jetzt hab ich mal einen Versuch gewagt, der "fast" funktioniert.
Hab mir gesagt, der Thread mit dem USB-Stick wird ja gar nicht gestartet. Zumindest tut das die Schreibweise wie du vorgeschlagen hat nicht.
also hab ich zuerst mal probiert aus

Code: Alles auswählen

threading.Thread(target=usb_stick_loop, args=(queue,), daemon=True)

Code: Alles auswählen

threading.Thread(target=usb_stick_loop(queue,),  daemon=True)
zu machen.
Das hat den Thread zwar gestartet aber ich bekam die Meldung beim Einstecken des Sticks, dass

Code: Alles auswählen

    queue.put(name_of_stick)
AttributeError: module 'queue' has no attribute 'put'
Also mache ich daraus das hier:

Code: Alles auswählen

        self.queue = queue.Queue()
        
        usb_looper_start = threading.Thread(target=usb_stick_loop, args = (self.queue, ), daemon=True)
        usb_looper_start.start()
Das funktioniert mit einer kleinen Einschränkung.
Wenn die Daten vom Stick gezogen sind, dann wird die Datei zwar eingelesen, aber es wird keine auf den Stick geschrieben. Immerhin wird er ausgeworfen.
Die Fehlermeldung kommt in einer Infobox, was mich ganz irritiert hat.
'builtin_function_or_method'
object has no attribute 'close'
Könnte sein, dass das daher kommt, dass in dem Code vorher, wo die beiden noch getrennt waren, mit SQLalchemy gearbeitet hat?
Sirius3
User
Beiträge: 17711
Registriert: Sonntag 21. Oktober 2012, 17:20

Ich kann halt das Skript bei mir nicht testen, daher die vielen kleinen Fehler. Um da weiter zu helfen, müßte man den kompletten Traceback haben, also die Fehlerbehandlung rausnehmen.
Schön, das solangsam alles tut.
theoS
User
Beiträge: 108
Registriert: Dienstag 5. November 2019, 21:44

Also, das war jetzt schon ein wenig seltsam.
Hab in dem Code mal bei jeder einzelnen Funktion ein Print reingeschrieben um zu sehen wo der Fehler ausgelöst wurde, dabei kam dann eine Tracebackmeldung.

Code: Alles auswählen

loadconfig
lade daten
bin in der loop vom usb
warte auf usb
remove
remove
add
add
add OLEBIRD
warte auf usb
copyFile <sqlite3.Connection object at 0x7fd692626b90> OLEBIRD
<sqlite3.Connection object at 0x7fd692626b90>
config arbeitsdatei
Exception in Tkinter callback
Traceback (most recent call last):
  File "/home/earl/projekt/Zusammen2.05/alles5.01.py", line 117, in config_arbeitsdb
    cursor.execute("insert or ignore into numbers(ID) select ID from knopfdaten")
AttributeError: 'builtin_function_or_method' object has no attribute 'execute'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/earl/projekt/Zusammen2.05/alles5.01.py", line 135, in copy_files
    ergebnis = config_arbeitsdb(db_engine, name_of_stick)
  File "/home/earl/projekt/Zusammen2.05/alles5.01.py", line 117, in config_arbeitsdb
    cursor.execute("insert or ignore into numbers(ID) select ID from knopfdaten")
  File "/usr/lib/python3.6/contextlib.py", line 185, in __exit__
    self.thing.close()
AttributeError: 'builtin_function_or_method' object has no attribute 'close'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/lib/python3.6/tkinter/__init__.py", line 1705, in __call__
    return self.func(*args)
  File "/usr/lib/python3.6/tkinter/__init__.py", line 749, in callit
    func(*args)
  File "/home/earl/projekt/Zusammen2.05/alles5.01.py", line 292, in queue_loop
    copy_files(self.connection, name_of_stick)
  File "/home/earl/projekt/Zusammen2.05/alles5.01.py", line 142, in copy_files
    print("hier war das der Error beim Kopieren", message)
NameError: name 'message' is not defined
Da konnte ich jetzt ein wenig suchen und glaube ich hab es gefunden.
Es war die Zeile mit closing(connection.cursor)...
Das hatte ich in einem früheren Versuch die USB-Version auch schon mal. Hier geht das nicht mit dem cursor sondern mit der connection.execute.
Das hab ich jetzt mal umgesetzt und ich glaube, es geht.
Die prints muss ich noch rauswerfen und vielleicht noch was einbauen einen evtl. Fehler abzufangen, denn dafür war vermutlich das closing gedacht, oder?
So sieht es zumindest aus als würde alles klappen.
Danke für alles auf jeden Fall!!!

Code: Alles auswählen

 #!/usr/bin/env python3
# -*- coding: utf8 -*-
#import csv
import queue
import threading
import sqlite3
import time
import subprocess
import tkinter as tk
from collections import namedtuple
from tkinter import messagebox
from functools import partial
from contextlib import closing
from pathlib import Path
from datetime import datetime as DateTime
import pandas as pd
#import usbconfig
import pyudev


PFAD = Path.home() / ".DruckData"
BARCODE_DB_FILENAME = PFAD / "sqlite/db"
DATABASE = BARCODE_DB_FILENAME / "config8.db"

MEDIA_PFAD = Path("/media/earl/")
CONFIG_ON_STICK = Path("TB_Ausgabe_8iii.txt")
CSV_DATEI_TO_STICK = Path("gedruckte_nummern.csv")

ORT = "PZ Dingsbums 14"
CHECKPOINT = "42"
SQL_UPDATE_AKTUELLE_NR = """UPDATE numbers
    SET letzte_nr = (letzte_nr + 1) % 10000 
    WHERE id = ?"""
SQL_SELECT_KNOPFDATEN = """SELECT knd.ISPT_Nr,
                           num.Letzte_Nr
                           FROM knopfdaten as knd,
                           numbers as num
                           WHERE knd.ID = ? 
                       AND knd.ID = num.ID"""

SQL_OUT = """select distinct
             knopfdaten.ID,
             knopfdaten.E_St,
             knopfdaten.ISPT_Nr,
             numbers.Letzte_Nr
             from knopfdaten,
             numbers
             where
             numbers.ID = knopfdaten.ID"""

USED_COLS = [
        'ID',
        'E_St',
        'Zeile3',
        'Zeile5',
        'Zeile6',
        'Zeile7',
        'Zeilex',
        'ISPT_Nr',
        'HallenPos',
        'Aktuell_Nr',
        'Barcode'
        ]

def warte_auf_usb_stick(udev_context):
    print("warte auf usb")
    monitor = pyudev.Monitor.from_netlink(udev_context)
    monitor.filter_by("block")
    for device in iter(monitor.poll, None):
        print(device.action)
        if "ID_FS_TYPE" in device and device.action == "add":
            name_of_stick = Path(device.get("ID_FS_LABEL"))
            print(device.action, name_of_stick)
            time.sleep(2)
            return name_of_stick
    raise AssertionError("unreachable code")  # das soll auch nie in Aktion sein, wenn es richtig funktioniert

def usb_stick_loop(queue):
    print("bin in der loop vom usb")
    udev_context = pyudev.Context()
    while True:
        name_of_stick = warte_auf_usb_stick(udev_context)
        queue.put(name_of_stick)

def anzahl_drucke_dokumentieren(connection, name_of_stick, dateiname_roh):
    print("ducke doku")
    dateiname_ziel = (
        MEDIA_PFAD
        / name_of_stick
        / dateiname_roh.with_name(
            f"{dateiname_roh.stem}_{DateTime.now():%Y-%m-%d_%H_%M}.csv"
        )
    )
    pd.read_sql(SQL_OUT, connection).to_csv(
        dateiname_ziel, sep=";", decimal=",", index=False
    )

def config_arbeitsdb(connection, name_of_stick):
    print("config arbeitsdatei")
    config_datei = MEDIA_PFAD / name_of_stick / CONFIG_ON_STICK
    datensaetze = pd.read_csv(        
        config_datei,
        usecols=USED_COLS,
        na_values="x",
        quotechar='"',
        sep=";",
        encoding="utf-8",
        decimal=",",
        dtype={"ID": int, "E_St": int, "ISPT_Nr": str, "Barcode": str},
    )
    if len(datensaetze) != 8:
        raise RuntimeError("es waren keine 8 Datensätze")
    datensaetze.to_sql(
        "knopfdaten", connection, if_exists="replace", index=False
    )
    with connection as conn:
        conn.execute("insert or ignore into numbers(ID) select ID from knopfdaten")
    
    return "Daten erfolgreich übertragen"

def auswerfen(name_of_stick):
    print("werfe DS aus")
    subprocess.run(
        ["umount", MEDIA_PFAD / name_of_stick],
        check=True,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
    )

def copy_files(connection, name_of_stick):
    print("copyFile", connection, name_of_stick)
    try:
        db_engine = connection
        print(db_engine)
        ergebnis = config_arbeitsdb(db_engine, name_of_stick)
        print(ergebnis)
        messagebox.showinfo(message=ergebnis, icon='info')
        anzahl_drucke_dokumentieren(
            db_engine, name_of_stick, CSV_DATEI_TO_STICK
        )
    except Exception as error:
        print("hier war das der Error beim Kopieren", message)
        messagebox.showinfo(message=str(error), icon='error')
    try:
        auswerfen(name_of_stick)
    except subprocess.CalledProcessError as error:
        error_text = f"Fehler {error.returncode} beim Auswerfen: {error.stdout}"
        messagebox.showinfo(message=str(error_text), icon='error')


# und bitte bessere Namen für die Spalten!
KnopfDaten = namedtuple("KnopfDaten", "id, e_st, zeile3, zeile5, zeile6, zeile7")

def lade_daten(connection):
    print("lade daten")
    with closing(connection.cursor()) as cursor:
        cursor.execute(f"select {','.join(KnopfDaten._fields)} from knopfdaten")
        return [KnopfDaten(*row) for row in cursor]

def prepare_rows_for_print(count, row, first_line):
    return "\n".join([
        "^XA",
        "^FXUnicode:",
        "^CI28",
        "^FXBox oben:",
        "^FO690,20^GB0,1150,8,^FS",
        "^FXgroßeBox:",
        "^FO15,20^GB790,1150,8^FS",
        "^FXZwischenlinie",
        "^FO495,20^GB0,1150,8,^FS",
        "^FXZwischenlinie unten",
        "^FO125,20^GB0,1150,8,^FS",
        "^FO720,200^A0R,50,50^FB800,1,0,C^FD",
        first_line,
        "^FS^FO620,200^A0R,60,60^FB800,1,0,C^FD",
        str(row.e_st),
        "^FS^FO500,200^A0R,95,95^FB800,1,0,C^FD",
        str(row.zeile3),
        "^FS^FO0,180^BY3",
        "^BCR,90,Y,N,N",
        "^FO385,370^BY4^FD",
        str(count),
        "^FS^FO260,200^A0R,70,70^FB800,,0,C^FD",
        str(row.zeile5),
        "^FS^FO120,200^A0R,60,60^FB800,2,0,C^FD",
        str(row.zeile6),
        "^FS^FO40,200^A0R,70,70^FB800,1,0,C^FD",
        str(row.zeile7),
        "^FS",
        "^XZ",
        ""
    ])

def datei_drucken(text):    
    subprocess.run(["lp", "-"], input=text.encode('utf-8'), check=True)

def zaehl_ausdrucke(connection, id):
    print("zahl ausdrucke")
    with closing(connection.cursor()) as cursor:
        cursor.execute(SQL_UPDATE_AKTUELLE_NR, [id])
        cursor.execute(SQL_SELECT_KNOPFDATEN, [id])
        ispt_nr, letzte_nr = cursor.fetchone()
        barcode = f"{ispt_nr}{CHECKPOINT}{letzte_nr}"
        cursor.execute("update knopfdaten SET Barcode = ? where id = ?",  (barcode, id))
        print(barcode, letzte_nr)
        connection.commit()
    return barcode


class MainWindow(tk.Tk):
    def __init__(self):
        super().__init__()
        self.title("Auswahl der Label")
        self["background"] = "#f2c618"
        self.connection = sqlite3.connect(f"{DATABASE}")
        button_frame = tk.Frame(self, width=1800, height=800)
        button_frame.grid(row=0, column=0, padx=0, pady=0, sticky='NSWE')
        self.buttons = []
        for index in range(8):
            row_index, column_index = divmod(index, 4)
            button = tk.Button(
                button_frame,
                bg="#f2c618",
                width=33,
                height=18,
                command=partial(self.on_click, index),
                )
            button.grid(row=row_index, column=column_index, padx=0, pady=0, sticky='NSWE')
            self.buttons.append(button)
        self.load_config()
        self.bind('<ButtonPress-1>', self.start_druck)
        self.bind('<ButtonRelease-1>', self.stopp_druck)
        self.knopf_drueck_zeit = None
        self.queue = queue.Queue()
        
        usb_looper = threading.Thread(target=usb_stick_loop, args = (self.queue, ), daemon=True)
        usb_looper.start()
        self.after(500, self.queue_loop)
        self.wait_visibility()
        self._center()

    def _center(self, *args):

        xpos = (self.winfo_screenwidth() - self.winfo_width()) / 2
        ypos = ((self.winfo_screenheight() - self.winfo_height()) / 2) / 100 * 90
        self.wm_geometry("+%d+%d" % (xpos,ypos))

        

    def load_config(self):
        print("loadconfig")
        self.config = lade_daten(self.connection)
        for button, entry in zip(self.buttons, self.config):
            button['text'] = f"{entry.e_st}\n{entry.zeile3}\n{entry.zeile5}"

    def start_druck(self, event):
        self.knopf_drueck_zeit = time.monotonic()
        print("start_druck", self.knopf_drueck_zeit)

    def stopp_druck(self, event):
        knopf_loslass_zeit = time.monotonic()
        print("stopp_druck", knopf_loslass_zeit)
        if knopf_loslass_zeit - self.knopf_drueck_zeit > 5:
            print("schalte jetzt aus")
            self.exit_application()
    
    def exit_application(self):
        result = messagebox.askyesno(
            title="Ausschalten/Neustarten",
            message="Soll der Rechner ausgeschaltet werden?\n Mit »Ja» ausschalten, mit »Nein« zurück zum Programm"
        )
        if result:
            self.destroy()
        else:
            messagebox.showinfo(
                'Zurück',
                'Das Hauptfenster wird wieder gezeigt'
            )
        
    def on_click(self, index):
        entry = self.config[index]
        count = zaehl_ausdrucke(self.connection, entry.id)
        print(count)
        string_zum_druck = prepare_rows_for_print(count, entry, ORT)
        datei_drucken(string_zum_druck)

    def queue_loop(self):
        #print("ich bin die queue")
        if not self.queue.empty():
            
            name_of_stick = self.queue.get()
            copy_files(self.connection, name_of_stick)
            self.load_config()
        self.after(500, self.queue_loop)

def main():
    BARCODE_DB_FILENAME.mkdir(parents=True, exist_ok=True)
    root = MainWindow()
    root.mainloop()
        
if __name__ == '__main__':
    main()
Sirius3
User
Beiträge: 17711
Registriert: Sonntag 21. Oktober 2012, 17:20

Es fehlten nur die Klammern

Code: Alles auswählen

with closing(connection.cursor()) as cursor:
execute von der connection-Instanz sollte man nicht benutzen, da es nicht zum Standard gehört.
theoS
User
Beiträge: 108
Registriert: Dienstag 5. November 2019, 21:44

Hab ich mal probiert, das geht nicht. Es kommt keine Fehlermeldung, keine Erfolgsmeldung.
Da ich ja in jedem Teil ein print hab, kann ich dir nur sagen, wo er zuletzt was getan hat:
'warte auf usb' war das letzte was mir angezeigt wird, das steht in dem Teil:

Code: Alles auswählen

def warte_auf_usb_stick(udev_context):
    print("warte auf usb")
    monitor = pyudev.Monitor.from_netlink(udev_context)
    monitor.filter_by("block")
    for device in iter(monitor.poll, None):
        print(device.action)
        if "ID_FS_TYPE" in device and device.action == "add":
            name_of_stick = Path(device.get("ID_FS_LABEL"))
            print(device.action, name_of_stick)
            time.sleep(2)
            return name_of_stick
    raise AssertionError("unreachable code")  # das soll auch nie in Aktion sein, wenn es richtig funktioniert
Der Stick wird auch nicht ausgeworfen. Die Oberfläche läuft weiter, das Drucken geht auch, aber das Nachladen bleibt aus.

Aber, Pandas macht zudem ja auch einen Autocommit, wahrscheinlich ist das dann in dem Fall die Engine, nicht die Connection, auch wenn das so gesehen das gleiche ist - also das gleiche wahrscheinlich nicht, aber es funktioniert genauso mit der engine die sqlalchemy erzeugt und die connection von sqlite3. Das hab ich mal an einem Testmodul ausprobiert. Nur mit Pandas halt ohne Commit.
Vielleicht sollte ich das mal umbenennen in engine, auch wenn das nicht viel an der Funktionalität ändert, kann sein, dass ich den Code selbst mal wieder lesen muss in ein paar Tagen :o)
theoS
User
Beiträge: 108
Registriert: Dienstag 5. November 2019, 21:44

Kurze Zwischenmeldung: Es klappt auf dem Raspberry auch.
Hab inzwischen noch rausgefunden wie ich die Knöpfe in den Vollbildmodus bringe und mit dem Ausschalten der GUI auch gleich den Rechner runterfahre.

Mein größtes Problem ist grad, dass ich die Anwendung nicht in den "Autostart" bringe. Tkinter kommt beim Start des service über systemd nicht an das Display ran. Konnte die Fehlermeldung noch nicht kopieren, vielleicht kennt die ja jemand und was man dagegen tun kann...
theoS
User
Beiträge: 108
Registriert: Dienstag 5. November 2019, 21:44

Nun habe ich die Fehlermeldung aus dem jourmalctl ausgelesen, das hatte gestern nicht mehr geklappt. Da war der Text in der Datei nur noch halb da.
Hier der Teil in der der Service gestartet wird, und zwar zum Testen von mir manuell:

Code: Alles auswählen

Mai 09 22:15:50 raspberrypi systemd[1]: Started labeldrucker Service.
Mai 09 22:15:50 raspberrypi sudo[1217]: pam_unix(sudo:session): session closed for user root
Mai 09 22:15:51 raspberrypi python3[1224]: Traceback (most recent call last):
Mai 09 22:15:51 raspberrypi python3[1224]:   File "/home/pi/Labeldruck/alles5.02.py", line 294, in <module>
Mai 09 22:15:51 raspberrypi python3[1224]:     main()
Mai 09 22:15:51 raspberrypi python3[1224]:   File "/home/pi/Labeldruck/alles5.02.py", line 290, in main
Mai 09 22:15:51 raspberrypi python3[1224]:     root = MainWindow()
Mai 09 22:15:51 raspberrypi python3[1224]:   File "/home/pi/Labeldruck/alles5.02.py", line 204, in __init__
Mai 09 22:15:51 raspberrypi python3[1224]:     super().__init__()
Mai 09 22:15:51 raspberrypi python3[1224]:   File "/usr/lib/python3.7/tkinter/__init__.py", line 2023, in __init__
Mai 09 22:15:51 raspberrypi python3[1224]:     self.tk = _tkinter.create(screenName, baseName, className, interactive, wantobjects, useTk, sync, use)
Mai 09 22:15:51 raspberrypi python3[1224]: _tkinter.TclError: no display name and no $DISPLAY environment variable
Mai 09 22:15:52 raspberrypi systemd[1]: labeldrucker.service: Main process exited, code=exited, status=1/FAILURE
Mai 09 22:15:52 raspberrypi systemd[1]: labeldrucker.service: Failed with result 'exit-code'.
Mai 09 22:16:00 raspberrypi sudo[1228]:       pi : TTY=pts/0 ; PWD=/home/pi ; USER=root ; COMMAND=/bin/journalctl
Mai 09 22:16:00 raspberrypi sudo[1228]: pam_unix(sudo:session): session opened for user root by (uid=0)
Mai 09 22:16:00 raspberrypi sudo[1228]: pam_unix(sudo:session): session closed for user root
Mai 09 22:17:01 raspberrypi CRON[1273]: pam_unix(cron:session): session opened for user root by (uid=0)
Mai 09 22:17:01 raspberrypi CRON[1277]: (root) CMD (   cd / && run-parts --report /etc/cron.hourly)
Mai 09 22:17:01 raspberrypi CRON[1273]: pam_unix(cron:session): session closed for user root
Dabei habe ich die Software auf 2 Pi's getestet indem ich sie mit IDLE und im Terminal gestartet hab, in dem ich das Skript ausführbar gemacht und doppelt geklickt sowie durch ein Shellskript gestartet.
Jedesmal läuft das alles super, lediglich die Druckgeschwindigkeit ist auf einem der beiden Test-Pi's ein wenig lahmer, aber es tut was es soll ohne Fehlermeldung.
Nur als "service" gibt es diese Meldung im systemctl.
Falls jemand eine Idee hat, woran das liegen könnten, noch meine Datei labeldrucker.service die hier gestartet werden soll:

Code: Alles auswählen

[Unit]
Description=labeldrucker Service
After=multi-user.target

[Service]
User=pi
Group=pi
Type=idle
WorkingDirectory=/home/pi/Labeldruck
ExecStart=python3 /home/pi/Labeldruck/alles5.02.py &

[Install]
WantedBy=multi-user.target
Das Einzige was mir dazu einfällt ist, dass die Software vielleicht al root gestartet wird der keinen eigenen Bildschirm hat. Das ist aber nur gestochert...

Danke für Tipps, die Software zu starten mit Klick auf ein Symbol ginge auch, aber die sind auf dem Touchscreen so winzig.
Antworten