Labeldruck und was draus folgt

Du hast eine Idee für ein Projekt?
Sirius3
User
Beiträge: 17710
Registriert: Sonntag 21. Oktober 2012, 17:20

Du bestätigst damit nur das Problem, dass ich von Anfang an gesehen habe, zwei unabhängige Datenbanken synchron zu halten:
Von access wird ein Dings mit einer ispt-Nr und der Anzahl 5 auf den Raspi geladen und dort drei mal ein Barcode gedruckt (xxx6, xxx7 und xxx8). Der neue USB-Stick kommt, die Anzahl 8 wird geschrieben, aber da Access noch nicht weiss, dass gedruckt wurde, wird wieder 5 in Deine interne Datenbank geladen und munter nochmal xxx6 und xxx7 gedruckt.

Die Berechnung des Barcodes ist falsch. In deinem SQL-Ausdruck ist die +1 nicht geklammert. Außerdem wächst die Stellenanzahl des Barcodes mit der von Anzahl.
theoS
User
Beiträge: 108
Registriert: Dienstag 5. November 2019, 21:44

Du bestätigst damit nur das Problem, dass ich von Anfang an gesehen habe, zwei unabhängige Datenbanken synchron zu halten:
Das war von Anfang an berechtigt zu fragen.
Wahrscheinlich wird sich dann der Barcode als Zierde herausstellen, weil ihn so keiner richtig auswerten kann.
Dazu kommt, dass Labels auch von Access selbst gedruckt werden, wenn viele auf einmal gebraucht werden. Das wird sicher spaßig, das auszuwerten. Aber nicht mein Problem.
Ich mach das Ding und der Rest ist deren Problem.
Die Berechnung des Barcodes ist falsch. In deinem SQL-Ausdruck ist die +1 nicht geklammert. Außerdem wächst die Stellenanzahl des Barcodes mit der von Anzahl.
Danke. Hatte die Klammer schon mal in einer vorigen Version drin, warum ich dir rausgemacht hab?
Weiß es nicht.
Kann mit deiner Version ja nicht passieren.
Muss den string halt dann noch einfach hochschreiben auf die DB.
Danke dafür.
Sirius3
User
Beiträge: 17710
Registriert: Sonntag 21. Oktober 2012, 17:20

theoS hat geschrieben: Freitag 10. April 2020, 11:00 Aber nicht mein Problem.
Ich mach das Ding und der Rest ist deren Problem.
Dann bin ich hier raus.
theoS
User
Beiträge: 108
Registriert: Dienstag 5. November 2019, 21:44

Naja, dass denen dann einfällt, dass das schön wäre wenn das synchronisiert läuft, dann bring ich denen auch die Netzlösung, die aber aus genannten Gründen nicht geht.
Wenn sie das dann auch über den Stick austauschen wollen, müssen sie erst mal in ihrer Datenbank aufräumen.
Solange das nicht ist, mache ich das so. Vielleicht setzte ich noch eine Konstante in die Mitte vom Barcode dass man sehen kann, welches System den erzeugt hat. Aber die letzte Nummer fängt dann bei einer Änderung von 1 an.
Im Hinterkopf kann ich ja das was auch schon mal vorgeschlagen wurde, die Daten nicht komplett auszutauschen sondern fortzuschreiben behalten. Aber mein Urlaub ist am Montag wieder rum, dann hab ich wieder weniger Zeit, also schau ich, dass das läuft was laufen muss.
Dann sollen die testen und entscheiden was sie noch haben wollen.
Danke für die bisherige Unterstützung.
theoS
User
Beiträge: 108
Registriert: Dienstag 5. November 2019, 21:44

Also, jetzt habe ich zwei Sachen gemacht.
Die eine, ich habe in meiner DB ein zweite Tabelle drin, in der das Hochzählen stattfindet und deren Wert dann (sofern ich jetzt nicht was übersehen hab) in der CSV gespeichert wird und deren ID's von der Konfigdatei übernommen werden also so, dass sie in Ruhe gelassen werden wenn sie schon da sind. Hier beginnt dann der Startwert beim ersten Mal bei 0, wenn dieser Datensatz dann noch mal kommt, wird dort weitergezählt.
Dann habe ich zumindest das Problem mal soweit von der Seite des Raspis gebannt, dass mir da die Zahlen überschrieben werden.
Dass man erkennt, wer gedruckt hat, habe ich im Barcode eine Nummer zwischenrein gemacht.

Code: Alles auswählen

#!/usr/bin/env python3
import subprocess
import time
from datetime import datetime as DateTime
from pathlib import Path

import pandas as pd
import pyudev
from sqlalchemy import create_engine
from sqlalchemy.exc import SQLAlchemyError

PFAD = Path.home() / ".DruckData"
MEDIA_PFAD = Path("/media/earl/")
PFAD = Path.home() / ".DruckData"
DATABASE = PFAD / "sqlite/db/config8.db"
CONFIG_ON_STICK = Path("TB_Ausgabe_8iii.txt")
CSV_DATEI_TO_STICK = Path("gedruckte_nummern.csv")
#SQL_OUT = "SELECT ID, E_St, ISPT_Nr, Aktuell_Nr, Barcode FROM knopfdaten"
SQL_OUT = """select distinct
             knopfdaten.ID,
             knopfdaten.E_St,
             knopfdaten.ISPT_Nr,
             numbers.Letzte_Nr
             from knopfdaten,
             numbers
             where
             numbers.ID = knopfdaten.ID"""

USED_COLS = [
        'ID',
        'E_St',
        'Zeile3',
        'Zeile5',
        'Zeile6',
        'Zeile7',
        'Zeilex',
        'ISPT_Nr',
        'HallenPos',
        'Aktuell_Nr',
        'Barcode'
        ]

