Kapazitätsmessgerät mit dem NE555, pyaudio und tkinter

Stellt hier eure Projekte vor.
Internetseiten, Skripte, und alles andere bzgl. Python.
Antworten
Benutzeravatar
kaytec
User
Beiträge: 608
Registriert: Dienstag 13. Februar 2007, 21:57

Hallo,

ein Versuch ohne Messung - nur mit Ausgabe der Streamdaten.

Code: Alles auswählen

#/usr/bin/env python
# -*- coding: utf-8

import tkinter as tk
import numpy as np
import pyaudio
import time
from threading import Thread, Event
from functools import partial
import queue

WIDTH = 256
HEIGHT = 200
MEASURE_TIME = 0.025

class ReadLineInAudioThreadClient(object):
    def __init__(self, queue):
        self.queue = queue
        self.run_event = None
        
    def stop(self):
        self.run_event.set()
        
    def start(self, measure_time):
        self.run_event = Event()
        self.thread = Thread(target=partial (self.worker_thread, 
                                             measure_time))
        self.thread.daemon = True
        self.thread.start()
            
    def worker_thread(self, measure_time):
        with ReadLineInAudio(measure_time) \
            as read_line_in_audio:
                read_line_in_audio.start_stream()
                while not self.run_event.is_set():
                    self.queue.put(read_line_in_audio.read_data())
                    self.run_event.wait(measure_time)


class ReadLineInAudio(object):
    def __init__(self, 
                 measure_time,
                 channels = 1,
                 rate = 44100,
                 frames_per_buffer = 256):
        self.py_audio = pyaudio.PyAudio()
        self.rate = rate
        self.frames_per_buffer = frames_per_buffer
        self.measure_time = measure_time
        self.channels = channels
        
    def __enter__(self):
        return self

    def __exit__(self, *args):
        self.release()
    
    def start_stream(self):
        self.stream = self.py_audio.open(format= pyaudio.paInt16,
                                         channels = self.channels,
                                         rate = self.rate,
                                         frames_per_buffer = \
                                            self.frames_per_buffer,
                                         input = True,
                                         stream_callback = self.callback)
        time.sleep(0.1)
        
    def callback(self, in_data, frame_count, time_info, status):
        self.stream_data = in_data
        return (in_data, pyaudio.paContinue)
            
    def read_data(self):
        return np.frombuffer(self.stream_data, 
                             dtype = np.int16)
                             
    def release(self):
        try:
            self.stream.stop_stream()
            self.stream.close()
        except AttributeError as er:
            print(er)
        self.py_audio.terminate()
        

class CapacitanceUI(tk.LabelFrame):
    def __init__(self, 
                 parent, 
                 width, 
                 height, 
                 measure_time):
        tk.LabelFrame.__init__(self, parent, text = "LINE_IN", 
            relief = "solid")
        self.queue = queue.Queue()
        self.read_line_in_audio_thread_client = \
            ReadLineInAudioThreadClient(self.queue)
        self.measure_time = measure_time
        self.width = width
        self.height = height
        self.parent = parent
        self.after_id = None
        self.display = tk.Canvas(self, width = width, height = height,
            bg = "black")
        self.display.pack(padx = 5, pady = 5)
        self.start_button = tk.Button(self, text = "START", width = 7,
            command = self.start_measure)
        self.start_button.pack(side = tk.LEFT, padx = 30, pady = 5)
        self.stop_button = tk.Button(self, text = "STOP", width = 7,
            command = self.stop_measure)
        self.stop_button.pack(side = tk.RIGHT, padx = 30, pady = 5)
        self.stop_button.config(state = "disabled")
        
    def start_measure(self):
        self.start_button.config(state = "disabled")
        self.stop_button.config(state = "normal")
        self.read_line_in_audio_thread_client.start(self.measure_time)
        self.measure()
        
    def stop_measure(self):
        self.after_cancel(self.after_id)
        self.read_line_in_audio_thread_client.stop()
        self.start_button.config(state = "normal")
        self.stop_button.config(state = "disabled")

    def update_signal_line(self, points, tag):
        self.display.delete(tag)
        self.display.create_line(points, tag = tag, fill = "lightgreen")

    def measure(self):
        if not self.queue.empty():
            while True:
                try:
                    np_data = self.queue.get_nowait()
                    start_x, start_y = 0, 0
                    for data in np_data:
                        self.update_signal_line((start_x, 
                                                 start_y + self.height / 2, 
                                                 start_x + 1, 
                                                 data + self.height / 2),
                                                 "freq_line{0}".format(
                                                 start_x))
                        start_x += 1
                        start_y = data
                        
                except queue.Empty:
                    break
        self.after_id = self.after(int(self.measure_time * 1000), 
            self.measure)
    
    def release(self):
        if self.after_id:
            self.after_cancel(self.after_id)
        self.parent.destroy()
        
