Du bestätigst damit nur das Problem, dass ich von Anfang an gesehen habe, zwei unabhängige Datenbanken synchron zu halten:
Von access wird ein Dings mit einer ispt-Nr und der Anzahl 5 auf den Raspi geladen und dort drei mal ein Barcode gedruckt (xxx6, xxx7 und xxx8). Der neue USB-Stick kommt, die Anzahl 8 wird geschrieben, aber da Access noch nicht weiss, dass gedruckt wurde, wird wieder 5 in Deine interne Datenbank geladen und munter nochmal xxx6 und xxx7 gedruckt.
Die Berechnung des Barcodes ist falsch. In deinem SQL-Ausdruck ist die +1 nicht geklammert. Außerdem wächst die Stellenanzahl des Barcodes mit der von Anzahl.
Labeldruck und was draus folgt
Das war von Anfang an berechtigt zu fragen.Du bestätigst damit nur das Problem, dass ich von Anfang an gesehen habe, zwei unabhängige Datenbanken synchron zu halten:
Wahrscheinlich wird sich dann der Barcode als Zierde herausstellen, weil ihn so keiner richtig auswerten kann.
Dazu kommt, dass Labels auch von Access selbst gedruckt werden, wenn viele auf einmal gebraucht werden. Das wird sicher spaßig, das auszuwerten. Aber nicht mein Problem.
Ich mach das Ding und der Rest ist deren Problem.
Danke. Hatte die Klammer schon mal in einer vorigen Version drin, warum ich dir rausgemacht hab?Die Berechnung des Barcodes ist falsch. In deinem SQL-Ausdruck ist die +1 nicht geklammert. Außerdem wächst die Stellenanzahl des Barcodes mit der von Anzahl.
Weiß es nicht.
Kann mit deiner Version ja nicht passieren.
Muss den string halt dann noch einfach hochschreiben auf die DB.
Danke dafür.
Naja, dass denen dann einfällt, dass das schön wäre wenn das synchronisiert läuft, dann bring ich denen auch die Netzlösung, die aber aus genannten Gründen nicht geht.
Wenn sie das dann auch über den Stick austauschen wollen, müssen sie erst mal in ihrer Datenbank aufräumen.
Solange das nicht ist, mache ich das so. Vielleicht setzte ich noch eine Konstante in die Mitte vom Barcode dass man sehen kann, welches System den erzeugt hat. Aber die letzte Nummer fängt dann bei einer Änderung von 1 an.
Im Hinterkopf kann ich ja das was auch schon mal vorgeschlagen wurde, die Daten nicht komplett auszutauschen sondern fortzuschreiben behalten. Aber mein Urlaub ist am Montag wieder rum, dann hab ich wieder weniger Zeit, also schau ich, dass das läuft was laufen muss.
Dann sollen die testen und entscheiden was sie noch haben wollen.
Danke für die bisherige Unterstützung.
Wenn sie das dann auch über den Stick austauschen wollen, müssen sie erst mal in ihrer Datenbank aufräumen.
Solange das nicht ist, mache ich das so. Vielleicht setzte ich noch eine Konstante in die Mitte vom Barcode dass man sehen kann, welches System den erzeugt hat. Aber die letzte Nummer fängt dann bei einer Änderung von 1 an.
Im Hinterkopf kann ich ja das was auch schon mal vorgeschlagen wurde, die Daten nicht komplett auszutauschen sondern fortzuschreiben behalten. Aber mein Urlaub ist am Montag wieder rum, dann hab ich wieder weniger Zeit, also schau ich, dass das läuft was laufen muss.
Dann sollen die testen und entscheiden was sie noch haben wollen.
Danke für die bisherige Unterstützung.
Also, jetzt habe ich zwei Sachen gemacht.
Die eine, ich habe in meiner DB ein zweite Tabelle drin, in der das Hochzählen stattfindet und deren Wert dann (sofern ich jetzt nicht was übersehen hab) in der CSV gespeichert wird und deren ID's von der Konfigdatei übernommen werden also so, dass sie in Ruhe gelassen werden wenn sie schon da sind. Hier beginnt dann der Startwert beim ersten Mal bei 0, wenn dieser Datensatz dann noch mal kommt, wird dort weitergezählt.
Dann habe ich zumindest das Problem mal soweit von der Seite des Raspis gebannt, dass mir da die Zahlen überschrieben werden.
Dass man erkennt, wer gedruckt hat, habe ich im Barcode eine Nummer zwischenrein gemacht.
Und im Teil mit den Knöpfen dann so:
ToDo ist das Ganze zusammenzubringen und vor allem ein Fehlermeldefenster und eine Möglichkeit den Raspi auszuschalten ohne einfach den Stecker zu ziehen.
Der hat ja nur einen Touchscreen.
Die eine, ich habe in meiner DB ein zweite Tabelle drin, in der das Hochzählen stattfindet und deren Wert dann (sofern ich jetzt nicht was übersehen hab) in der CSV gespeichert wird und deren ID's von der Konfigdatei übernommen werden also so, dass sie in Ruhe gelassen werden wenn sie schon da sind. Hier beginnt dann der Startwert beim ersten Mal bei 0, wenn dieser Datensatz dann noch mal kommt, wird dort weitergezählt.
Dann habe ich zumindest das Problem mal soweit von der Seite des Raspis gebannt, dass mir da die Zahlen überschrieben werden.
Dass man erkennt, wer gedruckt hat, habe ich im Barcode eine Nummer zwischenrein gemacht.
Code: Alles auswählen
#!/usr/bin/env python3
import subprocess
import time
from datetime import datetime as DateTime
from pathlib import Path
import pandas as pd
import pyudev
from sqlalchemy import create_engine
from sqlalchemy.exc import SQLAlchemyError
PFAD = Path.home() / ".DruckData"
MEDIA_PFAD = Path("/media/earl/")
PFAD = Path.home() / ".DruckData"
DATABASE = PFAD / "sqlite/db/config8.db"
CONFIG_ON_STICK = Path("TB_Ausgabe_8iii.txt")
CSV_DATEI_TO_STICK = Path("gedruckte_nummern.csv")
#SQL_OUT = "SELECT ID, E_St, ISPT_Nr, Aktuell_Nr, Barcode FROM knopfdaten"
SQL_OUT = """select distinct
knopfdaten.ID,
knopfdaten.E_St,
knopfdaten.ISPT_Nr,
numbers.Letzte_Nr
from knopfdaten,
numbers
where
numbers.ID = knopfdaten.ID"""
USED_COLS = [
'ID',
'E_St',
'Zeile3',
'Zeile5',
'Zeile6',
'Zeile7',
'Zeilex',
'ISPT_Nr',
'HallenPos',
'Aktuell_Nr',
'Barcode'
]
def warte_auf_usb_stick(udev_context):
monitor = pyudev.Monitor.from_netlink(udev_context)
monitor.filter_by("block")
for device in iter(monitor.poll, None):
print(device.action)
if "ID_FS_TYPE" in device and device.action == "add":
name_of_stick = Path(device.get("ID_FS_LABEL"))
print(device.action, name_of_stick)
time.sleep(2)
return name_of_stick
raise AssertionError("unreachable code")
def anzahl_drucke_dokumentieren(connection, name_of_stick, dateiname_roh):
dateiname_ziel = (
MEDIA_PFAD
/ name_of_stick
/ dateiname_roh.with_name(
f"{dateiname_roh.stem}_{DateTime.now():%Y-%m-%d_%H_%M}.csv"
)
)
pd.read_sql(SQL_OUT, connection).to_csv(
dateiname_ziel, sep=";", decimal=",", index=False
)
def config_arbeitsdb(connection, name_of_stick):
config_datei = MEDIA_PFAD / name_of_stick / CONFIG_ON_STICK
#
# TODO Auf die Spalten einschränken die tatsächlich benötigt werden.
#
datensaetze = pd.read_csv(
config_datei,
usecols=USED_COLS,
na_values="x",
quotechar='"',
sep=";",
encoding="utf-8",
decimal=",",
dtype={"ID": int, "E_St": int, "ISPT_Nr": str, "Barcode": str},
)
if len(datensaetze) != 8:
print("es waren keine 8 Datensätze")
else:
datensaetze.to_sql(
"knopfdaten", connection, if_exists="replace", index=False
)
sql_ident = "insert or ignore into numbers(ID) select ID from knopfdaten;"
connection.execute(sql_ident)
def auswerfen(name_of_stick):
subprocess.run(
["umount", MEDIA_PFAD / name_of_stick],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
def main():
udev_context = pyudev.Context()
db_engine = create_engine(f"sqlite:///{DATABASE}", encoding="utf-8")
while True:
name_of_stick = warte_auf_usb_stick(udev_context)
try:
config_arbeitsdb(db_engine, name_of_stick)
anzahl_drucke_dokumentieren(
db_engine, name_of_stick, CSV_DATEI_TO_STICK
)
except SQLAlchemyError as error:
print(error)
except ValueError as error:
print(error)
except OSError as error:
print("Fehler beim kopieren:", error)
try:
auswerfen(name_of_stick)
except subprocess.CalledProcessError as error:
print(f"Fehler {error.returncode} beim Auswerfen: {error.stdout}")
if __name__ == "__main__":
main()
Code: Alles auswählen
#!/usr/bin/env python3
# -*- coding: utf8 -*-
import sqlite3
from sqlite3 import Error
from pathlib import Path
import csv
import tkinter as tk
from functools import partial
from contextlib import closing
PFAD = Path.home() / ".DruckData"
BARCODE_DB_FILENAME = PFAD / "sqlite/db"
DATABASE = BARCODE_DB_FILENAME / "config8.db"
ORT = "PZ Dingsbums 14"
CHECKPOINT = "42"
SQL_UPDATE_AKTUELLE_NR = """UPDATE numbers
SET letzte_nr = (letzte_nr + 1) % 10000
WHERE id = ?"""
SQL_SELECT_KNOPFDATEN = """SELECT knd.ISPT_Nr,
num.Letzte_Nr
FROM knopfdaten as knd,
numbers as num
WHERE knd.ID = ?
AND knd.ID = num.ID"""
def save_row_as_file(count, row, first_line):
text = "\n".join(
[
"^XA",
"^FO15,90^GB780,0,8,^FS",
"^FO15,250^GB780,0,8,^FS",
"^FO15,700^GB780,0,8,^FS",
"^FO0,0^GB600,200,2",
"^FO15,20^GB780,785,4^FS",
"^FO0,40^A0,50,50^FB800,1,0,C^FD",
first_line,
"^FS^FO0,110^A0,60,60^FB800,1,0,C^FD",
str(row[1]),
"^FS^FO0,190^A0,70,70^FB800,1,0,C^FD",
str(row[2]),
"^FS^FO0,80^BY3",
"^BCN,170,Y,N,N",
"^FO165,270^BY4^FD",
str(count),
"^FS^FO0,500^A0,60,50^FB800,,0,C^FD",
str(row[3]),
"^FS^FO0,580^A0,60,50^FB800,,0,C^FD",
str(row[4]),
"^FS^FO0,730^A0,60,60^FB800,1,0,C^FD",
str(row[5]),
"^FS",
"^XZ",
"",
]
)
PFAD.mkdir(exist_ok=True)
(PFAD / f"testT_{row[3]}_{row[1]}.zpl").write_text(text, "utf-8")
def lade_daten(conn):
curs2 = conn.cursor()
config_sql = "select ID, E_St, Zeile3, Zeile5, Zeile6, Zeile7 from knopfdaten;"
curs2.execute(config_sql)
configdaten = curs2.fetchall()
return configdaten
def zaehl_ausdrucke(connection, id):
with closing(connection.cursor()) as cursor:
cursor.execute(SQL_UPDATE_AKTUELLE_NR, [id])
cursor.execute(SQL_SELECT_KNOPFDATEN, [id])
ispt_nr, letzte_nr = cursor.fetchone()
barcode = f"{ispt_nr}{CHECKPOINT}{letzte_nr}"
cursor.execute("update knopfdaten SET Barcode = ? where id = ?", (barcode, id))
print(barcode, letzte_nr)
connection.commit()
return barcode
def on_click(row, conn):
count = zaehl_ausdrucke(conn, row[0])
print(count)
save_row_as_file(count, row, ORT)
def main():
BARCODE_DB_FILENAME.mkdir(parents=True, exist_ok=True)
try:
conn = sqlite3.connect(f"{DATABASE}")
except Error as e:
print(e)
configdaten = lade_daten(conn) #lädt die Daten aus der csv nach
root = tk.Tk()
root.title("Auswahl der Label")
root.config(background="#f2c618")
button_frame = tk.Frame(root, width=1200, height=400)
button_frame.grid(row=0, column=0, padx=10, pady=3)
for index, entry in enumerate(configdaten):
row_index, column_index = divmod(index, 4)
tk.Button(
button_frame,
text="{}\n{}".format(entry[1], entry[2]),
bg="#f2c618",
width=15,
height=10,
command=partial(on_click, entry, conn),
).grid(row=row_index, column=column_index, padx=0, pady=0)
root.mainloop()
if __name__ == '__main__':
main()
ToDo ist das Ganze zusammenzubringen und vor allem ein Fehlermeldefenster und eine Möglichkeit den Raspi auszuschalten ohne einfach den Stecker zu ziehen.
Der hat ja nur einen Touchscreen.
With a little help...
Hab ich jetzt herausgefunden, wie ich den Rechner runterfahren wenn ich einen Knopf länger drücke und auch wie ich drucke mit subprocess()
Was ich jetzt nicht hab, ist die Anbindung dieses Teils an sqlalchemy. Wobei ich das bisher "nur" für das USB-Teil brauche.
Was mir vermutlich hier noch abgeht ist die Fehlerbehandlung. Aber so richtig weiß ich nicht, was das Progg eigentlich tun soll, wenn der Printer nicht druckt. Das schiebt den Befehl ja bloß durch ans System.
Zudem habe ich mir überlegt, ob ich nicht gleich einfach den rohen Text an lp übergeben kann. Der Labeldrucker macht aus dem Kauderwelsch schöne Label wenns von der Datei kommt, warum nicht auch vom Text.
Nur da hab ich vermutlich einfach zu wenig Ahnung...
Hier mal der Code.
Vielleicht ist ja was dabei was halbwegs passt.
Hab ich jetzt herausgefunden, wie ich den Rechner runterfahren wenn ich einen Knopf länger drücke und auch wie ich drucke mit subprocess()
Was ich jetzt nicht hab, ist die Anbindung dieses Teils an sqlalchemy. Wobei ich das bisher "nur" für das USB-Teil brauche.
Was mir vermutlich hier noch abgeht ist die Fehlerbehandlung. Aber so richtig weiß ich nicht, was das Progg eigentlich tun soll, wenn der Printer nicht druckt. Das schiebt den Befehl ja bloß durch ans System.
Zudem habe ich mir überlegt, ob ich nicht gleich einfach den rohen Text an lp übergeben kann. Der Labeldrucker macht aus dem Kauderwelsch schöne Label wenns von der Datei kommt, warum nicht auch vom Text.
Nur da hab ich vermutlich einfach zu wenig Ahnung...
Hier mal der Code.
Vielleicht ist ja was dabei was halbwegs passt.
Code: Alles auswählen
#!/usr/bin/env python3
# -*- coding: utf8 -*-
import sqlite3
from sqlite3 import Error
from pathlib import Path
import csv
import tkinter as tk
from tkinter import messagebox
from functools import partial
from contextlib import closing
import time
from datetime import datetime as DateTime
import subprocess
PFAD = Path.home() / ".DruckData"
BARCODE_DB_FILENAME = PFAD / "sqlite/db"
DATABASE = BARCODE_DB_FILENAME / "config8.db"
ORT = "PZ Dingsbums 14"
CHECKPOINT = "42"
SQL_UPDATE_AKTUELLE_NR = """UPDATE numbers
SET letzte_nr = (letzte_nr + 1) % 10000
WHERE id = ?"""
SQL_SELECT_KNOPFDATEN = """SELECT knd.ISPT_Nr,
num.Letzte_Nr
FROM knopfdaten as knd,
numbers as num
WHERE knd.ID = ?
AND knd.ID = num.ID"""
def save_row_as_file(count, row, first_line):
text = "\n".join(
[
"^XA",
"^FO15,90^GB780,0,8,^FS",
"^FO15,250^GB780,0,8,^FS",
"^FO15,700^GB780,0,8,^FS",
"^FO0,0^GB600,200,2",
"^FO15,20^GB780,785,4^FS",
"^FO0,40^A0,50,50^FB800,1,0,C^FD",
first_line,
"^FS^FO0,110^A0,60,60^FB800,1,0,C^FD",
str(row[1]),
"^FS^FO0,190^A0,70,70^FB800,1,0,C^FD",
str(row[2]),
"^FS^FO0,80^BY3",
"^BCN,170,Y,N,N",
"^FO165,270^BY4^FD",
str(count),
"^FS^FO0,500^A0,60,50^FB800,,0,C^FD",
str(row[3]),
"^FS^FO0,580^A0,60,50^FB800,,0,C^FD",
str(row[4]),
"^FS^FO0,730^A0,60,60^FB800,1,0,C^FD",
str(row[5]),
"^FS",
"^XZ",
"",
]
)
PFAD.mkdir(exist_ok=True)
datei_zum_druck = PFAD / f"testT_{row[3]}_{row[1]}.zpl"
(datei_zum_druck).write_text(text, "utf-8")
#hier bin ich mir nicht sicher, ob es nicht schlauer wäre
#auf das speichern zu verzichten und die Datei gleich mit
#subporcess an den Drucker zu schicken. Wenn das geht...
return datei_zum_druck
def lade_daten(conn):
curs2 = conn.cursor()
config_sql = "select ID, E_St, Zeile3, Zeile5, Zeile6, Zeile7 from knopfdaten;"
curs2.execute(config_sql)
configdaten = curs2.fetchall()
return configdaten
def zaehl_ausdrucke(connection, id):
with closing(connection.cursor()) as cursor:
cursor.execute(SQL_UPDATE_AKTUELLE_NR, [id])
cursor.execute(SQL_SELECT_KNOPFDATEN, [id])
ispt_nr, letzte_nr = cursor.fetchone()
barcode = f"{ispt_nr}{CHECKPOINT}{letzte_nr}"
cursor.execute("update knopfdaten SET Barcode = ? where id = ?", (barcode, id))
print(barcode, letzte_nr)
connection.commit()
return barcode
def datei_drucken(druckdatei):
print(f"{druckdatei}")
subprocess.run(['lp' , druckdatei])
class MainWindow(tk.Tk):
def __init__(self, conn, configdaten):
super().__init__()
self.title("Auswahl der Label")
self["background"] = "#f2c618"
button_frame = tk.Frame(self, width=1200, height=400)
button_frame.grid(row=0, column=0, padx=10, pady=3)
for index, entry in enumerate(configdaten):
row_index, column_index = divmod(index, 4)
tk.Button(
button_frame,
text="{}\n{}".format(entry[1], entry[2]),
bg="#f2c618",
width=15,
height=10,
command=partial(self.on_click, entry, conn),
).grid(row=row_index, column=column_index, padx=0, pady=0)
self.bind('<ButtonPress-1>', self.start_druck)
self.bind('<ButtonRelease-1>', self.stopp_druck)
self.knopf_drueck_zeit = None
def start_druck(self, event):
self.knopf_drueck_zeit = time.monotonic()
print("start_druck", self.knopf_drueck_zeit)
def stopp_druck(self, event):
knopf_loslass_zeit = time.monotonic()
print("stopp_druck", knopf_loslass_zeit)
if knopf_loslass_zeit - self.knopf_drueck_zeit >5:
print("schalte jetzt aus")
self.ExitApplication()
def ExitApplication(self):
MsgBox = tk.messagebox.askokcancel(
title="Ausschalten",
message="Soll der Rechner ausgeschaltet werden?")
print(MsgBox)
if MsgBox == True:
self.destroy()
else:
tk.messagebox.showinfo(
'Zurück',
'Das Hauptfenster wird wieder gezeigt'
)
def on_click(self, row, conn):
count = zaehl_ausdrucke(conn, row[0])
print(count)
datei_zum_druck = save_row_as_file(count, row, ORT)
datei_drucken(datei_zum_druck)
def main():
BARCODE_DB_FILENAME.mkdir(parents=True, exist_ok=True)
print(f"{BARCODE_DB_FILENAME}/config8.db")
try:
conn = sqlite3.connect(f"{DATABASE}")
except Error as e:
print(e)
configdaten = lade_daten(conn) #lädt die Daten aus der csv nach
print(configdaten)
root = MainWindow(conn, configdaten)
root.mainloop()
if __name__ == '__main__':
main()
Warum sollte man nicht direkt drucken können?
Code: Alles auswählen
subprocess.run(["lp", "-"], input=text.encode('utf-8'), check=True)
Jetzt bin ich vor dem nächsten Problem. Dooferweise habe ich ja die beiden Programmteile unabhängig von einander gebaut. Jetzt will ich die zusammenbauen, aber wie stell ich das an, ohne dass ich dadurch wieder Fehler ins Programm dazu hole.
Prinzipiell könnte ich hergehen und den einen Teil mit import in den anderen holen - das soll man dann ja nicht, habe ich gelesen. Gut, ich habs natürlich trotzdem versucht und bin gescheitert. (erster Anlauf, hab ich nicht anders erwartet.)
Dann könnte ich die beiden zusammenkopieren.
Aber...
Ich sehe, dass ich da noch Vorbereitung brauche. Hat jemand ein Stichwort für mich, wonach ich suchen muss?
Danke.
Prinzipiell könnte ich hergehen und den einen Teil mit import in den anderen holen - das soll man dann ja nicht, habe ich gelesen. Gut, ich habs natürlich trotzdem versucht und bin gescheitert. (erster Anlauf, hab ich nicht anders erwartet.)
Dann könnte ich die beiden zusammenkopieren.
Aber...
Ich sehe, dass ich da noch Vorbereitung brauche. Hat jemand ein Stichwort für mich, wonach ich suchen muss?
Danke.
Warum willst Du jetzt beide Teile in einem haben? Die beiden haben doch im Moment nichts miteinander zu tun, außer, dass sie die selbe Datenbank benutzen.
Ansonsten ist der richtige Weg, beide Programmteile als Modul zu implementieren, diese in einem Hauptprogramm zu importieren und dort richtig aufrufen.
Ansonsten ist der richtige Weg, beide Programmteile als Modul zu implementieren, diese in einem Hauptprogramm zu importieren und dort richtig aufrufen.
Äh.
An den Weg hatte ich jetzt gar nicht gedacht.
Wie schon erwähnt, hatte ich, sogar hier drin, gelesen, dass das in Python so üblich sei.
Das mit einem dritten Modul zu lösen hätte ich mich jetzt schon gar nicht zu denken gewagt.
Wird später gleich ausprobiert.
Ach ja, das eine Teil hat vielleicht schon was mit dem anderen zu tun. Wenn der Stick dann erfolgreich ausgelesen wurde, dann sollte das Knöpfemodul neu gestartet werden sonst werden da ja noch die alten Daten angezeigt.
Aber das kann man sicher dann in dem Hauptmodul machen.
Mal sehen ob ich das hinkrieg.
Danke für den Tipp.
An den Weg hatte ich jetzt gar nicht gedacht.
Wie schon erwähnt, hatte ich, sogar hier drin, gelesen, dass das in Python so üblich sei.
Das mit einem dritten Modul zu lösen hätte ich mich jetzt schon gar nicht zu denken gewagt.
Wird später gleich ausprobiert.
Ach ja, das eine Teil hat vielleicht schon was mit dem anderen zu tun. Wenn der Stick dann erfolgreich ausgelesen wurde, dann sollte das Knöpfemodul neu gestartet werden sonst werden da ja noch die alten Daten angezeigt.
Aber das kann man sicher dann in dem Hauptmodul machen.
Mal sehen ob ich das hinkrieg.
Danke für den Tipp.
Also, das hat geklappt. Hab die Dateien so umbenannt, dass die Versionsnummern hinten weg sind und einfach mit dem Code hier aufgerufen.
Jetzt habe ich noch eine Infobox in den Code vom USB-Teil mit eingebaut und wenn das abgearbeitet ist, dann sollte mir das dann idealerweise einen Wert zurückgeben der dann den Neustart der Knopfanwendung auslöst.
Da hab ich noch zu knobeln.
Code: Alles auswählen
import configusbdb as cfg
import knopfdruck as knd
from tkinter import messagebox
def main():
cfg.main()
knd.main()
if __name__ == '__main__':
main()
Da hab ich noch zu knobeln.
Jetzt bin ich doch ein wenig arg verwirrt.
Habe ich bei dem USB-Modul die Fehlermeldungen die ich schon habe in die Infobox gepackt. Das funktioniert auch. Aber, jetzt kommt beim Aufruf dieses Teils was merkwürdiges.
Wenn ich das so aufrufe wie es vorher funktioniert hat:
Dann startet entweder der eine Teil, hier der USB-Import/Konfiguration, der andere schweigt.
Wenn ich die Befehle umdrehe, startet die Knopfoberfläche und das USB-Teil schweigt.
Das dürfte doch wohl kaum an der Infobox liegen?
Und die beiden Module allein funktionieren ja auch.
# großes Rätsel geworden.
Habe ich bei dem USB-Modul die Fehlermeldungen die ich schon habe in die Infobox gepackt. Das funktioniert auch. Aber, jetzt kommt beim Aufruf dieses Teils was merkwürdiges.
Wenn ich das so aufrufe wie es vorher funktioniert hat:
Code: Alles auswählen
import configusbdb as cfg
import knopfdruck as knd
def main():
cfg.main()
knd.main()
if __name__ == '__main__':
main()
Wenn ich die Befehle umdrehe, startet die Knopfoberfläche und das USB-Teil schweigt.
Das dürfte doch wohl kaum an der Infobox liegen?
Und die beiden Module allein funktionieren ja auch.
Code: Alles auswählen
#!/usr/bin/env python3
import subprocess
import time
from datetime import datetime as DateTime
from pathlib import Path
import pandas as pd
import pyudev
from sqlalchemy import create_engine
from sqlalchemy.exc import SQLAlchemyError
import tkinter as tk
from tkinter import messagebox
PFAD = Path.home() / ".DruckData"
MEDIA_PFAD = Path("/media/earl/")
PFAD = Path.home() / ".DruckData"
DATABASE = PFAD / "sqlite/db/config8.db"
CONFIG_ON_STICK = Path("TB_Ausgabe_8iii.txt")
CSV_DATEI_TO_STICK = Path("gedruckte_nummern.csv")
SQL_OUT = """select distinct
knopfdaten.ID,
knopfdaten.E_St,
knopfdaten.ISPT_Nr,
numbers.Letzte_Nr
from knopfdaten,
numbers
where
numbers.ID = knopfdaten.ID"""
USED_COLS = [
'ID',
'E_St',
'Zeile3',
'Zeile5',
'Zeile6',
'Zeile7',
'Zeilex',
'ISPT_Nr',
'HallenPos',
'Aktuell_Nr',
'Barcode'
]
def warte_auf_usb_stick(udev_context):
monitor = pyudev.Monitor.from_netlink(udev_context)
monitor.filter_by("block")
for device in iter(monitor.poll, None):
print(device.action)
if "ID_FS_TYPE" in device and device.action == "add":
name_of_stick = Path(device.get("ID_FS_LABEL"))
print(device.action, name_of_stick)
time.sleep(2)
return name_of_stick
raise AssertionError("unreachable code") #auch noch nicht 100% klar, trotz der Erklärung. habe ich noch nie in "Aktion" gesehen
def anzahl_drucke_dokumentieren(connection, name_of_stick, dateiname_roh):
dateiname_ziel = (
MEDIA_PFAD
/ name_of_stick
/ dateiname_roh.with_name(
f"{dateiname_roh.stem}_{DateTime.now():%Y-%m-%d_%H_%M}.csv"
)
)
pd.read_sql(SQL_OUT, connection).to_csv(
dateiname_ziel, sep=";", decimal=",", index=False
)
def config_arbeitsdb(connection, name_of_stick):
config_datei = MEDIA_PFAD / name_of_stick / CONFIG_ON_STICK
datensaetze = pd.read_csv(
config_datei,
usecols=USED_COLS,
na_values="x",
quotechar='"',
sep=";",
encoding="utf-8",
decimal=",",
dtype={"ID": int, "E_St": int, "ISPT_Nr": str, "Barcode": str},
)
if len(datensaetze) != 8:
print("es waren keine 8 Datensätze")
antwort = "es waren keine 8 Datensätze"
else:
datensaetze.to_sql(
"knopfdaten", connection, if_exists="replace", index=False
)
sql_ident = "insert or ignore into numbers(ID) select ID from knopfdaten;"
connection.execute(sql_ident)
antwort = "Daten erfolgreich übertragen, \n Programm startet neu" #das funktionierte in der infobox
return antwort
def auswerfen(name_of_stick):
subprocess.run(
["umount", MEDIA_PFAD / name_of_stick],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
def erfolgsmeldung(ergebnis, icon): #den code habe ich so wie er ist gefunden. Hier bin ich an dem Fenster das übrigblieb von der Infobox fast verzweifelt
window = tk.Tk() #ich konnte keine so rechte Erklärung finden, warum hier noch zu der Box ein Fenster aufgeht, so verschwindet das wenigstens.
window.eval('tk::PlaceWindow %s center' % window.winfo_toplevel())
window.withdraw
messagebox.showinfo(message=ergebnis, icon=icon)
window.deiconify()
window.destroy()
window.quit()
##def neustart(): #das war ein Versuch, das Modul nach erfolgter Aktualisierung neu zu laden.
## knd.main() #das hat auch "eigentlich" funktioniert, aber nicht so wie gedacht.
def main():
udev_context = pyudev.Context()
db_engine = create_engine(f"sqlite:///{DATABASE}", encoding="utf-8")
while True:
name_of_stick = warte_auf_usb_stick(udev_context)
try:
ergebnis = config_arbeitsdb(db_engine, name_of_stick)
anzahl_drucke_dokumentieren(
db_engine, name_of_stick, CSV_DATEI_TO_STICK
)
####
print(ergebnis)
erfolgsmeldung(ergebnis, 'info')
#neustart()
except SQLAlchemyError as error:
print(error)
erfolgsmeldung(error, 'error')
except ValueError as error:
print(error)
erfolgsmeldung(error, 'error')
except OSError as error:
print("Fehler beim kopieren:", error)
erfolgsmeldung(error, 'error')
try:
auswerfen(name_of_stick)
except subprocess.CalledProcessError as error:
print(f"Fehler {error.returncode} beim Auswerfen: {error.stdout}")
error_lang = f"Fehler {error.returncode} beim Auswerfen: {error.stdout}"
erfolgsmeldung(error_lang, 'error')
if __name__ == "__main__": #das Teil hier unten ist mir - grade wegen des Beispiels in der Dokumentation im Zusammenhang mit dem Aufruf von Modulen ein
main()
Das sollte Dich eigentlich nicht verwirren, den ein beiden `main`-Funktionen hast Du Endlosschleifen.
Das warten_auf_usb-Stick mußt Du in einen Thread packen, und regelmäßig in der GUI abfragen, ob eine USB-Stick eingesteckt worden ist, und dann kannst Du dort auch das Kopieren erledigen.
Das warten_auf_usb-Stick mußt Du in einen Thread packen, und regelmäßig in der GUI abfragen, ob eine USB-Stick eingesteckt worden ist, und dann kannst Du dort auch das Kopieren erledigen.
Gute Idee. Funktioniert schon mal nach einer Seite hin.
Hast du noch so einen guten Tipp, wie ich dann die neue Konfiguration in die Oberfläche schaufle?
Das Laden der Daten auch in einen Thread der das von der anderen Seite überwacht?
Oder kann ich von dem einen Modul in dem anderen Modul das Mainwindow zerstören?
Hast du noch so einen guten Tipp, wie ich dann die neue Konfiguration in die Oberfläche schaufle?
Das Laden der Daten auch in einen Thread der das von der anderen Seite überwacht?
Oder kann ich von dem einen Modul in dem anderen Modul das Mainwindow zerstören?
Eigentlich hast Du ja nur eine Oberfläche und eine Datenbank, und verschiedene Dinge, die im Hintergrund laufen. Erstens, laden von Daten und zweitens Drucken.
Ob man das jetzt auf verschiedene Module (vier Stück) aufteilt, oder alles in einer Datei stehen hat, ist erst einmal nebensächlich.
Mainwindow zerstören ist auch gar nicht nötig, sondern nur Aktualisieren.
Ob man das jetzt auf verschiedene Module (vier Stück) aufteilt, oder alles in einer Datei stehen hat, ist erst einmal nebensächlich.
Mainwindow zerstören ist auch gar nicht nötig, sondern nur Aktualisieren.
Ja, das habe ich jetzt eine ganze Weile probiert.Mainwindow zerstören ist auch gar nicht nötig, sondern nur Aktualisieren.
Mein Problem ist, dass ich das Laden der Daten nicht in die Klasse mit reingebracht hab. Die Funkion dafür ist außerhalb und benötigt die Connection.
Wenn ich jetzt versuche das MainWindow mit update() von einem anderen Modul aufzurufen geht ihm entweder "self" ab oder die Connection.
Blick da jetzt nicht mehr so wirklich durch, denn die Connection kriegt es ja in Main zugewiesen.
Nachtrag.
Es ist verhext. Jetzt kann ich von dem einen Modul wenn der Stick eingesteckt wird die Ladefunktion des anderen Moduls starten ohne die Connection übergeben zu müssen, aber das MainWindow aktualisieren? Das will mir nicht gelingen.
Heut geh ich erst mal ins Bett. Wird gescheiter sein.
Es ist verhext. Jetzt kann ich von dem einen Modul wenn der Stick eingesteckt wird die Ladefunktion des anderen Moduls starten ohne die Connection übergeben zu müssen, aber das MainWindow aktualisieren? Das will mir nicht gelingen.
Heut geh ich erst mal ins Bett. Wird gescheiter sein.
Hab jetzt noch mal versucht - ich finde einfach keine Möglichkeit das update() so unterzubringen dass es auch ausgeführt wird.
Also, ich hab jetzt die beiden Dateien und eine kleine Testdatei dazu gebaut, dass ich nicht immer den USB-Stick einstecken muss.
Das hat einen timer der nach ein paar Sekunden den Befehl in das andere Modul rüberschickt. Währenddessen aktualisiere ich, dass ich was seh, die DB manuell. Ist umständlich aber einfacher als die Sache mit dem Stick.
Wie auch immer, das muss ja so gesehen beides funktionieren.
Tut aber nichts. Und ich suche nebenbei immer wieder mal, finde zig Seiten, aber ich kann das nicht auf meine Knöpfe anwenden.
Bin für jeden Tipp dankbar.
Also, ich rufe mit dem Modul das Knopfdruckmodul auf: (könnte ich mir vermutlich sparen, weil ja der Thread im Knopfdruckmodul gestartet wird.)
Dann kommt das Test-Modul:
Hier ist das Modul das dann letztlich das Update vom MainWindow bräuchte.
Das USB-Teil:
Also, ich hab jetzt die beiden Dateien und eine kleine Testdatei dazu gebaut, dass ich nicht immer den USB-Stick einstecken muss.
Das hat einen timer der nach ein paar Sekunden den Befehl in das andere Modul rüberschickt. Währenddessen aktualisiere ich, dass ich was seh, die DB manuell. Ist umständlich aber einfacher als die Sache mit dem Stick.
Wie auch immer, das muss ja so gesehen beides funktionieren.
Tut aber nichts. Und ich suche nebenbei immer wieder mal, finde zig Seiten, aber ich kann das nicht auf meine Knöpfe anwenden.
Bin für jeden Tipp dankbar.
Also, ich rufe mit dem Modul das Knopfdruckmodul auf: (könnte ich mir vermutlich sparen, weil ja der Thread im Knopfdruckmodul gestartet wird.)
Code: Alles auswählen
#!/usr/bin/env python3
# -*- coding: utf8 -*-
import knopfdruck as knd
def main():
knd.main()
if __name__ == '__main__':
main()
Code: Alles auswählen
#!/usr/bin/env python3
# -*- coding: utf8 -*-
import knopfdruck as knd
import time
def teste_aufruf():
time.sleep(10)
print("10 sekunden geschlafen")
knd.MainWindow.antwort_von_usb("komme vom Schläferfenster", "info")
def main():
teste_aufruf()
if __name__ == '__main__':
main()
Code: Alles auswählen
#!/usr/bin/env python3
# -*- coding: utf8 -*-
import sqlite3
from sqlite3 import Error
from pathlib import Path
#import csv #bleibt erst mal drin weil ggf. doch die Dateien gebraucht werden
import tkinter as tk
#from tkinter import messagebox
from functools import partial
from contextlib import closing
import time
#from datetime import datetime as DateTime #ggf. noch gebraucht
import subprocess
import threading
import configusbdb as uconf
import teste as test
PFAD = Path.home() / ".DruckData"
BARCODE_DB_FILENAME = PFAD / "sqlite/db"
DATABASE = BARCODE_DB_FILENAME / "config8.db"
ORT = "PZ Dingsbums 14"
CHECKPOINT = "42"
SQL_UPDATE_AKTUELLE_NR = """UPDATE numbers
SET letzte_nr = (letzte_nr + 1) % 10000
WHERE id = ?"""
SQL_SELECT_KNOPFDATEN = """SELECT knd.ISPT_Nr,
num.Letzte_Nr
FROM knopfdaten as knd,
numbers as num
WHERE knd.ID = ?
AND knd.ID = num.ID"""
def prepare_rows_for_print(count, row, first_line):
text = "\n".join(
[
"^XA",
"^FXUnicode:",
"^CI28",
"^FXBox oben:",
"^FO690,20^GB0,1150,8,^FS",
"^FXgroßeBox:",
"^FO15,20^GB790,1150,8^FS",
"^FXZwischenlinie",
"^FO495,20^GB0,1150,8,^FS",
"^FXZwischenlinie unten",
"^FO125,20^GB0,1150,8,^FS",
"^FO720,200^A0R,50,50^FB800,1,0,C^FD",
first_line,
"^FS^FO620,200^A0R,60,60^FB800,1,0,C^FD",
str(row[1]),
"^FS^FO500,200^A0R,95,95^FB800,1,0,C^FD",
str(row[2]),
"^FS^FO0,180^BY3",
"^BCR,90,Y,N,N",
"^FO385,370^BY4^FD",
str(count),
"^FS^FO260,200^A0R,70,70^FB800,,0,C^FD",
str(row[3]),
"^FS^FO120,200^A0R,60,60^FB800,2,0,C^FD",
str(row[4]),
"^FS^FO40,200^A0R,70,70^FB800,1,0,C^FD",
str(row[5]),
"^FS",
"^XZ",
""
]
)
#PFAD.mkdir(exist_ok=True) #ggf. noch gebraucht
#string_zum_druck = PFAD / f"testT_{row[3]}_{row[1]}.zpl"
#(string_zum_druck).write_text(text, "utf-8")
#hier bin ich mir nicht sicher, ob es nicht schlauer wäre
#auf das speichern zu verzichten und die Datei gleich mit
#subporcess an den Drucker zu schicken. Wenn das geht...
return text
def lade_daten():
try:
conn = sqlite3.connect(f"{DATABASE}")
except Error as e:
print(e)
curs2 = conn.cursor()
config_sql = "select ID, E_St, Zeile3, Zeile5, Zeile6, Zeile7 from knopfdaten;"
curs2.execute(config_sql)
configdaten = curs2.fetchall()
print("daten geladen")
return configdaten, conn
def zaehl_ausdrucke(connection, id):
with closing(connection.cursor()) as cursor:
cursor.execute(SQL_UPDATE_AKTUELLE_NR, [id])
cursor.execute(SQL_SELECT_KNOPFDATEN, [id])
ispt_nr, letzte_nr = cursor.fetchone()
barcode = f"{ispt_nr}{CHECKPOINT}{letzte_nr}"
cursor.execute("update knopfdaten SET Barcode = ? where id = ?", (barcode, id))
print(barcode, letzte_nr)
connection.commit()
return barcode
def datei_drucken(text):
subprocess.run(["lp", "-"], input=text.encode('utf-8'), check=True)
class MainWindow(tk.Tk):
def __init__(self, conn, configdaten):
super().__init__()
self.title("Auswahl der Label")
self["background"] = "#f2c618"
button_frame = tk.Frame(self, width=1400, height=600)
button_frame.grid(row=0, column=0, padx=0, pady=0)
for index, entry in enumerate(configdaten):
row_index, column_index = divmod(index, 4)
tk.Button(
button_frame,
text="{}\n{}\n{}".format(entry[1], entry[2], entry[3]),
bg="#f2c618",
width=22,
height=11,
command=partial(self.on_click, entry, conn),
).grid(row=row_index, column=column_index, padx=0, pady=0)
self.bind('<ButtonPress-1>', self.start_druck)
self.bind('<ButtonRelease-1>', self.stopp_druck)
self.knopf_drueck_zeit = None
def start_druck(self, event):
self.knopf_drueck_zeit = time.monotonic()
print("start_druck", self.knopf_drueck_zeit)
def stopp_druck(self, event):
knopf_loslass_zeit = time.monotonic()
print("stopp_druck", knopf_loslass_zeit)
if knopf_loslass_zeit - self.knopf_drueck_zeit >5:
print("schalte jetzt aus")
self.ExitApplication()
def ExitApplication(self):
MsgBox = tk.messagebox.askokcancel(
title="Ausschalten",
message="Soll der Rechner ausgeschaltet werden?")
print(MsgBox)
if MsgBox == True:
self.destroy()
else:
tk.messagebox.showinfo(
'Zurück',
'Das Hauptfenster wird wieder gezeigt'
)
def fensterfrisch():
print("hier war ich im fensterfrisch")
lade_daten()
#MainWindow.update()
def antwort_von_usb(antwort, icon):
print("vom anderen Teil")
tk.messagebox.showinfo(
message=antwort,
icon=icon)
MainWindow.fensterfrisch()
def on_click(self, row, conn):
count = zaehl_ausdrucke(conn, row[0])
print(count)
string_zum_druck = prepare_rows_for_print(count, row, ORT)
datei_drucken(string_zum_druck)
def main():
BARCODE_DB_FILENAME.mkdir(parents=True, exist_ok=True)
print("starte main")
configdaten, conn = lade_daten() #lädt die Daten aus der csv nach
print(configdaten)
root = MainWindow(conn, configdaten)
faden = threading.Thread(target=uconf.main)
faden.start()
faden2 = threading.Thread(target=test.main)
faden2.start()
root.mainloop()
if __name__ == '__main__':
configdaten, conn = lade_daten() #lädt die Daten aus der csv nach
main()
Code: Alles auswählen
#!/usr/bin/env python3
import subprocess
import time
from datetime import datetime as DateTime
from pathlib import Path
import pandas as pd
import pyudev
from sqlalchemy import create_engine
from sqlalchemy.exc import SQLAlchemyError
import tkinter as tk
from tkinter import messagebox
import knopfdruck as knd
import threading
PFAD = Path.home() / ".DruckData"
MEDIA_PFAD = Path("/media/earl/")
PFAD = Path.home() / ".DruckData"
DATABASE = PFAD / "sqlite/db/config8.db"
CONFIG_ON_STICK = Path("TB_Ausgabe_8iii.txt")
CSV_DATEI_TO_STICK = Path("gedruckte_nummern.csv")
SQL_OUT = """select distinct
knopfdaten.ID,
knopfdaten.E_St,
knopfdaten.ISPT_Nr,
numbers.Letzte_Nr
from knopfdaten,
numbers
where
numbers.ID = knopfdaten.ID"""
USED_COLS = [
'ID',
'E_St',
'Zeile3',
'Zeile5',
'Zeile6',
'Zeile7',
'Zeilex',
'ISPT_Nr',
'HallenPos',
'Aktuell_Nr',
'Barcode'
]
def warte_auf_usb_stick(udev_context):
monitor = pyudev.Monitor.from_netlink(udev_context)
monitor.filter_by("block")
for device in iter(monitor.poll, None):
print(device.action)
if "ID_FS_TYPE" in device and device.action == "add":
name_of_stick = Path(device.get("ID_FS_LABEL"))
print(device.action, name_of_stick)
time.sleep(2)
return name_of_stick
raise AssertionError("unreachable code")
def anzahl_drucke_dokumentieren(connection, name_of_stick, dateiname_roh):
dateiname_ziel = (
MEDIA_PFAD
/ name_of_stick
/ dateiname_roh.with_name(
f"{dateiname_roh.stem}_{DateTime.now():%Y-%m-%d_%H_%M}.csv"
)
)
pd.read_sql(SQL_OUT, connection).to_csv(
dateiname_ziel, sep=";", decimal=",", index=False
)
def config_arbeitsdb(connection, name_of_stick):
config_datei = MEDIA_PFAD / name_of_stick / CONFIG_ON_STICK
#
# TODO Auf die Spalten einschränken die tatsächlich benötigt werden.
#
datensaetze = pd.read_csv(
config_datei,
usecols=USED_COLS,
na_values="x",
quotechar='"',
sep=";",
encoding="utf-8",
decimal=",",
dtype={"ID": int, "E_St": int, "ISPT_Nr": str, "Barcode": str},
)
if len(datensaetze) != 8:
print("es waren keine 8 Datensätze")
antwort = "es waren keine 8 Datensätze"
else:
datensaetze.to_sql(
"knopfdaten", connection, if_exists="replace", index=False
)
sql_ident = "insert or ignore into numbers(ID) select ID from knopfdaten;"
connection.execute(sql_ident)
antwort = "Daten erfolgreich übertragen, \n Programm startet neu"
return antwort
def auswerfen(name_of_stick):
subprocess.run(
["umount", MEDIA_PFAD / name_of_stick],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
def erfolgsmeldung(ergebnis, icon):
knd.MainWindow.antwort_von_usb(ergebnis, icon)
## window = tk.Tk()
## window.eval('tk::PlaceWindow %s center' % window.winfo_toplevel())
## window.withdraw
## messagebox.showinfo(message=ergebnis, icon=icon)
## window.deiconify()
## window.destroy()
## window.quit()
def neuladen_config():
print("schicke zu knd")
knd.MainWindow.fensterfrisch()
def main():
udev_context = pyudev.Context()
db_engine = create_engine(f"sqlite:///{DATABASE}", encoding="utf-8")
while True:
name_of_stick = warte_auf_usb_stick(udev_context)
try:
ergebnis = config_arbeitsdb(db_engine, name_of_stick)
anzahl_drucke_dokumentieren(
db_engine, name_of_stick, CSV_DATEI_TO_STICK
)
####
print(ergebnis)
erfolgsmeldung(ergebnis, 'info')
neuladen_config()
except SQLAlchemyError as error:
print(error)
erfolgsmeldung(error, 'error')
except ValueError as error:
print(error)
erfolgsmeldung(error, 'error')
except OSError as error:
print("Fehler beim kopieren:", error)
erfolgsmeldung(error, 'error')
try:
auswerfen(name_of_stick)
except subprocess.CalledProcessError as error:
print(f"Fehler {error.returncode} beim Auswerfen: {error.stdout}")
error_lang = f"Fehler {error.returncode} beim Auswerfen: {error.stdout}"
erfolgsmeldung(error_lang, 'error')
if __name__ == "__main__":
main()