def warte_auf_usb_stick(udev_context):
    monitor = pyudev.Monitor.from_netlink(udev_context)
    monitor.filter_by("block")
    for device in iter(monitor.poll, None):
        print(device.action)
        if "ID_FS_TYPE" in device and device.action == "add":
            name_of_stick = Path(device.get("ID_FS_LABEL"))
            print(device.action, name_of_stick)
            time.sleep(2)
            return name_of_stick

    raise AssertionError("unreachable code")


def anzahl_drucke_dokumentieren(connection, name_of_stick, dateiname_roh):
    dateiname_ziel = (
        MEDIA_PFAD
        / name_of_stick
        / dateiname_roh.with_name(
            f"{dateiname_roh.stem}_{DateTime.now():%Y-%m-%d_%H_%M}.csv"
        )
    )
    pd.read_sql(SQL_OUT, connection).to_csv(
        dateiname_ziel, sep=";", decimal=",", index=False
    )


def config_arbeitsdb(connection, name_of_stick):
    config_datei = MEDIA_PFAD / name_of_stick / CONFIG_ON_STICK
    #
    # TODO Auf die Spalten einschränken die tatsächlich benötigt werden.
    #
    datensaetze = pd.read_csv(        
        config_datei,
        usecols=USED_COLS,
        na_values="x",
        quotechar='"',
        sep=";",
        encoding="utf-8",
        decimal=",",
        dtype={"ID": int, "E_St": int, "ISPT_Nr": str, "Barcode": str},
    )
    if len(datensaetze) != 8:
        print("es waren keine 8 Datensätze")
    else:
        datensaetze.to_sql(
            "knopfdaten", connection, if_exists="replace", index=False
        )
        sql_ident = "insert or ignore into numbers(ID) select ID from knopfdaten;"
        connection.execute(sql_ident)



def auswerfen(name_of_stick):
    subprocess.run(
        ["umount", MEDIA_PFAD / name_of_stick],
        check=True,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
    )


def main():
    udev_context = pyudev.Context()
    db_engine = create_engine(f"sqlite:///{DATABASE}", encoding="utf-8")
    while True:
        name_of_stick = warte_auf_usb_stick(udev_context)
        try:
            config_arbeitsdb(db_engine, name_of_stick)
            anzahl_drucke_dokumentieren(
                db_engine, name_of_stick, CSV_DATEI_TO_STICK
            )
        except SQLAlchemyError as error:
            print(error)
        except ValueError as error:
            print(error)
        except OSError as error:
            print("Fehler beim kopieren:", error)

        try:
            auswerfen(name_of_stick)
        except subprocess.CalledProcessError as error:
            print(f"Fehler {error.returncode} beim Auswerfen: {error.stdout}")


if __name__ == "__main__":
    main()
Und im Teil mit den Knöpfen dann so:

Code: Alles auswählen

#!/usr/bin/env python3
# -*- coding: utf8 -*-

import sqlite3
from sqlite3 import Error
from pathlib import Path
import csv
import tkinter as tk
from functools import partial
from contextlib import closing


PFAD = Path.home() / ".DruckData"
BARCODE_DB_FILENAME = PFAD / "sqlite/db"
DATABASE = BARCODE_DB_FILENAME / "config8.db"
ORT = "PZ Dingsbums 14"
CHECKPOINT = "42"
SQL_UPDATE_AKTUELLE_NR = """UPDATE numbers
    SET letzte_nr = (letzte_nr + 1) % 10000 
    WHERE id = ?"""
SQL_SELECT_KNOPFDATEN = """SELECT knd.ISPT_Nr,
                           num.Letzte_Nr
                           FROM knopfdaten as knd,
                           numbers as num
                           WHERE knd.ID = ? 
	                   AND knd.ID = num.ID"""



def save_row_as_file(count, row, first_line):
    
    text = "\n".join(
        [
            "^XA",
            "^FO15,90^GB780,0,8,^FS",
            "^FO15,250^GB780,0,8,^FS",
            "^FO15,700^GB780,0,8,^FS",
            "^FO0,0^GB600,200,2",
            "^FO15,20^GB780,785,4^FS",
            "^FO0,40^A0,50,50^FB800,1,0,C^FD",
            first_line,
            "^FS^FO0,110^A0,60,60^FB800,1,0,C^FD",
            str(row[1]),
            "^FS^FO0,190^A0,70,70^FB800,1,0,C^FD",
            str(row[2]),
            "^FS^FO0,80^BY3",
            "^BCN,170,Y,N,N",
            "^FO165,270^BY4^FD",
            str(count),
            "^FS^FO0,500^A0,60,50^FB800,,0,C^FD",
            str(row[3]),
            "^FS^FO0,580^A0,60,50^FB800,,0,C^FD",
            str(row[4]),
            "^FS^FO0,730^A0,60,60^FB800,1,0,C^FD",
            str(row[5]),
            "^FS",
            "^XZ",
            "",
        ]
    )
    PFAD.mkdir(exist_ok=True)
    (PFAD / f"testT_{row[3]}_{row[1]}.zpl").write_text(text, "utf-8")


def lade_daten(conn):
    curs2 = conn.cursor()
    config_sql = "select ID, E_St, Zeile3, Zeile5, Zeile6, Zeile7 from knopfdaten;"
    curs2.execute(config_sql)
    configdaten = curs2.fetchall()
    
    return  configdaten

 
def zaehl_ausdrucke(connection, id):   
    with closing(connection.cursor()) as cursor:
        cursor.execute(SQL_UPDATE_AKTUELLE_NR, [id])
        cursor.execute(SQL_SELECT_KNOPFDATEN, [id])
        ispt_nr, letzte_nr = cursor.fetchone()
        barcode = f"{ispt_nr}{CHECKPOINT}{letzte_nr}"
        cursor.execute("update knopfdaten SET Barcode = ? where id = ?",  (barcode, id))
        print(barcode, letzte_nr)
        connection.commit()
    return barcode

