Re: pyserial und with
Verfasst: Sonntag 3. März 2019, 10:37
@kaytec: Ich finde die Aufteilung komisch. Nicht zuletzt weil Programmlogik und GUI nicht sauber getrennt ist.
Seit 2002 Diskussionen rund um die Programmiersprache Python
https://www.python-forum.de/
Code: Alles auswählen
#/usr/bin/env python
# -*- coding: utf-8
import Tkinter as tk
import serial
import time
import threading
import sys
from functools import partial
import Queue
WIDTH = 200
HEIGHT = 100
DEVICE = "/dev/ttyUSB0"
class CapacitanceUI(tk.LabelFrame):
MEASURE_COUNTER = 10
def __init__(self, parent,
width,
height,
device,
update_interval = 1000,
text_color = "blue",
error_font = "system 5 bold",
display_font = "system 18 bold"):
tk.LabelFrame.__init__(self, parent, text = "CAPACITANCE",
relief = "solid")
self.width = width
self.height = height
self.parent = parent
self.device = device
self.update_interval = update_interval
self.text_color = text_color
self.error_font = error_font
self.display_font = display_font
self.queue = Queue.Queue()
self.serial_thread_client = SerialThreadedClient(device, self.queue)
self.after_id = None
self.measure_counter = self.MEASURE_COUNTER
self.measuring_data = list()
self.display = tk.Canvas(self,
width = width,
height = height,
bg="cyan")
self.display.pack(padx = 5, pady = 5)
self.update_cap_display("00.00 uf", False)
button_frame = tk.Frame(self, bg = "gray")
button_frame.pack(padx = 5, pady = 5)
self.buttons = list()
for column, (text, width, command, var) in enumerate(
(("START", 3, self.measure_start_stop, ()),
("+", 2, self.measure_counter_up_down, (1,)),
("-", 2, self.measure_counter_up_down, (-1,)))):
button = tk.Button(button_frame, text=text, width=width,
command=partial(command, *var))
button.grid(column=column, row=0)
self.buttons.append(button)
def measure_start_stop(self):
if not self.after_id:
for button in self.buttons:
button.config(state = "disabled")
self.serial_thread_client.start_stop()
self.measure()
else:
for button in self.buttons:
button.config(state = "active")
self.measuring_data = list()
self.measure_counter = self.MEASURE_COUNTER
self.update_cap_display(
"{0}: {1}".format("COUNTS",
self.measure_counter),
True,
"counter",
30,
self.height - 10)
self.serial_thread_client.start_stop()
self.measure_counter = self.MEASURE_COUNTER
self.after_cancel(self.after_id)
self.after_id = None
def measure_counter_up_down(self, step):
if not self.after_id:
if self.measure_counter > -step:
self.measure_counter += step
self.update_cap_display(
"{0}: {1}".format("COUNTS",
self.measure_counter),
True,
"counter",
30,
self.height - 10)
def update_cap_display(self, text, is_error, tag = "cap", width = None,
height = None):
self.display.delete(tag)
display_width = self.width / 2 if not width else width
display_height = self.height / 2 if not height else height
self.display.create_text(
display_width,
display_height,
text = text,
font = self.error_font if is_error else self.display_font,
fill = self.text_color,
tag = tag)
def measure(self):
if self.measure_counter > 0:
try:
measure_time, state = self.queue.get(0)
if not state:
capacity = measure_time * 1000 - 50 * measure_time
self.measuring_data.append(capacity)
self.update_cap_display("{0:2.2f} uf".format(capacity), False,
"cap")
else:
self.update_cap_display(measure_time, state, "error")
self.display.delete("cap")
except Queue.Empty:
if len(self.measuring_data) == 0:
self.update_cap_display("!! WAIT !!",
False)
self.measure_counter -= 1
self.after_id = self.after(self.update_interval, self.measure)
else:
if len(self.measuring_data) > 0:
if len(self.measuring_data) > 1:
total_data = 0
for data in self.measuring_data:
total_data += data
total_data = total_data / len(self.measuring_data)
self.update_cap_display("{0:2.2f} uf".format(total_data),
False)
self.after(int(self.measuring_data[0] * 1.5),
self.measure_start_stop)
else:
self.measure_start_stop()
def release(self):
self.parent.destroy()
self.serial_thread_client.release()
class SerialThreadedClient(object):
def __init__(self, device, queue):
self.device = device
self.queue = queue
self.running = None
def start_stop(self):
if self.running:
self.running = None
else:
self.running = True
self.thread = threading.Thread(target=self.workerThread)
self.thread.start()
def workerThread(self):
end = None
while self.running:
try:
with serial.Serial(self.device) as ser:
ser.dtr = False
start = time.time()
while not ser.dsr:
end = time.time() - start
ser.dtr = True
if end * 1000 >= 1:
self.queue.put((end, False))
else:
self.queue.put(("capacity < 1 µf or not connected",
True))
except (serial.SerialException, IOError) as error:
self.queue.put((error, True))
if end:
time.sleep(end * 3)
def release(self):
self.running = None
sys.exit(1)
def main():
root = tk.Tk()
root.title("CAPACITANCE")
capacitance_ui = CapacitanceUI(root, WIDTH, HEIGHT, DEVICE)
capacitance_ui.pack(expand=tk.YES, padx=5, pady=5)
root.protocol("WM_DELETE_WINDOW",capacitance_ui.release)
root.mainloop()
if __name__ == '__main__':
main()
Code: Alles auswählen
#/usr/bin/env python
# -*- coding: utf-8
import Tkinter as tk
import serial
import time
import threading
from functools import partial
import Queue
WIDTH = 200
HEIGHT = 100
DEVICE = "/dev/ttyUSB0"
class CapacitanceUI(tk.LabelFrame):
MEASURE_COUNTER = 10
def __init__(self, parent,
width,
height,
device,
update_interval = 1000,
text_color = "blue",
error_font = "system 5 bold",
display_font = "system 18 bold"):
tk.LabelFrame.__init__(self, parent, text = "CAPACITANCE",
relief = "solid")
self.width = width
self.height = height
self.parent = parent
self.device = device
self.update_interval = update_interval
self.text_color = text_color
self.error_font = error_font
self.display_font = display_font
self.queue = Queue.Queue()
self.serial_thread_client = SerialThreadedClient(device, self.queue)
self.after_id = None
self.measure_counter = self.MEASURE_COUNTER
self.measuring_data = list()
self.display = tk.Canvas(self,
width = width,
height = height,
bg="cyan")
self.display.pack(padx = 5, pady = 5)
self.update_cap_display("00.00 uf", False)
button_frame = tk.Frame(self, bg = "gray")
button_frame.pack(padx = 5, pady = 5)
self.buttons = list()
for column, (text, width, command, var) in enumerate(
(("START", 3, self.measure_start_stop, ()),
("+", 2, self.measure_counter_up_down, (1,)),
("-", 2, self.measure_counter_up_down, (-1,)))):
button = tk.Button(button_frame, text=text, width=width,
command=partial(command, *var))
button.grid(column=column, row=0)
self.buttons.append(button)
def measure_start_stop(self):
if not self.after_id:
for button in self.buttons:
button.config(state = "disabled")
self.display.delete("error")
self.serial_thread_client.start_stop()
self.measure()
else:
for button in self.buttons:
button.config(state = "active")
self.measuring_data = list()
self.measure_counter = self.MEASURE_COUNTER
self.update_cap_display(
"{0}: {1}".format("COUNTS",
self.measure_counter),
True,
"counter",
30,
self.height - 10)
self.serial_thread_client.start_stop()
self.measure_counter = self.MEASURE_COUNTER
self.after_cancel(self.after_id)
self.after_id = None
def measure_counter_up_down(self, step):
if not self.after_id:
if self.measure_counter > -step:
self.measure_counter += step
self.update_cap_display(
"{0}: {1}".format("COUNTS",
self.measure_counter),
True,
"counter",
30,
self.height - 10)
def update_cap_display(self, text, is_error, tag = "cap", width = None,
height = None):
self.display.delete(tag)
display_width = self.width / 2 if not width else width
display_height = self.height / 2 if not height else height
self.display.create_text(
display_width,
display_height,
text = text,
font = self.error_font if is_error else self.display_font,
fill = self.text_color,
tag = tag)
def measure(self):
if self.measure_counter > 0:
try:
measure_time, state = self.queue.get(-1)
if not state:
capacity = measure_time * 1000 - 50 * measure_time
self.measuring_data.append(capacity)
self.update_cap_display("{0:2.2f} uf".format(capacity), False,
"cap")
self.update_cap_display(
"{0}: {1}".format("COUNTS",
self.measure_counter),
True,
"counter",
30,
self.height - 10)
else:
self.update_cap_display(measure_time, state, "error",
self.width / 2, 10)
self.display.delete("cap")
except Queue.Empty:
if len(self.measuring_data) == 0:
self.update_cap_display("!! WAIT !!",
False)
self.measure_counter -= 1
self.after_id = self.after(self.update_interval, self.measure)
else:
if len(self.measuring_data) > 0:
if len(self.measuring_data) > 1:
total_data = 0
for data in self.measuring_data:
total_data += data
total_data = total_data / len(self.measuring_data)
self.update_cap_display("{0:2.2f} uf".format(total_data),
False)
for i in xrange(self.queue.qsize()):
self.queue.get(0)
self.after(int(self.measuring_data[0] * 1.5),
self.measure_start_stop)
else:
self.measure_start_stop()
def release(self):
self.parent.destroy()
self.serial_thread_client.release()
class SerialThreadedClient(object):
def __init__(self, device, queue):
self.device = device
self.queue = queue
self.running = False
def clear_queue(self):
for item in xrange(self.queue.qsize()):
try:
self.queue.get(0)
except Queue.Empty:
break
def start_stop(self):
if self.running:
self.running = False
else:
self.running = True
self.thread = threading.Thread(target=self.workerThread)
self.thread.start()
def workerThread(self):
end = 0.0
while self.running:
try:
with serial.Serial(self.device) as ser:
ser.dtr = False
start = time.time()
while not ser.dsr:
end = time.time() - start
ser.dtr = True
if end * 1000 > 1:
self.queue.put((end, False))
else:
self.queue.put(("capacity < 1 µf or not connected",
True))
except (serial.SerialException, IOError) as error:
self.queue.put((error, True))
if end:
self.clear_queue()
time.sleep(end * 3)
def release(self):
self.running = False
def main():
root = tk.Tk()
root.title("CAPACITANCE")
capacitance_ui = CapacitanceUI(root, WIDTH, HEIGHT, DEVICE)
capacitance_ui.pack(expand=tk.YES, padx=5, pady=5)
root.protocol("WM_DELETE_WINDOW",capacitance_ui.release)
root.mainloop()
if __name__ == '__main__':
main()
Code: Alles auswählen
#/usr/bin/env python
# -*- coding: utf-8
import Tkinter as tk
import serial
import time
import threading
from functools import partial
import Queue
WIDTH = 270
HEIGHT = 100
DEVICE = "/dev/ttyUSB0"
class CapacitanceUI(tk.LabelFrame):
MEASURE_COUNTER = 10
def __init__(self, parent,
width,
height,
device,
update_interval = 1000,
text_color = "blue",
error_font = "system 5 bold",
display_font = "system 18 bold"):
tk.LabelFrame.__init__(self, parent, text = "CAPACITANCE",
relief = "solid")
self.width = width
self.height = height
self.parent = parent
self.device = device
self.update_interval = update_interval
self.text_color = text_color
self.error_font = error_font
self.display_font = display_font
self.queue = Queue.Queue()
self.serial_thread_client = SerialThreadedClient(device, parent, self.queue)
self.after_id = None
self.measure_counter = self.MEASURE_COUNTER
self.measuring_data = list()
self.display = tk.Canvas(self,
width = width,
height = height,
bg="cyan")
self.display.pack(padx = 5, pady = 5)
self.update_cap_display("00.00 uf", False)
button_frame = tk.Frame(self, bg = "gray")
button_frame.pack(padx = 5, pady = 5)
self.buttons = list()
for column, (text, width, command, var) in enumerate(
(("START", 3, self.measure_start_stop, ()),
("+", 2, self.measure_counter_up_down, (1,)),
("-", 2, self.measure_counter_up_down, (-1,)))):
button = tk.Button(button_frame, text=text, width=width,
command=partial(command, *var))
button.grid(column=column, row=0)
self.buttons.append(button)
def measure_start_stop(self):
if not self.after_id:
for button in self.buttons:
button.config(state = "disabled")
self.display.delete("error")
self.serial_thread_client.clear_queue(0)
self.serial_thread_client.start_stop()
self.measure()
else:
for button in self.buttons:
button.config(state = "active")
self.measuring_data = list()
self.measure_counter = self.MEASURE_COUNTER
self.update_cap_display(
"{0}: {1}".format("COUNTS",
self.measure_counter),
True,
"counter",
30,
self.height - 10)
self.serial_thread_client.start_stop()
self.after_cancel(self.after_id)
self.after_id = None
def measure_counter_up_down(self, step):
if not self.after_id:
if self.measure_counter > -step:
self.measure_counter += step
self.update_cap_display(
"{0}: {1}".format("COUNTS",
self.measure_counter),
True,
"counter",
30,
self.height - 10)
def update_cap_display(self, text, is_error, tag = "cap", width = None,
height = None):
self.display.delete(tag)
display_width = self.width / 2 if not width else width
display_height = self.height / 2 if not height else height
self.display.create_text(
display_width,
display_height,
text = text,
font = self.error_font if is_error else self.display_font,
fill = self.text_color,
tag = tag)
def measure(self):
if self.measure_counter > 0:
try:
measure_time, state = self.queue.get(-1)
if not state:
if measure_time:
capacity = measure_time * 1000 - 50 * measure_time
self.measuring_data.append(capacity)
self.update_cap_display(
"{0:2.2f} uf".format(
capacity),
False,
"cap")
self.update_cap_display(
"{0}: {1}".format(
"COUNTS",
self.measure_counter),
True,
"counter",
30,
self.height - 10)
else:
self.update_cap_display(
measure_time,
True,
"error",
self.width / 2,
10)
self.measure_counter = 0
except Queue.Empty:
if len(self.measuring_data) == 0:
self.update_cap_display("!! WAIT !!", False)
self.measure_counter -= 1
self.after_id = self.after(self.update_interval, self.measure)
else:
if len(self.measuring_data) > 0:
if len(self.measuring_data) > 1:
self.measuring_data.pop(-1)
total_data = 0
for data in self.measuring_data:
total_data += data
total_data = total_data / len(self.measuring_data)
self.update_cap_display("{0:2.2f} uf".format(total_data),
False)
for i in xrange(self.queue.qsize()):
self.queue.get(0)
self.after(int(self.measuring_data[0] * 1.5),
self.measure_start_stop)
else:
self.measure_start_stop()
def release(self):
self.parent.destroy()
self.serial_thread_client.release()
class SerialThreadedClient(object):
def __init__(self, device, parent, queue):
self.device = device
self.parent = parent
self.queue = queue
self.run = False
def clear_queue(self, min = 1):
while self.queue.qsize() > min:
self.queue.get(0)
def start_stop(self):
if self.run:
self.parent.after_cancel(self.thread_id)
self.thread_id = None
self.run = False
else:
self.run = True
self.thread = threading.Thread(target=self.worker_thread)
self.thread.start()
def worker_thread(self):
end = 0
if self.run:
try:
with serial.Serial(self.device) as ser:
ser.dtr = False
start = time.time()
while not ser.dsr:
end = time.time() - start
ser.dtr = True
if end * 1000 > 1:
self.queue.put((end, False))
else:
self.queue.put(("capacity < 1 µf or not connected",
True))
self.run = False
except (serial.SerialException, IOError) as error:
self.queue.put((error, True))
self.run = False
self.clear_queue()
self.thread_id = self.parent.after(int(end * 10000),
self.worker_thread)
def release(self):
self.running = False
def main():
root = tk.Tk()
root.title("CAPACITANCE")
capacitance_ui = CapacitanceUI(root, WIDTH, HEIGHT, DEVICE)
capacitance_ui.pack(expand=tk.YES, padx=5, pady=5)
root.protocol("WM_DELETE_WINDOW",capacitance_ui.release)
root.mainloop()
if __name__ == '__main__':
main()
Code: Alles auswählen
#/usr/bin/env python
# -*- coding: utf-8
import Tkinter as tk
import serial
import time
import threading
from functools import partial
import Queue
WIDTH = 270
HEIGHT = 100
DEVICE = "/dev/ttyUSB0"
class CapacitanceUI(tk.LabelFrame):
MEASURE_COUNTER = 10
def __init__(self, parent,
width,
height,
device,
update_interval = 3000,
text_color = "blue",
error_font = "system 5 bold",
display_font = "system 18 bold"):
tk.LabelFrame.__init__(self, parent, text = "CAPACITANCE",
relief = "solid")
self.width = width
self.height = height
self.parent = parent
self.device = device
self.update_interval = update_interval
self.text_color = text_color
self.error_font = error_font
self.display_font = display_font
self.queue = Queue.Queue()
self.serial_thread_client = SerialThreadedClient(device, parent, self.queue)
self.after_id = None
self.measure_counter = self.MEASURE_COUNTER
self.measuring_data = list()
self.display = tk.Canvas(self,
width = width,
height = height,
bg="cyan")
self.display.pack(padx = 5, pady = 5)
self.update_cap_display("00.00 uf", False)
button_frame = tk.Frame(self, bg = "gray")
button_frame.pack(padx = 5, pady = 5)
self.buttons = list()
for column, (text, width, command, var) in enumerate(
(("START", 3, self.measure_start_stop, ()),
("+", 2, self.measure_counter_up_down, (1,)),
("-", 2, self.measure_counter_up_down, (-1,)))):
button = tk.Button(button_frame, text=text, width=width,
command=partial(command, *var))
button.grid(column=column, row=0)
self.buttons.append(button)
def measure_start_stop(self):
if not self.after_id:
for button in self.buttons:
button.config(state = "disabled")
self.display.delete("error")
self.serial_thread_client.clear_queue(0)
self.serial_thread_client.start_stop()
self.measure()
else:
for button in self.buttons:
button.config(state = "active")
self.measuring_data = list()
self.measure_counter = self.MEASURE_COUNTER
self.update_cap_display(
"{0}: {1}".format("COUNTS",
self.measure_counter),
True,
"counter",
30,
self.height - 10)
self.serial_thread_client.start_stop()
self.after_cancel(self.after_id)
self.after_id = None
def measure_counter_up_down(self, step):
if not self.after_id:
if self.measure_counter > -step:
self.measure_counter += step
self.update_cap_display(
"{0}: {1}".format("COUNTS",
self.measure_counter),
True,
"counter",
30,
self.height - 10)
def update_cap_display(self, text, is_error, tag = "cap", width = None,
height = None):
self.display.delete(tag)
display_width = self.width / 2 if not width else width
display_height = self.height / 2 if not height else height
self.display.create_text(
display_width,
display_height,
text = text,
font = self.error_font if is_error else self.display_font,
fill = self.text_color,
tag = tag)
def measure(self):
measure_time = 1
if self.measure_counter > 0:
try:
measure_time, state = self.queue.get(0)
if not state:
if measure_time:
capacity = measure_time * 1000 - 50 * measure_time
self.measuring_data.append(capacity)
self.update_cap_display(
"{0:2.2f} uf".format(
capacity),
False,
"cap")
else:
self.update_cap_display(
measure_time,
True,
"error",
self.width / 2,
10)
self.measure_counter = 0
measure_time = 1
except Queue.Empty:
if len(self.measuring_data) == 0:
self.update_cap_display("!! WAIT !!", False)
self.update_cap_display(
"{0}: {1}".format(
"COUNTS",
self.measure_counter),
True,
"counter",
30,
self.height - 10)
self.measure_counter -= 1
self.after_id = self.after(int(self.update_interval
* measure_time), self.measure)
else:
if len(self.measuring_data) > 0:
if len(self.measuring_data) > 1:
self.measuring_data.pop(-1)
total_data = 0
for data in self.measuring_data:
total_data += data
total_data = total_data / len(self.measuring_data)
self.update_cap_display("{0:2.2f} uf".format(total_data),
False)
for i in xrange(self.queue.qsize()):
self.queue.get(0)
self.after(int(self.measuring_data[0] * 1.5),
self.measure_start_stop)
else:
self.measure_start_stop()
def release(self):
self.parent.destroy()
self.serial_thread_client.release()
class SerialThreadedClient(object):
def __init__(self, device, parent, queue):
self.device = device
self.parent = parent
self.queue = queue
self.run = False
def clear_queue(self, min = 1):
while self.queue.qsize() > min:
self.queue.get(0)
def start_stop(self):
if self.run:
self.parent.after_cancel(self.thread_id)
self.thread_id = None
self.run = False
else:
self.run = True
self.thread = threading.Thread(target=self.worker_thread)
self.thread.start()
def worker_thread(self):
end = 0
if self.run:
try:
with serial.Serial(self.device) as ser:
ser.dtr = False
start = time.time()
while not ser.dsr:
end = time.time() - start
ser.dtr = True
if end * 1000 > 1:
self.queue.put((end, False))
else:
self.queue.put(("capacity < 1 µf or not connected",
True))
self.run = False
except (serial.SerialException, IOError) as error:
self.queue.put((error, True))
self.run = False
self.clear_queue()
self.thread_id = self.parent.after(int(end * 3000),
self.worker_thread)
def release(self):
self.running = False
def main():
root = tk.Tk()
root.title("CAPACITANCE")
capacitance_ui = CapacitanceUI(root, WIDTH, HEIGHT, DEVICE)
capacitance_ui.pack(expand=tk.YES, padx=5, pady=5)
root.protocol("WM_DELETE_WINDOW",capacitance_ui.release)
root.mainloop()
if __name__ == '__main__':
main()
Code: Alles auswählen
#/usr/bin/env python
# -*- coding: utf-8
import Tkinter as tk
import serial
import time
import random
from threading import Thread, Event
from functools import partial
import Queue
class CapacitanceUI(tk.LabelFrame):
def __init__(self, parent):
tk.LabelFrame.__init__(self, parent)
self.parent = parent
self.queue = Queue.Queue()
self.serial_thread_client = SerialThreadedClient(self.queue)
self.after_id = None
self.measure_counter = 20
self.display = tk.Label(self, width = 20, height = 3)
self.display.pack()
self.start_button = tk.Button(self, text = "START",
command = self.measure_start_stop)
self.start_button.pack()
def measure_start_stop(self):
if not self.after_id:
self.start_button.config(state = "disabled")
self.serial_thread_client.start_stop()
self.measure()
else:
self.start_button.config(state = "active")
self.measure_counter = 20
self.serial_thread_client.start_stop()
self.after_cancel(self.after_id)
self.after_id = None
def measure(self):
measure_time = 1
if self.measure_counter > 0:
if not self.queue.empty():
measure_time, state = self.queue.get(-1)
if not state:
self.display.config(text = measure_time)
else:
self.display.config(text = "error")
self.measure_counter = 0
self.measure_counter -= 1
self.after_id = self.after(1000, self.measure)
else:
self.measure_start_stop()
def release(self):
self.parent.destroy()
class SerialThreadedClient(object):
def __init__(self, queue):
self.queue = queue
self.run = False
self.run_event = None
def clear_queue(self):
while self.queue.qsize() > 0:
self.queue.get(0)
def start_stop(self):
if self.run:
self.run = False
self.run_event.set()
else:
print self.queue.qsize()
self.run_event = Event()
#self.clear_queue()
self.run = True
self.thread = Thread(target=self.worker_thread)
self.thread.start()
def worker_thread(self):
while not self.run_event.is_set():
item = random.randint(0, 4)
if item == 0:
self.queue.put((item, True))
else:
self.queue.put((item, False))
self.run_event.wait(0.5)
def main():
root = tk.Tk()
root.title("CAPACITANCE")
capacitance_ui = CapacitanceUI(root)
capacitance_ui.pack(expand=tk.YES, padx=5, pady=5)
root.protocol("WM_DELETE_WINDOW",capacitance_ui.release)
root.mainloop()
if __name__ == '__main__':
main()
Code: Alles auswählen
#/usr/bin/env python
# -*- coding: utf-8
import Tkinter as tk
import serial
import time
import random
from threading import Thread, Event
from functools import partial
import Queue
class CapacitanceUI(tk.LabelFrame):
def __init__(self, parent):
tk.LabelFrame.__init__(self, parent)
self.parent = parent
self.queue = Queue.Queue()
self.serial_thread_client = SerialThreadedClient(self.queue)
self.after_id = None
self.measure_counter = 0
self.display = tk.Label(self, width=20, height=3)
self.display.pack()
self.start_button = tk.Button(self, text="START",
command = self.measure_start)
self.start_button.pack()
def measure_start(self):
self.measure_counter = 200
self.start_button['state'] = "disabled"
self.serial_thread_client.start()
self.measure()
def measure_stop(self):
self.start_button['state'] = "active"
self.serial_thread_client.stop()
if self.after_id is not None:
self.after_cancel(self.after_id)
self.after_id = None
def measure(self):
if not self.queue.empty():
while True:
try:
measure_time, error = self.queue.get_nowait()
except Queue.Empty:
break
if error:
self.display['text'] = "error"
self.measure_stop()
return
self.display['text'] = measure_time
self.measure_counter -= 1
if self.measure_counter > 0:
self.after_id = self.after(100, self.measure)
else:
self.measure_stop()
def release(self):
self.measure_stop()
self.parent.destroy()
class SerialThreadedClient(object):
def __init__(self, queue):
self.queue = queue
self.run_event = None
def stop(self):
self.run_event.set()
def start(self):
print self.queue.qsize()
self.run_event = Event()
self.thread = Thread(target=self.worker_thread)
self.thread.daemon = True
self.thread.start()
def worker_thread(self):
while not self.run_event.is_set():
item = random.randint(0, 4)
if item == 0:
self.queue.put((item, True))
else:
self.queue.put((item, False))
self.run_event.wait(0.5)
def main():
root = tk.Tk()
root.title("CAPACITANCE")
capacitance_ui = CapacitanceUI(root)
capacitance_ui.pack(expand=tk.YES, padx=5, pady=5)
root.protocol("WM_DELETE_WINDOW",capacitance_ui.release)
root.mainloop()
if __name__ == '__main__':
main()