Hab jetzt noch mal versucht - ich finde einfach keine Möglichkeit das update() so unterzubringen dass es auch ausgeführt wird.
Also, ich hab jetzt die beiden Dateien und eine kleine Testdatei dazu gebaut, dass ich nicht immer den USB-Stick einstecken muss.
Das hat einen timer der nach ein paar Sekunden den Befehl in das andere Modul rüberschickt. Währenddessen aktualisiere ich, dass ich was seh, die DB manuell. Ist umständlich aber einfacher als die Sache mit dem Stick.
Wie auch immer, das muss ja so gesehen beides funktionieren.
Tut aber nichts. Und ich suche nebenbei immer wieder mal, finde zig Seiten, aber ich kann das nicht auf meine Knöpfe anwenden.
Bin für jeden Tipp dankbar.
Also, ich rufe mit dem Modul das Knopfdruckmodul auf: (könnte ich mir vermutlich sparen, weil ja der Thread im Knopfdruckmodul gestartet wird.)
Code: Alles auswählen
#!/usr/bin/env python3
# -*- coding: utf8 -*-
import knopfdruck as knd
def main():
knd.main()
if __name__ == '__main__':
main()
Dann kommt das Test-Modul:
Code: Alles auswählen
#!/usr/bin/env python3
# -*- coding: utf8 -*-
import knopfdruck as knd
import time
def teste_aufruf():
time.sleep(10)
print("10 sekunden geschlafen")
knd.MainWindow.antwort_von_usb("komme vom Schläferfenster", "info")
def main():
teste_aufruf()
if __name__ == '__main__':
main()
Hier ist das Modul das dann letztlich das Update vom MainWindow bräuchte.
Code: Alles auswählen
#!/usr/bin/env python3
# -*- coding: utf8 -*-
import sqlite3
from sqlite3 import Error
from pathlib import Path
#import csv #bleibt erst mal drin weil ggf. doch die Dateien gebraucht werden
import tkinter as tk
#from tkinter import messagebox
from functools import partial
from contextlib import closing
import time
#from datetime import datetime as DateTime #ggf. noch gebraucht
import subprocess
import threading
import configusbdb as uconf
import teste as test
PFAD = Path.home() / ".DruckData"
BARCODE_DB_FILENAME = PFAD / "sqlite/db"
DATABASE = BARCODE_DB_FILENAME / "config8.db"
ORT = "PZ Dingsbums 14"
CHECKPOINT = "42"
SQL_UPDATE_AKTUELLE_NR = """UPDATE numbers
SET letzte_nr = (letzte_nr + 1) % 10000
WHERE id = ?"""
SQL_SELECT_KNOPFDATEN = """SELECT knd.ISPT_Nr,
num.Letzte_Nr
FROM knopfdaten as knd,
numbers as num
WHERE knd.ID = ?
AND knd.ID = num.ID"""
def prepare_rows_for_print(count, row, first_line):
text = "\n".join(
[
"^XA",
"^FXUnicode:",
"^CI28",
"^FXBox oben:",
"^FO690,20^GB0,1150,8,^FS",
"^FXgroßeBox:",
"^FO15,20^GB790,1150,8^FS",
"^FXZwischenlinie",
"^FO495,20^GB0,1150,8,^FS",
"^FXZwischenlinie unten",
"^FO125,20^GB0,1150,8,^FS",
"^FO720,200^A0R,50,50^FB800,1,0,C^FD",
first_line,
"^FS^FO620,200^A0R,60,60^FB800,1,0,C^FD",
str(row[1]),
"^FS^FO500,200^A0R,95,95^FB800,1,0,C^FD",
str(row[2]),
"^FS^FO0,180^BY3",
"^BCR,90,Y,N,N",
"^FO385,370^BY4^FD",
str(count),
"^FS^FO260,200^A0R,70,70^FB800,,0,C^FD",
str(row[3]),
"^FS^FO120,200^A0R,60,60^FB800,2,0,C^FD",
str(row[4]),
"^FS^FO40,200^A0R,70,70^FB800,1,0,C^FD",
str(row[5]),
"^FS",
"^XZ",
""
]
)
#PFAD.mkdir(exist_ok=True) #ggf. noch gebraucht
#string_zum_druck = PFAD / f"testT_{row[3]}_{row[1]}.zpl"
#(string_zum_druck).write_text(text, "utf-8")
#hier bin ich mir nicht sicher, ob es nicht schlauer wäre
#auf das speichern zu verzichten und die Datei gleich mit
#subporcess an den Drucker zu schicken. Wenn das geht...
return text
def lade_daten():
try:
conn = sqlite3.connect(f"{DATABASE}")
except Error as e:
print(e)
curs2 = conn.cursor()
config_sql = "select ID, E_St, Zeile3, Zeile5, Zeile6, Zeile7 from knopfdaten;"
curs2.execute(config_sql)
configdaten = curs2.fetchall()
print("daten geladen")
return configdaten, conn
def zaehl_ausdrucke(connection, id):
with closing(connection.cursor()) as cursor:
cursor.execute(SQL_UPDATE_AKTUELLE_NR, [id])
cursor.execute(SQL_SELECT_KNOPFDATEN, [id])
ispt_nr, letzte_nr = cursor.fetchone()
barcode = f"{ispt_nr}{CHECKPOINT}{letzte_nr}"
cursor.execute("update knopfdaten SET Barcode = ? where id = ?", (barcode, id))
print(barcode, letzte_nr)
connection.commit()
return barcode
def datei_drucken(text):
subprocess.run(["lp", "-"], input=text.encode('utf-8'), check=True)
class MainWindow(tk.Tk):
def __init__(self, conn, configdaten):
super().__init__()
self.title("Auswahl der Label")
self["background"] = "#f2c618"
button_frame = tk.Frame(self, width=1400, height=600)
button_frame.grid(row=0, column=0, padx=0, pady=0)
for index, entry in enumerate(configdaten):
row_index, column_index = divmod(index, 4)
tk.Button(
button_frame,
text="{}\n{}\n{}".format(entry[1], entry[2], entry[3]),
bg="#f2c618",
width=22,
height=11,
command=partial(self.on_click, entry, conn),
).grid(row=row_index, column=column_index, padx=0, pady=0)
self.bind('<ButtonPress-1>', self.start_druck)
self.bind('<ButtonRelease-1>', self.stopp_druck)
self.knopf_drueck_zeit = None
def start_druck(self, event):
self.knopf_drueck_zeit = time.monotonic()
print("start_druck", self.knopf_drueck_zeit)
def stopp_druck(self, event):
knopf_loslass_zeit = time.monotonic()
print("stopp_druck", knopf_loslass_zeit)
if knopf_loslass_zeit - self.knopf_drueck_zeit >5:
print("schalte jetzt aus")
self.ExitApplication()
def ExitApplication(self):
MsgBox = tk.messagebox.askokcancel(
title="Ausschalten",
message="Soll der Rechner ausgeschaltet werden?")
print(MsgBox)
if MsgBox == True:
self.destroy()
else:
tk.messagebox.showinfo(
'Zurück',
'Das Hauptfenster wird wieder gezeigt'
)
def fensterfrisch():
print("hier war ich im fensterfrisch")
lade_daten()
#MainWindow.update()
def antwort_von_usb(antwort, icon):
print("vom anderen Teil")
tk.messagebox.showinfo(
message=antwort,
icon=icon)
MainWindow.fensterfrisch()
def on_click(self, row, conn):
count = zaehl_ausdrucke(conn, row[0])
print(count)
string_zum_druck = prepare_rows_for_print(count, row, ORT)
datei_drucken(string_zum_druck)
def main():
BARCODE_DB_FILENAME.mkdir(parents=True, exist_ok=True)
print("starte main")
configdaten, conn = lade_daten() #lädt die Daten aus der csv nach
print(configdaten)
root = MainWindow(conn, configdaten)
faden = threading.Thread(target=uconf.main)
faden.start()
faden2 = threading.Thread(target=test.main)
faden2.start()
root.mainloop()
if __name__ == '__main__':
configdaten, conn = lade_daten() #lädt die Daten aus der csv nach
main()
Das USB-Teil:
Code: Alles auswählen
#!/usr/bin/env python3
import subprocess
import time
from datetime import datetime as DateTime
from pathlib import Path
import pandas as pd
import pyudev
from sqlalchemy import create_engine
from sqlalchemy.exc import SQLAlchemyError
import tkinter as tk
from tkinter import messagebox
import knopfdruck as knd
import threading
PFAD = Path.home() / ".DruckData"
MEDIA_PFAD = Path("/media/earl/")
PFAD = Path.home() / ".DruckData"
DATABASE = PFAD / "sqlite/db/config8.db"
CONFIG_ON_STICK = Path("TB_Ausgabe_8iii.txt")
CSV_DATEI_TO_STICK = Path("gedruckte_nummern.csv")
SQL_OUT = """select distinct
knopfdaten.ID,
knopfdaten.E_St,
knopfdaten.ISPT_Nr,
numbers.Letzte_Nr
from knopfdaten,
numbers
where
numbers.ID = knopfdaten.ID"""
USED_COLS = [
'ID',
'E_St',
'Zeile3',
'Zeile5',
'Zeile6',
'Zeile7',
'Zeilex',
'ISPT_Nr',
'HallenPos',
'Aktuell_Nr',
'Barcode'
]
def warte_auf_usb_stick(udev_context):
monitor = pyudev.Monitor.from_netlink(udev_context)
monitor.filter_by("block")
for device in iter(monitor.poll, None):
print(device.action)
if "ID_FS_TYPE" in device and device.action == "add":
name_of_stick = Path(device.get("ID_FS_LABEL"))
print(device.action, name_of_stick)
time.sleep(2)
return name_of_stick
raise AssertionError("unreachable code")
def anzahl_drucke_dokumentieren(connection, name_of_stick, dateiname_roh):
dateiname_ziel = (
MEDIA_PFAD
/ name_of_stick
/ dateiname_roh.with_name(
f"{dateiname_roh.stem}_{DateTime.now():%Y-%m-%d_%H_%M}.csv"
)
)
pd.read_sql(SQL_OUT, connection).to_csv(
dateiname_ziel, sep=";", decimal=",", index=False
)
def config_arbeitsdb(connection, name_of_stick):
config_datei = MEDIA_PFAD / name_of_stick / CONFIG_ON_STICK
#
# TODO Auf die Spalten einschränken die tatsächlich benötigt werden.
#
datensaetze = pd.read_csv(
config_datei,
usecols=USED_COLS,
na_values="x",
quotechar='"',
sep=";",
encoding="utf-8",
decimal=",",
dtype={"ID": int, "E_St": int, "ISPT_Nr": str, "Barcode": str},
)
if len(datensaetze) != 8:
print("es waren keine 8 Datensätze")
antwort = "es waren keine 8 Datensätze"
else:
datensaetze.to_sql(
"knopfdaten", connection, if_exists="replace", index=False
)
sql_ident = "insert or ignore into numbers(ID) select ID from knopfdaten;"
connection.execute(sql_ident)
antwort = "Daten erfolgreich übertragen, \n Programm startet neu"
return antwort
def auswerfen(name_of_stick):
subprocess.run(
["umount", MEDIA_PFAD / name_of_stick],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
def erfolgsmeldung(ergebnis, icon):
knd.MainWindow.antwort_von_usb(ergebnis, icon)
## window = tk.Tk()
## window.eval('tk::PlaceWindow %s center' % window.winfo_toplevel())
## window.withdraw
## messagebox.showinfo(message=ergebnis, icon=icon)
## window.deiconify()
## window.destroy()
## window.quit()
def neuladen_config():
print("schicke zu knd")
knd.MainWindow.fensterfrisch()
def main():
udev_context = pyudev.Context()
db_engine = create_engine(f"sqlite:///{DATABASE}", encoding="utf-8")
while True:
name_of_stick = warte_auf_usb_stick(udev_context)
try:
ergebnis = config_arbeitsdb(db_engine, name_of_stick)
anzahl_drucke_dokumentieren(
db_engine, name_of_stick, CSV_DATEI_TO_STICK
)
####
print(ergebnis)
erfolgsmeldung(ergebnis, 'info')
neuladen_config()
except SQLAlchemyError as error:
print(error)
erfolgsmeldung(error, 'error')
except ValueError as error:
print(error)
erfolgsmeldung(error, 'error')
except OSError as error:
print("Fehler beim kopieren:", error)
erfolgsmeldung(error, 'error')
try:
auswerfen(name_of_stick)
except subprocess.CalledProcessError as error:
print(f"Fehler {error.returncode} beim Auswerfen: {error.stdout}")
error_lang = f"Fehler {error.returncode} beim Auswerfen: {error.stdout}"
erfolgsmeldung(error_lang, 'error')
if __name__ == "__main__":
main()