@kaytec: Das hat Sirius3 nicht gemeint. Die Ausgabe ist hier die `update_cap_display()`-Methode. Davor sollte die Ausgabe natürlich formatiert werden. Das ersetzt ja sozusagen ein `print()` wenn es keine GUI wäre. Wenn Du das in dieser Methode formatierst hast Du ja genau dieses Problem was Du da gerade so unschön löst, was man nicht hätte wenn man es davor machen würde. Was Sirius3 meinte das man die Ausgabe nicht schon dauernd in der ``while``-Schleife formatieren sollte, sondern erst dann wenn das Ergebnis das formatiert werden soll auch tatsächlich fest steht.
Es ist an der Stelle ja nicht nur so, dass da ein kleines bisschen mit Ressourcen in Form von Speicher und Rechenzeit jongliert wird, die Rechenzeit die das braucht, wirkt sich auch auf die Messgenauigkeit aus. Spätestens wenn man die Messung in eine eigene Funktion auslagert, fällt auf dass die Formatierung an der falschen Stelle ist, denn von einer Funktion möchte man ja eine Zahl bekommen bei der man dann entscheiden kann was man damit macht, und nicht eine fertige Zeichenkette für eine Ausgabe.
pyserial und with
- __blackjack__
- User
- Beiträge: 13114
- Registriert: Samstag 2. Juni 2018, 10:21
- Wohnort: 127.0.0.1
- Kontaktdaten:
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
Danke BlackJack,
ich verstehe dich gerade nicht. Wo soll es jetzt genau formatiert werden ? Nicht beim Einlesen und auch nicht in der update_cap_display()`-Methode ?
Wie das Script abläuft ist mir schon klar und auch das Auslagern in eine eigene Methode (keine Wiederholung von gleichem Code). Die nötige Unterscheidung von Text/Zahl entsteht doch durch die Zusammenführung ?
Gruß Frank
ich verstehe dich gerade nicht. Wo soll es jetzt genau formatiert werden ? Nicht beim Einlesen und auch nicht in der update_cap_display()`-Methode ?
Wie das Script abläuft ist mir schon klar und auch das Auslagern in eine eigene Methode (keine Wiederholung von gleichem Code). Die nötige Unterscheidung von Text/Zahl entsteht doch durch die Zusammenführung ?
Gruß Frank
Der betreffende Teil sollte besser so geschrieben werden:
Allein schon, weil es sonst beim Vergleich mittels >= knallt, denn Vergleiche zwischen Zeichenketten und Zahlen machen halt wenig Sinn.
Code: Alles auswählen
while not ser.dsr:
end = time.time() - start
capacity = end * 1000 - 80 * end
# Oder direkt:
# capacity = end * 920
ser.dtr = True
if end and capacity >= 0.5:
self.update_cap_display("{0:2.2f} uf".format(capacity))
# u.s.w.
Hallo,
habe es mit "queue und threads" versucht und es funktioniert, doch so richtig bin ich nicht zufrieden und evt. ist es ja auch Müll.
https://www.oreilly.com/library/view/py ... 09s07.html
- Kann ich immer einen neuen "Thread" starten, denn macht evt, Sinn und so könnten auch andere Schnitstellen gewählt werden ?
- Kann ich die "queue" auch leeren, da beim Start eines neuen "threads" ja keine Daten mehr darin sein sollten.
- Drücke Ich den "button" sehr schnell, dann gibt es Fehlmessungen - steuere ich zeitlich den "button" aus dem "ThreadedClient" ?
Gruß Frank
habe es mit "queue und threads" versucht und es funktioniert, doch so richtig bin ich nicht zufrieden und evt. ist es ja auch Müll.
https://www.oreilly.com/library/view/py ... 09s07.html
Code: Alles auswählen
#/usr/bin/env python
# -*- coding: utf-8
import Tkinter as tk
import serial
from functools import partial
import time
import threading
import sys
import Queue
WIDTH = 200
HEIGHT = 100
DEVICE = "/dev/ttyUSB0"
class CapacitanceUI(tk.LabelFrame):
def __init__(self, parent, width, height, queue, start_stop_measure, text_color = "blue",
error_font = "system 5 bold", display_font = "system 18 bold"):
tk.LabelFrame.__init__(self, parent, text = "CAPACITANCE",
relief = "solid")
self.ser = None
self.after_id = None
self.parent = parent
self.width = width
self.height = height
self.queue = queue
self.start_stop_measure = start_stop_measure
self.text_color = text_color
self.error_font = error_font
self.display_font = display_font
self.display = tk.Canvas(self,
width = width,
height = height,
bg="cyan")
self.display.pack(padx = 5, pady = 5)
self.update_cap_display("00.00 uf", False)
button_frame = tk.Frame(self, bg = "gray")
button_frame.pack(padx = 5, pady = 5)
self.start_stop_button = tk.Button(button_frame, text = "START",
command = self.start_stop)
self.start_stop_button.pack()
def start_stop(self):
if self.start_stop_button["text"] == "START":
self.start_stop_button.config(text = "STOP")
self.start_stop_measure(DEVICE)
else:
self.update_cap_display("00.00 uf", False)
self.start_stop_button.config(text = "START")
self.start_stop_button.config(state = "disabled")
self.after(3000, self.start_stop_button.config(state = "active"))
self.start_stop_measure()
def update_cap_display(self, text, is_error):
self.display.delete("cap", "error")
self.display.create_text(
self.width / 2,
self.height / 2, text = text,
font = self.error_font if is_error else self.display_font,
fill = self.text_color,
tag = "error" if is_error else "cap")
def measure(self):
"""Handle all messages currently in the queue, if any."""
while self.queue.qsize():
try:
text, state = self.queue.get(0)
self.update_cap_display(text, state)
# Check contents of message and do whatever is needed. As a
# simple test, print it (in real life, you would
# suitably update the GUI's display in a richer fashion).
except Queue.Empty:
# just on general principles, although we don't
# expect this branch to be taken in this case
pass
class ThreadedClient(object):
"""
Launch the main part of the GUI and the worker thread. periodicCall and
endApplication could reside in the GUI part, but putting them here
means that you have all the thread controls in a single place.
"""
def __init__(self, parent):
"""
Start the GUI and the asynchronous threads. We are in the main
(original) thread of the application, which will later be used by
the GUI as well. We spawn a new thread for the worker (I/O).
"""
self.device = None
self.parent = parent
self.after_id = None
# Create the queue
self.queue = Queue.Queue()
# Set up the GUI part
self.gui = CapacitanceUI(parent, WIDTH, HEIGHT, self.queue, self.start_stop)
self.gui.pack(expand=tk.YES, padx=5, pady=5)
# Set up the thread to do asynchronous I/O
# More threads can also be created and used, if necessary
self.running = False
#self.thread = threading.Thread(target=self.workerThread)
#self.thread.start()
# Start the periodic call in the GUI to check if the queue contains
# anything
def start_stop(self, device = None):
self.device = device
if self.running:
self.parent.after_cancel(self.after_id)
self.after_id = None
self.running = None
else:
self.running = True
self.thread = threading.Thread(target=self.workerThread)
self.thread.start()
self.call_periodic()
def call_periodic(self):
"""
Check every 100 ms if there is something new in the queue.
"""
self.gui.measure()
self.after_id = self.parent.after(100, self.call_periodic)
def workerThread(self):
"""
This is where we handle the asynchronous I/O. For example, it may be
a 'select( )'. One important thing to remember is that the thread has
to yield control pretty regularly, by select or otherwise.
"""
while self.running:
try:
with serial.Serial(self.device) as ser:
end = None
ser.dtr = False
start = time.time()
while not ser.dsr:
end = time.time() - start
capacity = end * 1000 - 60 * end
ser.dtr = True
if end and capacity >= 1:
self.queue.put(("{0:2.2f} uf".format(capacity), False))
else:
self.queue.put(("capacity < 1 µf or not connected", True))
except (serial.SerialException, IOError) as error:
self.queue.put((error, True))
if end:
time.sleep(end * 3)
else:
time.sleep(1)
def release(self):
self.running = None
self.parent.destroy()
sys.exit(1)
def main():
root = tk.Tk()
root.title("CAPACITANCE")
client = ThreadedClient(root)
root.protocol("WM_DELETE_WINDOW",client.release)
root.mainloop()
if __name__ == '__main__':
main()
- Kann ich die "queue" auch leeren, da beim Start eines neuen "threads" ja keine Daten mehr darin sein sollten.
- Drücke Ich den "button" sehr schnell, dann gibt es Fehlmessungen - steuere ich zeitlich den "button" aus dem "ThreadedClient" ?
Gruß Frank
- __blackjack__
- User
- Beiträge: 13114
- Registriert: Samstag 2. Juni 2018, 10:21
- Wohnort: 127.0.0.1
- Kontaktdaten:
@kaytec: Ich finde die Aufteilung komisch. Nicht zuletzt weil Programmlogik und GUI nicht sauber getrennt ist.
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
Hallo BlackJack,
ist bestimmt komisch, doch ich hatte auf einen brauchbaren Beitrag von dem Verlag gehofft. Ich messe evtl. nur einige Durchläufe und bilde einen Mittelwert. Eine ständige Interaktion von „Gui/Thread /Adapter“ gibt die Schaltung evtl. nicht her.
Werde eine sinnvolle Trennung versuchen – ist der Fehler schon in dem Beispiel von dem Verlag enthalten ?
Gruß Frank
ist bestimmt komisch, doch ich hatte auf einen brauchbaren Beitrag von dem Verlag gehofft. Ich messe evtl. nur einige Durchläufe und bilde einen Mittelwert. Eine ständige Interaktion von „Gui/Thread /Adapter“ gibt die Schaltung evtl. nicht her.
Werde eine sinnvolle Trennung versuchen – ist der Fehler schon in dem Beispiel von dem Verlag enthalten ?
Gruß Frank
Hallo,
habe mal was umgeschriebe und hoffe die Trennung ist ok. Kann die "queue" geleert werden ?
Gruß Frank
habe mal was umgeschriebe und hoffe die Trennung ist ok. Kann die "queue" geleert werden ?
Code: Alles auswählen
#/usr/bin/env python
# -*- coding: utf-8
import Tkinter as tk
import serial
import time
import threading
import sys
from functools import partial
import Queue
WIDTH = 200
HEIGHT = 100
DEVICE = "/dev/ttyUSB0"
class CapacitanceUI(tk.LabelFrame):
MEASURE_COUNTER = 10
def __init__(self, parent,
width,
height,
device,
update_interval = 1000,
text_color = "blue",
error_font = "system 5 bold",
display_font = "system 18 bold"):
tk.LabelFrame.__init__(self, parent, text = "CAPACITANCE",
relief = "solid")
self.width = width
self.height = height
self.parent = parent
self.device = device
self.update_interval = update_interval
self.text_color = text_color
self.error_font = error_font
self.display_font = display_font
self.queue = Queue.Queue()
self.serial_thread_client = SerialThreadedClient(device, self.queue)
self.after_id = None
self.measure_counter = self.MEASURE_COUNTER
self.measuring_data = list()
self.display = tk.Canvas(self,
width = width,
height = height,
bg="cyan")
self.display.pack(padx = 5, pady = 5)
self.update_cap_display("00.00 uf", False)
button_frame = tk.Frame(self, bg = "gray")
button_frame.pack(padx = 5, pady = 5)
self.buttons = list()
for column, (text, width, command, var) in enumerate(
(("START", 3, self.measure_start_stop, ()),
("+", 2, self.measure_counter_up_down, (1,)),
("-", 2, self.measure_counter_up_down, (-1,)))):
button = tk.Button(button_frame, text=text, width=width,
command=partial(command, *var))
button.grid(column=column, row=0)
self.buttons.append(button)
def measure_start_stop(self):
if not self.after_id:
for button in self.buttons:
button.config(state = "disabled")
self.serial_thread_client.start_stop()
self.measure()
else:
for button in self.buttons:
button.config(state = "active")
self.measuring_data = list()
self.measure_counter = self.MEASURE_COUNTER
self.update_cap_display(
"{0}: {1}".format("COUNTS",
self.measure_counter),
True,
"counter",
30,
self.height - 10)
self.serial_thread_client.start_stop()
self.measure_counter = self.MEASURE_COUNTER
self.after_cancel(self.after_id)
self.after_id = None
def measure_counter_up_down(self, step):
if not self.after_id:
if self.measure_counter > -step:
self.measure_counter += step
self.update_cap_display(
"{0}: {1}".format("COUNTS",
self.measure_counter),
True,
"counter",
30,
self.height - 10)
def update_cap_display(self, text, is_error, tag = "cap", width = None,
height = None):
self.display.delete(tag)
display_width = self.width / 2 if not width else width
display_height = self.height / 2 if not height else height
self.display.create_text(
display_width,
display_height,
text = text,
font = self.error_font if is_error else self.display_font,
fill = self.text_color,
tag = tag)
def measure(self):
if self.measure_counter > 0:
try:
measure_time, state = self.queue.get(0)
if not state:
capacity = measure_time * 1000 - 50 * measure_time
self.measuring_data.append(capacity)
self.update_cap_display("{0:2.2f} uf".format(capacity), False,
"cap")
else:
self.update_cap_display(measure_time, state, "error")
self.display.delete("cap")
except Queue.Empty:
if len(self.measuring_data) == 0:
self.update_cap_display("!! WAIT !!",
False)
self.measure_counter -= 1
self.after_id = self.after(self.update_interval, self.measure)
else:
if len(self.measuring_data) > 0:
if len(self.measuring_data) > 1:
total_data = 0
for data in self.measuring_data:
total_data += data
total_data = total_data / len(self.measuring_data)
self.update_cap_display("{0:2.2f} uf".format(total_data),
False)
self.after(int(self.measuring_data[0] * 1.5),
self.measure_start_stop)
else:
self.measure_start_stop()
def release(self):
self.parent.destroy()
self.serial_thread_client.release()
class SerialThreadedClient(object):
def __init__(self, device, queue):
self.device = device
self.queue = queue
self.running = None
def start_stop(self):
if self.running:
self.running = None
else:
self.running = True
self.thread = threading.Thread(target=self.workerThread)
self.thread.start()
def workerThread(self):
end = None
while self.running:
try:
with serial.Serial(self.device) as ser:
ser.dtr = False
start = time.time()
while not ser.dsr:
end = time.time() - start
ser.dtr = True
if end * 1000 >= 1:
self.queue.put((end, False))
else:
self.queue.put(("capacity < 1 µf or not connected",
True))
except (serial.SerialException, IOError) as error:
self.queue.put((error, True))
if end:
time.sleep(end * 3)
def release(self):
self.running = None
sys.exit(1)
def main():
root = tk.Tk()
root.title("CAPACITANCE")
capacitance_ui = CapacitanceUI(root, WIDTH, HEIGHT, DEVICE)
capacitance_ui.pack(expand=tk.YES, padx=5, pady=5)
root.protocol("WM_DELETE_WINDOW",capacitance_ui.release)
root.mainloop()
if __name__ == '__main__':
main()
Ohne jetzt den ganzen GUI-Code angeschaut zu haben, sieht die Trennung von Messung und Anzeige jetzt gut aus.
SerialThreadedClient.release sollte kein sys.exit haben, das ist sehr überraschend, dort das komplette Programm zu beenden. Das sollte auch automatisch passieren, in dem sich mainloop beendet.
Auch ist komisch, dass running die Werte None/True annehmen kann, statt False/True.
Idealerweise wäre running auch ein Event, um korrekt mit dem Thread zu kommunizieren.
SerialThreadedClient.release sollte kein sys.exit haben, das ist sehr überraschend, dort das komplette Programm zu beenden. Das sollte auch automatisch passieren, in dem sich mainloop beendet.
Auch ist komisch, dass running die Werte None/True annehmen kann, statt False/True.
Idealerweise wäre running auch ein Event, um korrekt mit dem Thread zu kommunizieren.
Das end = None am Anfang von workerThread() ist gefährlich. Wenn man damit weiterrechnet und keine Zahl eingesetzt wurde, dann kracht es. Hier besser 0 oder 0.0 als Startwert festlegen. Noch eleganter ist es, gänzlich auf solche Startwerte zu verzichten und die Programmlogik ggf anzupassen.
Hallo,
danke Snafu und hier mit Leerung der "queue", die bestimmt nicht schön gelöst ist.
Gruß Frank
danke Snafu und hier mit Leerung der "queue", die bestimmt nicht schön gelöst ist.
Code: Alles auswählen
#/usr/bin/env python
# -*- coding: utf-8
import Tkinter as tk
import serial
import time
import threading
from functools import partial
import Queue
WIDTH = 200
HEIGHT = 100
DEVICE = "/dev/ttyUSB0"
class CapacitanceUI(tk.LabelFrame):
MEASURE_COUNTER = 10
def __init__(self, parent,
width,
height,
device,
update_interval = 1000,
text_color = "blue",
error_font = "system 5 bold",
display_font = "system 18 bold"):
tk.LabelFrame.__init__(self, parent, text = "CAPACITANCE",
relief = "solid")
self.width = width
self.height = height
self.parent = parent
self.device = device
self.update_interval = update_interval
self.text_color = text_color
self.error_font = error_font
self.display_font = display_font
self.queue = Queue.Queue()
self.serial_thread_client = SerialThreadedClient(device, self.queue)
self.after_id = None
self.measure_counter = self.MEASURE_COUNTER
self.measuring_data = list()
self.display = tk.Canvas(self,
width = width,
height = height,
bg="cyan")
self.display.pack(padx = 5, pady = 5)
self.update_cap_display("00.00 uf", False)
button_frame = tk.Frame(self, bg = "gray")
button_frame.pack(padx = 5, pady = 5)
self.buttons = list()
for column, (text, width, command, var) in enumerate(
(("START", 3, self.measure_start_stop, ()),
("+", 2, self.measure_counter_up_down, (1,)),
("-", 2, self.measure_counter_up_down, (-1,)))):
button = tk.Button(button_frame, text=text, width=width,
command=partial(command, *var))
button.grid(column=column, row=0)
self.buttons.append(button)
def measure_start_stop(self):
if not self.after_id:
for button in self.buttons:
button.config(state = "disabled")
self.display.delete("error")
self.serial_thread_client.start_stop()
self.measure()
else:
for button in self.buttons:
button.config(state = "active")
self.measuring_data = list()
self.measure_counter = self.MEASURE_COUNTER
self.update_cap_display(
"{0}: {1}".format("COUNTS",
self.measure_counter),
True,
"counter",
30,
self.height - 10)
self.serial_thread_client.start_stop()
self.measure_counter = self.MEASURE_COUNTER
self.after_cancel(self.after_id)
self.after_id = None
def measure_counter_up_down(self, step):
if not self.after_id:
if self.measure_counter > -step:
self.measure_counter += step
self.update_cap_display(
"{0}: {1}".format("COUNTS",
self.measure_counter),
True,
"counter",
30,
self.height - 10)
def update_cap_display(self, text, is_error, tag = "cap", width = None,
height = None):
self.display.delete(tag)
display_width = self.width / 2 if not width else width
display_height = self.height / 2 if not height else height
self.display.create_text(
display_width,
display_height,
text = text,
font = self.error_font if is_error else self.display_font,
fill = self.text_color,
tag = tag)
def measure(self):
if self.measure_counter > 0:
try:
measure_time, state = self.queue.get(-1)
if not state:
capacity = measure_time * 1000 - 50 * measure_time
self.measuring_data.append(capacity)
self.update_cap_display("{0:2.2f} uf".format(capacity), False,
"cap")
self.update_cap_display(
"{0}: {1}".format("COUNTS",
self.measure_counter),
True,
"counter",
30,
self.height - 10)
else:
self.update_cap_display(measure_time, state, "error",
self.width / 2, 10)
self.display.delete("cap")
except Queue.Empty:
if len(self.measuring_data) == 0:
self.update_cap_display("!! WAIT !!",
False)
self.measure_counter -= 1
self.after_id = self.after(self.update_interval, self.measure)
else:
if len(self.measuring_data) > 0:
if len(self.measuring_data) > 1:
total_data = 0
for data in self.measuring_data:
total_data += data
total_data = total_data / len(self.measuring_data)
self.update_cap_display("{0:2.2f} uf".format(total_data),
False)
for i in xrange(self.queue.qsize()):
self.queue.get(0)
self.after(int(self.measuring_data[0] * 1.5),
self.measure_start_stop)
else:
self.measure_start_stop()
def release(self):
self.parent.destroy()
self.serial_thread_client.release()
class SerialThreadedClient(object):
def __init__(self, device, queue):
self.device = device
self.queue = queue
self.running = False
def clear_queue(self):
for item in xrange(self.queue.qsize()):
try:
self.queue.get(0)
except Queue.Empty:
break
def start_stop(self):
if self.running:
self.running = False
else:
self.running = True
self.thread = threading.Thread(target=self.workerThread)
self.thread.start()
def workerThread(self):
end = 0.0
while self.running:
try:
with serial.Serial(self.device) as ser:
ser.dtr = False
start = time.time()
while not ser.dsr:
end = time.time() - start
ser.dtr = True
if end * 1000 > 1:
self.queue.put((end, False))
else:
self.queue.put(("capacity < 1 µf or not connected",
True))
except (serial.SerialException, IOError) as error:
self.queue.put((error, True))
if end:
self.clear_queue()
time.sleep(end * 3)
def release(self):
self.running = False
def main():
root = tk.Tk()
root.title("CAPACITANCE")
capacitance_ui = CapacitanceUI(root, WIDTH, HEIGHT, DEVICE)
capacitance_ui.pack(expand=tk.YES, padx=5, pady=5)
root.protocol("WM_DELETE_WINDOW",capacitance_ui.release)
root.mainloop()
if __name__ == '__main__':
main()
Hallo,
"while self.running" ist raus und durch "after" ersetzt.
"while self.running" ist raus und durch "after" ersetzt.
Code: Alles auswählen
#/usr/bin/env python
# -*- coding: utf-8
import Tkinter as tk
import serial
import time
import threading
from functools import partial
import Queue
WIDTH = 270
HEIGHT = 100
DEVICE = "/dev/ttyUSB0"
class CapacitanceUI(tk.LabelFrame):
MEASURE_COUNTER = 10
def __init__(self, parent,
width,
height,
device,
update_interval = 1000,
text_color = "blue",
error_font = "system 5 bold",
display_font = "system 18 bold"):
tk.LabelFrame.__init__(self, parent, text = "CAPACITANCE",
relief = "solid")
self.width = width
self.height = height
self.parent = parent
self.device = device
self.update_interval = update_interval
self.text_color = text_color
self.error_font = error_font
self.display_font = display_font
self.queue = Queue.Queue()
self.serial_thread_client = SerialThreadedClient(device, parent, self.queue)
self.after_id = None
self.measure_counter = self.MEASURE_COUNTER
self.measuring_data = list()
self.display = tk.Canvas(self,
width = width,
height = height,
bg="cyan")
self.display.pack(padx = 5, pady = 5)
self.update_cap_display("00.00 uf", False)
button_frame = tk.Frame(self, bg = "gray")
button_frame.pack(padx = 5, pady = 5)
self.buttons = list()
for column, (text, width, command, var) in enumerate(
(("START", 3, self.measure_start_stop, ()),
("+", 2, self.measure_counter_up_down, (1,)),
("-", 2, self.measure_counter_up_down, (-1,)))):
button = tk.Button(button_frame, text=text, width=width,
command=partial(command, *var))
button.grid(column=column, row=0)
self.buttons.append(button)
def measure_start_stop(self):
if not self.after_id:
for button in self.buttons:
button.config(state = "disabled")
self.display.delete("error")
self.serial_thread_client.clear_queue(0)
self.serial_thread_client.start_stop()
self.measure()
else:
for button in self.buttons:
button.config(state = "active")
self.measuring_data = list()
self.measure_counter = self.MEASURE_COUNTER
self.update_cap_display(
"{0}: {1}".format("COUNTS",
self.measure_counter),
True,
"counter",
30,
self.height - 10)
self.serial_thread_client.start_stop()
self.after_cancel(self.after_id)
self.after_id = None
def measure_counter_up_down(self, step):
if not self.after_id:
if self.measure_counter > -step:
self.measure_counter += step
self.update_cap_display(
"{0}: {1}".format("COUNTS",
self.measure_counter),
True,
"counter",
30,
self.height - 10)
def update_cap_display(self, text, is_error, tag = "cap", width = None,
height = None):
self.display.delete(tag)
display_width = self.width / 2 if not width else width
display_height = self.height / 2 if not height else height
self.display.create_text(
display_width,
display_height,
text = text,
font = self.error_font if is_error else self.display_font,
fill = self.text_color,
tag = tag)
def measure(self):
if self.measure_counter > 0:
try:
measure_time, state = self.queue.get(-1)
if not state:
if measure_time:
capacity = measure_time * 1000 - 50 * measure_time
self.measuring_data.append(capacity)
self.update_cap_display(
"{0:2.2f} uf".format(
capacity),
False,
"cap")
self.update_cap_display(
"{0}: {1}".format(
"COUNTS",
self.measure_counter),
True,
"counter",
30,
self.height - 10)
else:
self.update_cap_display(
measure_time,
True,
"error",
self.width / 2,
10)
self.measure_counter = 0
except Queue.Empty:
if len(self.measuring_data) == 0:
self.update_cap_display("!! WAIT !!", False)
self.measure_counter -= 1
self.after_id = self.after(self.update_interval, self.measure)
else:
if len(self.measuring_data) > 0:
if len(self.measuring_data) > 1:
self.measuring_data.pop(-1)
total_data = 0
for data in self.measuring_data:
total_data += data
total_data = total_data / len(self.measuring_data)
self.update_cap_display("{0:2.2f} uf".format(total_data),
False)
for i in xrange(self.queue.qsize()):
self.queue.get(0)
self.after(int(self.measuring_data[0] * 1.5),
self.measure_start_stop)
else:
self.measure_start_stop()
def release(self):
self.parent.destroy()
self.serial_thread_client.release()
class SerialThreadedClient(object):
def __init__(self, device, parent, queue):
self.device = device
self.parent = parent
self.queue = queue
self.run = False
def clear_queue(self, min = 1):
while self.queue.qsize() > min:
self.queue.get(0)
def start_stop(self):
if self.run:
self.parent.after_cancel(self.thread_id)
self.thread_id = None
self.run = False
else:
self.run = True
self.thread = threading.Thread(target=self.worker_thread)
self.thread.start()
def worker_thread(self):
end = 0
if self.run:
try:
with serial.Serial(self.device) as ser:
ser.dtr = False
start = time.time()
while not ser.dsr:
end = time.time() - start
ser.dtr = True
if end * 1000 > 1:
self.queue.put((end, False))
else:
self.queue.put(("capacity < 1 µf or not connected",
True))
self.run = False
except (serial.SerialException, IOError) as error:
self.queue.put((error, True))
self.run = False
self.clear_queue()
self.thread_id = self.parent.after(int(end * 10000),
self.worker_thread)
def release(self):
self.running = False
def main():
root = tk.Tk()
root.title("CAPACITANCE")
capacitance_ui = CapacitanceUI(root, WIDTH, HEIGHT, DEVICE)
capacitance_ui.pack(expand=tk.YES, padx=5, pady=5)
root.protocol("WM_DELETE_WINDOW",capacitance_ui.release)
root.mainloop()
if __name__ == '__main__':
main()
Fehler beseitgt
Code: Alles auswählen
#/usr/bin/env python
# -*- coding: utf-8
import Tkinter as tk
import serial
import time
import threading
from functools import partial
import Queue
WIDTH = 270
HEIGHT = 100
DEVICE = "/dev/ttyUSB0"
class CapacitanceUI(tk.LabelFrame):
MEASURE_COUNTER = 10
def __init__(self, parent,
width,
height,
device,
update_interval = 3000,
text_color = "blue",
error_font = "system 5 bold",
display_font = "system 18 bold"):
tk.LabelFrame.__init__(self, parent, text = "CAPACITANCE",
relief = "solid")
self.width = width
self.height = height
self.parent = parent
self.device = device
self.update_interval = update_interval
self.text_color = text_color
self.error_font = error_font
self.display_font = display_font
self.queue = Queue.Queue()
self.serial_thread_client = SerialThreadedClient(device, parent, self.queue)
self.after_id = None
self.measure_counter = self.MEASURE_COUNTER
self.measuring_data = list()
self.display = tk.Canvas(self,
width = width,
height = height,
bg="cyan")
self.display.pack(padx = 5, pady = 5)
self.update_cap_display("00.00 uf", False)
button_frame = tk.Frame(self, bg = "gray")
button_frame.pack(padx = 5, pady = 5)
self.buttons = list()
for column, (text, width, command, var) in enumerate(
(("START", 3, self.measure_start_stop, ()),
("+", 2, self.measure_counter_up_down, (1,)),
("-", 2, self.measure_counter_up_down, (-1,)))):
button = tk.Button(button_frame, text=text, width=width,
command=partial(command, *var))
button.grid(column=column, row=0)
self.buttons.append(button)
def measure_start_stop(self):
if not self.after_id:
for button in self.buttons:
button.config(state = "disabled")
self.display.delete("error")
self.serial_thread_client.clear_queue(0)
self.serial_thread_client.start_stop()
self.measure()
else:
for button in self.buttons:
button.config(state = "active")
self.measuring_data = list()
self.measure_counter = self.MEASURE_COUNTER
self.update_cap_display(
"{0}: {1}".format("COUNTS",
self.measure_counter),
True,
"counter",
30,
self.height - 10)
self.serial_thread_client.start_stop()
self.after_cancel(self.after_id)
self.after_id = None
def measure_counter_up_down(self, step):
if not self.after_id:
if self.measure_counter > -step:
self.measure_counter += step
self.update_cap_display(
"{0}: {1}".format("COUNTS",
self.measure_counter),
True,
"counter",
30,
self.height - 10)
def update_cap_display(self, text, is_error, tag = "cap", width = None,
height = None):
self.display.delete(tag)
display_width = self.width / 2 if not width else width
display_height = self.height / 2 if not height else height
self.display.create_text(
display_width,
display_height,
text = text,
font = self.error_font if is_error else self.display_font,
fill = self.text_color,
tag = tag)
def measure(self):
measure_time = 1
if self.measure_counter > 0:
try:
measure_time, state = self.queue.get(0)
if not state:
if measure_time:
capacity = measure_time * 1000 - 50 * measure_time
self.measuring_data.append(capacity)
self.update_cap_display(
"{0:2.2f} uf".format(
capacity),
False,
"cap")
else:
self.update_cap_display(
measure_time,
True,
"error",
self.width / 2,
10)
self.measure_counter = 0
measure_time = 1
except Queue.Empty:
if len(self.measuring_data) == 0:
self.update_cap_display("!! WAIT !!", False)
self.update_cap_display(
"{0}: {1}".format(
"COUNTS",
self.measure_counter),
True,
"counter",
30,
self.height - 10)
self.measure_counter -= 1
self.after_id = self.after(int(self.update_interval
* measure_time), self.measure)
else:
if len(self.measuring_data) > 0:
if len(self.measuring_data) > 1:
self.measuring_data.pop(-1)
total_data = 0
for data in self.measuring_data:
total_data += data
total_data = total_data / len(self.measuring_data)
self.update_cap_display("{0:2.2f} uf".format(total_data),
False)
for i in xrange(self.queue.qsize()):
self.queue.get(0)
self.after(int(self.measuring_data[0] * 1.5),
self.measure_start_stop)
else:
self.measure_start_stop()
def release(self):
self.parent.destroy()
self.serial_thread_client.release()
class SerialThreadedClient(object):
def __init__(self, device, parent, queue):
self.device = device
self.parent = parent
self.queue = queue
self.run = False
def clear_queue(self, min = 1):
while self.queue.qsize() > min:
self.queue.get(0)
def start_stop(self):
if self.run:
self.parent.after_cancel(self.thread_id)
self.thread_id = None
self.run = False
else:
self.run = True
self.thread = threading.Thread(target=self.worker_thread)
self.thread.start()
def worker_thread(self):
end = 0
if self.run:
try:
with serial.Serial(self.device) as ser:
ser.dtr = False
start = time.time()
while not ser.dsr:
end = time.time() - start
ser.dtr = True
if end * 1000 > 1:
self.queue.put((end, False))
else:
self.queue.put(("capacity < 1 µf or not connected",
True))
self.run = False
except (serial.SerialException, IOError) as error:
self.queue.put((error, True))
self.run = False
self.clear_queue()
self.thread_id = self.parent.after(int(end * 3000),
self.worker_thread)
def release(self):
self.running = False
def main():
root = tk.Tk()
root.title("CAPACITANCE")
capacitance_ui = CapacitanceUI(root, WIDTH, HEIGHT, DEVICE)
capacitance_ui.pack(expand=tk.YES, padx=5, pady=5)
root.protocol("WM_DELETE_WINDOW",capacitance_ui.release)
root.mainloop()
if __name__ == '__main__':
main()
Übrigens: Wenn keine Fragen zum Code bestehen, man aber trotzdem regelmäßig Änderungen protokollieren möchte, dann empfiehlt sich sowas wie https://pastebin.com/ oder https://gist.github.com/ weil man damit nicht durch ellenlange Threads scrollen muss und mehr Übersicht hat...
@kaytec: Du hast Deinen Code jetzt verschlimmbessert. Statt dass die Messung unabhängig von der GUI läuft, rufst Du beim ersten Durchgang aus einem anderen Thread eine GUI-Methode auf (verboten!) und danach läuft die Messung im selben Thread wie die GUI, läßt sie also wieder einfrieren.
Also alles zurück, in SerialThreadedClient darf es keinen Verweis auf `parent` oder irgend ein anderes GUI-Element geben.
Also alles zurück, in SerialThreadedClient darf es keinen Verweis auf `parent` oder irgend ein anderes GUI-Element geben.
Hallo,
snafu und Sirius3. Ich habe immer Fragen und kann nie so richtig sagen, ob ich mit meinen Änderungen richtig liege. Hier bekommt man ja eine sehr gute Hilfe und es haben evtl. auch andere Leser gleiche Probleme. Ich poste eigentlich erst, wenn das Script bei mir fehlerfrei läuft und ich vermute es könnte so stimmen.
Wie mache ich es jetzt richtig - mir gehen die Ideen aus ?
Gruß Frank
snafu und Sirius3. Ich habe immer Fragen und kann nie so richtig sagen, ob ich mit meinen Änderungen richtig liege. Hier bekommt man ja eine sehr gute Hilfe und es haben evtl. auch andere Leser gleiche Probleme. Ich poste eigentlich erst, wenn das Script bei mir fehlerfrei läuft und ich vermute es könnte so stimmen.
Wie mache ich es jetzt richtig - mir gehen die Ideen aus ?
Gruß Frank
Hallo,
und meine Suche ergab noch dies --> https://noisefloor-net.blogspot.com/201 ... reads.html
Müsste ich so hinbekommen - oder noch mehr verschlimmern !
Gruß Frank
und meine Suche ergab noch dies --> https://noisefloor-net.blogspot.com/201 ... reads.html
Müsste ich so hinbekommen - oder noch mehr verschlimmern !
Gruß Frank
Deine Variante vom Montag 4. März 2019, 13:32 Uhr, war doch schon ganz ok. Bis auf das queue-Leeren, was völlig überflüssig und sogar schädlich ist, weil Du die Queue sofort wieder leerst, sobald Du etwas reingeschrieben hast.
Hallo Sirius3,
würde ich ich die "Queue nicht leeren, dann sind noch viele Einträge aus der letzten Messung drin. Beider Messung werden über die gesamte Messdauer Daten geschrieben und eigentlich ist letzte Wert der einzig benötigte. Falls ein Fehler auslöst, dann ist der letzte Eintrag die Fehlermeldung, die für die letzte Anzeige auf dem Display benötigt wird. Würde ich nun die "Queue" nicht komplett leeren, dann ist bei einer neuen Messung im letzte Eintrag noch eine Fehlermeldung mit dem Status=False und es würde der vorhergehende Fehler wieder ausgelöst. Wenn ich heute Abend wieder Zeit habe, dann mache ich ein Minimalbeispiel. So kann es auch ausprobiert werden und die "Hilfesteller" müssen nicht den gesamten "Datenmüll" lesen.
Gruß Frank
würde ich ich die "Queue nicht leeren, dann sind noch viele Einträge aus der letzten Messung drin. Beider Messung werden über die gesamte Messdauer Daten geschrieben und eigentlich ist letzte Wert der einzig benötigte. Falls ein Fehler auslöst, dann ist der letzte Eintrag die Fehlermeldung, die für die letzte Anzeige auf dem Display benötigt wird. Würde ich nun die "Queue" nicht komplett leeren, dann ist bei einer neuen Messung im letzte Eintrag noch eine Fehlermeldung mit dem Status=False und es würde der vorhergehende Fehler wieder ausgelöst. Wenn ich heute Abend wieder Zeit habe, dann mache ich ein Minimalbeispiel. So kann es auch ausprobiert werden und die "Hilfesteller" müssen nicht den gesamten "Datenmüll" lesen.
Gruß Frank