Labeldruck und was draus folgt

Du hast eine Idee für ein Projekt?
theoS
User
Beiträge: 108
Registriert: Dienstag 5. November 2019, 21:44

With a little help...
Hab ich jetzt herausgefunden, wie ich den Rechner runterfahren wenn ich einen Knopf länger drücke und auch wie ich drucke mit subprocess()
Was ich jetzt nicht hab, ist die Anbindung dieses Teils an sqlalchemy. Wobei ich das bisher "nur" für das USB-Teil brauche.
Was mir vermutlich hier noch abgeht ist die Fehlerbehandlung. Aber so richtig weiß ich nicht, was das Progg eigentlich tun soll, wenn der Printer nicht druckt. Das schiebt den Befehl ja bloß durch ans System.
Zudem habe ich mir überlegt, ob ich nicht gleich einfach den rohen Text an lp übergeben kann. Der Labeldrucker macht aus dem Kauderwelsch schöne Label wenns von der Datei kommt, warum nicht auch vom Text.
Nur da hab ich vermutlich einfach zu wenig Ahnung...
Hier mal der Code.
Vielleicht ist ja was dabei was halbwegs passt.

Code: Alles auswählen

#!/usr/bin/env python3
# -*- coding: utf8 -*-

import sqlite3
from sqlite3 import Error
from pathlib import Path
import csv
import tkinter as tk
from tkinter import messagebox
from functools import partial
from contextlib import closing
import time
from datetime import datetime as DateTime
import subprocess

PFAD = Path.home() / ".DruckData"
BARCODE_DB_FILENAME = PFAD / "sqlite/db"
DATABASE = BARCODE_DB_FILENAME / "config8.db"
ORT = "PZ Dingsbums 14"
CHECKPOINT = "42"
SQL_UPDATE_AKTUELLE_NR = """UPDATE numbers
    SET letzte_nr = (letzte_nr + 1) % 10000 
    WHERE id = ?"""
SQL_SELECT_KNOPFDATEN = """SELECT knd.ISPT_Nr,
                           num.Letzte_Nr
                           FROM knopfdaten as knd,
                           numbers as num
                           WHERE knd.ID = ? 
	                   AND knd.ID = num.ID"""



def save_row_as_file(count, row, first_line):
    
    text = "\n".join(
        [
            "^XA",
            "^FO15,90^GB780,0,8,^FS",
            "^FO15,250^GB780,0,8,^FS",
            "^FO15,700^GB780,0,8,^FS",
            "^FO0,0^GB600,200,2",
            "^FO15,20^GB780,785,4^FS",
            "^FO0,40^A0,50,50^FB800,1,0,C^FD",
            first_line,
            "^FS^FO0,110^A0,60,60^FB800,1,0,C^FD",
            str(row[1]),
            "^FS^FO0,190^A0,70,70^FB800,1,0,C^FD",
            str(row[2]),
            "^FS^FO0,80^BY3",
            "^BCN,170,Y,N,N",
            "^FO165,270^BY4^FD",
            str(count),
            "^FS^FO0,500^A0,60,50^FB800,,0,C^FD",
            str(row[3]),
            "^FS^FO0,580^A0,60,50^FB800,,0,C^FD",
            str(row[4]),
            "^FS^FO0,730^A0,60,60^FB800,1,0,C^FD",
            str(row[5]),
            "^FS",
            "^XZ",
            "",
        ]
    )
    PFAD.mkdir(exist_ok=True)
    datei_zum_druck = PFAD / f"testT_{row[3]}_{row[1]}.zpl"   
    (datei_zum_druck).write_text(text, "utf-8")
    #hier bin ich mir nicht sicher, ob es nicht schlauer wäre
    #auf das speichern zu verzichten und die Datei gleich mit
    #subporcess an den Drucker zu schicken. Wenn das geht...
    return datei_zum_druck
    

def lade_daten(conn):
    curs2 = conn.cursor()
    config_sql = "select ID, E_St, Zeile3, Zeile5, Zeile6, Zeile7 from knopfdaten;"
    curs2.execute(config_sql)
    configdaten = curs2.fetchall()
    
    return  configdaten

 
def zaehl_ausdrucke(connection, id):   
    with closing(connection.cursor()) as cursor:
        cursor.execute(SQL_UPDATE_AKTUELLE_NR, [id])
        cursor.execute(SQL_SELECT_KNOPFDATEN, [id])
        ispt_nr, letzte_nr = cursor.fetchone()
        barcode = f"{ispt_nr}{CHECKPOINT}{letzte_nr}"
        cursor.execute("update knopfdaten SET Barcode = ? where id = ?",  (barcode, id))
        print(barcode, letzte_nr)
        connection.commit()
    return barcode

def datei_drucken(druckdatei):
    print(f"{druckdatei}")
    subprocess.run(['lp' , druckdatei])     
    



class MainWindow(tk.Tk):
    def __init__(self, conn, configdaten):
        super().__init__()
        self.title("Auswahl der Label")
        self["background"] = "#f2c618"
        button_frame = tk.Frame(self, width=1200, height=400)
        button_frame.grid(row=0, column=0, padx=10, pady=3)
        
        for index, entry in enumerate(configdaten):
            row_index, column_index = divmod(index, 4)
            tk.Button(
                button_frame,
                text="{}\n{}".format(entry[1], entry[2]),
                bg="#f2c618",
                width=15,
                height=10,
                command=partial(self.on_click, entry, conn),
                ).grid(row=row_index, column=column_index, padx=0, pady=0)
        self.bind('<ButtonPress-1>', self.start_druck)
        self.bind('<ButtonRelease-1>', self.stopp_druck)
        self.knopf_drueck_zeit = None
    def start_druck(self, event):
        self.knopf_drueck_zeit = time.monotonic()
        print("start_druck", self.knopf_drueck_zeit)

    def stopp_druck(self, event):
        knopf_loslass_zeit = time.monotonic()
        print("stopp_druck", knopf_loslass_zeit)
        if knopf_loslass_zeit - self.knopf_drueck_zeit >5:
            print("schalte jetzt aus")
            self.ExitApplication()
    
    def ExitApplication(self):
        MsgBox = tk.messagebox.askokcancel(
        title="Ausschalten",
        message="Soll der Rechner ausgeschaltet werden?")
        print(MsgBox)
        if MsgBox == True:
           self.destroy()
        else:
            tk.messagebox.showinfo(
                'Zurück',
                'Das Hauptfenster wird wieder gezeigt'
                )
        


    def on_click(self, row, conn):
        count = zaehl_ausdrucke(conn, row[0])
        print(count)
        datei_zum_druck = save_row_as_file(count, row, ORT)
        datei_drucken(datei_zum_druck)


def main():
    
    BARCODE_DB_FILENAME.mkdir(parents=True, exist_ok=True)
    print(f"{BARCODE_DB_FILENAME}/config8.db")
           
    try:
        conn = sqlite3.connect(f"{DATABASE}")        
    except Error as e:
        print(e)
    configdaten = lade_daten(conn)  #lädt die Daten aus der csv nach
    print(configdaten)
    
    root = MainWindow(conn, configdaten) 
    root.mainloop()

        