def on_click(row, conn):
    count = zaehl_ausdrucke(conn, row[0])
    print(count)
    save_row_as_file(count, row, ORT)    

def main():
    
    BARCODE_DB_FILENAME.mkdir(parents=True, exist_ok=True)
   
            
    try:
        conn = sqlite3.connect(f"{DATABASE}")        
    except Error as e:
        print(e)
    configdaten = lade_daten(conn)  #lädt die Daten aus der csv nach

    root = tk.Tk()
    root.title("Auswahl der Label")
    root.config(background="#f2c618")
    button_frame = tk.Frame(root, width=1200, height=400)
    button_frame.grid(row=0, column=0, padx=10, pady=3)
    
    for index, entry in enumerate(configdaten):
        row_index, column_index = divmod(index, 4)
        tk.Button(
            button_frame,
            text="{}\n{}".format(entry[1], entry[2]),
            bg="#f2c618",
            width=15,
            height=10,
            command=partial(on_click, entry, conn),
            ).grid(row=row_index, column=column_index, padx=0, pady=0)

    root.mainloop()

        
if __name__ == '__main__':

    main()

ToDo ist das Ganze zusammenzubringen und vor allem ein Fehlermeldefenster und eine Möglichkeit den Raspi auszuschalten ohne einfach den Stecker zu ziehen.
Der hat ja nur einen Touchscreen.
theoS
User
Beiträge: 108
Registriert: Dienstag 5. November 2019, 21:44

With a little help...
Hab ich jetzt herausgefunden, wie ich den Rechner runterfahren wenn ich einen Knopf länger drücke und auch wie ich drucke mit subprocess()
Was ich jetzt nicht hab, ist die Anbindung dieses Teils an sqlalchemy. Wobei ich das bisher "nur" für das USB-Teil brauche.
Was mir vermutlich hier noch abgeht ist die Fehlerbehandlung. Aber so richtig weiß ich nicht, was das Progg eigentlich tun soll, wenn der Printer nicht druckt. Das schiebt den Befehl ja bloß durch ans System.
Zudem habe ich mir überlegt, ob ich nicht gleich einfach den rohen Text an lp übergeben kann. Der Labeldrucker macht aus dem Kauderwelsch schöne Label wenns von der Datei kommt, warum nicht auch vom Text.
Nur da hab ich vermutlich einfach zu wenig Ahnung...
Hier mal der Code.
Vielleicht ist ja was dabei was halbwegs passt.

Code: Alles auswählen

#!/usr/bin/env python3
# -*- coding: utf8 -*-

import sqlite3
from sqlite3 import Error
from pathlib import Path
import csv
import tkinter as tk
from tkinter import messagebox
from functools import partial
from contextlib import closing
import time
from datetime import datetime as DateTime
import subprocess

PFAD = Path.home() / ".DruckData"
BARCODE_DB_FILENAME = PFAD / "sqlite/db"
DATABASE = BARCODE_DB_FILENAME / "config8.db"
ORT = "PZ Dingsbums 14"
CHECKPOINT = "42"
SQL_UPDATE_AKTUELLE_NR = """UPDATE numbers
    SET letzte_nr = (letzte_nr + 1) % 10000 
    WHERE id = ?"""
SQL_SELECT_KNOPFDATEN = """SELECT knd.ISPT_Nr,
                           num.Letzte_Nr
                           FROM knopfdaten as knd,
                           numbers as num
                           WHERE knd.ID = ? 
	                   AND knd.ID = num.ID"""



def save_row_as_file(count, row, first_line):
    
    text = "\n".join(
        [
            "^XA",
            "^FO15,90^GB780,0,8,^FS",
            "^FO15,250^GB780,0,8,^FS",
            "^FO15,700^GB780,0,8,^FS",
            "^FO0,0^GB600,200,2",
            "^FO15,20^GB780,785,4^FS",
            "^FO0,40^A0,50,50^FB800,1,0,C^FD",
            first_line,
            "^FS^FO0,110^A0,60,60^FB800,1,0,C^FD",
            str(row[1]),
            "^FS^FO0,190^A0,70,70^FB800,1,0,C^FD",
            str(row[2]),
            "^FS^FO0,80^BY3",
            "^BCN,170,Y,N,N",
            "^FO165,270^BY4^FD",
            str(count),
            "^FS^FO0,500^A0,60,50^FB800,,0,C^FD",
            str(row[3]),
            "^FS^FO0,580^A0,60,50^FB800,,0,C^FD",
            str(row[4]),
            "^FS^FO0,730^A0,60,60^FB800,1,0,C^FD",
            str(row[5]),
            "^FS",
            "^XZ",
            "",
        ]
    )
    PFAD.mkdir(exist_ok=True)
    datei_zum_druck = PFAD / f"testT_{row[3]}_{row[1]}.zpl"   
    (datei_zum_druck).write_text(text, "utf-8")
    #hier bin ich mir nicht sicher, ob es nicht schlauer wäre
    #auf das speichern zu verzichten und die Datei gleich mit
    #subporcess an den Drucker zu schicken. Wenn das geht...
    return datei_zum_druck
    

def lade_daten(conn):
    curs2 = conn.cursor()
    config_sql = "select ID, E_St, Zeile3, Zeile5, Zeile6, Zeile7 from knopfdaten;"
    curs2.execute(config_sql)
    configdaten = curs2.fetchall()
    
    return  configdaten

 
def zaehl_ausdrucke(connection, id):   
    with closing(connection.cursor()) as cursor:
        cursor.execute(SQL_UPDATE_AKTUELLE_NR, [id])
        cursor.execute(SQL_SELECT_KNOPFDATEN, [id])
        ispt_nr, letzte_nr = cursor.fetchone()
        barcode = f"{ispt_nr}{CHECKPOINT}{letzte_nr}"
        cursor.execute("update knopfdaten SET Barcode = ? where id = ?",  (barcode, id))
        print(barcode, letzte_nr)
        connection.commit()
    return barcode

