pyserial und with

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Benutzeravatar
kaytec
User
Beiträge: 608
Registriert: Dienstag 13. Februar 2007, 21:57

Hallo,

danke Sirius3 - hier ist die umgebastelte Version des Testers.

Code: Alles auswählen

#/usr/bin/env python
# -*- coding: utf-8
import Tkinter as tk
import serial
import time
from threading import Thread, Event
from functools import partial
import Queue

WIDTH = 270
HEIGHT = 100
DEVICE = "/dev/ttyUSB0"

class CapacitanceUI(tk.LabelFrame):
    
    MEASURE_COUNTER = 10
    
    def __init__(self, parent, width, height, device, update_interval = 2000,
            text_color = "blue"):
        tk.LabelFrame.__init__(self, parent, text = "CAPACITANCE", 
            relief = "solid")
    
        self.width = width
        self.height = height
        self.parent = parent
        self.device = device
        self.update_interval = update_interval
        self.queue = Queue.Queue()
        self.serial_thread_client = SerialThreadedClient(device, self.queue)
        self.after_id = None
        self.measure_counter = self.MEASURE_COUNTER
        self.measuring_data = list()
        self.text = ""
        self.display_conf = {"cap" : (
                             self.width / 2, 
                             self.height / 2, 
                             "{0:2.2f} uf",
                             "system 18 bold", 
                             "blue",
                             "cap"),
                             "counter" : (
                             40, 
                             self.height - 10,
                             "COUNTS: {0}",
                             "system 7 bold",
                             "green",
                             "counter"),
                             "error" : (
                             self.width / 2, 
                             10,
                             "{0}",
                             "system 5 bold", 
                             "red",
                             "error"),
                             "wait" :(
                             self.width / 2, 
                             self.height / 2,
                             "{0}",
                             "system 18 bold",
                             "magenta",
                             "cap")}
        self.display = tk.Canvas(self,
            width = width,
            height = height,
            bg="cyan")
        self.display.pack(padx = 5, pady = 5)
        self.update_cap_display(0, "cap")
        button_frame = tk.Frame(self)
        button_frame.pack(padx = 5, pady = 5)
        self.buttons = list()
        for column, (text, width, command, var) in enumerate(
            (("START", 3, self.measure_start, ()),
             ("+", 2, self.measure_counter_up_down, (1,)),
             ("-", 2, self.measure_counter_up_down, (-1,)))):
            button = tk.Button(button_frame, text=text, width=width,
                command=partial(command, *var))
            button.grid(column=column, row=0)
            self.buttons.append(button)
            
    def measure_start(self):
        for button in self.buttons:
            button.config(state = "disabled")
        self.display.delete("error")
        self.measuring_data = list()
        self.measure_counter = self.MEASURE_COUNTER
        self.serial_thread_client.start()
        self.measure()

    def measure_stop(self):
        for button in self.buttons:
            button.config(state = "active")
        self.update_cap_display(self.measure_counter, "counter")
        self.serial_thread_client.stop()
        self.after_cancel(self.after_id)
        self.after_id = None
    
    def measure_counter_up_down(self, step):
        if self.measure_counter > -step:
            self.measure_counter += step
            self.update_cap_display(self.measure_counter, "counter")
            
    def update_cap_display(self, text, tag):
        width, height, text_conf, font, color, tag = self.display_conf[tag]
        self.display.delete(tag)
        self.display.create_text(width, height, text = text_conf.format(text), 
            font = font, fill = color,tag = tag)
            
    def measure(self):
        measure_time = 1
        if not self.queue.empty():
            while True:
                try:
                    measure_time, error = self.queue.get_nowait()
                except Queue.Empty:
                    break
            if error:
                self.update_cap_display(measure_time, "error")
                self.measure_counter = 0
                measure_time = 1
            else:
                capacity = measure_time * 1000 - 50 * measure_time
                self.measuring_data.append(capacity)
                self.update_cap_display(capacity, "cap")
                    
                self.update_cap_display(self.measure_counter, "counter")
                    
            self.measure_counter -= 1
        else:
            if len(self.measuring_data) == 0:
                self.update_cap_display("!! WAIT !!", "wait")
        
        if self.measure_counter > 0:
            self.after_id = self.after(int(self.update_interval 
                * measure_time), self.measure)
        else:
            if len(self.measuring_data) > 1:
                self.measuring_data.pop(-1)
                total_data = 0
                for data in self.measuring_data:
                    total_data += data
                total_data = total_data / len(self.measuring_data)
                self.update_cap_display(total_data, "cap")
                self.after(int(self.measuring_data[0] * 1.5), 
                    self.measure_stop)
            else:
                self.after(100, self.measure_stop)
            
    def release(self):
        if self.after_id:
            self.measure_stop()
        self.parent.destroy()
        
        
