Hab in dem Code mal bei jeder einzelnen Funktion ein Print reingeschrieben um zu sehen wo der Fehler ausgelöst wurde, dabei kam dann eine Tracebackmeldung.
Code: Alles auswählen
loadconfig
lade daten
bin in der loop vom usb
warte auf usb
remove
remove
add
add
add OLEBIRD
warte auf usb
copyFile <sqlite3.Connection object at 0x7fd692626b90> OLEBIRD
<sqlite3.Connection object at 0x7fd692626b90>
config arbeitsdatei
Exception in Tkinter callback
Traceback (most recent call last):
File "/home/earl/projekt/Zusammen2.05/alles5.01.py", line 117, in config_arbeitsdb
cursor.execute("insert or ignore into numbers(ID) select ID from knopfdaten")
AttributeError: 'builtin_function_or_method' object has no attribute 'execute'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/earl/projekt/Zusammen2.05/alles5.01.py", line 135, in copy_files
ergebnis = config_arbeitsdb(db_engine, name_of_stick)
File "/home/earl/projekt/Zusammen2.05/alles5.01.py", line 117, in config_arbeitsdb
cursor.execute("insert or ignore into numbers(ID) select ID from knopfdaten")
File "/usr/lib/python3.6/contextlib.py", line 185, in __exit__
self.thing.close()
AttributeError: 'builtin_function_or_method' object has no attribute 'close'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/lib/python3.6/tkinter/__init__.py", line 1705, in __call__
return self.func(*args)
File "/usr/lib/python3.6/tkinter/__init__.py", line 749, in callit
func(*args)
File "/home/earl/projekt/Zusammen2.05/alles5.01.py", line 292, in queue_loop
copy_files(self.connection, name_of_stick)
File "/home/earl/projekt/Zusammen2.05/alles5.01.py", line 142, in copy_files
print("hier war das der Error beim Kopieren", message)
NameError: name 'message' is not defined
Es war die Zeile mit closing(connection.cursor)...
Das hatte ich in einem früheren Versuch die USB-Version auch schon mal. Hier geht das nicht mit dem cursor sondern mit der connection.execute.
Das hab ich jetzt mal umgesetzt und ich glaube, es geht.
Die prints muss ich noch rauswerfen und vielleicht noch was einbauen einen evtl. Fehler abzufangen, denn dafür war vermutlich das closing gedacht, oder?
So sieht es zumindest aus als würde alles klappen.
Danke für alles auf jeden Fall!!!
Code: Alles auswählen
#!/usr/bin/env python3
# -*- coding: utf8 -*-
#import csv
import queue
import threading
import sqlite3
import time
import subprocess
import tkinter as tk
from collections import namedtuple
from tkinter import messagebox
from functools import partial
from contextlib import closing
from pathlib import Path
from datetime import datetime as DateTime
import pandas as pd
#import usbconfig
import pyudev
PFAD = Path.home() / ".DruckData"
BARCODE_DB_FILENAME = PFAD / "sqlite/db"
DATABASE = BARCODE_DB_FILENAME / "config8.db"
MEDIA_PFAD = Path("/media/earl/")
CONFIG_ON_STICK = Path("TB_Ausgabe_8iii.txt")
CSV_DATEI_TO_STICK = Path("gedruckte_nummern.csv")
ORT = "PZ Dingsbums 14"
CHECKPOINT = "42"
SQL_UPDATE_AKTUELLE_NR = """UPDATE numbers
SET letzte_nr = (letzte_nr + 1) % 10000
WHERE id = ?"""
SQL_SELECT_KNOPFDATEN = """SELECT knd.ISPT_Nr,
num.Letzte_Nr
FROM knopfdaten as knd,
numbers as num
WHERE knd.ID = ?
AND knd.ID = num.ID"""
SQL_OUT = """select distinct
knopfdaten.ID,
knopfdaten.E_St,
knopfdaten.ISPT_Nr,
numbers.Letzte_Nr
from knopfdaten,
numbers
where
numbers.ID = knopfdaten.ID"""
USED_COLS = [
'ID',
'E_St',
'Zeile3',
'Zeile5',
'Zeile6',
'Zeile7',
'Zeilex',
'ISPT_Nr',
'HallenPos',
'Aktuell_Nr',
'Barcode'
]
def warte_auf_usb_stick(udev_context):
print("warte auf usb")
monitor = pyudev.Monitor.from_netlink(udev_context)
monitor.filter_by("block")
for device in iter(monitor.poll, None):
print(device.action)
if "ID_FS_TYPE" in device and device.action == "add":
name_of_stick = Path(device.get("ID_FS_LABEL"))
print(device.action, name_of_stick)
time.sleep(2)
return name_of_stick
raise AssertionError("unreachable code") # das soll auch nie in Aktion sein, wenn es richtig funktioniert
def usb_stick_loop(queue):
print("bin in der loop vom usb")
udev_context = pyudev.Context()
while True:
name_of_stick = warte_auf_usb_stick(udev_context)
queue.put(name_of_stick)
def anzahl_drucke_dokumentieren(connection, name_of_stick, dateiname_roh):
print("ducke doku")
dateiname_ziel = (
MEDIA_PFAD
/ name_of_stick
/ dateiname_roh.with_name(
f"{dateiname_roh.stem}_{DateTime.now():%Y-%m-%d_%H_%M}.csv"
)
)
pd.read_sql(SQL_OUT, connection).to_csv(
dateiname_ziel, sep=";", decimal=",", index=False
)
def config_arbeitsdb(connection, name_of_stick):
print("config arbeitsdatei")
config_datei = MEDIA_PFAD / name_of_stick / CONFIG_ON_STICK
datensaetze = pd.read_csv(
config_datei,
usecols=USED_COLS,
na_values="x",
quotechar='"',
sep=";",
encoding="utf-8",
decimal=",",
dtype={"ID": int, "E_St": int, "ISPT_Nr": str, "Barcode": str},
)
if len(datensaetze) != 8:
raise RuntimeError("es waren keine 8 Datensätze")
datensaetze.to_sql(
"knopfdaten", connection, if_exists="replace", index=False
)
with connection as conn:
conn.execute("insert or ignore into numbers(ID) select ID from knopfdaten")
return "Daten erfolgreich übertragen"
def auswerfen(name_of_stick):
print("werfe DS aus")
subprocess.run(
["umount", MEDIA_PFAD / name_of_stick],
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
def copy_files(connection, name_of_stick):
print("copyFile", connection, name_of_stick)
try:
db_engine = connection
print(db_engine)
ergebnis = config_arbeitsdb(db_engine, name_of_stick)
print(ergebnis)
messagebox.showinfo(message=ergebnis, icon='info')
anzahl_drucke_dokumentieren(
db_engine, name_of_stick, CSV_DATEI_TO_STICK
)
except Exception as error:
print("hier war das der Error beim Kopieren", message)
messagebox.showinfo(message=str(error), icon='error')
try:
auswerfen(name_of_stick)
except subprocess.CalledProcessError as error:
error_text = f"Fehler {error.returncode} beim Auswerfen: {error.stdout}"
messagebox.showinfo(message=str(error_text), icon='error')
# und bitte bessere Namen für die Spalten!
KnopfDaten = namedtuple("KnopfDaten", "id, e_st, zeile3, zeile5, zeile6, zeile7")
def lade_daten(connection):
print("lade daten")
with closing(connection.cursor()) as cursor:
cursor.execute(f"select {','.join(KnopfDaten._fields)} from knopfdaten")
return [KnopfDaten(*row) for row in cursor]
def prepare_rows_for_print(count, row, first_line):
return "\n".join([
"^XA",
"^FXUnicode:",
"^CI28",
"^FXBox oben:",
"^FO690,20^GB0,1150,8,^FS",
"^FXgroßeBox:",
"^FO15,20^GB790,1150,8^FS",
"^FXZwischenlinie",
"^FO495,20^GB0,1150,8,^FS",
"^FXZwischenlinie unten",
"^FO125,20^GB0,1150,8,^FS",
"^FO720,200^A0R,50,50^FB800,1,0,C^FD",
first_line,
"^FS^FO620,200^A0R,60,60^FB800,1,0,C^FD",
str(row.e_st),
"^FS^FO500,200^A0R,95,95^FB800,1,0,C^FD",
str(row.zeile3),
"^FS^FO0,180^BY3",
"^BCR,90,Y,N,N",
"^FO385,370^BY4^FD",
str(count),
"^FS^FO260,200^A0R,70,70^FB800,,0,C^FD",
str(row.zeile5),
"^FS^FO120,200^A0R,60,60^FB800,2,0,C^FD",
str(row.zeile6),
"^FS^FO40,200^A0R,70,70^FB800,1,0,C^FD",
str(row.zeile7),
"^FS",
"^XZ",
""
])
def datei_drucken(text):
subprocess.run(["lp", "-"], input=text.encode('utf-8'), check=True)
def zaehl_ausdrucke(connection, id):
print("zahl ausdrucke")
with closing(connection.cursor()) as cursor:
cursor.execute(SQL_UPDATE_AKTUELLE_NR, [id])
cursor.execute(SQL_SELECT_KNOPFDATEN, [id])
ispt_nr, letzte_nr = cursor.fetchone()
barcode = f"{ispt_nr}{CHECKPOINT}{letzte_nr}"
cursor.execute("update knopfdaten SET Barcode = ? where id = ?", (barcode, id))
print(barcode, letzte_nr)
connection.commit()
return barcode
class MainWindow(tk.Tk):
def __init__(self):
super().__init__()
self.title("Auswahl der Label")
self["background"] = "#f2c618"
self.connection = sqlite3.connect(f"{DATABASE}")
button_frame = tk.Frame(self, width=1800, height=800)
button_frame.grid(row=0, column=0, padx=0, pady=0, sticky='NSWE')
self.buttons = []
for index in range(8):
row_index, column_index = divmod(index, 4)
button = tk.Button(
button_frame,
bg="#f2c618",
width=33,
height=18,
command=partial(self.on_click, index),
)
button.grid(row=row_index, column=column_index, padx=0, pady=0, sticky='NSWE')
self.buttons.append(button)
self.load_config()
self.bind('<ButtonPress-1>', self.start_druck)
self.bind('<ButtonRelease-1>', self.stopp_druck)
self.knopf_drueck_zeit = None
self.queue = queue.Queue()
usb_looper = threading.Thread(target=usb_stick_loop, args = (self.queue, ), daemon=True)
usb_looper.start()
self.after(500, self.queue_loop)
self.wait_visibility()
self._center()
def _center(self, *args):
xpos = (self.winfo_screenwidth() - self.winfo_width()) / 2
ypos = ((self.winfo_screenheight() - self.winfo_height()) / 2) / 100 * 90
self.wm_geometry("+%d+%d" % (xpos,ypos))
def load_config(self):
print("loadconfig")
self.config = lade_daten(self.connection)
for button, entry in zip(self.buttons, self.config):
button['text'] = f"{entry.e_st}\n{entry.zeile3}\n{entry.zeile5}"
def start_druck(self, event):
self.knopf_drueck_zeit = time.monotonic()
print("start_druck", self.knopf_drueck_zeit)
def stopp_druck(self, event):
knopf_loslass_zeit = time.monotonic()
print("stopp_druck", knopf_loslass_zeit)
if knopf_loslass_zeit - self.knopf_drueck_zeit > 5:
print("schalte jetzt aus")
self.exit_application()
def exit_application(self):
result = messagebox.askyesno(
title="Ausschalten/Neustarten",
message="Soll der Rechner ausgeschaltet werden?\n Mit »Ja» ausschalten, mit »Nein« zurück zum Programm"
)
if result:
self.destroy()
else:
messagebox.showinfo(
'Zurück',
'Das Hauptfenster wird wieder gezeigt'
)
def on_click(self, index):
entry = self.config[index]
count = zaehl_ausdrucke(self.connection, entry.id)
print(count)
string_zum_druck = prepare_rows_for_print(count, entry, ORT)
datei_drucken(string_zum_druck)
def queue_loop(self):
#print("ich bin die queue")
if not self.queue.empty():
name_of_stick = self.queue.get()
copy_files(self.connection, name_of_stick)
self.load_config()
self.after(500, self.queue_loop)
def main():
BARCODE_DB_FILENAME.mkdir(parents=True, exist_ok=True)
root = MainWindow()
root.mainloop()
if __name__ == '__main__':
main()