def main():
    root = tk.Tk()
    capacitance_ui = CapacitanceUI(root,
                                   WIDTH, 
                                   HEIGHT, 
                                   MEASURE_TIME)
    capacitance_ui.pack(expand=tk.YES, padx=5, pady=5)
    root.protocol("WM_DELETE_WINDOW",capacitance_ui.release)
    root.mainloop()
    
if __name__ == '__main__':
    main()
Gruß Frank
Sirius3
User
Beiträge: 17711
Registriert: Sonntag 21. Oktober 2012, 17:20

`Thread` hat sowohl ein args-Argument als auch ein daemon.
Den Sinn der ReadLineInAudioThreadClient-Klasse verstehe ich nicht, da py_audio doch mit Callbacks arbeitet.
AttributeError ist eher eine Programmierfehler, als dass man es sinnvoll in einem Programm einsetzen kann.

Code: Alles auswählen

class ReadLineInAudio(object):
    def __init__(self,
                 queue, 
                 measure_time,
                 channels = 1,
                 rate = 44100,
                 frames_per_buffer = 256):
        self.py_audio = self.stream = None
        self.queue = queue
        self.rate = rate
        self.frames_per_buffer = frames_per_buffer
        self.measure_time = measure_time
        self.channels = channels
        
    def __enter__(self):
        self.start_stream()
        return self

    def __exit__(self, *args):
        self.release()
    
    def start_stream(self):
        self.py_audio = pyaudio.PyAudio()
        self.stream = self.py_audio.open(format=pyaudio.paInt16,
                                         channels=self.channels,
                                         rate=self.rate,
                                         frames_per_buffer=self.frames_per_buffer,
                                         input=True,
                                         stream_callback=self.callback)
        
    def callback(self, in_data, frame_count, time_info, status):
        self.queue.put(np.frombuffer(in_data, dtype=np.int16))
        return (in_data, pyaudio.paContinue)
                             
    def release(self):
        if self.stream is not None:
            self.stream.stop_stream()
            self.stream.close()
            self.py_audio.terminate()
            self.stream = None
Benutzeravatar
kaytec
User
Beiträge: 608
Registriert: Dienstag 13. Februar 2007, 21:57

Danke Sirius3,

das mit der den Callbacks steht auch so in der Doku und so hatte ich es auch eingebaut, damit die GUI nicht einfriert. Deets erwähnte ja Queues und die hatte ich mit einem eigenen Thread verbunden. Da fehlt mir halt Grundverständnis der Thematik ! Der AttributeError entsteht, wenn die GUI ohne gestarteten Stream geschlossen wird. Mit der Exception habe ich diese Fehlermeldung ausgeschlossen. Welche Menge an Daten machen beim Einlesen eigentlich Sinn ?

Gruß Frank
Benutzeravatar
__blackjack__
User
Beiträge: 13004
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@kaytec: Der `AttributeError` kann also nur passieren weil in der `__init__()` nicht alle Attribute angelegt werden. Sollten sie aber. Wenn es noch keinen Wert dafür gibt, dann halt mit `None`.
“Most people find the concept of programming obvious, but the doing impossible.” — Alan J. Perlis
Benutzeravatar
kaytec
User
Beiträge: 608
Registriert: Dienstag 13. Februar 2007, 21:57

Hallo,