if __name__ == '__main__':

    main()
Sirius3
User
Beiträge: 17754
Registriert: Sonntag 21. Oktober 2012, 17:20

Warum sollte man nicht direkt drucken können?

Code: Alles auswählen

subprocess.run(["lp", "-"], input=text.encode('utf-8'), check=True)
theoS
User
Beiträge: 108
Registriert: Dienstag 5. November 2019, 21:44

Warum sollte man nicht direkt drucken können?
Das geht aber so was von!
Danke!!!
theoS
User
Beiträge: 108
Registriert: Dienstag 5. November 2019, 21:44

Jetzt bin ich vor dem nächsten Problem. Dooferweise habe ich ja die beiden Programmteile unabhängig von einander gebaut. Jetzt will ich die zusammenbauen, aber wie stell ich das an, ohne dass ich dadurch wieder Fehler ins Programm dazu hole.
Prinzipiell könnte ich hergehen und den einen Teil mit import in den anderen holen - das soll man dann ja nicht, habe ich gelesen. Gut, ich habs natürlich trotzdem versucht und bin gescheitert. (erster Anlauf, hab ich nicht anders erwartet.)
Dann könnte ich die beiden zusammenkopieren.
Aber...
Ich sehe, dass ich da noch Vorbereitung brauche. Hat jemand ein Stichwort für mich, wonach ich suchen muss?
Danke.
Sirius3
User
Beiträge: 17754
Registriert: Sonntag 21. Oktober 2012, 17:20

Warum willst Du jetzt beide Teile in einem haben? Die beiden haben doch im Moment nichts miteinander zu tun, außer, dass sie die selbe Datenbank benutzen.

Ansonsten ist der richtige Weg, beide Programmteile als Modul zu implementieren, diese in einem Hauptprogramm zu importieren und dort richtig aufrufen.
theoS
User
Beiträge: 108
Registriert: Dienstag 5. November 2019, 21:44

Äh.
An den Weg hatte ich jetzt gar nicht gedacht.
Wie schon erwähnt, hatte ich, sogar hier drin, gelesen, dass das in Python so üblich sei.
Das mit einem dritten Modul zu lösen hätte ich mich jetzt schon gar nicht zu denken gewagt.
Wird später gleich ausprobiert.
Ach ja, das eine Teil hat vielleicht schon was mit dem anderen zu tun. Wenn der Stick dann erfolgreich ausgelesen wurde, dann sollte das Knöpfemodul neu gestartet werden sonst werden da ja noch die alten Daten angezeigt.
Aber das kann man sicher dann in dem Hauptmodul machen.
Mal sehen ob ich das hinkrieg.
Danke für den Tipp.
theoS
User
Beiträge: 108
Registriert: Dienstag 5. November 2019, 21:44

Also, das hat geklappt. Hab die Dateien so umbenannt, dass die Versionsnummern hinten weg sind und einfach mit dem Code hier aufgerufen.

Code: Alles auswählen

import configusbdb as cfg
import knopfdruck as knd
from tkinter import messagebox

def main():
    cfg.main()
    knd.main()

if __name__ == '__main__':

    main()
Jetzt habe ich noch eine Infobox in den Code vom USB-Teil mit eingebaut und wenn das abgearbeitet ist, dann sollte mir das dann idealerweise einen Wert zurückgeben der dann den Neustart der Knopfanwendung auslöst.
Da hab ich noch zu knobeln.
theoS
User
Beiträge: 108
Registriert: Dienstag 5. November 2019, 21:44

Jetzt bin ich doch ein wenig arg verwirrt.
Habe ich bei dem USB-Modul die Fehlermeldungen die ich schon habe in die Infobox gepackt. Das funktioniert auch. Aber, jetzt kommt beim Aufruf dieses Teils was merkwürdiges.
Wenn ich das so aufrufe wie es vorher funktioniert hat:

Code: Alles auswählen

import configusbdb as cfg
import knopfdruck as knd

def main():
    cfg.main()
    knd.main()    

if __name__ == '__main__':
    main()
Dann startet entweder der eine Teil, hier der USB-Import/Konfiguration, der andere schweigt.
Wenn ich die Befehle umdrehe, startet die Knopfoberfläche und das USB-Teil schweigt.
Das dürfte doch wohl kaum an der Infobox liegen?
Und die beiden Module allein funktionieren ja auch.

Code: Alles auswählen

#!/usr/bin/env python3
import subprocess
import time
from datetime import datetime as DateTime
from pathlib import Path
import pandas as pd
import pyudev
from sqlalchemy import create_engine
from sqlalchemy.exc import SQLAlchemyError
import tkinter as tk
from tkinter import messagebox


PFAD = Path.home() / ".DruckData"
MEDIA_PFAD = Path("/media/earl/")
PFAD = Path.home() / ".DruckData"
DATABASE = PFAD / "sqlite/db/config8.db"
CONFIG_ON_STICK = Path("TB_Ausgabe_8iii.txt")
CSV_DATEI_TO_STICK = Path("gedruckte_nummern.csv")

SQL_OUT = """select distinct
             knopfdaten.ID,
             knopfdaten.E_St,
             knopfdaten.ISPT_Nr,
             numbers.Letzte_Nr
             from knopfdaten,
             numbers
             where
             numbers.ID = knopfdaten.ID"""

USED_COLS = [
        'ID',
        'E_St',
        'Zeile3',
        'Zeile5',
        'Zeile6',
        'Zeile7',
        'Zeilex',
        'ISPT_Nr',
        'HallenPos',
        'Aktuell_Nr',
        'Barcode'
        ]

def warte_auf_usb_stick(udev_context):
    monitor = pyudev.Monitor.from_netlink(udev_context)
    monitor.filter_by("block")
    for device in iter(monitor.poll, None):
        print(device.action)
        if "ID_FS_TYPE" in device and device.action == "add":
            name_of_stick = Path(device.get("ID_FS_LABEL"))
            print(device.action, name_of_stick)
            time.sleep(2)
            return name_of_stick

    raise AssertionError("unreachable code")  #auch noch nicht 100% klar, trotz der Erklärung. habe ich noch nie in "Aktion" gesehen


def anzahl_drucke_dokumentieren(connection, name_of_stick, dateiname_roh):
    dateiname_ziel = (
        MEDIA_PFAD
        / name_of_stick
        / dateiname_roh.with_name(
            f"{dateiname_roh.stem}_{DateTime.now():%Y-%m-%d_%H_%M}.csv"
        )
    )
    pd.read_sql(SQL_OUT, connection).to_csv(
        dateiname_ziel, sep=";", decimal=",", index=False
    )