def datei_drucken(druckdatei):
    print(f"{druckdatei}")
    subprocess.run(['lp' , druckdatei])     
    



class MainWindow(tk.Tk):
    def __init__(self, conn, configdaten):
        super().__init__()
        self.title("Auswahl der Label")
        self["background"] = "#f2c618"
        button_frame = tk.Frame(self, width=1200, height=400)
        button_frame.grid(row=0, column=0, padx=10, pady=3)
        
        for index, entry in enumerate(configdaten):
            row_index, column_index = divmod(index, 4)
            tk.Button(
                button_frame,
                text="{}\n{}".format(entry[1], entry[2]),
                bg="#f2c618",
                width=15,
                height=10,
                command=partial(self.on_click, entry, conn),
                ).grid(row=row_index, column=column_index, padx=0, pady=0)
        self.bind('<ButtonPress-1>', self.start_druck)
        self.bind('<ButtonRelease-1>', self.stopp_druck)
        self.knopf_drueck_zeit = None
    def start_druck(self, event):
        self.knopf_drueck_zeit = time.monotonic()
        print("start_druck", self.knopf_drueck_zeit)

    def stopp_druck(self, event):
        knopf_loslass_zeit = time.monotonic()
        print("stopp_druck", knopf_loslass_zeit)
        if knopf_loslass_zeit - self.knopf_drueck_zeit >5:
            print("schalte jetzt aus")
            self.ExitApplication()
    
    def ExitApplication(self):
        MsgBox = tk.messagebox.askokcancel(
        title="Ausschalten",
        message="Soll der Rechner ausgeschaltet werden?")
        print(MsgBox)
        if MsgBox == True:
           self.destroy()
        else:
            tk.messagebox.showinfo(
                'Zurück',
                'Das Hauptfenster wird wieder gezeigt'
                )
        


    def on_click(self, row, conn):
        count = zaehl_ausdrucke(conn, row[0])
        print(count)
        datei_zum_druck = save_row_as_file(count, row, ORT)
        datei_drucken(datei_zum_druck)


def main():
    
    BARCODE_DB_FILENAME.mkdir(parents=True, exist_ok=True)
    print(f"{BARCODE_DB_FILENAME}/config8.db")
           
    try:
        conn = sqlite3.connect(f"{DATABASE}")        
    except Error as e:
        print(e)
    configdaten = lade_daten(conn)  #lädt die Daten aus der csv nach
    print(configdaten)
    
    root = MainWindow(conn, configdaten) 
    root.mainloop()

        
if __name__ == '__main__':

    main()
Sirius3
User
Beiträge: 17710
Registriert: Sonntag 21. Oktober 2012, 17:20

Warum sollte man nicht direkt drucken können?

Code: Alles auswählen

subprocess.run(["lp", "-"], input=text.encode('utf-8'), check=True)
theoS
User
Beiträge: 108
Registriert: Dienstag 5. November 2019, 21:44

Warum sollte man nicht direkt drucken können?
Das geht aber so was von!
Danke!!!
theoS
User
Beiträge: 108
Registriert: Dienstag 5. November 2019, 21:44

Jetzt bin ich vor dem nächsten Problem. Dooferweise habe ich ja die beiden Programmteile unabhängig von einander gebaut. Jetzt will ich die zusammenbauen, aber wie stell ich das an, ohne dass ich dadurch wieder Fehler ins Programm dazu hole.
Prinzipiell könnte ich hergehen und den einen Teil mit import in den anderen holen - das soll man dann ja nicht, habe ich gelesen. Gut, ich habs natürlich trotzdem versucht und bin gescheitert. (erster Anlauf, hab ich nicht anders erwartet.)
Dann könnte ich die beiden zusammenkopieren.
Aber...
Ich sehe, dass ich da noch Vorbereitung brauche. Hat jemand ein Stichwort für mich, wonach ich suchen muss?
Danke.
Sirius3
User
Beiträge: 17710
Registriert: Sonntag 21. Oktober 2012, 17:20

Warum willst Du jetzt beide Teile in einem haben? Die beiden haben doch im Moment nichts miteinander zu tun, außer, dass sie die selbe Datenbank benutzen.

Ansonsten ist der richtige Weg, beide Programmteile als Modul zu implementieren, diese in einem Hauptprogramm zu importieren und dort richtig aufrufen.
theoS
User
Beiträge: 108
Registriert: Dienstag 5. November 2019, 21:44

Äh.
An den Weg hatte ich jetzt gar nicht gedacht.
Wie schon erwähnt, hatte ich, sogar hier drin, gelesen, dass das in Python so üblich sei.
Das mit einem dritten Modul zu lösen hätte ich mich jetzt schon gar nicht zu denken gewagt.
Wird später gleich ausprobiert.
Ach ja, das eine Teil hat vielleicht schon was mit dem anderen zu tun. Wenn der Stick dann erfolgreich ausgelesen wurde, dann sollte das Knöpfemodul neu gestartet werden sonst werden da ja noch die alten Daten angezeigt.
Aber das kann man sicher dann in dem Hauptmodul machen.
Mal sehen ob ich das hinkrieg.
Danke für den Tipp.
theoS
User
Beiträge: 108
Registriert: Dienstag 5. November 2019, 21:44

Also, das hat geklappt. Hab die Dateien so umbenannt, dass die Versionsnummern hinten weg sind und einfach mit dem Code hier aufgerufen.

Code: Alles auswählen

import configusbdb as cfg
import knopfdruck as knd
from tkinter import messagebox

def main():
    cfg.main()
    knd.main()

if __name__ == '__main__':

    main()
