Also, jetzt habe ich zwei Sachen gemacht.
Die eine, ich habe in meiner DB ein zweite Tabelle drin, in der das Hochzählen stattfindet und deren Wert dann (sofern ich jetzt nicht was übersehen hab) in der CSV gespeichert wird und deren ID's von der Konfigdatei übernommen werden also so, dass sie in Ruhe gelassen werden wenn sie schon da sind. Hier beginnt dann der Startwert beim ersten Mal bei 0, wenn dieser Datensatz dann noch mal kommt, wird dort weitergezählt.
Dann habe ich zumindest das Problem mal soweit von der Seite des Raspis gebannt, dass mir da die Zahlen überschrieben werden.
Dass man erkennt, wer gedruckt hat, habe ich im Barcode eine Nummer zwischenrein gemacht.
Code: Alles auswählen
#!/usr/bin/env python3
import subprocess
import time
from datetime import datetime as DateTime
from pathlib import Path
import pandas as pd
import pyudev
from sqlalchemy import create_engine
from sqlalchemy.exc import SQLAlchemyError
PFAD = Path.home() / ".DruckData"
MEDIA_PFAD = Path("/media/earl/")
PFAD = Path.home() / ".DruckData"
DATABASE = PFAD / "sqlite/db/config8.db"
CONFIG_ON_STICK = Path("TB_Ausgabe_8iii.txt")
CSV_DATEI_TO_STICK = Path("gedruckte_nummern.csv")
#SQL_OUT = "SELECT ID, E_St, ISPT_Nr, Aktuell_Nr, Barcode FROM knopfdaten"
SQL_OUT = """select distinct
knopfdaten.ID,
knopfdaten.E_St,
knopfdaten.ISPT_Nr,
numbers.Letzte_Nr
from knopfdaten,
numbers
where
numbers.ID = knopfdaten.ID"""
USED_COLS = [
'ID',
'E_St',
'Zeile3',
'Zeile5',
'Zeile6',
'Zeile7',
'Zeilex',
'ISPT_Nr',
'HallenPos',
'Aktuell_Nr',
'Barcode'
]
def warte_auf_usb_stick(udev_context):
monitor = pyudev.Monitor.from_netlink(udev_context)
monitor.filter_by("block")
for device in iter(monitor.poll, None):
print(device.action)
if "ID_FS_TYPE" in device and device.action == "add":
name_of_stick = Path(device.get("ID_FS_LABEL"))
print(device.action, name_of_stick)
time.sleep(2)
return name_of_stick
raise AssertionError("unreachable code")
def anzahl_drucke_dokumentieren(connection, name_of_stick, dateiname_roh):
dateiname_ziel = (
MEDIA_PFAD
/ name_of_stick
/ dateiname_roh.with_name(
f"{dateiname_roh.stem}_{DateTime.now():%Y-%m-%d_%H_%M}.csv"
)
)
pd.read_sql(SQL_OUT, connection).to_csv(
dateiname_ziel, sep=";", decimal=",", index=False
)
def config_arbeitsdb(connection, name_of_stick):
config_datei = MEDIA_PFAD / name_of_stick / CONFIG_ON_STICK
#
# TODO Auf die Spalten einschränken die tatsächlich benötigt werden.
#
datensaetze = pd.read_csv(
config_datei,
usecols=USED_COLS,
na_values="x",
quotechar='"',
sep=";",
encoding="utf-8",
decimal=",",
dtype={"ID": int, "E_St": int, "ISPT_Nr": str, "Barcode": str},
)
if len(datensaetze) != 8:
print("es waren keine 8 Datensätze")
else:
datensaetze.to_sql(
"knopfdaten", connection, if_exists="replace", index=False
)
sql_ident = "insert or ignore into numbers(ID) select ID from knopfdaten;"
connection.execute(sql_ident)
def auswerfen(name_of_stick):
subprocess.run(
["umount", MEDIA_PFAD / name_of_stick],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
def main():
udev_context = pyudev.Context()
db_engine = create_engine(f"sqlite:///{DATABASE}", encoding="utf-8")
while True:
name_of_stick = warte_auf_usb_stick(udev_context)
try:
config_arbeitsdb(db_engine, name_of_stick)
anzahl_drucke_dokumentieren(
db_engine, name_of_stick, CSV_DATEI_TO_STICK
)
except SQLAlchemyError as error:
print(error)
except ValueError as error:
print(error)
except OSError as error:
print("Fehler beim kopieren:", error)
try:
auswerfen(name_of_stick)
except subprocess.CalledProcessError as error:
print(f"Fehler {error.returncode} beim Auswerfen: {error.stdout}")
if __name__ == "__main__":
main()
Und im Teil mit den Knöpfen dann so:
Code: Alles auswählen
#!/usr/bin/env python3
# -*- coding: utf8 -*-
import sqlite3
from sqlite3 import Error
from pathlib import Path
import csv
import tkinter as tk
from functools import partial
from contextlib import closing
PFAD = Path.home() / ".DruckData"
BARCODE_DB_FILENAME = PFAD / "sqlite/db"
DATABASE = BARCODE_DB_FILENAME / "config8.db"
ORT = "PZ Dingsbums 14"
CHECKPOINT = "42"
SQL_UPDATE_AKTUELLE_NR = """UPDATE numbers
SET letzte_nr = (letzte_nr + 1) % 10000
WHERE id = ?"""
SQL_SELECT_KNOPFDATEN = """SELECT knd.ISPT_Nr,
num.Letzte_Nr
FROM knopfdaten as knd,
numbers as num
WHERE knd.ID = ?
AND knd.ID = num.ID"""
def save_row_as_file(count, row, first_line):
text = "\n".join(
[
"^XA",
"^FO15,90^GB780,0,8,^FS",
"^FO15,250^GB780,0,8,^FS",
"^FO15,700^GB780,0,8,^FS",
"^FO0,0^GB600,200,2",
"^FO15,20^GB780,785,4^FS",
"^FO0,40^A0,50,50^FB800,1,0,C^FD",
first_line,
"^FS^FO0,110^A0,60,60^FB800,1,0,C^FD",
str(row[1]),
"^FS^FO0,190^A0,70,70^FB800,1,0,C^FD",
str(row[2]),
"^FS^FO0,80^BY3",
"^BCN,170,Y,N,N",
"^FO165,270^BY4^FD",
str(count),
"^FS^FO0,500^A0,60,50^FB800,,0,C^FD",
str(row[3]),
"^FS^FO0,580^A0,60,50^FB800,,0,C^FD",
str(row[4]),
"^FS^FO0,730^A0,60,60^FB800,1,0,C^FD",
str(row[5]),
"^FS",
"^XZ",
"",
]
)
PFAD.mkdir(exist_ok=True)
(PFAD / f"testT_{row[3]}_{row[1]}.zpl").write_text(text, "utf-8")
def lade_daten(conn):
curs2 = conn.cursor()
config_sql = "select ID, E_St, Zeile3, Zeile5, Zeile6, Zeile7 from knopfdaten;"
curs2.execute(config_sql)
configdaten = curs2.fetchall()
return configdaten
def zaehl_ausdrucke(connection, id):
with closing(connection.cursor()) as cursor:
cursor.execute(SQL_UPDATE_AKTUELLE_NR, [id])
cursor.execute(SQL_SELECT_KNOPFDATEN, [id])
ispt_nr, letzte_nr = cursor.fetchone()
barcode = f"{ispt_nr}{CHECKPOINT}{letzte_nr}"
cursor.execute("update knopfdaten SET Barcode = ? where id = ?", (barcode, id))
print(barcode, letzte_nr)
connection.commit()
return barcode
def on_click(row, conn):
count = zaehl_ausdrucke(conn, row[0])
print(count)
save_row_as_file(count, row, ORT)
def main():
BARCODE_DB_FILENAME.mkdir(parents=True, exist_ok=True)
try:
conn = sqlite3.connect(f"{DATABASE}")
except Error as e:
print(e)
configdaten = lade_daten(conn) #lädt die Daten aus der csv nach
root = tk.Tk()
root.title("Auswahl der Label")
root.config(background="#f2c618")
button_frame = tk.Frame(root, width=1200, height=400)
button_frame.grid(row=0, column=0, padx=10, pady=3)
for index, entry in enumerate(configdaten):
row_index, column_index = divmod(index, 4)
tk.Button(
button_frame,
text="{}\n{}".format(entry[1], entry[2]),
bg="#f2c618",
width=15,
height=10,
command=partial(on_click, entry, conn),
).grid(row=row_index, column=column_index, padx=0, pady=0)
root.mainloop()
if __name__ == '__main__':
main()
ToDo ist das Ganze zusammenzubringen und vor allem ein Fehlermeldefenster und eine Möglichkeit den Raspi auszuschalten ohne einfach den Stecker zu ziehen.
Der hat ja nur einen Touchscreen.