def config_arbeitsdb(connection, name_of_stick):
    config_datei = MEDIA_PFAD / name_of_stick / CONFIG_ON_STICK
    
    datensaetze = pd.read_csv(        
        config_datei,
        usecols=USED_COLS,
        na_values="x",
        quotechar='"',
        sep=";",
        encoding="utf-8",
        decimal=",",
        dtype={"ID": int, "E_St": int, "ISPT_Nr": str, "Barcode": str},
    )
    if len(datensaetze) != 8:
        print("es waren keine 8 Datensätze")
        antwort = "es waren keine 8 Datensätze"
    else:
        datensaetze.to_sql(
            "knopfdaten", connection, if_exists="replace", index=False
        )
        sql_ident = "insert or ignore into numbers(ID) select ID from knopfdaten;"
        connection.execute(sql_ident)
        antwort = "Daten erfolgreich übertragen, \n Programm startet neu"     #das funktionierte in der infobox

    return antwort


def auswerfen(name_of_stick):
    subprocess.run(
        ["umount", MEDIA_PFAD / name_of_stick],
        check=True,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
    )

def erfolgsmeldung(ergebnis, icon):      #den code habe ich so wie er ist gefunden. Hier bin ich an dem Fenster das übrigblieb von der Infobox fast verzweifelt
    window = tk.Tk()                                    #ich konnte keine so rechte Erklärung finden, warum hier noch zu der Box ein Fenster aufgeht, so verschwindet das wenigstens.
    window.eval('tk::PlaceWindow %s center' % window.winfo_toplevel())
    window.withdraw
    messagebox.showinfo(message=ergebnis, icon=icon)
    window.deiconify()
    window.destroy()
    window.quit() 
    
##def neustart():  #das war ein Versuch, das Modul nach erfolgter Aktualisierung neu zu laden.
##    knd.main()   #das hat auch "eigentlich" funktioniert, aber nicht so wie gedacht.


    
def main():
    udev_context = pyudev.Context()
    db_engine = create_engine(f"sqlite:///{DATABASE}", encoding="utf-8")
    while True:
        name_of_stick = warte_auf_usb_stick(udev_context)
        try:
            ergebnis = config_arbeitsdb(db_engine, name_of_stick)
            anzahl_drucke_dokumentieren(
                db_engine, name_of_stick, CSV_DATEI_TO_STICK
            )
            ####
            print(ergebnis)
            erfolgsmeldung(ergebnis, 'info')
            #neustart()
        except SQLAlchemyError as error:
            print(error)
            erfolgsmeldung(error, 'error')
        except ValueError as error:
            print(error)
            erfolgsmeldung(error, 'error')
        except OSError as error:
            print("Fehler beim kopieren:", error)
            erfolgsmeldung(error, 'error')

        try:
            auswerfen(name_of_stick)
            
        except subprocess.CalledProcessError as error:
            print(f"Fehler {error.returncode} beim Auswerfen: {error.stdout}")
            error_lang = f"Fehler {error.returncode} beim Auswerfen: {error.stdout}"
            erfolgsmeldung(error_lang, 'error')


if __name__ == "__main__":   #das Teil hier unten ist mir - grade wegen des Beispiels in der Dokumentation im Zusammenhang mit dem Aufruf von Modulen ein
    main()
# großes Rätsel geworden.
Sirius3
User
Beiträge: 17754
Registriert: Sonntag 21. Oktober 2012, 17:20

Das sollte Dich eigentlich nicht verwirren, den ein beiden `main`-Funktionen hast Du Endlosschleifen.

Das warten_auf_usb-Stick mußt Du in einen Thread packen, und regelmäßig in der GUI abfragen, ob eine USB-Stick eingesteckt worden ist, und dann kannst Du dort auch das Kopieren erledigen.
theoS
User
Beiträge: 108
Registriert: Dienstag 5. November 2019, 21:44

Gute Idee. Funktioniert schon mal nach einer Seite hin.
Hast du noch so einen guten Tipp, wie ich dann die neue Konfiguration in die Oberfläche schaufle?
Das Laden der Daten auch in einen Thread der das von der anderen Seite überwacht?
Oder kann ich von dem einen Modul in dem anderen Modul das Mainwindow zerstören?
Sirius3
User
Beiträge: 17754
Registriert: Sonntag 21. Oktober 2012, 17:20

Eigentlich hast Du ja nur eine Oberfläche und eine Datenbank, und verschiedene Dinge, die im Hintergrund laufen. Erstens, laden von Daten und zweitens Drucken.
Ob man das jetzt auf verschiedene Module (vier Stück) aufteilt, oder alles in einer Datei stehen hat, ist erst einmal nebensächlich.
Mainwindow zerstören ist auch gar nicht nötig, sondern nur Aktualisieren.
theoS
User
Beiträge: 108
Registriert: Dienstag 5. November 2019, 21:44

Mainwindow zerstören ist auch gar nicht nötig, sondern nur Aktualisieren.
Ja, das habe ich jetzt eine ganze Weile probiert.
Mein Problem ist, dass ich das Laden der Daten nicht in die Klasse mit reingebracht hab. Die Funkion dafür ist außerhalb und benötigt die Connection.
Wenn ich jetzt versuche das MainWindow mit update() von einem anderen Modul aufzurufen geht ihm entweder "self" ab oder die Connection.
Blick da jetzt nicht mehr so wirklich durch, denn die Connection kriegt es ja in Main zugewiesen.
theoS
User
Beiträge: 108
Registriert: Dienstag 5. November 2019, 21:44

Nachtrag.
Es ist verhext. Jetzt kann ich von dem einen Modul wenn der Stick eingesteckt wird die Ladefunktion des anderen Moduls starten ohne die Connection übergeben zu müssen, aber das MainWindow aktualisieren? Das will mir nicht gelingen.
Heut geh ich erst mal ins Bett. Wird gescheiter sein.
Sirius3
User
Beiträge: 17754
Registriert: Sonntag 21. Oktober 2012, 17:20

Dann zeig doch nochmal komplett, was Du jetzt hast.
theoS
User
Beiträge: 108
Registriert: Dienstag 5. November 2019, 21:44

Hab jetzt noch mal versucht - ich finde einfach keine Möglichkeit das update() so unterzubringen dass es auch ausgeführt wird.
Also, ich hab jetzt die beiden Dateien und eine kleine Testdatei dazu gebaut, dass ich nicht immer den USB-Stick einstecken muss.
Das hat einen timer der nach ein paar Sekunden den Befehl in das andere Modul rüberschickt. Währenddessen aktualisiere ich, dass ich was seh, die DB manuell. Ist umständlich aber einfacher als die Sache mit dem Stick.
Wie auch immer, das muss ja so gesehen beides funktionieren.
Tut aber nichts. Und ich suche nebenbei immer wieder mal, finde zig Seiten, aber ich kann das nicht auf meine Knöpfe anwenden.
Bin für jeden Tipp dankbar.

Also, ich rufe mit dem Modul das Knopfdruckmodul auf: (könnte ich mir vermutlich sparen, weil ja der Thread im Knopfdruckmodul gestartet wird.)

Code: Alles auswählen

#!/usr/bin/env python3
# -*- coding: utf8 -*-

import knopfdruck as knd

def main():

        knd.main()
        

if __name__ == '__main__':
    main()
Dann kommt das Test-Modul:

Code: Alles auswählen