Jetzt habe ich noch eine Infobox in den Code vom USB-Teil mit eingebaut und wenn das abgearbeitet ist, dann sollte mir das dann idealerweise einen Wert zurückgeben der dann den Neustart der Knopfanwendung auslöst.
Da hab ich noch zu knobeln.
theoS
User
Beiträge: 108
Registriert: Dienstag 5. November 2019, 21:44

Jetzt bin ich doch ein wenig arg verwirrt.
Habe ich bei dem USB-Modul die Fehlermeldungen die ich schon habe in die Infobox gepackt. Das funktioniert auch. Aber, jetzt kommt beim Aufruf dieses Teils was merkwürdiges.
Wenn ich das so aufrufe wie es vorher funktioniert hat:

Code: Alles auswählen

import configusbdb as cfg
import knopfdruck as knd

def main():
    cfg.main()
    knd.main()    

if __name__ == '__main__':
    main()
Dann startet entweder der eine Teil, hier der USB-Import/Konfiguration, der andere schweigt.
Wenn ich die Befehle umdrehe, startet die Knopfoberfläche und das USB-Teil schweigt.
Das dürfte doch wohl kaum an der Infobox liegen?
Und die beiden Module allein funktionieren ja auch.

Code: Alles auswählen

#!/usr/bin/env python3
import subprocess
import time
from datetime import datetime as DateTime
from pathlib import Path
import pandas as pd
import pyudev
from sqlalchemy import create_engine
from sqlalchemy.exc import SQLAlchemyError
import tkinter as tk
from tkinter import messagebox


PFAD = Path.home() / ".DruckData"
MEDIA_PFAD = Path("/media/earl/")
PFAD = Path.home() / ".DruckData"
DATABASE = PFAD / "sqlite/db/config8.db"
CONFIG_ON_STICK = Path("TB_Ausgabe_8iii.txt")
CSV_DATEI_TO_STICK = Path("gedruckte_nummern.csv")

SQL_OUT = """select distinct
             knopfdaten.ID,
             knopfdaten.E_St,
             knopfdaten.ISPT_Nr,
             numbers.Letzte_Nr
             from knopfdaten,
             numbers
             where
             numbers.ID = knopfdaten.ID"""

USED_COLS = [
        'ID',
        'E_St',
        'Zeile3',
        'Zeile5',
        'Zeile6',
        'Zeile7',
        'Zeilex',
        'ISPT_Nr',
        'HallenPos',
        'Aktuell_Nr',
        'Barcode'
        ]

def warte_auf_usb_stick(udev_context):
    monitor = pyudev.Monitor.from_netlink(udev_context)
    monitor.filter_by("block")
    for device in iter(monitor.poll, None):
        print(device.action)
        if "ID_FS_TYPE" in device and device.action == "add":
            name_of_stick = Path(device.get("ID_FS_LABEL"))
            print(device.action, name_of_stick)
            time.sleep(2)
            return name_of_stick

    raise AssertionError("unreachable code")  #auch noch nicht 100% klar, trotz der Erklärung. habe ich noch nie in "Aktion" gesehen


def anzahl_drucke_dokumentieren(connection, name_of_stick, dateiname_roh):
    dateiname_ziel = (
        MEDIA_PFAD
        / name_of_stick
        / dateiname_roh.with_name(
            f"{dateiname_roh.stem}_{DateTime.now():%Y-%m-%d_%H_%M}.csv"
        )
    )
    pd.read_sql(SQL_OUT, connection).to_csv(
        dateiname_ziel, sep=";", decimal=",", index=False
    )


def config_arbeitsdb(connection, name_of_stick):
    config_datei = MEDIA_PFAD / name_of_stick / CONFIG_ON_STICK
    
    datensaetze = pd.read_csv(        
        config_datei,
        usecols=USED_COLS,
        na_values="x",
        quotechar='"',
        sep=";",
        encoding="utf-8",
        decimal=",",
        dtype={"ID": int, "E_St": int, "ISPT_Nr": str, "Barcode": str},
    )
    if len(datensaetze) != 8:
        print("es waren keine 8 Datensätze")
        antwort = "es waren keine 8 Datensätze"
    else:
        datensaetze.to_sql(
            "knopfdaten", connection, if_exists="replace", index=False
        )
        sql_ident = "insert or ignore into numbers(ID) select ID from knopfdaten;"
        connection.execute(sql_ident)
        antwort = "Daten erfolgreich übertragen, \n Programm startet neu"     #das funktionierte in der infobox

    return antwort


def auswerfen(name_of_stick):
    subprocess.run(
        ["umount", MEDIA_PFAD / name_of_stick],
        check=True,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
    )

def erfolgsmeldung(ergebnis, icon):      #den code habe ich so wie er ist gefunden. Hier bin ich an dem Fenster das übrigblieb von der Infobox fast verzweifelt
    window = tk.Tk()                                    #ich konnte keine so rechte Erklärung finden, warum hier noch zu der Box ein Fenster aufgeht, so verschwindet das wenigstens.
    window.eval('tk::PlaceWindow %s center' % window.winfo_toplevel())
    window.withdraw
    messagebox.showinfo(message=ergebnis, icon=icon)
    window.deiconify()
    window.destroy()
    window.quit() 
    
##def neustart():  #das war ein Versuch, das Modul nach erfolgter Aktualisierung neu zu laden.
##    knd.main()   #das hat auch "eigentlich" funktioniert, aber nicht so wie gedacht.


    
def main():
    udev_context = pyudev.Context()
    db_engine = create_engine(f"sqlite:///{DATABASE}", encoding="utf-8")
    while True:
        name_of_stick = warte_auf_usb_stick(udev_context)
        try:
            ergebnis = config_arbeitsdb(db_engine, name_of_stick)
            anzahl_drucke_dokumentieren(
                db_engine, name_of_stick, CSV_DATEI_TO_STICK
            )
            ####
            print(ergebnis)
            erfolgsmeldung(ergebnis, 'info')
            #neustart()
        except SQLAlchemyError as error:
            print(error)
            erfolgsmeldung(error, 'error')
        except ValueError as error:
            print(error)
            erfolgsmeldung(error, 'error')
        except OSError as error:
            print("Fehler beim kopieren:", error)
            erfolgsmeldung(error, 'error')

        try:
            auswerfen(name_of_stick)
            
        except subprocess.CalledProcessError as error:
            print(f"Fehler {error.returncode} beim Auswerfen: {error.stdout}")
            error_lang = f"Fehler {error.returncode} beim Auswerfen: {error.stdout}"
            erfolgsmeldung(error_lang, 'error')