class SerialThreadedClient(object):
    
    def __init__(self, device, queue):
        self.device = device
        self.queue = queue
        self.run_event = None

    def stop(self):
        self.run_event.set()
        
    def start(self):
        self.run_event = Event()
        self.thread = Thread(target=self.worker_thread)
        self.thread.daemon = True
        self.thread.start()
            
    def worker_thread(self):
        end = 0
        while not self.run_event.is_set():
            try:
                with serial.Serial(self.device) as ser:
                    ser.dtr = False
                    start = time.time()
                    while not ser.dsr:
                        end = time.time() - start
                        ser.dtr = True
                    if end * 1000 > 1:
                        self.queue.put((end, False))
                    else:
                        self.queue.put(("capacity < 1 µf or not connected", 
                            True))
            except (serial.SerialException, IOError) as error:
                self.queue.put((error, True))
            self.run_event.wait(end * 3) 
        
def main():
    root = tk.Tk()
    root.title("CAPACITANCE")
    capacitance_ui = CapacitanceUI(root, WIDTH, HEIGHT, DEVICE)
    capacitance_ui.pack(expand=tk.YES, padx=5, pady=5)
    root.protocol("WM_DELETE_WINDOW",capacitance_ui.release)
    root.mainloop()
    
if __name__ == '__main__':
    main()
Gruß Frank
Benutzeravatar
kaytec
User
Beiträge: 608
Registriert: Dienstag 13. Februar 2007, 21:57

Hallo,

noch Fehler beim Abziehen von Adapter/Bauteilen gefunden, noch einiger Quatsch von mir gelüöscht und einiges von Sirius3-Script vergessen.

Code: Alles auswählen

#/usr/bin/env python
# -*- coding: utf-8
import Tkinter as tk
import serial
import time
from threading import Thread, Event
from functools import partial
import Queue

WIDTH = 270
HEIGHT = 100
DEVICE = "/dev/ttyUSB0"