#!/usr/bin/env python3
# -*- coding: utf8 -*-

import knopfdruck as knd
import time

def teste_aufruf():
        time.sleep(10)
        print("10 sekunden geschlafen")
        
        knd.MainWindow.antwort_von_usb("komme vom Schläferfenster", "info")
        

        
def main():
        teste_aufruf()
        

if __name__ == '__main__':
    main()
Hier ist das Modul das dann letztlich das Update vom MainWindow bräuchte.

Code: Alles auswählen

#!/usr/bin/env python3
# -*- coding: utf8 -*-

import sqlite3
from sqlite3 import Error
from pathlib import Path
#import csv  #bleibt erst mal drin weil ggf. doch die Dateien gebraucht werden
import tkinter as tk
#from tkinter import messagebox
from functools import partial
from contextlib import closing
import time
#from datetime import datetime as DateTime  #ggf. noch gebraucht
import subprocess
import threading
import configusbdb as uconf
import teste as test

PFAD = Path.home() / ".DruckData"
BARCODE_DB_FILENAME = PFAD / "sqlite/db"
DATABASE = BARCODE_DB_FILENAME / "config8.db"
ORT = "PZ Dingsbums 14"
CHECKPOINT = "42"
SQL_UPDATE_AKTUELLE_NR = """UPDATE numbers
    SET letzte_nr = (letzte_nr + 1) % 10000 
    WHERE id = ?"""
SQL_SELECT_KNOPFDATEN = """SELECT knd.ISPT_Nr,
                           num.Letzte_Nr
                           FROM knopfdaten as knd,
                           numbers as num
                           WHERE knd.ID = ? 
	                   AND knd.ID = num.ID"""



def prepare_rows_for_print(count, row, first_line):
    
    text = "\n".join(
        [
            "^XA",
            "^FXUnicode:",
            "^CI28",
            "^FXBox oben:",
            "^FO690,20^GB0,1150,8,^FS",
            "^FXgroßeBox:",
            "^FO15,20^GB790,1150,8^FS",
            "^FXZwischenlinie",
            "^FO495,20^GB0,1150,8,^FS",
            "^FXZwischenlinie unten",
            "^FO125,20^GB0,1150,8,^FS",
            "^FO720,200^A0R,50,50^FB800,1,0,C^FD",
            first_line,
            "^FS^FO620,200^A0R,60,60^FB800,1,0,C^FD",
            str(row[1]),
            "^FS^FO500,200^A0R,95,95^FB800,1,0,C^FD",
            str(row[2]),
            "^FS^FO0,180^BY3",
            "^BCR,90,Y,N,N",
            "^FO385,370^BY4^FD",
            str(count),
            "^FS^FO260,200^A0R,70,70^FB800,,0,C^FD",
            str(row[3]),
            "^FS^FO120,200^A0R,60,60^FB800,2,0,C^FD",
            str(row[4]),
            "^FS^FO40,200^A0R,70,70^FB800,1,0,C^FD",
            str(row[5]),
            "^FS",
            "^XZ",
            ""
        ]
    )
    #PFAD.mkdir(exist_ok=True)   #ggf. noch gebraucht
    #string_zum_druck = PFAD / f"testT_{row[3]}_{row[1]}.zpl"   
    #(string_zum_druck).write_text(text, "utf-8")
    #hier bin ich mir nicht sicher, ob es nicht schlauer wäre
    #auf das speichern zu verzichten und die Datei gleich mit
    #subporcess an den Drucker zu schicken. Wenn das geht...
    return text
    
    

def lade_daten():
               
    try:
        conn = sqlite3.connect(f"{DATABASE}")        
    except Error as e:
        print(e)
        
    curs2 = conn.cursor()
    config_sql = "select ID, E_St, Zeile3, Zeile5, Zeile6, Zeile7 from knopfdaten;"
    curs2.execute(config_sql)
    configdaten = curs2.fetchall()
    print("daten geladen")    
    return  configdaten, conn

 
def zaehl_ausdrucke(connection, id):   
    with closing(connection.cursor()) as cursor:
        cursor.execute(SQL_UPDATE_AKTUELLE_NR, [id])
        cursor.execute(SQL_SELECT_KNOPFDATEN, [id])
        ispt_nr, letzte_nr = cursor.fetchone()
        barcode = f"{ispt_nr}{CHECKPOINT}{letzte_nr}"
        cursor.execute("update knopfdaten SET Barcode = ? where id = ?",  (barcode, id))
        print(barcode, letzte_nr)
        connection.commit()
    return barcode

def datei_drucken(text):   
    
    subprocess.run(["lp", "-"], input=text.encode('utf-8'), check=True)
    



class MainWindow(tk.Tk):
    def __init__(self, conn, configdaten):
        super().__init__()
        self.title("Auswahl der Label")
        self["background"] = "#f2c618"
        button_frame = tk.Frame(self, width=1400, height=600)
        button_frame.grid(row=0, column=0, padx=0, pady=0)
        
        for index, entry in enumerate(configdaten):
            row_index, column_index = divmod(index, 4)
            tk.Button(
                button_frame,
                text="{}\n{}\n{}".format(entry[1], entry[2], entry[3]),
                bg="#f2c618",
                width=22,
                height=11,
                command=partial(self.on_click, entry, conn),
                ).grid(row=row_index, column=column_index, padx=0, pady=0)
        self.bind('<ButtonPress-1>', self.start_druck)
        self.bind('<ButtonRelease-1>', self.stopp_druck)
        self.knopf_drueck_zeit = None
    def start_druck(self, event):
        self.knopf_drueck_zeit = time.monotonic()
        print("start_druck", self.knopf_drueck_zeit)

    def stopp_druck(self, event):
        knopf_loslass_zeit = time.monotonic()
        print("stopp_druck", knopf_loslass_zeit)
        if knopf_loslass_zeit - self.knopf_drueck_zeit >5:
            print("schalte jetzt aus")
            self.ExitApplication()
    
    def ExitApplication(self):
        MsgBox = tk.messagebox.askokcancel(
        title="Ausschalten",
        message="Soll der Rechner ausgeschaltet werden?")
        print(MsgBox)
        if MsgBox == True:
           self.destroy()
        else:
            tk.messagebox.showinfo(
                'Zurück',
                'Das Hauptfenster wird wieder gezeigt'
                )
        
    def fensterfrisch():
        print("hier war ich im fensterfrisch")
        lade_daten()
        #MainWindow.update()
        
    def antwort_von_usb(antwort, icon):
        print("vom anderen Teil")          
        tk.messagebox.showinfo(
                message=antwort,
                icon=icon)
        MainWindow.fensterfrisch()
        
        
    def on_click(self, row, conn):
        count = zaehl_ausdrucke(conn, row[0])
        print(count)
        string_zum_druck = prepare_rows_for_print(count, row, ORT)
        datei_drucken(string_zum_druck)