nach einem Durchlauf wird "release()" ausgelöst ?

Code: Alles auswählen

import numpy as np
import pyaudio
import time
from threading import Thread, Event
from functools import partial
import queue

WIDTH = 256
HEIGHT = 200
MEASURE_TIME = 0.025

class ReadLineInAudio(object):
    def __init__(self,
                 queue, 
                 measure_time,
                 channels = 1,
                 rate = 44100,
                 frames_per_buffer = 256):
        self.py_audio = self.stream = None
        self.queue = queue
        self.rate = rate
        self.frames_per_buffer = frames_per_buffer
        self.measure_time = measure_time
        self.channels = channels
        
    def __enter__(self):
        self.start_stream()
        return self

    def __exit__(self, *args):
        self.release()
    
    def start_stream(self):
        self.py_audio = pyaudio.PyAudio()
        self.stream = self.py_audio.open(format=pyaudio.paInt16,
                                         channels=self.channels,
                                         rate=self.rate,
                                         frames_per_buffer=self.frames_per_buffer,
                                         input=True,
                                         stream_callback=self.callback)
        
    def callback(self, in_data, frame_count, time_info, status):
        self.queue.put(np.frombuffer(in_data, dtype=np.int16))
        return (in_data, pyaudio.paContinue)
                             
    def release(self):
        if self.stream is not None:
            self.stream.stop_stream()
            self.stream.close()
            self.py_audio.terminate()
            self.stream = None

class CapacitanceUI(tk.LabelFrame):
    def __init__(self, 
                 parent, 
                 width, 
                 height, 
                 measure_time):
        tk.LabelFrame.__init__(self, parent, text = "LINE_IN", 
            relief = "solid")
        self.measure_time = measure_time
        self.width = width
        self.height = height
        self.parent = parent
        self.after_id = None
        self.last_start_y = 0
        self.queue = queue.Queue()
        self.display = tk.Canvas(self, width = width, height = height,
            bg = "black")
        self.display.pack(padx = 5, pady = 5)
        self.start_button = tk.Button(self, text = "START", width = 7,
            command = self.start_measure)
        self.start_button.pack(side = tk.LEFT, padx = 30, pady = 5)
        self.stop_button = tk.Button(self, text = "STOP", width = 7,
            command = self.stop_measure)
        self.stop_button.pack(side = tk.RIGHT, padx = 30, pady = 5)
        self.stop_button.config(state = "disabled")
        
    def start_measure(self):
        self.start_button.config(state = "disabled")
        self.stop_button.config(state = "normal")
        with ReadLineInAudio(self.queue, self.measure_time) as real_line_in_audio:
            self.measure()
        
    def stop_measure(self):
        self.after_cancel(self.after_id)
        self.stop_button.config(state = "disabled")
        self.start_button.config(state = "normal")
 
    def update_signal_line(self, points, tag):
        self.display.delete(tag)
        self.display.create_line(points, tag = tag, fill = "lightgreen")

    def measure(self):
        if not self.queue.empty():
            while True:
                try:
                    np_data = self.queue.get_nowait()
                    start_x = 0
                    for data in np_data:
                        self.update_signal_line((start_x, 
                                                    self.last_start_y + self.height / 2, 
                                                    start_x + 1, 
                                                     data + self.height / 2),
                                                     "freq_line{0}".format(
                                                     start_x))
                        start_x += 1
                        self.last_start_y = data
                except queue.Empty:
                    break
        self.after_id = self.after(int(self.measure_time * 1000), 
            self.measure)
    
    def release(self):
        if self.after_id:
            self.after_cancel(self.after_id)
        self.parent.destroy()
        
def main():
    root = tk.Tk()
    capacitance_ui = CapacitanceUI(root,
                                   WIDTH, 
                                   HEIGHT, 
                                   MEASURE_TIME)
    capacitance_ui.pack(expand=tk.YES, padx = 5, pady = 5)
    root.protocol("WM_DELETE_WINDOW",capacitance_ui.release)
    root.mainloop()
    
if __name__ == '__main__':
    main()
Gruß Frank
Antworten