class CapacitanceUI(tk.LabelFrame):
    
    MEASURE_COUNTER = 10
    UPDATE_INTERVAL = 2000
    def __init__(self, parent, width, height, device):
        tk.LabelFrame.__init__(self, parent, text = "CAPACITANCE", 
            relief = "solid")
    
        self.width = width
        self.height = height
        self.parent = parent
        self.device = device
        self.update_interval = self.UPDATE_INTERVAL
        self.queue = Queue.Queue()
        self.serial_thread_client = SerialThreadedClient(device, self.queue)
        self.after_id = None
        self.measure_counter = self.MEASURE_COUNTER
        self.measuring_data = list()
        self.text = ""
        self.display_conf = {"cap" : (
                             self.width / 2, 
                             self.height / 2, 
                             "{0:2.2f} uf",
                             "system 18 bold", 
                             "blue",
                             "cap"),
                             "counter" : (
                             40, 
                             self.height - 10,
                             "COUNTS: {0}",
                             "system 7 bold",
                             "green",
                             "counter"),
                             "error" : (
                             self.width / 2, 
                             10,
                             "{0}",
                             "system 5 bold", 
                             "red",
                             "error"),
                             "wait" :(
                             self.width / 2, 
                             self.height / 2,
                             "{0}",
                             "system 18 bold",
                             "magenta",
                             "cap")}
        self.display = tk.Canvas(self, width = width, height = height,
            bg="cyan")
        self.display.pack(padx = 5, pady = 5)
        self.update_cap_display(0, "cap")
        button_frame = tk.Frame(self)
        button_frame.pack(padx = 5, pady = 5)
        self.buttons = list()
        for column, (text, width, command, var) in enumerate(
            (("START", 3, self.measure_start, ()),
             ("+", 2, self.measure_counter_up_down, (1,)),
             ("-", 2, self.measure_counter_up_down, (-1,)))):
            button = tk.Button(button_frame, text=text, width=width,
                command=partial(command, *var))
            button.grid(column=column, row=0)
            self.buttons.append(button)
            
    def measure_start(self):
        for button in self.buttons:
            button.config(state = "disabled")
        self.display.delete("error")
        self.measuring_data = list()
        self.serial_thread_client.start()
        self.measure()

    def measure_stop(self):
        for button in self.buttons:
            button.config(state = "active")
        self.measure_counter = self.MEASURE_COUNTER
        self.update_cap_display(self.measure_counter, "counter")
        self.serial_thread_client.stop()
        if self.after_id:
            self.after_cancel(self.after_id)
        self.after_id = None 
    
    def measure_counter_up_down(self, step):
        if self.measure_counter > -step:
            self.measure_counter += step
            self.update_cap_display(self.measure_counter, "counter")
            
    def update_cap_display(self, text, tag):
        width, height, text_conf, font, color, tag = self.display_conf[tag]
        self.display.delete(tag)
        self.display.create_text(width, height, text = text_conf.format(text), 
            font = font, fill = color,tag = tag)
            
    def measure(self):
        measure_time = 1
        if not self.queue.empty():
            while True:
                try:
                    measure_time, error = self.queue.get_nowait()
                except Queue.Empty:
                    break
            if error:
                self.update_cap_display(measure_time, "error")
                self.measure_stop()
                return
            else:
                capacity = measure_time * 1000 - 50 * measure_time
                self.measuring_data.append(capacity)
                self.update_cap_display(capacity, "cap")
                    
                self.update_cap_display(self.measure_counter, "counter")
                    
            self.measure_counter -= 1
        else:
            if len(self.measuring_data) == 0:
                self.update_cap_display("!! WAIT !!", "wait")
        
        if self.measure_counter > 0:
            self.after_id = self.after(int(self.update_interval 
                * measure_time), self.measure)
        else:
            if len(self.measuring_data) > 1:
                self.measuring_data.pop(-1)
                total_data = 0
                for data in self.measuring_data:
                    total_data += data
                total_data = total_data / len(self.measuring_data)
                self.update_cap_display(total_data, "cap")
                self.after(int(self.measuring_data[0] * 1.5), 
                    self.measure_stop)
            else:
                self.after(100, self.measure_stop)
            
    def release(self):
        if self.after_id:
            self.measure_stop()
        self.parent.destroy()
        
        
class SerialThreadedClient(object):
    
    def __init__(self, device, queue):
        self.device = device
        self.queue = queue
        self.run_event = None

    def stop(self):
        self.run_event.set()
        
    def start(self):
        self.run_event = Event()
        self.thread = Thread(target=self.worker_thread)
        self.thread.daemon = True
        self.thread.start()
            
    def worker_thread(self):
        end = 0
        while not self.run_event.is_set():
            try:
                with serial.Serial(self.device) as ser:
                    ser.dtr = False
                    start = time.time()
                    while not ser.dsr:
                        end = time.time() - start
                        ser.dtr = True
                    if end * 1000 > 1:
                        self.queue.put((end, False))
                    else:
                        self.queue.put(("capacity < 1 µf or not connected", 
                            True))
            except (serial.SerialException, IOError) as error:
                self.queue.put((error, True))
            self.run_event.wait(end * 3) 
        
def main():
    root = tk.Tk()
    root.title("CAPACITANCE")
    capacitance_ui = CapacitanceUI(root, WIDTH, HEIGHT, DEVICE)
    capacitance_ui.pack(expand=tk.YES, padx=5, pady=5)
    root.protocol("WM_DELETE_WINDOW",capacitance_ui.release)
    root.mainloop()
    
if __name__ == '__main__':
    main()
    
Gruß Frank
Benutzeravatar
kaytec
User
Beiträge: 608
Registriert: Dienstag 13. Februar 2007, 21:57

Hallo,

mit Auswahl der Schnittstelle.

Code: Alles auswählen

#/usr/bin/env python
# -*- coding: utf-8

import tkinter as tk
import serial, serial.tools.list_ports
import time
from threading import Thread, Event
from functools import partial
import queue

WIDTH = 270
HEIGHT = 100
DEVICE = "/dev/ttyUSB0"