def main():
    
    BARCODE_DB_FILENAME.mkdir(parents=True, exist_ok=True)
    print("starte main")

    configdaten, conn = lade_daten()  #lädt die Daten aus der csv nach
    print(configdaten)
    
    root = MainWindow(conn, configdaten)
    faden = threading.Thread(target=uconf.main)
    faden.start()
    faden2 = threading.Thread(target=test.main)
    faden2.start()

    root.mainloop()

        
if __name__ == '__main__':
    configdaten, conn = lade_daten()  #lädt die Daten aus der csv nach
    main()
Das USB-Teil:

Code: Alles auswählen

#!/usr/bin/env python3


import subprocess
import time
from datetime import datetime as DateTime
from pathlib import Path
import pandas as pd
import pyudev
from sqlalchemy import create_engine
from sqlalchemy.exc import SQLAlchemyError
import tkinter as tk
from tkinter import messagebox
import knopfdruck as knd
import threading


PFAD = Path.home() / ".DruckData"
MEDIA_PFAD = Path("/media/earl/")
PFAD = Path.home() / ".DruckData"
DATABASE = PFAD / "sqlite/db/config8.db"
CONFIG_ON_STICK = Path("TB_Ausgabe_8iii.txt")
CSV_DATEI_TO_STICK = Path("gedruckte_nummern.csv")

SQL_OUT = """select distinct
             knopfdaten.ID,
             knopfdaten.E_St,
             knopfdaten.ISPT_Nr,
             numbers.Letzte_Nr
             from knopfdaten,
             numbers
             where
             numbers.ID = knopfdaten.ID"""

USED_COLS = [
        'ID',
        'E_St',
        'Zeile3',
        'Zeile5',
        'Zeile6',
        'Zeile7',
        'Zeilex',
        'ISPT_Nr',
        'HallenPos',
        'Aktuell_Nr',
        'Barcode'
        ]

def warte_auf_usb_stick(udev_context):
    monitor = pyudev.Monitor.from_netlink(udev_context)
    monitor.filter_by("block")
    for device in iter(monitor.poll, None):
        print(device.action)
        if "ID_FS_TYPE" in device and device.action == "add":
            name_of_stick = Path(device.get("ID_FS_LABEL"))
            print(device.action, name_of_stick)
            time.sleep(2)
            return name_of_stick

    raise AssertionError("unreachable code")


def anzahl_drucke_dokumentieren(connection, name_of_stick, dateiname_roh):
    dateiname_ziel = (
        MEDIA_PFAD
        / name_of_stick
        / dateiname_roh.with_name(
            f"{dateiname_roh.stem}_{DateTime.now():%Y-%m-%d_%H_%M}.csv"
        )
    )
    pd.read_sql(SQL_OUT, connection).to_csv(
        dateiname_ziel, sep=";", decimal=",", index=False
    )


def config_arbeitsdb(connection, name_of_stick):
    config_datei = MEDIA_PFAD / name_of_stick / CONFIG_ON_STICK
    #
    # TODO Auf die Spalten einschränken die tatsächlich benötigt werden.
    #
    datensaetze = pd.read_csv(        
        config_datei,
        usecols=USED_COLS,
        na_values="x",
        quotechar='"',
        sep=";",
        encoding="utf-8",
        decimal=",",
        dtype={"ID": int, "E_St": int, "ISPT_Nr": str, "Barcode": str},
    )
    if len(datensaetze) != 8:
        print("es waren keine 8 Datensätze")
        antwort = "es waren keine 8 Datensätze"
    else:
        datensaetze.to_sql(
            "knopfdaten", connection, if_exists="replace", index=False
        )
        sql_ident = "insert or ignore into numbers(ID) select ID from knopfdaten;"
        connection.execute(sql_ident)
        antwort = "Daten erfolgreich übertragen, \n Programm startet neu"

    return antwort


def auswerfen(name_of_stick):
    subprocess.run(
        ["umount", MEDIA_PFAD / name_of_stick],
        check=True,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
    )

def erfolgsmeldung(ergebnis, icon):
    knd.MainWindow.antwort_von_usb(ergebnis, icon)
    
##    window = tk.Tk()
##    window.eval('tk::PlaceWindow %s center' % window.winfo_toplevel())
##    window.withdraw
##    messagebox.showinfo(message=ergebnis, icon=icon)
##    window.deiconify()
##    window.destroy()
##    window.quit()  

def neuladen_config():        
        print("schicke zu knd")
        knd.MainWindow.fensterfrisch()


    
def main():
    udev_context = pyudev.Context()
    db_engine = create_engine(f"sqlite:///{DATABASE}", encoding="utf-8")
    while True:
        name_of_stick = warte_auf_usb_stick(udev_context)
        try:
            ergebnis = config_arbeitsdb(db_engine, name_of_stick)
            anzahl_drucke_dokumentieren(
                db_engine, name_of_stick, CSV_DATEI_TO_STICK
            )
            ####
            print(ergebnis)
            erfolgsmeldung(ergebnis, 'info')
            neuladen_config()
        except SQLAlchemyError as error:
            print(error)
            erfolgsmeldung(error, 'error')
        except ValueError as error:
            print(error)
            erfolgsmeldung(error, 'error')
        except OSError as error:
            print("Fehler beim kopieren:", error)
            erfolgsmeldung(error, 'error')

        try:
            auswerfen(name_of_stick)
            
        except subprocess.CalledProcessError as error:
            print(f"Fehler {error.returncode} beim Auswerfen: {error.stdout}")
            error_lang = f"Fehler {error.returncode} beim Auswerfen: {error.stdout}"
            erfolgsmeldung(error_lang, 'error')


if __name__ == "__main__":
    main()
Sirius3
User
Beiträge: 17754
Registriert: Sonntag 21. Oktober 2012, 17:20

Das erste Skript ist unnötig, da das ja nur das main vom dritten Skript aufruft. Das zweite Skript macht nichts sinnvolles.
In `lade_daten` ist die „Fehlerbehandlung” unsinnig, da im Fehlerfall einfach nur ein weiterer Fehler auftritt. Fehler sollte man nur behandeln, die man auch sinnvoll behandeln kann.

In Mainwindow.__init__ ist configdata falsch. Da Du ja auch während das Programm läuft, die Knopfbeschriftung ändern willst. Also nur Knöpfe erstellen und die Beschriftung in einer extra Methode, in der Du auch gleich die Datenbankabfrage unterbringen kannst.
`ExitApplication` ist falsch geschrieben.
thread sollte man nicht ins deutsche übersetzt. Der zweite Thread macht irgendwie nichts sinnvolles. Und der erste Thread muß ja irgendwie mit dem Fenster kommunizieren können, zum einen für Statusmeldungen, die man ja irgendwie auch im Fenster sehen will, und nicht auf der Konsole, zum anderen, um bei neuen Daten die Ansicht zu aktualisieren. Dafür braucht es zum einen eine Queue in die diese ganzen Daten gefüttert werden, zum anderen im MainWindow eine regelmäßige Abfrage, ob etwas in der Queue steckt.
Die Aufrufe von knd.Mainwindow sind allesamt falsch, weil Du da direkt Funktionen der Klassendefinition aufrufst, zum anderen, weil aus einem Thread nicht direkt mit der GUI gesprochen werden darf, das muß, wie schon geschrieben, über eine Queue erfolgen. Zum Dritten, benutze keine Kryptischen Abkürzungen.
theoS
User
Beiträge: 108
Registriert: Dienstag 5. November 2019, 21:44

