Re: Musikspieler mit Display und Bedienelementen
Verfasst: Mittwoch 23. Oktober 2019, 22:17
@smutbert: Ich schreibe gerade etwas, ist aber noch nicht fertig, und ich komme wohl erst morgen dazu das zuende zu bringen. 

Seit 2002 Diskussionen rund um die Programmiersprache Python
https://www.python-forum.de/
Code: Alles auswählen
mainloop(
commands,
mpd_client,
feedback_screen,
feedback_line1,
feedback_line2,
now_playing_line1,
now_playing_line2,
play_led,
mode1_led,
mode2_led,
)
Code: Alles auswählen
mainloop(
commands,
mpd_client,
feedback_screen,
feedback_lines,
now_playing_lines,
play_led,
mode1_led,
mode2_led,
)
Code: Alles auswählen
mainloop(
commands,
mpd_client,
feedback_screen,
feedback_lines,
now_playing_lines,
leds,
)
Code: Alles auswählen
#!/usr/bin/env python3
import datetime
import math
import queue
import subprocess
import threading
import time
from collections import namedtuple
from functools import partial
from mpdlcd.vendor.lcdproc.server import Server as lcdd
import musicpd as mpd
import wiringpi2 as wiringpi
MPD_HOST = "localhost"
MPD_PORT = 6600
LCDD_HOST = "localhost"
PREVIOUS_PIN = 11
NEXT_PIN = 9
DOWN_PIN = 10
UP_PIN = 8
POWER_PIN = 20
PLAY_PIN = 17
MODE1_PIN = 19
MODE2_PIN = 16
RADIO_PIN = 18
SELECT_PIN = 15
ENTER_PIN = 12
PLAY_LED_PIN = 22
MODE1_LED_PIN = 21
MODE2_LED_PIN = 26
# 1 und 2 fuer Pull-up und -down sind beim Cubietruck vertauscht.
PULLUP = 1
def call_async(function, *arguments):
thread = threading.Thread(target=function, args=arguments, daemon=True)
thread.start()
return thread
def watch_rotary_encoder(a_pin, b_pin, pin_pud, callback):
wiringpi.pinMode(a_pin, 0)
wiringpi.pinMode(b_pin, 0)
wiringpi.pullUpDnControl(a_pin, pin_pud)
wiringpi.pullUpDnControl(b_pin, pin_pud)
steps_per_indent = 4
last_direction = 0
delta_sum = 0
a_state = wiringpi.digitalRead(a_pin)
b_state = wiringpi.digitalRead(b_pin)
last_rotation_sequence = (a_state ^ b_state) | b_state << 1
while True:
a_state = wiringpi.digitalRead(a_pin)
b_state = wiringpi.digitalRead(b_pin)
rotation_sequence = (a_state ^ b_state) | b_state << 1
if rotation_sequence != last_rotation_sequence:
delta = (rotation_sequence - last_rotation_sequence) % 4
if last_direction == 0:
if delta == 3:
delta = -1
elif delta == 2:
delta = 0
elif last_direction * delta < 0:
delta = math.copysign(4 - delta, last_direction)
last_direction = delta
last_rotation_sequence = rotation_sequence
delta_sum += delta
delta = delta_sum // steps_per_indent
if delta != 0:
callback(delta)
delta_sum %= steps_per_indent
else:
last_direction = 0
time.sleep(0.005)
def watch_switch(pin, pin_pud, debounce, callback):
wiringpi.pinMode(pin, 0)
wiringpi.pullUpDnControl(pin, pin_pud)
last_state = wiringpi.digitalRead(pin)
while True:
state = wiringpi.digitalRead(pin)
if state != last_state:
callback(state - last_state)
time.sleep(debounce)
last_state = state
time.sleep(0.05)
class LED:
def __init__(self, pin):
self.pin = pin
wiringpi.pinMode(self.pin, 1)
wiringpi.digitalWrite(self.pin, False)
@property
def state(self):
return wiringpi.digitalRead(self.pin)
@state.setter
def state(self, value):
wiringpi.digitalWrite(self.pin, value)
def rotary_encoder_callback(commands, left_command, right_command, event):
if event == -1:
commands.put(left_command)
elif event == 1:
commands.put(right_command)
def switch_callback(commands, command, event):
if event == 1:
commands.put(command)
def select_from_list(commands, name, items, feedback_lines):
index = 0
item_count = len(items)
big_step = item_count // 20 if item_count > 10 else 1
feedback_lines[0].set_text("SELECT {0}:".format(name))
while True:
feedback_lines[1].set_text(items[index])
command = commands.get()
if command == "next":
index = (index + 1) % item_count
elif command == "previous":
index = (index - 1) % item_count
elif command == "up":
index = (index + big_step) % item_count
elif command == "down":
index = (index - big_step) % item_count
else:
feedback_lines[0].set_text("")
feedback_lines[1].set_text(" ...")
return command, index
feedback_lines[0].set_text(
"{0} {1}/{2}:".format(name, index + 1, item_count)
)
def on_power(_mpd_client, _commands, _feedback_lines):
subprocess.run(["shutdown", "-h", "now"])
def on_play(mpd_client, _commands, feedback_lines):
status = mpd_client.status()
if int(status["playlistlength"]) == 0:
text = "EMPTY QUEUE"
elif status["state"] == "play":
mpd_client.pause()
text = "PAUSE ||"
else:
mpd_client.play()
text = "PLAY >"
feedback_lines[0].set_text(text)
feedback_lines[1].set_text("")
def on_mode1(mpd_client, _commands, feedback_lines):
mpd_client.clear()
feedback_lines[0].set_text("CLEAR QUEUE")
def on_mode2(mpd_client, _commands, feedback_lines):
is_random = not bool(int(mpd_client.status()["random"]))
mpd_client.random(is_random)
feedback_lines[0].set_text("RANDOM:")
feedback_lines[1].set_text("ON" if is_random else "OFF")
def on_radio(mpd_client, _commands, feedback_lines):
mpd_client.clear()
mpd_client.load("radio")
mpd_client.play()
song_data = mpd_client.currentsong()
feedback_lines[0].set_text("INTERNET RADIO:")
feedback_lines[1].set_text(
"{0:3d}: {1}".format(int(song_data["pos"]) + 1, song_data["name"])
)
def on_track_change(command, mpd_client, _commands, feedback_lines):
status = mpd_client.status()
if status["state"] not in ["play", "pause"]:
line1 = "NOT PLAYING"
line2 = ""
else:
song_count = int(status["playlistlength"])
song_index = int(status["song"])
if command == "next":
line1 = "NEXT >>|"
song_index = (song_index + 1) % song_count
elif command == "previous":
if float(status["elapsed"]) < 5:
line1 = "PREV |<<"
song_index = (song_index - 1) % song_count
else:
song_index = song_index
line1 = "|<"
else:
assert False, "unknown command {!r}".format(command)
mpd_client.play(song_index)
line2 = "{0:3d}/{1:3d}".format(song_index + 1, song_count)
feedback_lines[0].set_text(line1)
feedback_lines[1].set_text(line2)
def on_volume_change(command, mpd_client, _commands, feedback_lines):
delta = {"down": -1, "up": 1}[command]
# TODO
# rethink calculation of new_volume
volume = 2 * min(
50, max(0, int(float(mpd_client.status()["volume"]) / 2) + delta)
)
mpd_client.setvol(volume)
feedback_lines[0].set_text("VOLUME:")
feedback_lines[1].set_text("{0:1.2f}".format(volume / 100))
def on_enter(mpd_client, commands, feedback_lines):
outputs = mpd_client.outputs()
output_names = [output["outputname"] for output in outputs]
command, index = select_from_list(
commands, "Output", output_names, feedback_lines
)
output = outputs[index]
if command == "enter":
feedback_lines[0].set_text("{0}:".format(output_names[index]))
feedback_lines[1].set_text(
"OFF" if bool(int(output["outputenabled"])) else "ON"
)
mpd_client.toggleoutput(output["outputid"])
elif command == "select":
if bool(int(output["outputenabled"])):
line2 = "ON TO EXCL. ON"
else:
line2 = "OFF TO EXCL. ON"
mpd_client.toggleoutput(output["outputid"])
feedback_lines[0].set_text("{0}:".format(output_names[index]))
feedback_lines[1].set_text(line2)
for i, output in enumerate(outputs):
if i != index:
mpd_client.disableoutput(output["outputid"])
def on_select(mpd_client, commands, feedback_lines):
filters = []
tags = mpd_client.tagtypes()
while True:
command, index = select_from_list(
commands, "Tag", tags, feedback_lines
)
if command not in ["select", "enter"]:
return
selected_tag = tags[index]
items = mpd_client.list(selected_tag, *filters)
command, index = select_from_list(
commands, selected_tag, items, feedback_lines
)
filters.append(selected_tag)
filters.append(items[index])
if command == "select":
break
#
# FIXME `command` kann im folgenden Code nur den Wert "select" haben!
#
if command == "play":
mpd_client.clear()
mpd_client.searchadd(*filters)
mpd_client.play()
return
elif command == "select":
items = mpd_client.list("album", *filters)
command, index = select_from_list(
commands, "Album", items, feedback_lines
)
selected_album = items[index]
else:
return
if command in ["play", "select"]:
mpd_client.clear()
mpd_client.findadd("album", selected_album)
mpd_client.play()
return
elif command == "enter":
mpd_client.findadd("album", selected_album)
return
LEDS = namedtuple("LEDS", "play mode1 mode2")
def mainloop(
commands,
mpd_client,
feedback_screen,
feedback_lines,
now_playing_lines,
leds,
):
commmand_to_func = {
"power": on_power,
"play": on_play,
"mode1": on_mode1,
"mode2": on_mode2,
"radio": on_radio,
"select": on_select,
"previous": partial(on_track_change, "previous"),
"next": partial(on_track_change, "next"),
"enter": on_enter,
"down": partial(on_volume_change, "down"),
"up": partial(on_volume_change, "up"),
}
while True:
try:
command = commands.get(timeout=1)
except queue.Empty:
current_song = mpd_client.currentsong()
status = mpd_client.status()
feedback_screen.set_priority("hidden")
feedback_lines[0].set_text("")
feedback_lines[1].set_text("")
# TODO
# andere Taktik für die Titelanzeige überlegen
try:
line2 = current_song["artist"]
except KeyError:
try:
line2 = current_song["name"]
except KeyError:
line1 = datetime.datetime.now().strftime(" %H:%M")
line2 = datetime.date.today().strftime(" %Y-%m-%d")
else:
try:
line1 = current_song["title"]
except KeyError:
line1 = "{0:3d}/{1:3d}".format(
int(status["song"]) + 1,
int(status["playlistlength"]),
)
else:
line1 = current_song["title"]
now_playing_lines[0].set_text(line1)
now_playing_lines[1].set_text(line2)
else:
feedback_screen.set_priority("input")
commmand_to_func[command](mpd_client, commands, feedback_lines)
leds.play.state = status["state"] == "play"
leds.mode1.state = int(status["playlistlength"]) > 0
leds.mode2.state = int(status["random"])
def main():
wiringpi.wiringPiSetup()
mpd_client = mpd.MPDClient()
mpd_client.connect(MPD_HOST, MPD_PORT)
oled = lcdd(LCDD_HOST, debug=False, charset="iso-8859-1")
oled.start_session()
now_playing_screen = oled.add_screen("now_playing")
now_playing_screen.set_heartbeat("off")
now_playing_screen.set_priority("foreground")
now_playing_screen.set_timeout(0)
now_playing_lines = [
now_playing_screen.add_scroller_widget(
"now_playing_line1",
left=1,
top=1,
right=28,
bottom=1,
speed=5,
text="",
),
now_playing_screen.add_scroller_widget(
"now_playing_line2",
left=1,
top=2,
right=20,
bottom=2,
speed=5,
text="",
),
]
feedback_screen = oled.add_screen("feedback")
feedback_screen.set_heartbeat("off")
feedback_screen.set_priority("hidden")
feedback_screen.set_timeout(0)
feedback_lines = [
feedback_screen.add_string_widget("feedback_line1", "", x=1, y=1),
feedback_screen.add_string_widget("feedback_line2", "", x=1, y=2),
]
commands = queue.Queue()
call_async(
watch_rotary_encoder,
PREVIOUS_PIN,
NEXT_PIN,
PULLUP,
partial(rotary_encoder_callback, commands, "previous", "next"),
)
call_async(
watch_rotary_encoder,
DOWN_PIN,
UP_PIN,
PULLUP,
partial(rotary_encoder_callback, commands, "down", "up"),
)
for pin, command in [
(POWER_PIN, "power"),
(PLAY_PIN, "play"),
(MODE1_PIN, "mode1"),
(MODE2_PIN, "mode2"),
(RADIO_PIN, "radio"),
(SELECT_PIN, "select"),
(ENTER_PIN, "enter"),
]:
call_async(
watch_switch,
pin,
PULLUP,
0.1,
partial(switch_callback, commands, command),
)
leds = LEDS(LED(PLAY_LED_PIN), LED(MODE1_LED_PIN), LED(MODE2_LED_PIN))
mainloop(
commands,
mpd_client,
feedback_screen,
feedback_lines,
now_playing_lines,
leds,
)
if __name__ == "__main__":
main()
Das stimmt ausnahmsweise nicht.__blackjack__ hat geschrieben: Donnerstag 24. Oktober 2019, 10:34 In `on_select()` [...]
Ab nach der Schleife habe ich ein Problem mit der Funktion – die ``while True:``-Schleife wird nur durch ``break`` verlassen wenn `command` den Wert "select" hat. Nach der Schleife steht dann aber Code der `command` gegen "play" und "enter" prüft, was ja nie der Fall sein kann‽[...]
Code: Alles auswählen
def on_select(mpd_client, commands, feedback_lines):
filters = []
tags = mpd_client.tagtypes()
while True:
command, index = select_from_list(
commands, "Tag", tags, feedback_lines
)
if command not in ["select", "enter"]:
return
selected_tag = tags[index]
items = mpd_client.list(selected_tag, *filters)
command, index = select_from_list(
commands, selected_tag, items, feedback_lines
)
filters.append(selected_tag)
filters.append(items[index])
if command != "enter":
break
if command == "play":
mpd_client.clear()
mpd_client.searchadd(*filters)
mpd_client.play()
elif command == "select":
items = mpd_client.list("album", *filters)
command, index = select_from_list(
commands, "Album", items, feedback_lines
)
selected_album = items[index]
if command in ["play", "select"]:
mpd_client.clear()
mpd_client.findadd("album", selected_album)
mpd_client.play()
elif command == "enter":
mpd_client.findadd("album", selected_album)
Code: Alles auswählen
def on_select(mpd_client, commands, feedback_lines):
filters = []
tags = mpd_client.tagtypes()
while True:
command, index = select_from_list(
commands, "Tag", tags, feedback_lines
)
if command not in ["select", "enter"]:
return
selected_tag = tags[index]
items = mpd_client.list(selected_tag, *filters)
command, index = select_from_list(
commands, selected_tag, items, feedback_lines
)
filters.append(selected_tag)
filters.append(items[index])
if command != "enter":
break
if command == "select":
items = mpd_client.list("album", *filters)
command, index = select_from_list(
commands, "Album", items, feedback_lines
)
filters[:] = []
filters.append("Album")
filters.append(items[index])
if command in ["play", "select"]:
mpd_client.clear()
mpd_client.searchadd(*filters)
mpd_client.play()
elif command == "enter":
mpd_client.findadd("album", selected_album)
Code: Alles auswählen
filters[:] = []
filters.append("Album")
filters.append(items[index])
Code: Alles auswählen
filters = ["Album", items[index]]
Code: Alles auswählen
#!/usr/bin/env python3
import math
import queue
import threading
import evdev
import selectors
import gpiozero
from functools import partial
from mpdlcd.vendor.lcdproc.server import Server as lcdd
import musicpd as mpd
MPD_HOST = "localhost"
MPD_PORT = 6600
LCDD_HOST = "localhost"
POWER_LED_PIN = 26
LED_PINS = {
'Play': 12,
'Mode': 13,
'Debug': 21
}
INPUT_DEVICES = [
'/dev/input/event0',
'/dev/input/event1',
'/dev/input/event2',
'/dev/input/event3',
'/dev/input/event4',
'/dev/input/event5',
'/dev/input/event7',
'/dev/input/event8'
]
INPUT_EVENTS = {
59: { 1: 'play'},
60: { 1: 'mode1'},
61: { 1: 'mode2'},
62: { 1: 'radio'},
63: { 1: 'select'},
64: { 1: 'enter'},
5: { 1: 'right', -1: 'left'},
6: { 1: 'up', -1: 'down'}
}
def call_async(function, *arguments):
thread = threading.Thread(target=function, args=arguments)
thread.daemon = True
thread.start()
return thread
def collect_input(devices, commands):
selector = selectors.DefaultSelector()
for i in devices:
selector.register(evdev.InputDevice(i), selectors.EVENT_READ)
while True:
for key, mask in selector.select():
device = key.fileobj
for event in device.read():
input_to_commands(event, commands)
def input_to_commands(event, commands):
# event.type -> device
# event.value -> press=01, rotary +/-1
# code -> keycode, axis (rotary)
try:
commands.put(INPUT_EVENTS[event.code][event.value])
except KeyError:
return
def on_play(mpd_client, _commands, display_lines):
now_status = mpd_client.status()
line2 = ''
if int(now_status['playlistlength']) == 0:
line1 = 'EMPTY QUEUE'
elif mpd_client.status()['state'] == 'play':
mpd_client.pause()
line1 = '||'
else:
mpd_client.play()
line1 = '>'
display_lines[1].set_text(line1)
display_lines[2].set_text(line2)
def on_mode1(mpd_client, _commands, display_lines):
mpd_client.clear()
display_lines[1].set_text('CLEAR QUEUE')
def on_mode2(mpd_client, _commands, display_lines):
random = (1 - int(mpd_client.status()['random']))
mpd_client.random(random)
line1 = 'RANDOM'
if random == 0:
line2 = 'OFF'
else:
line2 = 'ON'
display_lines[1].set_text(line1)
display_lines[1].set_text(line2)
def on_radio(mpd_client, _commands, display_lines):
mpd_client.clear()
mpd_client.load('radio')
mpd_client.play()
now_song = mpd_client.currentsong()
line1 = 'INTERNET RADIO'
line2 = '{0:3d}: {1}'.format(int(now_song['pos']) + 1, now_song['name'])
display_lines[1].set_text(line1)
display_lines[2].set_text(line2)
def on_track_change(command, mpd_client, commands, display_lines):
now_status = mpd_client.status()
if int(now_status['playlistlength']) == 0:
on_select(command, mpd_client, commands, display_lines)
return
now_queue = mpd_client.playlistid()
now_queue_length = len(now_queue)
now_queue_titles = []
for i in range(now_queue_length):
try:
now_queue_titles.append(now_queue[i]['title'])
except KeyError:
now_queue_titles.append(now_queue[i]['name'])
try:
start = int(now_status['song'])
except:
start = 0
command, index = select_from_list(mpd_client, commands, 'Title', now_queue_titles, display_lines, index=start)
if command == 'select' or command == 'enter':
selected_position = now_queue[index]['pos']
mpd_client.play(selected_position)
def on_volume_change(command, mpd_client, _commands, display_lines):
delta = {'down': -1, 'up': 1}[command]
mpd_client.volume(delta)
display_lines[1].set_text('VOLUME')
display_lines[2].set_text('{0:1.2f}'.format(int(mpd_client.status()['volume']) / 100))
def on_enter(mpd_client, commands, display_lines):
outputs = mpd_client.outputs()
number_of_outputs = len(outputs)
items = []
for i in range(number_of_outputs):
items.append(outputs[i]['outputname'])
command, index = select_from_list(mpd_client, commands, 'Output', items, display_lines)
if command == 'enter':
line1 = '{0}'.format(outputs[index]['outputname'])
if int(outputs[index]['outputenabled']) == 0:
line2 = 'ON'
else:
line2 = 'OFF'
display_lines[1].set_text(line1)
display_lines[2].set_text(line2)
mpd_client.toggleoutput(outputs[index]['outputid'])
elif command == 'select':
line1 = '{0}'.format(outputs[index]['outputname'])
line2 = 'EXCLUSIVE ON'
display_lines[1].set_text(line1)
display_lines[2].set_text(line2)
mpd_client.enableoutput(outputs[index]['outputid'])
for i in range(number_of_outputs):
if i != index:
mpd_client.disableoutput(outputs[i]['outputid'])
def on_select(mpd_client, commands, display_lines):
filters = []
tags = mpd_client.tagtypes()
while True:
command, index = select_from_list(mpd_client, commands, 'Tag', tags, display_lines)
selected_tag = tags[index]
if command == 'select' or command == 'enter':
items = mpd_client.list(selected_tag, *filters)
command, index = select_from_list(mpd_client, commands, selected_tag, items, display_lines)
filters.append(selected_tag)
filters.append(items[index])
else:
return
if command == 'enter':
continue
else:
break
if command == 'play':
mpd_client.clear()
mpd_client.searchadd(*filters)
mpd_client.play()
return
elif command == 'select':
items = mpd_client.list('album', *filters)
command, index = select_from_list(mpd_client, commands, 'Album', items, display_lines)
selected_album = items[index]
else:
return
if command == 'play' or command == 'select':
mpd_client.clear()
mpd_client.findadd('album', selected_album)
mpd_client.play()
return
elif command == 'enter':
mpd_client.findadd('album', selected_album)
return
def select_from_list(mpd_client, commands, name, items, display_lines, index=0):
number_of_items = len(items)
number_padding = int(math.log10(number_of_items)+1)
if number_of_items > 10:
big_step = int(number_of_items / 20)
else:
big_step = 1
while True:
display_lines[1].set_text('# {1:{width}}/{2:{width}} {0}'.format(name, index + 1, number_of_items, width=number_padding))
display_lines[2].set_text(items[index])
command = commands.get()
if command == "right":
index = (index + 1) % number_of_items
elif command == "left":
index = (index - 1) % number_of_items
elif command == 'up':
index = (index + big_step) % number_of_items
elif command == 'down':
index = (index - big_step) % number_of_items
else:
display_lines[1].set_text('...')
display_lines[2].set_text('')
return command, index
def mainloop(commands, mpd_client, display, display_lines, leds):
previous_play_led_state = 'unknown'
commmand_to_func = {
"play": on_play,
"mode1": on_mode1,
"mode2": on_mode2,
"radio": on_radio,
"select": on_select,
"left": partial(on_track_change, "left"),
"right": partial(on_track_change, "right"),
"enter": on_enter,
"down": partial(on_volume_change, "down"),
"up": partial(on_volume_change, "up"),
}
while True:
try:
command = commands.get(timeout=1)
except queue.Empty:
display.set_priority('hidden')
for i in display_lines.keys():
display_lines[i].set_text('')
leds['Mode'].off()
now_status = mpd_client.status()
if now_status['state'] == 'play':
leds['Play'].on()
previous_play_led_state = 'play'
elif now_status['state'] == 'pause' and previous_play_led_state != 'pause':
# pulse() stört jedes Mal das Pulsieren
previous_play_led_state = 'pause'
leds['Play'].pulse()
elif now_status['state'] == 'stop':
leds['Play'].off()
previous_play_led_state = 'stop'
if int(now_status['playlistlength']) > 0:
leds['Mode'].on()
else:
leds['Mode'].off()
if int(now_status['random']) == 1:
leds['Debug'].blink()
else:
leds['Debug'].off()
else:
leds['Mode'].pulse()
leds['Play'].off()
display.set_priority('input')
commmand_to_func[command](mpd_client, commands, display_lines)
def main():
mpd_client = mpd.MPDClient()
mpd_client.connect(MPD_HOST, MPD_PORT)
oled = lcdd(LCDD_HOST, debug=False, charset='iso-8859-1')
oled.start_session()
display = oled.add_screen('orpheus')
display.set_heartbeat('off')
display.set_priority('hidden')
display.set_timeout(0)
display_lines = {}
for i in 1, 2:
display_lines[i] = display.add_string_widget(f'line{i}', '', x=1, y=i)
commands = queue.Queue()
call_async(collect_input, INPUT_DEVICES, commands)
leds = { 'Power': gpiozero.LED(POWER_LED_PIN, active_high=False, initial_value=True) }
for i in LED_PINS.keys():
leds[i] = gpiozero.PWMLED(LED_PINS[i])
# TODO: except einschraenken oder entfernen, wenn keine Fehler mehr auftreten
try:
mainloop(commands, mpd_client, display, display_lines, leds)
except:
leds['Power'].off()
if __name__ == '__main__':
main()