ein Versuch ohne Messung - nur mit Ausgabe der Streamdaten.
Code: Alles auswählen
#/usr/bin/env python
# -*- coding: utf-8
import tkinter as tk
import numpy as np
import pyaudio
import time
from threading import Thread, Event
from functools import partial
import queue
WIDTH = 256
HEIGHT = 200
MEASURE_TIME = 0.025
class ReadLineInAudioThreadClient(object):
def __init__(self, queue):
self.queue = queue
self.run_event = None
def stop(self):
self.run_event.set()
def start(self, measure_time):
self.run_event = Event()
self.thread = Thread(target=partial (self.worker_thread,
measure_time))
self.thread.daemon = True
self.thread.start()
def worker_thread(self, measure_time):
with ReadLineInAudio(measure_time) \
as read_line_in_audio:
read_line_in_audio.start_stream()
while not self.run_event.is_set():
self.queue.put(read_line_in_audio.read_data())
self.run_event.wait(measure_time)
class ReadLineInAudio(object):
def __init__(self,
measure_time,
channels = 1,
rate = 44100,
frames_per_buffer = 256):
self.py_audio = pyaudio.PyAudio()
self.rate = rate
self.frames_per_buffer = frames_per_buffer
self.measure_time = measure_time
self.channels = channels
def __enter__(self):
return self
def __exit__(self, *args):
self.release()
def start_stream(self):
self.stream = self.py_audio.open(format= pyaudio.paInt16,
channels = self.channels,
rate = self.rate,
frames_per_buffer = \
self.frames_per_buffer,
input = True,
stream_callback = self.callback)
time.sleep(0.1)
def callback(self, in_data, frame_count, time_info, status):
self.stream_data = in_data
return (in_data, pyaudio.paContinue)
def read_data(self):
return np.frombuffer(self.stream_data,
dtype = np.int16)
def release(self):
try:
self.stream.stop_stream()
self.stream.close()
except AttributeError as er:
print(er)
self.py_audio.terminate()
class CapacitanceUI(tk.LabelFrame):
def __init__(self,
parent,
width,
height,
measure_time):
tk.LabelFrame.__init__(self, parent, text = "LINE_IN",
relief = "solid")
self.queue = queue.Queue()
self.read_line_in_audio_thread_client = \
ReadLineInAudioThreadClient(self.queue)
self.measure_time = measure_time
self.width = width
self.height = height
self.parent = parent
self.after_id = None
self.display = tk.Canvas(self, width = width, height = height,
bg = "black")
self.display.pack(padx = 5, pady = 5)
self.start_button = tk.Button(self, text = "START", width = 7,
command = self.start_measure)
self.start_button.pack(side = tk.LEFT, padx = 30, pady = 5)
self.stop_button = tk.Button(self, text = "STOP", width = 7,
command = self.stop_measure)
self.stop_button.pack(side = tk.RIGHT, padx = 30, pady = 5)
self.stop_button.config(state = "disabled")
def start_measure(self):
self.start_button.config(state = "disabled")
self.stop_button.config(state = "normal")
self.read_line_in_audio_thread_client.start(self.measure_time)
self.measure()
def stop_measure(self):
self.after_cancel(self.after_id)
self.read_line_in_audio_thread_client.stop()
self.start_button.config(state = "normal")
self.stop_button.config(state = "disabled")
def update_signal_line(self, points, tag):
self.display.delete(tag)
self.display.create_line(points, tag = tag, fill = "lightgreen")
def measure(self):
if not self.queue.empty():
while True:
try:
np_data = self.queue.get_nowait()
start_x, start_y = 0, 0
for data in np_data:
self.update_signal_line((start_x,
start_y + self.height / 2,
start_x + 1,
data + self.height / 2),
"freq_line{0}".format(
start_x))
start_x += 1
start_y = data
except queue.Empty:
break
self.after_id = self.after(int(self.measure_time * 1000),
self.measure)
def release(self):
if self.after_id:
self.after_cancel(self.after_id)
self.parent.destroy()
def main():
root = tk.Tk()
capacitance_ui = CapacitanceUI(root,
WIDTH,
HEIGHT,
MEASURE_TIME)
capacitance_ui.pack(expand=tk.YES, padx=5, pady=5)
root.protocol("WM_DELETE_WINDOW",capacitance_ui.release)
root.mainloop()
if __name__ == '__main__':
main()