Und der erste Thread muß ja irgendwie mit dem Fenster kommunizieren können, zum einen für Statusmeldungen, die man ja irgendwie auch im Fenster sehen will, und nicht auf der Konsole, zum anderen, um bei neuen Daten die Ansicht zu aktualisieren. Dafür braucht es zum einen eine Queue in die diese ganzen Daten gefüttert werden, zum anderen im MainWindow eine regelmäßige Abfrage, ob etwas in der Queue steckt.
Drum funktioniert das also nicht.
Wenn ich das recht verstehe, dann muss ich eine Queue ins MainWindow und in das USB-Modul einbauen.

Würde vermutlich auch nötig sein, wenn ich den Code in dem anderen Modul mit aufnehme?

Das Thema Queue scheint nicht ganz ohne zu sein...
Danke.
theoS
User
Beiträge: 108
Registriert: Dienstag 5. November 2019, 21:44

configdata falsch
noch eine Frage die ich jetzt nicht ganz verstehe. configdata als Ausdruck falsch? das ist ja ein Konstrukt, das von lade_daten() kommt und wird dem Mainwindow übergeben. Irgendwie muss ich das doch da reinbringen, wenn das von den Knöpfen gebraucht wird. Geht das dann mit der Queue?
theoS
User
Beiträge: 108
Registriert: Dienstag 5. November 2019, 21:44

Es geht nicht weiter, ich versteh das Ganze mit dem Thread nicht so recht.
Es ist aber eigentlich, so denke ich naiv, einfacher, wenn ich die Daten dann mit einem Knopfdruck neu laden lasse.
Dazu habe ich mir einen "Restarter" zusammengegooglet, der dann auf Knopfdruck die Oberfläche neu startet.
Das klappt, dann werden auch die Daten neu gezogen.
Das würde denke ich reichen. Dumm ist nur, wenn ich Threads verwende, wo dann in dem Teil der Thread gestartet wird und ich den Ausschalte, was ja schon funktioniert, müsste ich doch auch den anderen Thread killen.
Das ist jetzt so wie ich es habe nicht sauber, aber momentan stehe ich auf 5 Schläuchen.
Wenn mir jemand runter helfen könnte, wäre ich sehr dankbar.

Code: Alles auswählen

#!/usr/bin/env python3
# -*- coding: utf8 -*-

import sqlite3
from sqlite3 import Error
from pathlib import Path
import csv
import tkinter as tk
from tkinter import messagebox
from functools import partial
from contextlib import closing
import time
from datetime import datetime as DateTime
import subprocess
import sys
import os
import usbconfig
import concurrent.futures
import queue
import threading


PFAD = Path.home() / ".DruckData"
BARCODE_DB_FILENAME = PFAD / "sqlite/db"
DATABASE = BARCODE_DB_FILENAME / "config8.db"
ORT = "PZ Dingsbums 14"
CHECKPOINT = "42"
SQL_UPDATE_AKTUELLE_NR = """UPDATE numbers
    SET letzte_nr = (letzte_nr + 1) % 10000 
    WHERE id = ?"""
SQL_SELECT_KNOPFDATEN = """SELECT knd.ISPT_Nr,
                           num.Letzte_Nr
                           FROM knopfdaten as knd,
                           numbers as num
                           WHERE knd.ID = ? 
	                   AND knd.ID = num.ID"""



def prepare_rows_for_print(count, row, first_line):
    
    text = "\n".join(
        [
            "^XA",
            "^FXUnicode:",
            "^CI28",
            "^FXBox oben:",
            "^FO690,20^GB0,1150,8,^FS",
            "^FXgroßeBox:",
            "^FO15,20^GB790,1150,8^FS",
            "^FXZwischenlinie",
            "^FO495,20^GB0,1150,8,^FS",
            "^FXZwischenlinie unten",
            "^FO125,20^GB0,1150,8,^FS",
            "^FO720,200^A0R,50,50^FB800,1,0,C^FD",
            first_line,
            "^FS^FO620,200^A0R,60,60^FB800,1,0,C^FD",
            str(row[1]),
            "^FS^FO500,200^A0R,95,95^FB800,1,0,C^FD",
            str(row[2]),
            "^FS^FO0,180^BY3",
            "^BCR,90,Y,N,N",
            "^FO385,370^BY4^FD",
            str(count),
            "^FS^FO260,200^A0R,70,70^FB800,,0,C^FD",
            str(row[3]),
            "^FS^FO120,200^A0R,60,60^FB800,2,0,C^FD",
            str(row[4]),
            "^FS^FO40,200^A0R,70,70^FB800,1,0,C^FD",
            str(row[5]),
            "^FS",
            "^XZ",
            ""
        ]
    )
    #PFAD.mkdir(exist_ok=True)
    #string_zum_druck = PFAD / f"testT_{row[3]}_{row[1]}.zpl"   
    #(string_zum_druck).write_text(text, "utf-8")
    #hier bin ich mir nicht sicher, ob es nicht schlauer wäre
    #auf das speichern zu verzichten und die Datei gleich mit
    #subporcess an den Drucker zu schicken. Wenn das geht...
    return text


def restart_program():   #das ist jetzt nicht schön, vermute ich, aber wirkungsvoll

    python = sys.executable
    os.execl(python, python, * sys.argv)    

    
    

def lade_daten():
               
    try:
        conn = sqlite3.connect(f"{DATABASE}")        
    except Error as e:
        print(e)
        
    curs2 = conn.cursor()
    config_sql = "select ID, E_St, Zeile3, Zeile5, Zeile6, Zeile7 from knopfdaten;"
    curs2.execute(config_sql)
    configdaten = curs2.fetchall()
    
    return  configdaten, conn

 
def zaehl_ausdrucke(connection, id):   
    with closing(connection.cursor()) as cursor:
        cursor.execute(SQL_UPDATE_AKTUELLE_NR, [id])
        cursor.execute(SQL_SELECT_KNOPFDATEN, [id])
        ispt_nr, letzte_nr = cursor.fetchone()
        barcode = f"{ispt_nr}{CHECKPOINT}{letzte_nr}"
        cursor.execute("update knopfdaten SET Barcode = ? where id = ?",  (barcode, id))
        print(barcode, letzte_nr)
        connection.commit()
    return barcode

def datei_drucken(text):   
    
    subprocess.run(["lp", "-"], input=text.encode('utf-8'), check=True)
    