if __name__ == "__main__":   #das Teil hier unten ist mir - grade wegen des Beispiels in der Dokumentation im Zusammenhang mit dem Aufruf von Modulen ein
    main()
# großes Rätsel geworden.
Sirius3
User
Beiträge: 17710
Registriert: Sonntag 21. Oktober 2012, 17:20

Das sollte Dich eigentlich nicht verwirren, den ein beiden `main`-Funktionen hast Du Endlosschleifen.

Das warten_auf_usb-Stick mußt Du in einen Thread packen, und regelmäßig in der GUI abfragen, ob eine USB-Stick eingesteckt worden ist, und dann kannst Du dort auch das Kopieren erledigen.
theoS
User
Beiträge: 108
Registriert: Dienstag 5. November 2019, 21:44

Gute Idee. Funktioniert schon mal nach einer Seite hin.
Hast du noch so einen guten Tipp, wie ich dann die neue Konfiguration in die Oberfläche schaufle?
Das Laden der Daten auch in einen Thread der das von der anderen Seite überwacht?
Oder kann ich von dem einen Modul in dem anderen Modul das Mainwindow zerstören?
Sirius3
User
Beiträge: 17710
Registriert: Sonntag 21. Oktober 2012, 17:20

Eigentlich hast Du ja nur eine Oberfläche und eine Datenbank, und verschiedene Dinge, die im Hintergrund laufen. Erstens, laden von Daten und zweitens Drucken.
Ob man das jetzt auf verschiedene Module (vier Stück) aufteilt, oder alles in einer Datei stehen hat, ist erst einmal nebensächlich.
Mainwindow zerstören ist auch gar nicht nötig, sondern nur Aktualisieren.
theoS
User
Beiträge: 108
Registriert: Dienstag 5. November 2019, 21:44

Mainwindow zerstören ist auch gar nicht nötig, sondern nur Aktualisieren.
Ja, das habe ich jetzt eine ganze Weile probiert.
Mein Problem ist, dass ich das Laden der Daten nicht in die Klasse mit reingebracht hab. Die Funkion dafür ist außerhalb und benötigt die Connection.
Wenn ich jetzt versuche das MainWindow mit update() von einem anderen Modul aufzurufen geht ihm entweder "self" ab oder die Connection.
Blick da jetzt nicht mehr so wirklich durch, denn die Connection kriegt es ja in Main zugewiesen.
theoS
User
Beiträge: 108
Registriert: Dienstag 5. November 2019, 21:44

Nachtrag.
Es ist verhext. Jetzt kann ich von dem einen Modul wenn der Stick eingesteckt wird die Ladefunktion des anderen Moduls starten ohne die Connection übergeben zu müssen, aber das MainWindow aktualisieren? Das will mir nicht gelingen.
Heut geh ich erst mal ins Bett. Wird gescheiter sein.
Sirius3
User
Beiträge: 17710
Registriert: Sonntag 21. Oktober 2012, 17:20

Dann zeig doch nochmal komplett, was Du jetzt hast.
theoS
User
Beiträge: 108
Registriert: Dienstag 5. November 2019, 21:44

Hab jetzt noch mal versucht - ich finde einfach keine Möglichkeit das update() so unterzubringen dass es auch ausgeführt wird.
Also, ich hab jetzt die beiden Dateien und eine kleine Testdatei dazu gebaut, dass ich nicht immer den USB-Stick einstecken muss.
Das hat einen timer der nach ein paar Sekunden den Befehl in das andere Modul rüberschickt. Währenddessen aktualisiere ich, dass ich was seh, die DB manuell. Ist umständlich aber einfacher als die Sache mit dem Stick.
Wie auch immer, das muss ja so gesehen beides funktionieren.
Tut aber nichts. Und ich suche nebenbei immer wieder mal, finde zig Seiten, aber ich kann das nicht auf meine Knöpfe anwenden.
Bin für jeden Tipp dankbar.

Also, ich rufe mit dem Modul das Knopfdruckmodul auf: (könnte ich mir vermutlich sparen, weil ja der Thread im Knopfdruckmodul gestartet wird.)

Code: Alles auswählen

#!/usr/bin/env python3
# -*- coding: utf8 -*-

import knopfdruck as knd

def main():

        knd.main()
        

if __name__ == '__main__':
    main()
Dann kommt das Test-Modul:

Code: Alles auswählen

#!/usr/bin/env python3
# -*- coding: utf8 -*-

import knopfdruck as knd
import time

def teste_aufruf():
        time.sleep(10)
        print("10 sekunden geschlafen")
        
        knd.MainWindow.antwort_von_usb("komme vom Schläferfenster", "info")
        

        
def main():
        teste_aufruf()
        

if __name__ == '__main__':
    main()
Hier ist das Modul das dann letztlich das Update vom MainWindow bräuchte.

Code: Alles auswählen

#!/usr/bin/env python3
# -*- coding: utf8 -*-

import sqlite3
from sqlite3 import Error
from pathlib import Path
#import csv  #bleibt erst mal drin weil ggf. doch die Dateien gebraucht werden
import tkinter as tk
#from tkinter import messagebox
from functools import partial
from contextlib import closing
import time
#from datetime import datetime as DateTime  #ggf. noch gebraucht
import subprocess
import threading
import configusbdb as uconf
import teste as test