class CapacitanceUI(tk.LabelFrame):
    
    MEASURE_COUNTER = 10
    UPDATE_INTERVAL = 2000
    CALIBRATION = 45
    
    def __init__(self, parent, width, height):
        tk.LabelFrame.__init__(self, parent, text = "CAPACITANCE", 
            relief = "solid")
        self.width = width
        self.height = height
        self.parent = parent
        self.device = None
        self.update_interval = self.UPDATE_INTERVAL
        self.queue = queue.Queue()
        self.serial_thread_client = SerialThreadedClient(self.queue)
        menubar = tk.Menu()
        parent.config(menu=menubar)
        menubar.add_cascade(label="PORTS", menu = self.get_ports(menubar))
        self.after_id = None
        self.measure_counter = self.MEASURE_COUNTER
        self.measuring_data = list()
        self.display_conf = {"cap" : (
                             self.width / 2, 
                             self.height / 2, 
                             "{0:2.2f} uf",
                             "system 18 bold", 
                             "blue",
                             "cap"),
                             "counter" : (
                             40, 
                             self.height - 10,
                             "COUNTS: {0}",
                             "system 7 bold",
                             "green",
                             "counter"),
                             "error" : (
                             self.width / 2, 
                             10,
                             "{0}",
                             "system 5 bold", 
                             "red",
                             "error"),
                             "wait" :(
                             self.width / 2, 
                             self.height / 2,
                             "{0}",
                             "system 18 bold",
                             "magenta",
                             "cap")}
        self.display = tk.Canvas(self, width = width, height = height,
            bg="cyan")
        self.display.pack(padx = 5, pady = 5)
        self.update_cap_display(0, "cap")
        self.update_cap_display(self.measure_counter, "counter")
        button_frame = tk.Frame(self)
        button_frame.pack(padx = 5, pady = 5)
        self.buttons = list()
        for column, (text, width, command, var) in enumerate(
            (("START", 3, self.measure_start, ()),
             ("+", 2, self.measure_counter_up_down, (1,)),
             ("-", 2, self.measure_counter_up_down, (-1,)))):
            button = tk.Button(button_frame, text=text, width=width,
                command=partial(command, *var))
            button.grid(column=column, row=0)
            self.buttons.append(button)
        self.buttons[0].config(state = "disabled")
            
    def measure_start(self):
        for button in self.buttons:
            button.config(state = "disabled")
        self.display.delete("error")
        self.measuring_data = list()
        self.serial_thread_client.start(self.device)
        self.measure()
        
    def get_ports(self, menubar):
        port_menu = tk.Menu(menubar, tearoff = 0)
        for serial_ports in self.serial_thread_client.ask_for_ports():
           for port in serial_ports:
                device, desc, hwid = port
                port_menu.add_command(label = "{0}:{1}:{2}".format(
                    device, desc, hwid), command = partial(
                    self.set_device, device, menubar))
        return port_menu
        
    def set_device(self, device, menubar):
        menubar.entryconfig("PORTS", state = "disabled")
        self.buttons[0].config(state = "normal")
        self.device = device

    def measure_stop(self):
        for button in self.buttons:
            button.config(state = "active")
        self.measure_counter = self.MEASURE_COUNTER
        self.update_cap_display(self.measure_counter, "counter")
        self.serial_thread_client.stop()
        if self.after_id:
            self.after_cancel(self.after_id)
        self.after_id = None 
    
    def measure_counter_up_down(self, step):
        if self.measure_counter > -step:
            self.measure_counter += step
            self.update_cap_display(self.measure_counter, "counter")
            
    def update_cap_display(self, text, tag):
        width, height, text_conf, font, color, tag = self.display_conf[tag]
        self.display.delete(tag)
        self.display.create_text(width, height, text = text_conf.format(text), 
            font = font, fill = color,tag = tag)
            
    def measure(self):
        measure_time = 1
        if not self.queue.empty():
            while True:
                try:
                    measure_time, error = self.queue.get_nowait()
                except queue.Empty:
                    break
            if error:
                self.update_cap_display(measure_time, "error")
                self.measure_stop()
                return
            else:
                capacity = measure_time * 1000 - self.CALIBRATION \
                    * measure_time
                self.measuring_data.append(capacity)
                self.update_cap_display(capacity, "cap")
                self.update_cap_display(self.measure_counter, "counter")
                    
            self.measure_counter -= 1
        else:
            if len(self.measuring_data) == 0:
                self.update_cap_display("!! WAIT !!", "wait")
        
        if self.measure_counter > 0:
            self.after_id = self.after(int(self.update_interval
                * measure_time), self.measure)
        else:
            if len(self.measuring_data) > 1:
                self.measuring_data.pop(-1)
                total_data = 0
                for data in self.measuring_data:
                    total_data += data
                total_data = total_data / len(self.measuring_data)
                self.update_cap_display(total_data, "cap")
                self.after(int(self.measuring_data[0] * 1.5), 
                    self.measure_stop)
            else:
                self.after(1, self.measure_stop)
            
    def release(self):
        if self.after_id:
            self.measure_stop()
        self.parent.destroy()
        
        
