@kaytec: Du hast Deinen Code jetzt verschlimmbessert. Statt dass die Messung unabhängig von der GUI läuft, rufst Du beim ersten Durchgang aus einem anderen Thread eine GUI-Methode auf (verboten!) und danach läuft die Messung im selben Thread wie die GUI, läßt sie also wieder einfrieren.
Also alles zurück, in SerialThreadedClient darf es keinen Verweis auf `parent` oder irgend ein anderes GUI-Element geben.
pyserial und with
Hallo,
snafu und Sirius3. Ich habe immer Fragen und kann nie so richtig sagen, ob ich mit meinen Änderungen richtig liege. Hier bekommt man ja eine sehr gute Hilfe und es haben evtl. auch andere Leser gleiche Probleme. Ich poste eigentlich erst, wenn das Script bei mir fehlerfrei läuft und ich vermute es könnte so stimmen.
Wie mache ich es jetzt richtig - mir gehen die Ideen aus ?
Gruß Frank
snafu und Sirius3. Ich habe immer Fragen und kann nie so richtig sagen, ob ich mit meinen Änderungen richtig liege. Hier bekommt man ja eine sehr gute Hilfe und es haben evtl. auch andere Leser gleiche Probleme. Ich poste eigentlich erst, wenn das Script bei mir fehlerfrei läuft und ich vermute es könnte so stimmen.
Wie mache ich es jetzt richtig - mir gehen die Ideen aus ?
Gruß Frank
Hallo,
und meine Suche ergab noch dies --> https://noisefloor-net.blogspot.com/201 ... reads.html
Müsste ich so hinbekommen - oder noch mehr verschlimmern !
Gruß Frank
und meine Suche ergab noch dies --> https://noisefloor-net.blogspot.com/201 ... reads.html
Müsste ich so hinbekommen - oder noch mehr verschlimmern !
Gruß Frank
Deine Variante vom Montag 4. März 2019, 13:32 Uhr, war doch schon ganz ok. Bis auf das queue-Leeren, was völlig überflüssig und sogar schädlich ist, weil Du die Queue sofort wieder leerst, sobald Du etwas reingeschrieben hast.
Hallo Sirius3,
würde ich ich die "Queue nicht leeren, dann sind noch viele Einträge aus der letzten Messung drin. Beider Messung werden über die gesamte Messdauer Daten geschrieben und eigentlich ist letzte Wert der einzig benötigte. Falls ein Fehler auslöst, dann ist der letzte Eintrag die Fehlermeldung, die für die letzte Anzeige auf dem Display benötigt wird. Würde ich nun die "Queue" nicht komplett leeren, dann ist bei einer neuen Messung im letzte Eintrag noch eine Fehlermeldung mit dem Status=False und es würde der vorhergehende Fehler wieder ausgelöst. Wenn ich heute Abend wieder Zeit habe, dann mache ich ein Minimalbeispiel. So kann es auch ausprobiert werden und die "Hilfesteller" müssen nicht den gesamten "Datenmüll" lesen.
Gruß Frank
würde ich ich die "Queue nicht leeren, dann sind noch viele Einträge aus der letzten Messung drin. Beider Messung werden über die gesamte Messdauer Daten geschrieben und eigentlich ist letzte Wert der einzig benötigte. Falls ein Fehler auslöst, dann ist der letzte Eintrag die Fehlermeldung, die für die letzte Anzeige auf dem Display benötigt wird. Würde ich nun die "Queue" nicht komplett leeren, dann ist bei einer neuen Messung im letzte Eintrag noch eine Fehlermeldung mit dem Status=False und es würde der vorhergehende Fehler wieder ausgelöst. Wenn ich heute Abend wieder Zeit habe, dann mache ich ein Minimalbeispiel. So kann es auch ausprobiert werden und die "Hilfesteller" müssen nicht den gesamten "Datenmüll" lesen.
Gruß Frank
Hallo Sirius3,
Du meinst "while self.queue.qsize()" ? Bei einer Messung von hohen Kapazitäten enstehen recht lange Messzeiten. Bei einer Fehlermeldung wäre es natürlich schön, wenn ich diese nicht noch einige Sekunden sehen muss. Ein Abbruch der Messung führt zu einer nicht vollständig geleerten "Queue".
Gruß Frank
Du meinst "while self.queue.qsize()" ? Bei einer Messung von hohen Kapazitäten enstehen recht lange Messzeiten. Bei einer Fehlermeldung wäre es natürlich schön, wenn ich diese nicht noch einige Sekunden sehen muss. Ein Abbruch der Messung führt zu einer nicht vollständig geleerten "Queue".
Gruß Frank
Hallo Sirius,
hier ist eine entschlackte Version. Man sollte es länger testen, damit auch ein "error" ausgelöst wird. Die Größe der "queue" wird mit einem "print" angezeigt.
Gruß Frank
hier ist eine entschlackte Version. Man sollte es länger testen, damit auch ein "error" ausgelöst wird. Die Größe der "queue" wird mit einem "print" angezeigt.
Code: Alles auswählen
#/usr/bin/env python
# -*- coding: utf-8
import Tkinter as tk
import serial
import time
import random
from threading import Thread, Event
from functools import partial
import Queue
class CapacitanceUI(tk.LabelFrame):
def __init__(self, parent):
tk.LabelFrame.__init__(self, parent)
self.parent = parent
self.queue = Queue.Queue()
self.serial_thread_client = SerialThreadedClient(self.queue)
self.after_id = None
self.measure_counter = 20
self.display = tk.Label(self, width = 20, height = 3)
self.display.pack()
self.start_button = tk.Button(self, text = "START",
command = self.measure_start_stop)
self.start_button.pack()
def measure_start_stop(self):
if not self.after_id:
self.start_button.config(state = "disabled")
self.serial_thread_client.start_stop()
self.measure()
else:
self.start_button.config(state = "active")
self.measure_counter = 20
self.serial_thread_client.start_stop()
self.after_cancel(self.after_id)
self.after_id = None
def measure(self):
measure_time = 1
if self.measure_counter > 0:
if not self.queue.empty():
measure_time, state = self.queue.get(-1)
if not state:
self.display.config(text = measure_time)
else:
self.display.config(text = "error")
self.measure_counter = 0
self.measure_counter -= 1
self.after_id = self.after(1000, self.measure)
else:
self.measure_start_stop()
def release(self):
self.parent.destroy()
class SerialThreadedClient(object):
def __init__(self, queue):
self.queue = queue
self.run = False
self.run_event = None
def clear_queue(self):
while self.queue.qsize() > 0:
self.queue.get(0)
def start_stop(self):
if self.run:
self.run = False
self.run_event.set()
else:
print self.queue.qsize()
self.run_event = Event()
#self.clear_queue()
self.run = True
self.thread = Thread(target=self.worker_thread)
self.thread.start()
def worker_thread(self):
while not self.run_event.is_set():
item = random.randint(0, 4)
if item == 0:
self.queue.put((item, True))
else:
self.queue.put((item, False))
self.run_event.wait(0.5)
def main():
root = tk.Tk()
root.title("CAPACITANCE")
capacitance_ui = CapacitanceUI(root)
capacitance_ui.pack(expand=tk.YES, padx=5, pady=5)
root.protocol("WM_DELETE_WINDOW",capacitance_ui.release)
root.mainloop()
if __name__ == '__main__':
main()
@kaytec: und wo ist die Schleife in `measure`, die die ganze Queue abfragt? Das kann auch deutlich öfter als 1mal pro Sekunde passieren, dann ist die Queue auch nicht so lang.
Du solltest getrennte Funktionen für start und stop haben.
measure_counter dann auf 20 setzen, wenn man mit der Messung startet, nicht, wenn man damit fertig ist.
Du solltest getrennte Funktionen für start und stop haben.
measure_counter dann auf 20 setzen, wenn man mit der Messung startet, nicht, wenn man damit fertig ist.
Code: Alles auswählen
#/usr/bin/env python
# -*- coding: utf-8
import Tkinter as tk
import serial
import time
import random
from threading import Thread, Event
from functools import partial
import Queue
class CapacitanceUI(tk.LabelFrame):
def __init__(self, parent):
tk.LabelFrame.__init__(self, parent)
self.parent = parent
self.queue = Queue.Queue()
self.serial_thread_client = SerialThreadedClient(self.queue)
self.after_id = None
self.measure_counter = 0
self.display = tk.Label(self, width=20, height=3)
self.display.pack()
self.start_button = tk.Button(self, text="START",
command = self.measure_start)
self.start_button.pack()
def measure_start(self):
self.measure_counter = 200
self.start_button['state'] = "disabled"
self.serial_thread_client.start()
self.measure()
def measure_stop(self):
self.start_button['state'] = "active"
self.serial_thread_client.stop()
if self.after_id is not None:
self.after_cancel(self.after_id)
self.after_id = None
def measure(self):
if not self.queue.empty():
while True:
try:
measure_time, error = self.queue.get_nowait()
except Queue.Empty:
break
if error:
self.display['text'] = "error"
self.measure_stop()
return
self.display['text'] = measure_time
self.measure_counter -= 1
if self.measure_counter > 0:
self.after_id = self.after(100, self.measure)
else:
self.measure_stop()
def release(self):
self.measure_stop()
self.parent.destroy()
class SerialThreadedClient(object):
def __init__(self, queue):
self.queue = queue
self.run_event = None
def stop(self):
self.run_event.set()
def start(self):
print self.queue.qsize()
self.run_event = Event()
self.thread = Thread(target=self.worker_thread)
self.thread.daemon = True
self.thread.start()
def worker_thread(self):
while not self.run_event.is_set():
item = random.randint(0, 4)
if item == 0:
self.queue.put((item, True))
else:
self.queue.put((item, False))
self.run_event.wait(0.5)
def main():
root = tk.Tk()
root.title("CAPACITANCE")
capacitance_ui = CapacitanceUI(root)
capacitance_ui.pack(expand=tk.YES, padx=5, pady=5)
root.protocol("WM_DELETE_WINDOW",capacitance_ui.release)
root.mainloop()
if __name__ == '__main__':
main()
Hallo,
danke Sirius3 - hier ist die umgebastelte Version des Testers.
Gruß Frank
danke Sirius3 - hier ist die umgebastelte Version des Testers.
Code: Alles auswählen
#/usr/bin/env python
# -*- coding: utf-8
import Tkinter as tk
import serial
import time
from threading import Thread, Event
from functools import partial
import Queue
WIDTH = 270
HEIGHT = 100
DEVICE = "/dev/ttyUSB0"
class CapacitanceUI(tk.LabelFrame):
MEASURE_COUNTER = 10
def __init__(self, parent, width, height, device, update_interval = 2000,
text_color = "blue"):
tk.LabelFrame.__init__(self, parent, text = "CAPACITANCE",
relief = "solid")
self.width = width
self.height = height
self.parent = parent
self.device = device
self.update_interval = update_interval
self.queue = Queue.Queue()
self.serial_thread_client = SerialThreadedClient(device, self.queue)
self.after_id = None
self.measure_counter = self.MEASURE_COUNTER
self.measuring_data = list()
self.text = ""
self.display_conf = {"cap" : (
self.width / 2,
self.height / 2,
"{0:2.2f} uf",
"system 18 bold",
"blue",
"cap"),
"counter" : (
40,
self.height - 10,
"COUNTS: {0}",
"system 7 bold",
"green",
"counter"),
"error" : (
self.width / 2,
10,
"{0}",
"system 5 bold",
"red",
"error"),
"wait" :(
self.width / 2,
self.height / 2,
"{0}",
"system 18 bold",
"magenta",
"cap")}
self.display = tk.Canvas(self,
width = width,
height = height,
bg="cyan")
self.display.pack(padx = 5, pady = 5)
self.update_cap_display(0, "cap")
button_frame = tk.Frame(self)
button_frame.pack(padx = 5, pady = 5)
self.buttons = list()
for column, (text, width, command, var) in enumerate(
(("START", 3, self.measure_start, ()),
("+", 2, self.measure_counter_up_down, (1,)),
("-", 2, self.measure_counter_up_down, (-1,)))):
button = tk.Button(button_frame, text=text, width=width,
command=partial(command, *var))
button.grid(column=column, row=0)
self.buttons.append(button)
def measure_start(self):
for button in self.buttons:
button.config(state = "disabled")
self.display.delete("error")
self.measuring_data = list()
self.measure_counter = self.MEASURE_COUNTER
self.serial_thread_client.start()
self.measure()
def measure_stop(self):
for button in self.buttons:
button.config(state = "active")
self.update_cap_display(self.measure_counter, "counter")
self.serial_thread_client.stop()
self.after_cancel(self.after_id)
self.after_id = None
def measure_counter_up_down(self, step):
if self.measure_counter > -step:
self.measure_counter += step
self.update_cap_display(self.measure_counter, "counter")
def update_cap_display(self, text, tag):
width, height, text_conf, font, color, tag = self.display_conf[tag]
self.display.delete(tag)
self.display.create_text(width, height, text = text_conf.format(text),
font = font, fill = color,tag = tag)
def measure(self):
measure_time = 1
if not self.queue.empty():
while True:
try:
measure_time, error = self.queue.get_nowait()
except Queue.Empty:
break
if error:
self.update_cap_display(measure_time, "error")
self.measure_counter = 0
measure_time = 1
else:
capacity = measure_time * 1000 - 50 * measure_time
self.measuring_data.append(capacity)
self.update_cap_display(capacity, "cap")
self.update_cap_display(self.measure_counter, "counter")
self.measure_counter -= 1
else:
if len(self.measuring_data) == 0:
self.update_cap_display("!! WAIT !!", "wait")
if self.measure_counter > 0:
self.after_id = self.after(int(self.update_interval
* measure_time), self.measure)
else:
if len(self.measuring_data) > 1:
self.measuring_data.pop(-1)
total_data = 0
for data in self.measuring_data:
total_data += data
total_data = total_data / len(self.measuring_data)
self.update_cap_display(total_data, "cap")
self.after(int(self.measuring_data[0] * 1.5),
self.measure_stop)
else:
self.after(100, self.measure_stop)
def release(self):
if self.after_id:
self.measure_stop()
self.parent.destroy()
class SerialThreadedClient(object):
def __init__(self, device, queue):
self.device = device
self.queue = queue
self.run_event = None
def stop(self):
self.run_event.set()
def start(self):
self.run_event = Event()
self.thread = Thread(target=self.worker_thread)
self.thread.daemon = True
self.thread.start()
def worker_thread(self):
end = 0
while not self.run_event.is_set():
try:
with serial.Serial(self.device) as ser:
ser.dtr = False
start = time.time()
while not ser.dsr:
end = time.time() - start
ser.dtr = True
if end * 1000 > 1:
self.queue.put((end, False))
else:
self.queue.put(("capacity < 1 µf or not connected",
True))
except (serial.SerialException, IOError) as error:
self.queue.put((error, True))
self.run_event.wait(end * 3)
def main():
root = tk.Tk()
root.title("CAPACITANCE")
capacitance_ui = CapacitanceUI(root, WIDTH, HEIGHT, DEVICE)
capacitance_ui.pack(expand=tk.YES, padx=5, pady=5)
root.protocol("WM_DELETE_WINDOW",capacitance_ui.release)
root.mainloop()
if __name__ == '__main__':
main()
Hallo,
noch Fehler beim Abziehen von Adapter/Bauteilen gefunden, noch einiger Quatsch von mir gelüöscht und einiges von Sirius3-Script vergessen.
Gruß Frank
noch Fehler beim Abziehen von Adapter/Bauteilen gefunden, noch einiger Quatsch von mir gelüöscht und einiges von Sirius3-Script vergessen.
Code: Alles auswählen
#/usr/bin/env python
# -*- coding: utf-8
import Tkinter as tk
import serial
import time
from threading import Thread, Event
from functools import partial
import Queue
WIDTH = 270
HEIGHT = 100
DEVICE = "/dev/ttyUSB0"
class CapacitanceUI(tk.LabelFrame):
MEASURE_COUNTER = 10
UPDATE_INTERVAL = 2000
def __init__(self, parent, width, height, device):
tk.LabelFrame.__init__(self, parent, text = "CAPACITANCE",
relief = "solid")
self.width = width
self.height = height
self.parent = parent
self.device = device
self.update_interval = self.UPDATE_INTERVAL
self.queue = Queue.Queue()
self.serial_thread_client = SerialThreadedClient(device, self.queue)
self.after_id = None
self.measure_counter = self.MEASURE_COUNTER
self.measuring_data = list()
self.text = ""
self.display_conf = {"cap" : (
self.width / 2,
self.height / 2,
"{0:2.2f} uf",
"system 18 bold",
"blue",
"cap"),
"counter" : (
40,
self.height - 10,
"COUNTS: {0}",
"system 7 bold",
"green",
"counter"),
"error" : (
self.width / 2,
10,
"{0}",
"system 5 bold",
"red",
"error"),
"wait" :(
self.width / 2,
self.height / 2,
"{0}",
"system 18 bold",
"magenta",
"cap")}
self.display = tk.Canvas(self, width = width, height = height,
bg="cyan")
self.display.pack(padx = 5, pady = 5)
self.update_cap_display(0, "cap")
button_frame = tk.Frame(self)
button_frame.pack(padx = 5, pady = 5)
self.buttons = list()
for column, (text, width, command, var) in enumerate(
(("START", 3, self.measure_start, ()),
("+", 2, self.measure_counter_up_down, (1,)),
("-", 2, self.measure_counter_up_down, (-1,)))):
button = tk.Button(button_frame, text=text, width=width,
command=partial(command, *var))
button.grid(column=column, row=0)
self.buttons.append(button)
def measure_start(self):
for button in self.buttons:
button.config(state = "disabled")
self.display.delete("error")
self.measuring_data = list()
self.serial_thread_client.start()
self.measure()
def measure_stop(self):
for button in self.buttons:
button.config(state = "active")
self.measure_counter = self.MEASURE_COUNTER
self.update_cap_display(self.measure_counter, "counter")
self.serial_thread_client.stop()
if self.after_id:
self.after_cancel(self.after_id)
self.after_id = None
def measure_counter_up_down(self, step):
if self.measure_counter > -step:
self.measure_counter += step
self.update_cap_display(self.measure_counter, "counter")
def update_cap_display(self, text, tag):
width, height, text_conf, font, color, tag = self.display_conf[tag]
self.display.delete(tag)
self.display.create_text(width, height, text = text_conf.format(text),
font = font, fill = color,tag = tag)
def measure(self):
measure_time = 1
if not self.queue.empty():
while True:
try:
measure_time, error = self.queue.get_nowait()
except Queue.Empty:
break
if error:
self.update_cap_display(measure_time, "error")
self.measure_stop()
return
else:
capacity = measure_time * 1000 - 50 * measure_time
self.measuring_data.append(capacity)
self.update_cap_display(capacity, "cap")
self.update_cap_display(self.measure_counter, "counter")
self.measure_counter -= 1
else:
if len(self.measuring_data) == 0:
self.update_cap_display("!! WAIT !!", "wait")
if self.measure_counter > 0:
self.after_id = self.after(int(self.update_interval
* measure_time), self.measure)
else:
if len(self.measuring_data) > 1:
self.measuring_data.pop(-1)
total_data = 0
for data in self.measuring_data:
total_data += data
total_data = total_data / len(self.measuring_data)
self.update_cap_display(total_data, "cap")
self.after(int(self.measuring_data[0] * 1.5),
self.measure_stop)
else:
self.after(100, self.measure_stop)
def release(self):
if self.after_id:
self.measure_stop()
self.parent.destroy()
class SerialThreadedClient(object):
def __init__(self, device, queue):
self.device = device
self.queue = queue
self.run_event = None
def stop(self):
self.run_event.set()
def start(self):
self.run_event = Event()
self.thread = Thread(target=self.worker_thread)
self.thread.daemon = True
self.thread.start()
def worker_thread(self):
end = 0
while not self.run_event.is_set():
try:
with serial.Serial(self.device) as ser:
ser.dtr = False
start = time.time()
while not ser.dsr:
end = time.time() - start
ser.dtr = True
if end * 1000 > 1:
self.queue.put((end, False))
else:
self.queue.put(("capacity < 1 µf or not connected",
True))
except (serial.SerialException, IOError) as error:
self.queue.put((error, True))
self.run_event.wait(end * 3)
def main():
root = tk.Tk()
root.title("CAPACITANCE")
capacitance_ui = CapacitanceUI(root, WIDTH, HEIGHT, DEVICE)
capacitance_ui.pack(expand=tk.YES, padx=5, pady=5)
root.protocol("WM_DELETE_WINDOW",capacitance_ui.release)
root.mainloop()
if __name__ == '__main__':
main()
Hallo,
mit Auswahl der Schnittstelle.
Gruß Frank
mit Auswahl der Schnittstelle.
Code: Alles auswählen
#/usr/bin/env python
# -*- coding: utf-8
import tkinter as tk
import serial, serial.tools.list_ports
import time
from threading import Thread, Event
from functools import partial
import queue
WIDTH = 270
HEIGHT = 100
DEVICE = "/dev/ttyUSB0"
class CapacitanceUI(tk.LabelFrame):
MEASURE_COUNTER = 10
UPDATE_INTERVAL = 2000
CALIBRATION = 45
def __init__(self, parent, width, height):
tk.LabelFrame.__init__(self, parent, text = "CAPACITANCE",
relief = "solid")
self.width = width
self.height = height
self.parent = parent
self.device = None
self.update_interval = self.UPDATE_INTERVAL
self.queue = queue.Queue()
self.serial_thread_client = SerialThreadedClient(self.queue)
menubar = tk.Menu()
parent.config(menu=menubar)
menubar.add_cascade(label="PORTS", menu = self.get_ports(menubar))
self.after_id = None
self.measure_counter = self.MEASURE_COUNTER
self.measuring_data = list()
self.display_conf = {"cap" : (
self.width / 2,
self.height / 2,
"{0:2.2f} uf",
"system 18 bold",
"blue",
"cap"),
"counter" : (
40,
self.height - 10,
"COUNTS: {0}",
"system 7 bold",
"green",
"counter"),
"error" : (
self.width / 2,
10,
"{0}",
"system 5 bold",
"red",
"error"),
"wait" :(
self.width / 2,
self.height / 2,
"{0}",
"system 18 bold",
"magenta",
"cap")}
self.display = tk.Canvas(self, width = width, height = height,
bg="cyan")
self.display.pack(padx = 5, pady = 5)
self.update_cap_display(0, "cap")
self.update_cap_display(self.measure_counter, "counter")
button_frame = tk.Frame(self)
button_frame.pack(padx = 5, pady = 5)
self.buttons = list()
for column, (text, width, command, var) in enumerate(
(("START", 3, self.measure_start, ()),
("+", 2, self.measure_counter_up_down, (1,)),
("-", 2, self.measure_counter_up_down, (-1,)))):
button = tk.Button(button_frame, text=text, width=width,
command=partial(command, *var))
button.grid(column=column, row=0)
self.buttons.append(button)
self.buttons[0].config(state = "disabled")
def measure_start(self):
for button in self.buttons:
button.config(state = "disabled")
self.display.delete("error")
self.measuring_data = list()
self.serial_thread_client.start(self.device)
self.measure()
def get_ports(self, menubar):
port_menu = tk.Menu(menubar, tearoff = 0)
for serial_ports in self.serial_thread_client.ask_for_ports():
for port in serial_ports:
device, desc, hwid = port
port_menu.add_command(label = "{0}:{1}:{2}".format(
device, desc, hwid), command = partial(
self.set_device, device, menubar))
return port_menu
def set_device(self, device, menubar):
menubar.entryconfig("PORTS", state = "disabled")
self.buttons[0].config(state = "normal")
self.device = device
def measure_stop(self):
for button in self.buttons:
button.config(state = "active")
self.measure_counter = self.MEASURE_COUNTER
self.update_cap_display(self.measure_counter, "counter")
self.serial_thread_client.stop()
if self.after_id:
self.after_cancel(self.after_id)
self.after_id = None
def measure_counter_up_down(self, step):
if self.measure_counter > -step:
self.measure_counter += step
self.update_cap_display(self.measure_counter, "counter")
def update_cap_display(self, text, tag):
width, height, text_conf, font, color, tag = self.display_conf[tag]
self.display.delete(tag)
self.display.create_text(width, height, text = text_conf.format(text),
font = font, fill = color,tag = tag)
def measure(self):
measure_time = 1
if not self.queue.empty():
while True:
try:
measure_time, error = self.queue.get_nowait()
except queue.Empty:
break
if error:
self.update_cap_display(measure_time, "error")
self.measure_stop()
return
else:
capacity = measure_time * 1000 - self.CALIBRATION \
* measure_time
self.measuring_data.append(capacity)
self.update_cap_display(capacity, "cap")
self.update_cap_display(self.measure_counter, "counter")
self.measure_counter -= 1
else:
if len(self.measuring_data) == 0:
self.update_cap_display("!! WAIT !!", "wait")
if self.measure_counter > 0:
self.after_id = self.after(int(self.update_interval
* measure_time), self.measure)
else:
if len(self.measuring_data) > 1:
self.measuring_data.pop(-1)
total_data = 0
for data in self.measuring_data:
total_data += data
total_data = total_data / len(self.measuring_data)
self.update_cap_display(total_data, "cap")
self.after(int(self.measuring_data[0] * 1.5),
self.measure_stop)
else:
self.after(1, self.measure_stop)
def release(self):
if self.after_id:
self.measure_stop()
self.parent.destroy()
class SerialThreadedClient(object):
def __init__(self, queue):
self.queue = queue
self.run_event = None
def ask_for_ports(self):
ports = list()
ports.append((port, desc, hwid) for port, desc, hwid in sorted(
serial.tools.list_ports.comports()))
return ports
def stop(self):
self.run_event.set()
print(self.queue.qsize())
def start(self, device):
self.run_event = Event()
self.thread = Thread(target=partial (self.worker_thread, device))
self.thread.daemon = True
self.thread.start()
def worker_thread(self, device):
end = 0
while not self.run_event.is_set():
try:
with serial.Serial(device) as ser:
ser.dtr = False
start = time.time()
while not ser.dsr:
end = time.time() - start
ser.dtr = True
if end * 1000 > 1:
self.queue.put((end, False))
else:
self.queue.put(("capacity < 1 µf or not connected",
True))
except (serial.SerialException, IOError) as error:
self.queue.put((error, True))
self.run_event.wait(end * 3)
def main():
root = tk.Tk()
root.title("CAPACITANCE")
capacitance_ui = CapacitanceUI(root, WIDTH, HEIGHT)
capacitance_ui.pack(expand=tk.YES, padx=5, pady=5)
root.protocol("WM_DELETE_WINDOW",capacitance_ui.release)
root.mainloop()
if __name__ == '__main__':
main()
@kaytec: was ist der Sinn, dass `ask_for_ports` eine Liste mit einem Generator zurückliefert. Sollte die Funktion nicht direkt den Generator zurückliefern, dann ist bei der Benutzung diese doppelte for-Schleife auch nicht nötig.
Wenn man weiß, welche Elemente eine Liste hat, benutzt man [, ].
Hier sind die runden Klammern wichtig, weil Du willst ja eine Liste mit Generator haben, ohne runde Klammern wäre das einfach eine List-Comprehension.
Listen, die immer exakt ein Element haben, sind aber selten sinnvoll, und man kann die Liste gleich weglassen, muß dann aber beim Aufruf Änderungen vornehmen.
Jetzt entpackst Du die ListPortInfo-Objekte um daraus ein Tuple zu machen. Wenn es also nicht wichtig ist, dass das Tuple sind, kann man das weiter Vereinfachen zu:
Wenn jetzt ask_for_ports auch eine Liste zurückliefern darf, dann wird das noch einfacher:
Wenn man weiß, welche Elemente eine Liste hat, benutzt man [, ].
Code: Alles auswählen
def ask_for_ports(self):
return [((port, desc, hwid) for port, desc, hwid in
sorted(serial.tools.list_ports.comports())]
Listen, die immer exakt ein Element haben, sind aber selten sinnvoll, und man kann die Liste gleich weglassen, muß dann aber beim Aufruf Änderungen vornehmen.
Code: Alles auswählen
def ask_for_ports(self):
return ((port, desc, hwid) for port, desc, hwid in
sorted(serial.tools.list_ports.comports())
Code: Alles auswählen
def ask_for_ports(self):
return (info for info sorted(serial.tools.list_ports.comports())
Code: Alles auswählen
def ask_for_ports(self):
return sorted(serial.tools.list_ports.comports()
Hallo Sirius3,
danke für deine Korrektur. Eigentlich ist die Methode „def ask_for_ports“ unnötig, da ich ja gleich in „def get_ports“ auf „serial.tools.list_ports.comports()“ zugreifen könnte. Da hätte ich ja keine Trennung der Klassen ?!. So war meine Idee einfach eine Liste zu erzeugen, die zurückgegeben und ausgelesen werden kann. Die Programmierung entsteht durch Kopiern aus dem Netz, Versuchen, Fehler deuten etc.. und irgendwann funktioniert es so halbwegs – ich fühle mich wie ein „Superprofi“ und bin glücklich.
Gruß Frank
danke für deine Korrektur. Eigentlich ist die Methode „def ask_for_ports“ unnötig, da ich ja gleich in „def get_ports“ auf „serial.tools.list_ports.comports()“ zugreifen könnte. Da hätte ich ja keine Trennung der Klassen ?!. So war meine Idee einfach eine Liste zu erzeugen, die zurückgegeben und ausgelesen werden kann. Die Programmierung entsteht durch Kopiern aus dem Netz, Versuchen, Fehler deuten etc.. und irgendwann funktioniert es so halbwegs – ich fühle mich wie ein „Superprofi“ und bin glücklich.
Gruß Frank