PFAD = Path.home() / ".DruckData"
BARCODE_DB_FILENAME = PFAD / "sqlite/db"
DATABASE = BARCODE_DB_FILENAME / "config8.db"
ORT = "PZ Dingsbums 14"
CHECKPOINT = "42"
SQL_UPDATE_AKTUELLE_NR = """UPDATE numbers
    SET letzte_nr = (letzte_nr + 1) % 10000 
    WHERE id = ?"""
SQL_SELECT_KNOPFDATEN = """SELECT knd.ISPT_Nr,
                           num.Letzte_Nr
                           FROM knopfdaten as knd,
                           numbers as num
                           WHERE knd.ID = ? 
	                   AND knd.ID = num.ID"""



def prepare_rows_for_print(count, row, first_line):
    
    text = "\n".join(
        [
            "^XA",
            "^FXUnicode:",
            "^CI28",
            "^FXBox oben:",
            "^FO690,20^GB0,1150,8,^FS",
            "^FXgroßeBox:",
            "^FO15,20^GB790,1150,8^FS",
            "^FXZwischenlinie",
            "^FO495,20^GB0,1150,8,^FS",
            "^FXZwischenlinie unten",
            "^FO125,20^GB0,1150,8,^FS",
            "^FO720,200^A0R,50,50^FB800,1,0,C^FD",
            first_line,
            "^FS^FO620,200^A0R,60,60^FB800,1,0,C^FD",
            str(row[1]),
            "^FS^FO500,200^A0R,95,95^FB800,1,0,C^FD",
            str(row[2]),
            "^FS^FO0,180^BY3",
            "^BCR,90,Y,N,N",
            "^FO385,370^BY4^FD",
            str(count),
            "^FS^FO260,200^A0R,70,70^FB800,,0,C^FD",
            str(row[3]),
            "^FS^FO120,200^A0R,60,60^FB800,2,0,C^FD",
            str(row[4]),
            "^FS^FO40,200^A0R,70,70^FB800,1,0,C^FD",
            str(row[5]),
            "^FS",
            "^XZ",
            ""
        ]
    )
    #PFAD.mkdir(exist_ok=True)   #ggf. noch gebraucht
    #string_zum_druck = PFAD / f"testT_{row[3]}_{row[1]}.zpl"   
    #(string_zum_druck).write_text(text, "utf-8")
    #hier bin ich mir nicht sicher, ob es nicht schlauer wäre
    #auf das speichern zu verzichten und die Datei gleich mit
    #subporcess an den Drucker zu schicken. Wenn das geht...
    return text
    
    

def lade_daten():
               
    try:
        conn = sqlite3.connect(f"{DATABASE}")        
    except Error as e:
        print(e)
        
    curs2 = conn.cursor()
    config_sql = "select ID, E_St, Zeile3, Zeile5, Zeile6, Zeile7 from knopfdaten;"
    curs2.execute(config_sql)
    configdaten = curs2.fetchall()
    print("daten geladen")    
    return  configdaten, conn

 
def zaehl_ausdrucke(connection, id):   
    with closing(connection.cursor()) as cursor:
        cursor.execute(SQL_UPDATE_AKTUELLE_NR, [id])
        cursor.execute(SQL_SELECT_KNOPFDATEN, [id])
        ispt_nr, letzte_nr = cursor.fetchone()
        barcode = f"{ispt_nr}{CHECKPOINT}{letzte_nr}"
        cursor.execute("update knopfdaten SET Barcode = ? where id = ?",  (barcode, id))
        print(barcode, letzte_nr)
        connection.commit()
    return barcode

def datei_drucken(text):   
    
    subprocess.run(["lp", "-"], input=text.encode('utf-8'), check=True)
    



class MainWindow(tk.Tk):
    def __init__(self, conn, configdaten):
        super().__init__()
        self.title("Auswahl der Label")
        self["background"] = "#f2c618"
        button_frame = tk.Frame(self, width=1400, height=600)
        button_frame.grid(row=0, column=0, padx=0, pady=0)
        
        for index, entry in enumerate(configdaten):
            row_index, column_index = divmod(index, 4)
            tk.Button(
                button_frame,
                text="{}\n{}\n{}".format(entry[1], entry[2], entry[3]),
                bg="#f2c618",
                width=22,
                height=11,
                command=partial(self.on_click, entry, conn),
                ).grid(row=row_index, column=column_index, padx=0, pady=0)
        self.bind('<ButtonPress-1>', self.start_druck)
        self.bind('<ButtonRelease-1>', self.stopp_druck)
        self.knopf_drueck_zeit = None
    def start_druck(self, event):
        self.knopf_drueck_zeit = time.monotonic()
        print("start_druck", self.knopf_drueck_zeit)

    def stopp_druck(self, event):
        knopf_loslass_zeit = time.monotonic()
        print("stopp_druck", knopf_loslass_zeit)
        if knopf_loslass_zeit - self.knopf_drueck_zeit >5:
            print("schalte jetzt aus")
            self.ExitApplication()
    
    def ExitApplication(self):
        MsgBox = tk.messagebox.askokcancel(
        title="Ausschalten",
        message="Soll der Rechner ausgeschaltet werden?")
        print(MsgBox)
        if MsgBox == True:
           self.destroy()
        else:
            tk.messagebox.showinfo(
                'Zurück',
                'Das Hauptfenster wird wieder gezeigt'
                )
        
    def fensterfrisch():
        print("hier war ich im fensterfrisch")
        lade_daten()
        #MainWindow.update()
        
    def antwort_von_usb(antwort, icon):
        print("vom anderen Teil")          
        tk.messagebox.showinfo(
                message=antwort,
                icon=icon)
        MainWindow.fensterfrisch()
        
        
    def on_click(self, row, conn):
        count = zaehl_ausdrucke(conn, row[0])
        print(count)
        string_zum_druck = prepare_rows_for_print(count, row, ORT)
        datei_drucken(string_zum_druck)