class MainWindow(tk.Tk):
    def __init__(self, conn, configdaten):
        super().__init__()
        self.title("Auswahl der Label")
        self["background"] = "#f2c618"
        button_frame = tk.Frame(self, width=1400, height=600)
        button_frame.grid(row=0, column=0, padx=0, pady=0)
        
        for index, entry in enumerate(configdaten):
            row_index, column_index = divmod(index, 4)
            tk.Button(
                button_frame,
                text="{}\n{}\n{}".format(entry[1], entry[2], entry[3]),
                bg="#f2c618",
                width=22,
                height=11,
                command=partial(self.on_click, entry, conn),
                ).grid(row=row_index, column=column_index, padx=0, pady=0)
        self.bind('<ButtonPress-1>', self.start_druck)
        self.bind('<ButtonRelease-1>', self.stopp_druck)
        self.knopf_drueck_zeit = None
    def start_druck(self, event):
        self.knopf_drueck_zeit = time.monotonic()
        print("start_druck", self.knopf_drueck_zeit)

    def stopp_druck(self, event):
        knopf_loslass_zeit = time.monotonic()
        print("stopp_druck", knopf_loslass_zeit)
        if knopf_loslass_zeit - self.knopf_drueck_zeit >5:
            print("schalte jetzt aus")
            self.ExitApplication()
    
    def ExitApplication(self):
        MsgBox = tk.messagebox.askyesnocancel(
        title="Ausschalten/Neustarten",
        message="Soll der Rechner ausgeschaltet werden?\n Mit »Ja» ausschalten, Mit »Abbruch« Neustarten, mit »Nein« zurück zum Programm")
        print(MsgBox)
        if MsgBox == True:
            
        
            self.destroy()  #das geht
            faden.join()  #das nicht, behauptet das Terminal in dem das läuft
            sys.exit(0)  #das auch
            
        elif MsgBox == None:
            restart_program()  #das klappt ganz gut

           
        else:
            tk.messagebox.showinfo(
                'Zurück',
                'Das Hauptfenster wird wieder gezeigt'
                )
        

    def on_click(self, row, conn):
        count = zaehl_ausdrucke(conn, row[0])
        print(count)
        string_zum_druck = prepare_rows_for_print(count, row, ORT)
        datei_drucken(string_zum_druck)


def main():
    
    BARCODE_DB_FILENAME.mkdir(parents=True, exist_ok=True)
    print(f"{BARCODE_DB_FILENAME}/config8.db")

    root.mainloop()

        
if __name__ == '__main__':
    BARCODE_DB_FILENAME.mkdir(parents=True, exist_ok=True)
    print(f"{BARCODE_DB_FILENAME}/config8.db")

    configdaten, conn = lade_daten()  #lädt die Daten aus der csv nach
    print(configdaten)
    
    root = MainWindow(conn, configdaten)
    faden = threading.Thread(target=usbconfig.main)
    faden.start()
 
    
    #holer.usb_tausch
    root.mainloop()
    
    main(pipeline, event)
Sirius3
User
Beiträge: 17754
Registriert: Sonntag 21. Oktober 2012, 17:20