class SerialThreadedClient(object):
    
    def __init__(self, queue):
        self.queue = queue
        self.run_event = None
        
    def ask_for_ports(self):
        ports = list()
        ports.append((port, desc, hwid) for port, desc, hwid in sorted(
            serial.tools.list_ports.comports()))
        return ports
        
    def stop(self):
        self.run_event.set()
        print(self.queue.qsize())
        
    def start(self, device):
        self.run_event = Event()
        self.thread = Thread(target=partial (self.worker_thread, device))
        self.thread.daemon = True
        self.thread.start()
            
    def worker_thread(self, device):
        end = 0
        while not self.run_event.is_set():
            try:
                with serial.Serial(device) as ser:
                    ser.dtr = False
                    start = time.time()
                    while not ser.dsr:
                        end = time.time() - start
                        ser.dtr = True
                    if end * 1000 > 1:
                        self.queue.put((end, False))
                    else:
                        self.queue.put(("capacity < 1 µf or not connected", 
                            True))
            except (serial.SerialException, IOError) as error:
                self.queue.put((error, True))
            self.run_event.wait(end * 3) 
        
def main():
    root = tk.Tk()
    root.title("CAPACITANCE")
    capacitance_ui = CapacitanceUI(root, WIDTH, HEIGHT)
    capacitance_ui.pack(expand=tk.YES, padx=5, pady=5)
    root.protocol("WM_DELETE_WINDOW",capacitance_ui.release)
    root.mainloop()
    
if __name__ == '__main__':
    main()
    
Gruß Frank
Sirius3
User
Beiträge: 17750
Registriert: Sonntag 21. Oktober 2012, 17:20

@kaytec: was ist der Sinn, dass `ask_for_ports` eine Liste mit einem Generator zurückliefert. Sollte die Funktion nicht direkt den Generator zurückliefern, dann ist bei der Benutzung diese doppelte for-Schleife auch nicht nötig.

Wenn man weiß, welche Elemente eine Liste hat, benutzt man [, ].

Code: Alles auswählen

    def ask_for_ports(self):
        return [((port, desc, hwid) for port, desc, hwid in
            sorted(serial.tools.list_ports.comports())]
Hier sind die runden Klammern wichtig, weil Du willst ja eine Liste mit Generator haben, ohne runde Klammern wäre das einfach eine List-Comprehension.
Listen, die immer exakt ein Element haben, sind aber selten sinnvoll, und man kann die Liste gleich weglassen, muß dann aber beim Aufruf Änderungen vornehmen.

Code: Alles auswählen

    def ask_for_ports(self):
        return ((port, desc, hwid) for port, desc, hwid in
            sorted(serial.tools.list_ports.comports())
Jetzt entpackst Du die ListPortInfo-Objekte um daraus ein Tuple zu machen. Wenn es also nicht wichtig ist, dass das Tuple sind, kann man das weiter Vereinfachen zu:

Code: Alles auswählen

    def ask_for_ports(self):
        return (info for info sorted(serial.tools.list_ports.comports())
Wenn jetzt ask_for_ports auch eine Liste zurückliefern darf, dann wird das noch einfacher:

Code: Alles auswählen

    def ask_for_ports(self):
        return sorted(serial.tools.list_ports.comports()
Benutzeravatar
kaytec
User
Beiträge: 608
Registriert: Dienstag 13. Februar 2007, 21:57

Hallo Sirius3,
danke für deine Korrektur. Eigentlich ist die Methode „def ask_for_ports“ unnötig, da ich ja gleich in „def get_ports“ auf „serial.tools.list_ports.comports()“ zugreifen könnte. Da hätte ich ja keine Trennung der Klassen ?!. So war meine Idee einfach eine Liste zu erzeugen, die zurückgegeben und ausgelesen werden kann. Die Programmierung entsteht durch Kopiern aus dem Netz, Versuchen, Fehler deuten etc.. und irgendwann funktioniert es so halbwegs – ich fühle mich wie ein „Superprofi“ und bin glücklich.

Gruß Frank
Antworten