def main():
    
    BARCODE_DB_FILENAME.mkdir(parents=True, exist_ok=True)
    print("starte main")

    configdaten, conn = lade_daten()  #lädt die Daten aus der csv nach
    print(configdaten)
    
    root = MainWindow(conn, configdaten)
    faden = threading.Thread(target=uconf.main)
    faden.start()
    faden2 = threading.Thread(target=test.main)
    faden2.start()

    root.mainloop()

        
if __name__ == '__main__':
    configdaten, conn = lade_daten()  #lädt die Daten aus der csv nach
    main()
Das USB-Teil:

Code: Alles auswählen

#!/usr/bin/env python3


import subprocess
import time
from datetime import datetime as DateTime
from pathlib import Path
import pandas as pd
import pyudev
from sqlalchemy import create_engine
from sqlalchemy.exc import SQLAlchemyError
import tkinter as tk
from tkinter import messagebox
import knopfdruck as knd
import threading


PFAD = Path.home() / ".DruckData"
MEDIA_PFAD = Path("/media/earl/")
PFAD = Path.home() / ".DruckData"
DATABASE = PFAD / "sqlite/db/config8.db"
CONFIG_ON_STICK = Path("TB_Ausgabe_8iii.txt")
CSV_DATEI_TO_STICK = Path("gedruckte_nummern.csv")

SQL_OUT = """select distinct
             knopfdaten.ID,
             knopfdaten.E_St,
             knopfdaten.ISPT_Nr,
             numbers.Letzte_Nr
             from knopfdaten,
             numbers
             where
             numbers.ID = knopfdaten.ID"""

USED_COLS = [
        'ID',
        'E_St',
        'Zeile3',
        'Zeile5',
        'Zeile6',
        'Zeile7',
        'Zeilex',
        'ISPT_Nr',
        'HallenPos',
        'Aktuell_Nr',
        'Barcode'
        ]

def warte_auf_usb_stick(udev_context):
    monitor = pyudev.Monitor.from_netlink(udev_context)
    monitor.filter_by("block")
    for device in iter(monitor.poll, None):
        print(device.action)
        if "ID_FS_TYPE" in device and device.action == "add":
            name_of_stick = Path(device.get("ID_FS_LABEL"))
            print(device.action, name_of_stick)
            time.sleep(2)
            return name_of_stick

    raise AssertionError("unreachable code")


def anzahl_drucke_dokumentieren(connection, name_of_stick, dateiname_roh):
    dateiname_ziel = (
        MEDIA_PFAD
        / name_of_stick
        / dateiname_roh.with_name(
            f"{dateiname_roh.stem}_{DateTime.now():%Y-%m-%d_%H_%M}.csv"
        )
    )
    pd.read_sql(SQL_OUT, connection).to_csv(
        dateiname_ziel, sep=";", decimal=",", index=False
    )


def config_arbeitsdb(connection, name_of_stick):
    config_datei = MEDIA_PFAD / name_of_stick / CONFIG_ON_STICK
    #
    # TODO Auf die Spalten einschränken die tatsächlich benötigt werden.
    #
    datensaetze = pd.read_csv(        
        config_datei,
        usecols=USED_COLS,
        na_values="x",
        quotechar='"',
        sep=";",
        encoding="utf-8",
        decimal=",",
        dtype={"ID": int, "E_St": int, "ISPT_Nr": str, "Barcode": str},
    )
    if len(datensaetze) != 8:
        print("es waren keine 8 Datensätze")
        antwort = "es waren keine 8 Datensätze"
    else:
        datensaetze.to_sql(
            "knopfdaten", connection, if_exists="replace", index=False
        )
        sql_ident = "insert or ignore into numbers(ID) select ID from knopfdaten;"
        connection.execute(sql_ident)
        antwort = "Daten erfolgreich übertragen, \n Programm startet neu"

    return antwort


def auswerfen(name_of_stick):
    subprocess.run(
        ["umount", MEDIA_PFAD / name_of_stick],
        check=True,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
    )

def erfolgsmeldung(ergebnis, icon):
    knd.MainWindow.antwort_von_usb(ergebnis, icon)
    
##    window = tk.Tk()
##    window.eval('tk::PlaceWindow %s center' % window.winfo_toplevel())
##    window.withdraw
##    messagebox.showinfo(message=ergebnis, icon=icon)
##    window.deiconify()
##    window.destroy()
##    window.quit()  

def neuladen_config():        
        print("schicke zu knd")
        knd.MainWindow.fensterfrisch()


    
def main():
    udev_context = pyudev.Context()
    db_engine = create_engine(f"sqlite:///{DATABASE}", encoding="utf-8")
    while True:
        name_of_stick = warte_auf_usb_stick(udev_context)
        try:
            ergebnis = config_arbeitsdb(db_engine, name_of_stick)
            anzahl_drucke_dokumentieren(
                db_engine, name_of_stick, CSV_DATEI_TO_STICK
            )
            ####
            print(ergebnis)
            erfolgsmeldung(ergebnis, 'info')
            neuladen_config()
        except SQLAlchemyError as error:
            print(error)
            erfolgsmeldung(error, 'error')
        except ValueError as error:
            print(error)
            erfolgsmeldung(error, 'error')
        except OSError as error:
            print("Fehler beim kopieren:", error)
            erfolgsmeldung(error, 'error')

        try:
            auswerfen(name_of_stick)
            
        except subprocess.CalledProcessError as error:
            print(f"Fehler {error.returncode} beim Auswerfen: {error.stdout}")
            error_lang = f"Fehler {error.returncode} beim Auswerfen: {error.stdout}"
            erfolgsmeldung(error_lang, 'error')


if __name__ == "__main__":
    main()
Antworten