os.execl zu benutzen ist nicht schön und auch völlig unnötig. Der einfache Weg, einfach die Knöpfe zu aktualisieren, ist wohl zu offensichtlich.
Dass `lade_daten` auch eine Connection zur Datenbank erstellt, ist überraschend, weil man das von einer Funktion diesen Namens nicht erwartet. Und wie schon vorher geschrieben, ist die Fehlerbehandlung in dieser Funktion auch fehlerhaft.
Wenn man sich die Knöpfe in einer Liste merken würde, und die configdaten nicht gerade direkt in __init__ übergeben, dann könnte man auch einfach die Beschriftung ändern. Ist aber wohl zu einfach gedacht. messagebox wird als eigentständiges Modul importiert, das über tk.messagebox anzusprechen ist ungewöhnlich.
Ist es so schwer, sich an eine einfache Schreibregel zu halten? `ExitApplication` ist falsch geschrieben. MsgBox auch, aber dazu kommt, dass das ja das Ergebnis der MessageBox ist und nicht die Messagebox selbst.
`main` ist irgendwie überflüssig, weil das selbe unter dem if steht. Obwohl, das was unter dem if steht sollte eigentlich in main stehen. Und zum Schluß wird noch ein main mit falschen Parametern aufgerufen.
In `on_click` wird dann noch auf das globale ´conn` zugegriffen; fällt Dir nicht auf die Füße, weil ja keine richtige main-Funktion benutzt wird. Ist trotzdem unsauber.

Code: Alles auswählen

#!/usr/bin/env python3
# -*- coding: utf8 -*-
import csv
import queue
import threading
import sqlite3
import time
import subprocess
import tkinter as tk
from collections import namedtuple
from tkinter import messagebox
from functools import partial
from contextlib import closing
from pathlib import Path
from datetime import datetime as DateTime
import pandas as pd
import usbconfig
import pyudev


PFAD = Path.home() / ".DruckData"
BARCODE_DB_FILENAME = PFAD / "sqlite/db"
DATABASE = BARCODE_DB_FILENAME / "config8.db"

MEDIA_PFAD = Path("/media/earl/")
CONFIG_ON_STICK = Path("TB_Ausgabe_8iii.txt")
CSV_DATEI_TO_STICK = Path("gedruckte_nummern.csv")

ORT = "PZ Dingsbums 14"
CHECKPOINT = "42"
SQL_UPDATE_AKTUELLE_NR = """UPDATE numbers
    SET letzte_nr = (letzte_nr + 1) % 10000 
    WHERE id = ?"""
SQL_SELECT_KNOPFDATEN = """SELECT knd.ISPT_Nr,
                           num.Letzte_Nr
                           FROM knopfdaten as knd,
                           numbers as num
                           WHERE knd.ID = ? 
                       AND knd.ID = num.ID"""

SQL_OUT = """select distinct
             knopfdaten.ID,
             knopfdaten.E_St,
             knopfdaten.ISPT_Nr,
             numbers.Letzte_Nr
             from knopfdaten,
             numbers
             where
             numbers.ID = knopfdaten.ID"""

USED_COLS = [
        'ID',
        'E_St',
        'Zeile3',
        'Zeile5',
        'Zeile6',
        'Zeile7',
        'Zeilex',
        'ISPT_Nr',
        'HallenPos',
        'Aktuell_Nr',
        'Barcode'
        ]

def warte_auf_usb_stick(udev_context):
    monitor = pyudev.Monitor.from_netlink(udev_context)
    monitor.filter_by("block")
    for device in iter(monitor.poll, None):
        print(device.action)
        if "ID_FS_TYPE" in device and device.action == "add":
            name_of_stick = Path(device.get("ID_FS_LABEL"))
            print(device.action, name_of_stick)
            time.sleep(2)
            return name_of_stick
    raise AssertionError("unreachable code")  # das soll auch nie in Aktion sein, wenn es richtig funktioniert

def usb_stick_loop(queue):
    udev_context = pyudev.Context()
    while True:
        name_of_stick = warte_auf_usb_stick(udev_context)
        queue.put(name_of_stick)

def anzahl_drucke_dokumentieren(connection, name_of_stick, dateiname_roh):
    dateiname_ziel = (
        MEDIA_PFAD
        / name_of_stick
        / dateiname_roh.with_name(
            f"{dateiname_roh.stem}_{DateTime.now():%Y-%m-%d_%H_%M}.csv"
        )
    )
    pd.read_sql(SQL_OUT, connection).to_csv(
        dateiname_ziel, sep=";", decimal=",", index=False
    )

def config_arbeitsdb(connection, name_of_stick):
    config_datei = MEDIA_PFAD / name_of_stick / CONFIG_ON_STICK
    datensaetze = pd.read_csv(        
        config_datei,
        usecols=USED_COLS,
        na_values="x",
        quotechar='"',
        sep=";",
        encoding="utf-8",
        decimal=",",
        dtype={"ID": int, "E_St": int, "ISPT_Nr": str, "Barcode": str},
    )
    if len(datensaetze) != 8:
        raise RuntimeError("es waren keine 8 Datensätze")
    datensaetze.to_sql(
        "knopfdaten", connection, if_exists="replace", index=False
    )
    with closing(connection.cursor) as cursor:
        cursor.execute("insert or ignore into numbers(ID) select ID from knopfdaten")
    connection.commit()
    return "Daten erfolgreich übertragen"

def auswerfen(name_of_stick):
    subprocess.run(
        ["umount", MEDIA_PFAD / name_of_stick],
        check=True,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
    )

def copy_files(connection, name_of_stick):
    try:
        ergebnis = config_arbeitsdb(db_engine, name_of_stick)
        messagebox.showinfo(message=ergebnis, icon='info')
        anzahl_drucke_dokumentieren(
            db_engine, name_of_stick, CSV_DATEI_TO_STICK
        )
    except Exception as error:
        messagebox.showinfo(message=str(error), icon='error')
    try:
        auswerfen(name_of_stick)
    except subprocess.CalledProcessError as error:
        error_text = f"Fehler {error.returncode} beim Auswerfen: {error.stdout}"
        messagebox.showinfo(message=str(error_text), icon='error')


# und bitte bessere Namen für die Spalten!
KnopfDaten = namedtuple("KnopfDaten", "id, e_st, zeile3, zeile5, zeile6, zeile7")

def lade_daten(connection):
    with closing(connection.cursor()) as cursor:
        cursor.execute(f"select {','.join(KnopfDaten._fields)} from knopfdaten")
        return [KnopfDaten(*row) for row in cursor]

def prepare_rows_for_print(count, row, first_line):
    return "\n".join([
        "^XA",
        "^FXUnicode:",
        "^CI28",
        "^FXBox oben:",
        "^FO690,20^GB0,1150,8,^FS",
        "^FXgroßeBox:",
        "^FO15,20^GB790,1150,8^FS",
        "^FXZwischenlinie",
        "^FO495,20^GB0,1150,8,^FS",
        "^FXZwischenlinie unten",
        "^FO125,20^GB0,1150,8,^FS",
        "^FO720,200^A0R,50,50^FB800,1,0,C^FD",
        first_line,
        "^FS^FO620,200^A0R,60,60^FB800,1,0,C^FD",
        str(row.e_st),
        "^FS^FO500,200^A0R,95,95^FB800,1,0,C^FD",
        str(row.zeile3),
        "^FS^FO0,180^BY3",
        "^BCR,90,Y,N,N",
        "^FO385,370^BY4^FD",
        str(count),
        "^FS^FO260,200^A0R,70,70^FB800,,0,C^FD",
        str(row.zeile5),
        "^FS^FO120,200^A0R,60,60^FB800,2,0,C^FD",
        str(row.zeile6),
        "^FS^FO40,200^A0R,70,70^FB800,1,0,C^FD",
        str(row.zeile7),
        "^FS",
        "^XZ",
        ""
    ])

def datei_drucken(text):    
    subprocess.run(["lp", "-"], input=text.encode('utf-8'), check=True)

def zaehl_ausdrucke(connection, id):   
    with closing(connection.cursor()) as cursor:
        cursor.execute(SQL_UPDATE_AKTUELLE_NR, [id])
        cursor.execute(SQL_SELECT_KNOPFDATEN, [id])
        ispt_nr, letzte_nr = cursor.fetchone()
        barcode = f"{ispt_nr}{CHECKPOINT}{letzte_nr}"
        cursor.execute("update knopfdaten SET Barcode = ? where id = ?",  (barcode, id))
        print(barcode, letzte_nr)
        connection.commit()
    return barcode


class MainWindow(tk.Tk):
    def __init__(self):
        super().__init__()
        self.title("Auswahl der Label")
        self["background"] = "#f2c618"
        self.connection = sqlite3.connect(f"{DATABASE}")
        button_frame = tk.Frame(self, width=1400, height=600)
        button_frame.grid(row=0, column=0, padx=0, pady=0)
        self.buttons = []
        for index in range(8):
            row_index, column_index = divmod(index, 4)
            button = tk.Button(
                button_frame,
                bg="#f2c618",
                width=22,
                height=11,
                command=partial(self.on_click, index),
                )
            button.grid(row=row_index, column=column_index, padx=0, pady=0)
            self.buttons.append(button)
        self.load_config()
        self.bind('<ButtonPress-1>', self.start_druck)
        self.bind('<ButtonRelease-1>', self.stopp_druck)
        self.knopf_drueck_zeit = None
        self.queue = Queue()
        threading.Thread(target=usb_stick_loop, args=(queue,), daemon=True)
        self.after(500, self.queue_loop)

    def load_config(self):
        self.config = lade_daten(self.connection)
        for button, entry in zip(self.buttons, self.config):
            button['text'] = f"{entry.e_st}\n{entry.zeile3}\n{entry.zeile5}"

    def start_druck(self, event):
        self.knopf_drueck_zeit = time.monotonic()
        print("start_druck", self.knopf_drueck_zeit)

    def stopp_druck(self, event):
        knopf_loslass_zeit = time.monotonic()
        print("stopp_druck", knopf_loslass_zeit)
        if knopf_loslass_zeit - self.knopf_drueck_zeit > 5:
            print("schalte jetzt aus")
            self.exit_application()
    
    def exit_application(self):
        result = tk.messagebox.askyesno(
            title="Ausschalten/Neustarten",
            message="Soll der Rechner ausgeschaltet werden?\n Mit »Ja» ausschalten, mit »Nein« zurück zum Programm"
        )
        if result:
            self.destroy()
        else:
            tk.messagebox.showinfo(
                'Zurück',
                'Das Hauptfenster wird wieder gezeigt'
            )
        
    def on_click(self, index):
        entry = self.config[index]
        count = zaehl_ausdrucke(self.connection, entry.id)
        print(count)
        string_zum_druck = prepare_rows_for_print(count, entry, ORT)
        datei_drucken(string_zum_druck)

    def queue_loop(self):
        if not self.queue.empty():
            name_of_stick = self.queue.get()
            copy_files(self.connection, name_of_stick)
            self.load_config()
        self.after(500, self.queue_loop)

def main():
    BARCODE_DB_FILENAME.mkdir(parents=True, exist_ok=True)
    root = MainWindow()
    root.mainloop()
        
if __name__ == '__main__':
    main()
Antworten