Musikspieler mit Display und Bedienelementen

Python auf Einplatinencomputer wie Raspberry Pi, Banana Pi / Python für Micro-Controller
Benutzeravatar
smutbert
User
Beiträge: 20
Registriert: Samstag 5. Juli 2014, 17:22
Wohnort: Graz

Sonntag 29. September 2019, 14:03

Ok, danke. Ist natürlich kein Problem (ich habe wohl ein bisschen zu viel an matlab/octave gedacht).
Benutzeravatar
smutbert
User
Beiträge: 20
Registriert: Samstag 5. Juli 2014, 17:22
Wohnort: Graz

Mittwoch 9. Oktober 2019, 15:43

Zwischendurch ein großes Dankeschön!
Es funktioniert schon fast alles so wie ich es mir vorstelle, aber bevor ich den kompletten Code noch einmal poste, habe ich noch einiges zu tun :)


Mein größtes Problem ist momentan die Anzeige von Umlauten auf dem Display. Zu Beginn hatte ich mpdlcd ([1], noch mit python2) zur Anzeige des aktuellen Titels verwendet. Nun mache ich alles im eigenen Programm, verwende aber für die Anzeige statt dem eigenen Code das Modul python-lcdproc [2] für die Interaktion mit LCDd.
Zum Testen verwende ich gerade das hier

Code: Alles auswählen

#!/usr/bin/env python3
import time
import musicpd as mpd
from lcdproc.server import Server as lcdd

mpd_client = mpd.MPDClient()
mpd_client.connect('localhost', 6600)

now_playing = mpd_client.currentsong()

oled = lcdd('localhost', debug=False)
oled.start_session()

screen_now_playing = oled.add_screen('now_playing')
screen_now_playing.set_priority('foreground')
screen_now_playing.set_timeout(0)
now_playing_line1 = screen_now_playing.add_scroller_widget('now_playing_line1', left=1, top=1, right=28, bottom=1, speed=5, text='')
now_playing_line2 = screen_now_playing.add_scroller_widget('now_playing_line2', left=1, top=2, right=20, bottom=2, speed=5, text='')

line1 = now_playing['title']
line2 = now_playing['artist']

now_playing_line1.set_text(line1)
now_playing_line2.set_text(line2)

time.sleep(10)
Spiele ich nun beispielsweise ein Lied von Björk erscheint im Display BjAπrk (ich weiß nicht ob ich die genau richtigen Zeichen erwischt habe, ich kann die Zeichen ja nicht aus dem Display kopieren).
Grundsätzlich sind Umlaute mit dem Display und LCDd/lcdproc aber kein Problem und mit mpdlcd haben sie funktioniert ohne dass ich etwas unternehmen mußte.


[1] https://github.com/rbarrois/mpdlcd
[2] https://github.com/jinglemansweep/lcdproc
Benutzeravatar
smutbert
User
Beiträge: 20
Registriert: Samstag 5. Juli 2014, 17:22
Wohnort: Graz

Donnerstag 10. Oktober 2019, 08:35

Vielleicht habe ich etwas gefunden:

mpdlcd hat lcdproc eingebaut, allerdings wurde dort lcdproc angepasst. Die entscheidenden Änderungen könnten die hier sein: https://github.com/rbarrois/mpdlcd/comm ... 7b3bf88561
Wenn mir hier niemand einen besseren Rat gibt, werde ich versuchen statt lcdproc, das in mpdlcd eingebaute lcdproc in mein Programm zu importieren.
Benutzeravatar
smutbert
User
Beiträge: 20
Registriert: Samstag 5. Juli 2014, 17:22
Wohnort: Graz

Montag 14. Oktober 2019, 22:30

Die Probleme mit Umlauten bin ich mit dem Umstieg auf das lcdproc aus mpdlcd los geworden und sonst habe ich mich bemüht nicht zu viel Dummes anzustellen, allerdings mit mäßigem Erfolg, befürchte ich. Wenn ich noch einmal den kompletten Code posten darf:

Code: Alles auswählen

#!/usr/bin/env python3

import math
import queue
import socket
import subprocess
import threading
import time
import datetime

from functools import partial
from mpdlcd.vendor.lcdproc.server import Server as lcdd
import musicpd as mpd
import wiringpi2 as wiringpi

# 1 und 2 fuer Pull-up und -down sind beim Cubietruck vertauscht.
PULLUP = 1

def call_async(function, *arguments):
	thread = threading.Thread(target=function, args=arguments)
	thread.daemon = True
	thread.start()
	return thread


def watch_rotary_encoder(a_pin, b_pin, pin_pud, callback):
	wiringpi.pinMode(a_pin, 0)
	wiringpi.pinMode(b_pin, 0)
	wiringpi.pullUpDnControl(a_pin, pin_pud)
	wiringpi.pullUpDnControl(b_pin, pin_pud)
	steps_per_indent = 4
	last_direction = 0
	delta_sum = 0
	a_state = wiringpi.digitalRead(a_pin)
	b_state = wiringpi.digitalRead(b_pin)
	last_rotation_sequence = (a_state ^ b_state) | b_state << 1
	while True:
		a_state = wiringpi.digitalRead(a_pin)
		b_state = wiringpi.digitalRead(b_pin)
		rotation_sequence = (a_state ^ b_state) | b_state << 1
		if rotation_sequence != last_rotation_sequence:
			delta = (rotation_sequence - last_rotation_sequence) % 4
			if last_direction == 0:
				if delta == 3:
					delta = -1
				elif delta == 2:
					delta = 0
			elif last_direction * delta < 0:
				delta = math.copysign(4 - delta, last_direction)
			last_direction = delta
			last_rotation_sequence = rotation_sequence
			delta_sum += delta
			delta = delta_sum // steps_per_indent
			if delta != 0:
				callback(delta)
				delta_sum %= steps_per_indent
		else:
			last_direction = 0
		time.sleep(0.005)


def watch_switch(pin, pin_pud, debounce, callback):
	wiringpi.pinMode(pin, 0)
	wiringpi.pullUpDnControl(pin, pin_pud)
	last_state = wiringpi.digitalRead(pin)
	while True:
		state = wiringpi.digitalRead(pin)
		if state != last_state:
			callback(state - last_state)
			time.sleep(debounce)
		last_state = state
		time.sleep(0.05)


class LED:
	def __init__(self, pin):
		self.pin = pin
		wiringpi.pinMode(self.pin, 1)
		wiringpi.digitalWrite(self.pin, False)
	
	@property
	def state(self):
		return wiringpi.digitalRead(self.pin)
	
	@state.setter
	def state(self, value):
		wiringpi.digitalWrite(self.pin, value)


def rotary_encoder_callback(commands, left_command, right_command, event):
	if event == -1:
		commands.put(left_command)
	elif event == 1:
		commands.put(right_command)


def switch_callback(commands, command, event):
	if event == 1:
		commands.put(command)


		
def on_power(_mpd_client, _commands, _feedback_line1, _feedback_line2):
	subprocess.call(['shutdown', '-h', 'now'])


def on_play(mpd_client, _commands, feedback_line1, feedback_line2):
	now_status = mpd_client.status()
	line2 = ''
	if int(now_status['playlistlength']) == 0:
		line1 = 'EMPTY QUEUE'
	elif mpd_client.status()['state'] == 'play':
		mpd_client.pause()
		line1 = 'PAUSE ||'
	else:
		mpd_client.play()
		line1 = 'PLAY >'
	feedback_line1.set_text(line1)
	feedback_line2.set_text(line2)


def on_mode1(mpd_client, _commands, feedback_line1, _feedback_line2):
	mpd_client.clear()
	feedback_line1.set_text('CLEAR QUEUE')


def on_mode2(mpd_client, _commands, feedback_line1, feedback_line2):
	random = (1 - int(mpd_client.status()['random']))
	mpd_client.random(random)
	line1 = 'RANDOM:'
	if random == 0:
		line2 = 'OFF'
	else:
		line2 = 'ON'
	feedback_line1.set_text(line1)
	feedback_line2.set_text(line2)


def on_radio(mpd_client, _commands, feedback_line1, feedback_line2):
	mpd_client.clear()
	mpd_client.load('radio')
	mpd_client.play()
	now_song = mpd_client.currentsong()
	line1 = 'INTERNET RADIO:'
	line2 = '{0:3d}: {1}'.format(int(now_song['pos']) + 1, now_song['name'])
	feedback_line1.set_text(line1)
	feedback_line2.set_text(line2)


def on_track_change(command, mpd_client, _commands, feedback_line1, feedback_line2):
	now_status = mpd_client.status()
	if now_status['state'] != 'play' and now_status['state'] != 'pause':
		line1 = 'NOT PLAYING'
		line2 = ''
	else:
		now_length = int(now_status['playlistlength'])
		now_position = int(now_status['song'])
		if command == 'next':
			line1 = 'NEXT >>|'
			new_position = (now_position + 1) % now_length
		elif command == 'previous':
			if float(now_status['elapsed']) < 5:
				line1 = 'PREV |<<'
				new_position = (now_position - 1) % now_length
			else:
				new_position = now_position
				line1 = ('|<')
		mpd_client.play(new_position)
		line2 = '{0:3d}/{1:3d}'.format(new_position + 1, now_length)
	feedback_line1.set_text(line1)
	feedback_line2.set_text(line2)


def on_volume_change(command, mpd_client, _commands, feedback_line1, feedback_line2):
	delta = {'down': -1, 'up': 1}[command]
	# TODO
	# rethink calculation of new_volume
	new_volume = 2 * min(50, max(0, int(float(mpd_client.status()['volume'])/2) + delta))
	mpd_client.setvol(new_volume)
	line1 = 'VOLUME:'
	line2 = '{0:1.2f}'.format(new_volume / 100)
	feedback_line1.set_text(line1)
	feedback_line2.set_text(line2)


def on_enter(mpd_client, commands, feedback_line1, feedback_line2):
	outputs = mpd_client.outputs()
	number_of_outputs = len(outputs)
	items = []
	for i in range(number_of_outputs):
		items.append(outputs[i]['outputname'])
	while True:
		command, index = select_from_list(mpd_client, commands, 'Output', items, feedback_line1, feedback_line2)
		if command == 'enter':
			line1 = '{0}:'.format(outputs[index]['outputname'])
			if int(outputs[index]['outputenabled']) == 0:
				line2 = 'ON'
			else:
				line2 = 'OFF'
			feedback_line1.set_text(line1)
			feedback_line2.set_text(line2)
			mpd_client.toggleoutput(outputs[index]['outputid'])
			return
		elif command == 'select':
			line1 = '{0}:'.format(outputs[index]['outputname'])
			if int(outputs[index]['outputenabled']) == 0:
				line2 = 'OFF TO EXCL. ON'
				mpd_client.toggleoutput(outputs[index]['outputid'])
			else:
				line2 = 'ON TO EXCL. ON'
			feedback_line1.set_text(line1)
			feedback_line2.set_text(line2)
			for i in range(number_of_outputs):
				if i != index:
					mpd_client.disableoutput(outputs[i]['outputid'])
			return
		else:
			return


def on_select(mpd_client, commands, feedback_line1, feedback_line2):
	filters = []
	tags = mpd_client.tagtypes()
	while True:
		command, index = select_from_list(mpd_client, commands, 'Tag', tags, feedback_line1, feedback_line2)
		selected_tag = tags[index]
		if command == 'select' or command == 'enter':
			items = mpd_client.list(selected_tag, *filters)
			command, index = select_from_list(mpd_client, commands, selected_tag, items, feedback_line1, feedback_line2)
			filters.append(selected_tag)
			filters.append(items[index])
		else:
			return
		if command == 'enter':
			continue
		else:
			break

	if command == 'play':
		mpd_client.clear()
		mpd_client.searchadd(*filters)
		mpd_client.play()
		return
	elif command == 'select':
		items = mpd_client.list('album', *filters)
		command, index = select_from_list(mpd_client, commands, 'Album', items, feedback_line1, feedback_line2)
		selected_album = items[index]
	else:
		return
	
	if command == 'play' or command == 'select':
		mpd_client.clear()
		mpd_client.findadd('album', selected_album)
		mpd_client.play()
		return
	elif command == 'enter':
		mpd_client.findadd('album', selected_album)
		return
		

def select_from_list(mpd_client, commands, name, items, feedback_line1, feedback_line2):
	index = 0
	number_of_items = len(items)
	if number_of_items > 10:
		big_step = int(number_of_items / 20)
	else:
		big_step = 1
	feedback_line1.set_text('SELECT {0}:'.format(name))
	while True:
		feedback_line2.set_text(items[index])
		command = commands.get()
		if command == 'next':
			index = (index + 1) % number_of_items
		elif command == 'previous':
			index = (index - 1) % number_of_items
		elif command == 'up':
			index = (index + big_step) % number_of_items
		elif command == 'down':
			index = (index - big_step) % number_of_items
		else:
			feedback_line1.set_text('')
			feedback_line2.set_text('      ...')
			return command, index
		feedback_line1.set_text('{0} {1}/{2}:'.format(name, index + 1, number_of_items))


def main():
	mpd_host = 'localhost'
	mpd_port = 6600
	lcdd_host = 'localhost'

	wiringpi.wiringPiSetup()
	mpd_client = mpd.MPDClient()
	mpd_client.connect(mpd_host, mpd_port)

	oled = lcdd(lcdd_host, debug=False, charset='iso-8859-1')
	oled.start_session()

	screen_now_playing = oled.add_screen('now_playing')
	screen_now_playing.set_heartbeat('off')
	screen_now_playing.set_priority('foreground')
	screen_now_playing.set_timeout(0)
	now_playing_line1 = screen_now_playing.add_scroller_widget('now_playing_line1', left=1, top=1, right=28, bottom=1, speed=5, text='')
	now_playing_line2 = screen_now_playing.add_scroller_widget('now_playing_line2', left=1, top=2, right=20, bottom=2, speed=5, text='')

	screen_feedback = oled.add_screen('feedback')
	screen_feedback.set_heartbeat('off')
	screen_feedback.set_priority('hidden')
	screen_feedback.set_timeout(0)
	feedback_line1 = screen_feedback.add_string_widget('feedback_line1', '', x=1, y=1)
	feedback_line2 = screen_feedback.add_string_widget('feedback_line2', '', x=1, y=2)

	commands = queue.Queue()

	call_async(
		watch_rotary_encoder,
		11,
		9,
		PULLUP,
		partial(rotary_encoder_callback, commands, 'previous', 'next'),
	)
	call_async(
		watch_rotary_encoder,
		10,
		8,
		PULLUP,
		partial(rotary_encoder_callback, commands, 'down', 'up'),
	)

	for pin, command in [
		(20, 'power'),
		(17, 'play'),
		(19, 'mode1'),
		(16, 'mode2'),
		(18, 'radio'),
	
		(15, 'select'),
		(12, 'enter'),
	]:
		call_async(
			watch_switch,
			pin,
			PULLUP,
			0.1,
			partial(switch_callback, commands, command),
		)

	play_led = LED(22)
	mode1_led = LED(21)
	mode2_led = LED(26)

	commmand_to_func = {
		"power": on_power,
		"play": on_play,
		"mode1": on_mode1,
		"mode2": on_mode2,
		"radio": on_radio,
			
		"select": on_select,
		"previous": partial(on_track_change, "previous"),
		"next": partial(on_track_change, "next"),
			
		"enter": on_enter,
		"down": partial(on_volume_change, "down"),
		"up": partial(on_volume_change, "up"),
	}

	while True:
		try:
			command = commands.get(timeout=1)
		except queue.Empty:
			now_playing = mpd_client.currentsong()
			now_status = mpd_client.status()
			screen_feedback.set_priority('hidden')
			feedback_line1.set_text('')
			feedback_line2.set_text('')
			# TODO
			# andere Taktik für die Titelanzeige überlegen
			try:
				line2 = now_playing['artist']
			except KeyError:
				try:
					line2 = now_playing['name']
				except KeyError:
					line1 = datetime.datetime.now().strftime('     %H:%M')
					line2 = datetime.date.today().strftime('   %Y-%m-%d')
				else:
					try:
						line1 = now_playing['title']
					except KeyError:
						line1 = '{0:3d}/{1:3d}'.format(int(now_status['song']) + 1, int(now_status['playlistlength']))
			else:
				line1 = now_playing['title']

			now_playing_line1.set_text(line1)
			now_playing_line2.set_text(line2)
		else:
			screen_feedback.set_priority('input')
			commmand_to_func[command](mpd_client, commands, feedback_line1, feedback_line2)	

		play_led.state = now_status['state'] == 'play'
		mode1_led.state = int(now_status['playlistlength']) > 0
		mode2_led.state = int(now_status['random'])

if __name__ == '__main__':
	main()

Ich habe zum Beispiel vergeblich versucht den Code für das Ansprechen von LCDd/lcdproc in eine Klasse zu packen, aber weil ich da so viele Handles oder Objekte (?) habe (zuerst LCDd (oled), darüber hinaus den Screen (screen_) und schließlich noch für jede einzelne Zeile ein Widget und dabei möchte ich ja irgendwann auf vier Zeilen aufstocken...) ist jeder Versuch in einem unübersichtlichen Chaos geendet.

Eine Möglichkeit die von call_async gestarteten Threads zu beenden oder zu pausieren wäre auch nett (aber nicht besonders wichtig – ich spiele nur mit dem Gedangen für eine Art Standbymodus möglichst alles was Resourcen und Strom verbraucht anzuhalten bzw. zu deaktiveren).

Dort wo TODO dabei steht habe ich wenigstens eine ungefähre Vorstellung wie ich es besser machen kann. Da hoffe ich also früher oder später auch ohne Hilfe zurechtzukommen.

Auch für Hinweise, was ich sonst noch besser nicht oder anders gemacht hätte, freue ich mich natürlich wieder.


lg smutbert
Benutzeravatar
smutbert
User
Beiträge: 20
Registriert: Samstag 5. Juli 2014, 17:22
Wohnort: Graz

Mittwoch 23. Oktober 2019, 21:24

Habe ich (a) keine so dummen Fehler mehr gemacht, die einer Antwort würdig wären oder ist es (b) bei dem Umfang einfach schon ungehörig den kompletten Code zu posten.
Oder habe ich (c) dieses tolle Forum hier sowieso schon genug in Anspruch genommen?

Alle drei Möglichkeiten sind für mich vollkommen ok – alles andere wäre nach der großartigen Hilfe auch vollkommen unangebracht, aber wissen würde ich es gerne, weil ich mich im Falle von (b) bemühen würde nur meine wichtigsten Fragen in kleineren Portionen zu stellen :)
Benutzeravatar
__blackjack__
User
Beiträge: 4863
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

Mittwoch 23. Oktober 2019, 22:17

@smutbert: Ich schreibe gerade etwas, ist aber noch nicht fertig, und ich komme wohl erst morgen dazu das zuende zu bringen. :-)
“Give a man a fire and he's warm for a day, but set fire to him and he's warm for the rest of his life.”
— Terry Pratchett, Jingo
Benutzeravatar
__blackjack__
User
Beiträge: 4863
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

Donnerstag 24. Oktober 2019, 10:34

@smutbert: `socket` wird importiert aber anscheinen nirgends verwendet.

In `on_play()` wird der MPD-Status ganz kurz hintereinander zweimal abgerufen, obwohl beim ersten mal das Ergebnis sogar an einen Namen gebunden wird.

Die Zeile ``random = 1 - int(mpd_client.status()["random"])`` in `on_mode2()` sieht komisch aus. Kann es sein, dass Du hier versuchst eigentlich einen Wahrheitswert zu negieren? *Das* sollte man dann auch tun statt darauf zu setzen das -1 als im boole'schen Kontext als Wahr angesehen wird.

An einigen Stellen wird die gleiche Variable mit mehreren Werten verglichen in dem Einzelvergleiche mit ``or`` oder ``and`` verkettet werden – da kann man schauen ob man das nicht mit ``in``/``not in`` in einem Vergleich ausdrücken kann. Beispiel: ``if status["state"] != "play" and status["state"] != "pause":`` kann man als ``if status["state"] not in ["play", "pause"]:`` ausdrücken.

Wenn ein ``if``-Konstrukt in einem ``elif`` endet sollte man immer überlegen noch ein ``else`` hinzuzufügen. Notfalls mit einem ``assert False`` und einer Meldung warum das nicht passieren sollte.

Von `select_from_list()` wird das `mpd_client`-Argument überhaupt nicht verwendet, muss also auch nicht übergeben werden.

Bei `on_enter()` ist die ``while True:``-Schleife falsch. Da steckt ein ``if``-Konstrukt drin bei dem *jeder* Zweig mit einem ``return`` endet und damit die Schleife genau *einmal* durchlaufen wird. Damit ist das aber keine Schleife mehr.

In der ersten ``for``-Schleife wird unnötigerweise ein Index für den Zugriff verwendet, statt einfach direkt über die Elemente von `output` zu iterieren. Hier würde sich auch eine „list comprehension“ anbieten. Und ein besserer Name als `items` für das Ergebnis. Bei der zweiten ``for``-Schleife macht `enumerate()` Sinn.

In `on_select()` finde ich den Programmfluss recht undurchsichtig. Das ``continue`` beispielsweise kann man leicht beseitigen in dem man bei dem ``if`` die Bedingung umkehrt. Der Programmablauf kommt an diese Stelle auch nur wenn das `command` entweder "enter" oder "select" ist, man könnte die Abbruchbedingung also statt auf "enter" auch auf "select" beziehen, was ich in dem Fall dann verständlicher finden. Also wenn "select", dann Schleife verlassen. Anstelle von wenn nicht "enter" dann Schleife verlassen. Zudem kann das in den ``if``-Zweig wo geprüft wird ob `command` einen dieser beiden Werte hat, dann ist das noch deutlicher für den Leser was `command` an der Stelle für Werte annehmen kann.

Das bei anderen Werten als den beiden die Funktion mit ``return`` verlassen wird ist IMHO so wichtig dass das sofort nach der Auswahl des Kommandos geprüft werden sollte und nicht irgendwo nach mehreren Zeilen Code in einem ``else: return`` leicht übersehen werden kann.

Ab nach der Schleife habe ich ein Problem mit der Funktion – die ``while True:``-Schleife wird nur durch ``break`` verlassen wenn `command` den Wert "select" hat. Nach der Schleife steht dann aber Code der `command` gegen "play" und "enter" prüft, was ja nie der Fall sein kann‽ Den Code ab dort habe ich nicht weiter angefasst, den kann man IMHO auch mit deutlich weniger ``return``\s in den Zweigen schreiben, und damit IMHO verständlicher.

Die `main()`-Funktion ist mittlerweile ziemlich gross geworden. Die würde ich versuchen irgendwie sinnvoll aufzuteilen. Die ersten drei Variablen kann man prima als Konstanten an den Anfang herausziehen.

Der Aufruf sähe dann so aus:

Code: Alles auswählen

    mainloop(
        commands,
        mpd_client,
        feedback_screen,
        feedback_line1,
        feedback_line2,
        now_playing_line1,
        now_playing_line2,
        play_led,
        mode1_led,
        mode2_led,
    )
Also ein bisschen viel an Argumenten die man da übergeben muss. Wo man hier wirklich einfach jeweils zwei Argumente zu einem zusammenfassen kann sind die `*_line*`-Argumente. Ich sage das ja öfter: Nummerierte Namen sind ein Zeichen das man sich entweder bessere Namen ausdenken will, oder gar keine einzelnen Namen sondern eine Datenstruktur. Oft eine Liste. Und genau das passt hier: statt `feedback_line1` und `feedback_line2` einzeln zu übergeben, steckt man die einfach in eine Liste und greift dann mit `feedback_lines[0]` und `feedback_lines[1]` auf die Objekte zu. Analog lässt sich das mit `now_playing_line1` und `now_playing_line2` machen. Und das skaliert auch wunderbar wenn Du auf vier Zeilen erweitern willst. Dann wären wir bei:

Code: Alles auswählen

    mainloop(
        commands,
        mpd_client,
        feedback_screen,
        feedback_lines,
        now_playing_lines,
        play_led,
        mode1_led,
        mode2_led,
    )
Was sich hier einfach zusammenfassen lässt, wären die LEDs in einem `collections.namedtuple()`-Typ. An der Stelle könnte man den magischen Zahlen beim erstellen der `LED`-Objekte auch per Konstantendefinition Namen geben. Und damit dann auch leichter anpasssbar am Anfang des Quelltextes. Neuer Stand:

Code: Alles auswählen

    mainloop(
        commands,
        mpd_client,
        feedback_screen,
        feedback_lines,
        now_playing_lines,
        leds,
    )
Zwischenstand wäre dann (natürlich ungetestet und ich hoffe ich habe nichts/nicht zu viel kaputt gemacht):

Code: Alles auswählen

#!/usr/bin/env python3
import datetime
import math
import queue
import subprocess
import threading
import time
from collections import namedtuple
from functools import partial

from mpdlcd.vendor.lcdproc.server import Server as lcdd
import musicpd as mpd
import wiringpi2 as wiringpi

MPD_HOST = "localhost"
MPD_PORT = 6600
LCDD_HOST = "localhost"

PREVIOUS_PIN = 11
NEXT_PIN = 9
DOWN_PIN = 10
UP_PIN = 8
POWER_PIN = 20
PLAY_PIN = 17
MODE1_PIN = 19
MODE2_PIN = 16
RADIO_PIN = 18
SELECT_PIN = 15
ENTER_PIN = 12

PLAY_LED_PIN = 22
MODE1_LED_PIN = 21
MODE2_LED_PIN = 26

# 1 und 2 fuer Pull-up und -down sind beim Cubietruck vertauscht.
PULLUP = 1


def call_async(function, *arguments):
    thread = threading.Thread(target=function, args=arguments, daemon=True)
    thread.start()
    return thread


def watch_rotary_encoder(a_pin, b_pin, pin_pud, callback):
    wiringpi.pinMode(a_pin, 0)
    wiringpi.pinMode(b_pin, 0)
    wiringpi.pullUpDnControl(a_pin, pin_pud)
    wiringpi.pullUpDnControl(b_pin, pin_pud)
    steps_per_indent = 4
    last_direction = 0
    delta_sum = 0
    a_state = wiringpi.digitalRead(a_pin)
    b_state = wiringpi.digitalRead(b_pin)
    last_rotation_sequence = (a_state ^ b_state) | b_state << 1
    while True:
        a_state = wiringpi.digitalRead(a_pin)
        b_state = wiringpi.digitalRead(b_pin)
        rotation_sequence = (a_state ^ b_state) | b_state << 1
        if rotation_sequence != last_rotation_sequence:
            delta = (rotation_sequence - last_rotation_sequence) % 4
            if last_direction == 0:
                if delta == 3:
                    delta = -1
                elif delta == 2:
                    delta = 0
            elif last_direction * delta < 0:
                delta = math.copysign(4 - delta, last_direction)
            last_direction = delta
            last_rotation_sequence = rotation_sequence
            delta_sum += delta
            delta = delta_sum // steps_per_indent
            if delta != 0:
                callback(delta)
                delta_sum %= steps_per_indent
        else:
            last_direction = 0
        time.sleep(0.005)


def watch_switch(pin, pin_pud, debounce, callback):
    wiringpi.pinMode(pin, 0)
    wiringpi.pullUpDnControl(pin, pin_pud)
    last_state = wiringpi.digitalRead(pin)
    while True:
        state = wiringpi.digitalRead(pin)
        if state != last_state:
            callback(state - last_state)
            time.sleep(debounce)
        last_state = state
        time.sleep(0.05)


class LED:
    def __init__(self, pin):
        self.pin = pin
        wiringpi.pinMode(self.pin, 1)
        wiringpi.digitalWrite(self.pin, False)

    @property
    def state(self):
        return wiringpi.digitalRead(self.pin)

    @state.setter
    def state(self, value):
        wiringpi.digitalWrite(self.pin, value)


def rotary_encoder_callback(commands, left_command, right_command, event):
    if event == -1:
        commands.put(left_command)
    elif event == 1:
        commands.put(right_command)


def switch_callback(commands, command, event):
    if event == 1:
        commands.put(command)


def select_from_list(commands, name, items, feedback_lines):
    index = 0
    item_count = len(items)
    big_step = item_count // 20 if item_count > 10 else 1
    feedback_lines[0].set_text("SELECT {0}:".format(name))
    while True:
        feedback_lines[1].set_text(items[index])
        command = commands.get()
        if command == "next":
            index = (index + 1) % item_count
        elif command == "previous":
            index = (index - 1) % item_count
        elif command == "up":
            index = (index + big_step) % item_count
        elif command == "down":
            index = (index - big_step) % item_count
        else:
            feedback_lines[0].set_text("")
            feedback_lines[1].set_text("      ...")
            return command, index
        feedback_lines[0].set_text(
            "{0} {1}/{2}:".format(name, index + 1, item_count)
        )


def on_power(_mpd_client, _commands, _feedback_lines):
    subprocess.run(["shutdown", "-h", "now"])


def on_play(mpd_client, _commands, feedback_lines):
    status = mpd_client.status()
    if int(status["playlistlength"]) == 0:
        text = "EMPTY QUEUE"
    elif status["state"] == "play":
        mpd_client.pause()
        text = "PAUSE ||"
    else:
        mpd_client.play()
        text = "PLAY >"
    feedback_lines[0].set_text(text)
    feedback_lines[1].set_text("")


def on_mode1(mpd_client, _commands, feedback_lines):
    mpd_client.clear()
    feedback_lines[0].set_text("CLEAR QUEUE")


def on_mode2(mpd_client, _commands, feedback_lines):
    is_random = not bool(int(mpd_client.status()["random"]))
    mpd_client.random(is_random)
    feedback_lines[0].set_text("RANDOM:")
    feedback_lines[1].set_text("ON" if is_random else "OFF")


def on_radio(mpd_client, _commands, feedback_lines):
    mpd_client.clear()
    mpd_client.load("radio")
    mpd_client.play()
    song_data = mpd_client.currentsong()
    feedback_lines[0].set_text("INTERNET RADIO:")
    feedback_lines[1].set_text(
        "{0:3d}: {1}".format(int(song_data["pos"]) + 1, song_data["name"])
    )


def on_track_change(command, mpd_client, _commands, feedback_lines):
    status = mpd_client.status()
    if status["state"] not in ["play", "pause"]:
        line1 = "NOT PLAYING"
        line2 = ""
    else:
        song_count = int(status["playlistlength"])
        song_index = int(status["song"])
        if command == "next":
            line1 = "NEXT >>|"
            song_index = (song_index + 1) % song_count
        elif command == "previous":
            if float(status["elapsed"]) < 5:
                line1 = "PREV |<<"
                song_index = (song_index - 1) % song_count
            else:
                song_index = song_index
                line1 = "|<"
        else:
            assert False, "unknown command {!r}".format(command)
        mpd_client.play(song_index)
        line2 = "{0:3d}/{1:3d}".format(song_index + 1, song_count)

    feedback_lines[0].set_text(line1)
    feedback_lines[1].set_text(line2)


def on_volume_change(command, mpd_client, _commands, feedback_lines):
    delta = {"down": -1, "up": 1}[command]
    # TODO
    # rethink calculation of new_volume
    volume = 2 * min(
        50, max(0, int(float(mpd_client.status()["volume"]) / 2) + delta)
    )
    mpd_client.setvol(volume)
    feedback_lines[0].set_text("VOLUME:")
    feedback_lines[1].set_text("{0:1.2f}".format(volume / 100))


def on_enter(mpd_client, commands, feedback_lines):
    outputs = mpd_client.outputs()
    output_names = [output["outputname"] for output in outputs]
    command, index = select_from_list(
        commands, "Output", output_names, feedback_lines
    )
    output = outputs[index]

    if command == "enter":
        feedback_lines[0].set_text("{0}:".format(output_names[index]))
        feedback_lines[1].set_text(
            "OFF" if bool(int(output["outputenabled"])) else "ON"
        )
        mpd_client.toggleoutput(output["outputid"])

    elif command == "select":
        if bool(int(output["outputenabled"])):
            line2 = "ON TO EXCL. ON"
        else:
            line2 = "OFF TO EXCL. ON"
            mpd_client.toggleoutput(output["outputid"])

        feedback_lines[0].set_text("{0}:".format(output_names[index]))
        feedback_lines[1].set_text(line2)
        for i, output in enumerate(outputs):
            if i != index:
                mpd_client.disableoutput(output["outputid"])


def on_select(mpd_client, commands, feedback_lines):
    filters = []
    tags = mpd_client.tagtypes()
    while True:
        command, index = select_from_list(
            commands, "Tag", tags, feedback_lines
        )

        if command not in ["select", "enter"]:
            return

        selected_tag = tags[index]
        items = mpd_client.list(selected_tag, *filters)
        command, index = select_from_list(
            commands, selected_tag, items, feedback_lines
        )
        filters.append(selected_tag)
        filters.append(items[index])
        if command == "select":
            break
    #
    # FIXME `command` kann im folgenden Code nur den Wert "select" haben!
    #
    if command == "play":
        mpd_client.clear()
        mpd_client.searchadd(*filters)
        mpd_client.play()
        return
    elif command == "select":
        items = mpd_client.list("album", *filters)
        command, index = select_from_list(
            commands, "Album", items, feedback_lines
        )
        selected_album = items[index]
    else:
        return

    if command in ["play", "select"]:
        mpd_client.clear()
        mpd_client.findadd("album", selected_album)
        mpd_client.play()
        return
    elif command == "enter":
        mpd_client.findadd("album", selected_album)
        return


LEDS = namedtuple("LEDS", "play mode1 mode2")


def mainloop(
    commands,
    mpd_client,
    feedback_screen,
    feedback_lines,
    now_playing_lines,
    leds,
):
    commmand_to_func = {
        "power": on_power,
        "play": on_play,
        "mode1": on_mode1,
        "mode2": on_mode2,
        "radio": on_radio,
        "select": on_select,
        "previous": partial(on_track_change, "previous"),
        "next": partial(on_track_change, "next"),
        "enter": on_enter,
        "down": partial(on_volume_change, "down"),
        "up": partial(on_volume_change, "up"),
    }

    while True:
        try:
            command = commands.get(timeout=1)
        except queue.Empty:
            current_song = mpd_client.currentsong()
            status = mpd_client.status()
            feedback_screen.set_priority("hidden")
            feedback_lines[0].set_text("")
            feedback_lines[1].set_text("")
            # TODO
            # andere Taktik für die Titelanzeige überlegen
            try:
                line2 = current_song["artist"]
            except KeyError:
                try:
                    line2 = current_song["name"]
                except KeyError:
                    line1 = datetime.datetime.now().strftime("     %H:%M")
                    line2 = datetime.date.today().strftime("   %Y-%m-%d")
                else:
                    try:
                        line1 = current_song["title"]
                    except KeyError:
                        line1 = "{0:3d}/{1:3d}".format(
                            int(status["song"]) + 1,
                            int(status["playlistlength"]),
                        )
            else:
                line1 = current_song["title"]

            now_playing_lines[0].set_text(line1)
            now_playing_lines[1].set_text(line2)
        else:
            feedback_screen.set_priority("input")
            commmand_to_func[command](mpd_client, commands, feedback_lines)

        leds.play.state = status["state"] == "play"
        leds.mode1.state = int(status["playlistlength"]) > 0
        leds.mode2.state = int(status["random"])


def main():
    wiringpi.wiringPiSetup()
    mpd_client = mpd.MPDClient()
    mpd_client.connect(MPD_HOST, MPD_PORT)

    oled = lcdd(LCDD_HOST, debug=False, charset="iso-8859-1")
    oled.start_session()

    now_playing_screen = oled.add_screen("now_playing")
    now_playing_screen.set_heartbeat("off")
    now_playing_screen.set_priority("foreground")
    now_playing_screen.set_timeout(0)
    now_playing_lines = [
        now_playing_screen.add_scroller_widget(
            "now_playing_line1",
            left=1,
            top=1,
            right=28,
            bottom=1,
            speed=5,
            text="",
        ),
        now_playing_screen.add_scroller_widget(
            "now_playing_line2",
            left=1,
            top=2,
            right=20,
            bottom=2,
            speed=5,
            text="",
        ),
    ]

    feedback_screen = oled.add_screen("feedback")
    feedback_screen.set_heartbeat("off")
    feedback_screen.set_priority("hidden")
    feedback_screen.set_timeout(0)
    feedback_lines = [
        feedback_screen.add_string_widget("feedback_line1", "", x=1, y=1),
        feedback_screen.add_string_widget("feedback_line2", "", x=1, y=2),
    ]

    commands = queue.Queue()

    call_async(
        watch_rotary_encoder,
        PREVIOUS_PIN,
        NEXT_PIN,
        PULLUP,
        partial(rotary_encoder_callback, commands, "previous", "next"),
    )
    call_async(
        watch_rotary_encoder,
        DOWN_PIN,
        UP_PIN,
        PULLUP,
        partial(rotary_encoder_callback, commands, "down", "up"),
    )

    for pin, command in [
        (POWER_PIN, "power"),
        (PLAY_PIN, "play"),
        (MODE1_PIN, "mode1"),
        (MODE2_PIN, "mode2"),
        (RADIO_PIN, "radio"),
        (SELECT_PIN, "select"),
        (ENTER_PIN, "enter"),
    ]:
        call_async(
            watch_switch,
            pin,
            PULLUP,
            0.1,
            partial(switch_callback, commands, command),
        )

    leds = LEDS(LED(PLAY_LED_PIN), LED(MODE1_LED_PIN), LED(MODE2_LED_PIN))

    mainloop(
        commands,
        mpd_client,
        feedback_screen,
        feedback_lines,
        now_playing_lines,
        leds,
    )


if __name__ == "__main__":
    main()
Bei dem Display-Kram wäre es in der Tat sinnvoll das in einer Klasse zusammen zu fassen. Bei den ganzen `on_*`-Funktionen könnte man sich auch eine Klasse vorstellen auf der die definiert sind, denn die bekommen ja alle fast die gleichen Argumente.

Ich sehe auch zwei mögliche Einsätze für das `enum`-Modul. Einmal `IntEnum` für die Pinnummern wo man mit dem `enum.unique()`-Dekorator sicherstellen kann das keine Pinnummer zweimal vergeben wird. Und dann für die Kommandos, damit leichter auffällt wenn man sich mal vertippt und um Kommandos von anderen Zeichenketten unterscheiden zu können.
“Give a man a fire and he's warm for a day, but set fire to him and he's warm for the rest of his life.”
— Terry Pratchett, Jingo
Benutzeravatar
smutbert
User
Beiträge: 20
Registriert: Samstag 5. Juli 2014, 17:22
Wohnort: Graz

Donnerstag 24. Oktober 2019, 15:38

Oh, das ich so viele nicht verwendete Dinge übersehen habe, hätte ich nicht gedacht :oops:
__blackjack__ hat geschrieben:
Donnerstag 24. Oktober 2019, 10:34
In `on_select()` [...]

Ab nach der Schleife habe ich ein Problem mit der Funktion – die ``while True:``-Schleife wird nur durch ``break`` verlassen wenn `command` den Wert "select" hat. Nach der Schleife steht dann aber Code der `command` gegen "play" und "enter" prüft, was ja nie der Fall sein kann‽[...]
Das stimmt ausnahmsweise nicht.

Das Verwirrende an dem Punkt ist glaube ich, dass command im ersten if-Block im Falle von "select" oder "enter" einen neuen Wert zugewiesen bekommt, der dann in der nächsten if-Abfrage geprüft wird.
und zwar zuerst darauf ob in der Schleife weitere Kriterien hinzugefügt werden sollen ("enter") oder ob etwas anderes passieren soll. Nachdem „etwas anderes“ aber auf jeden Fall nur einmal ausgeführt werden soll, breche ich die Schleife zuerst ab und mache außerhalb mit einer neuen if-elif-Abfrage weiter.

(Das ist ganz typisch eine von den Situationen, in denen ich nicht weiß wie man es richtig macht.
Ungünstigerweise jongliere ich nebenbei auch noch ein bisschen mit den Funktionen der Taster um herauszufinden wie die Bedienung am flüssigsten und intuitivsten ist. Unheimlich motivierend ist dabei die Tatsache, dass sich mein Gerät meiner voreingenommenen Meinung nach jetzt schon viel flüssiger und angenehmer bedient als übliche Hifi-Geräte oder mpd-Clients am Handy oder Tablet.)


Den Rest muss ich wieder erst einmal verdauen, nur die PIN-Benennung für die Drehimpulsgeber werde ich nicht übernehmen, wenn du nicht darauf bestehst (es haben beide Pins jeweils gleich viel mit up/next wie mit previous/down zu tun).

Danke!
Benutzeravatar
__blackjack__
User
Beiträge: 4863
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

Freitag 25. Oktober 2019, 11:14

@smutbert: Okay, dann muss die Bedingung für das ``break`` doch ``command != "enter"`` lauten.

Aus dem letzten ``if``/``elif`` können die beiden ``return`` gestrichen werden, denn das passiert da sowieso.

Wenn man den Code dann in den ``elif``-Zweig darüber verschiebt, kann dort im ersten Zweig das ``return`` weg und das ``else: return`` am Ende kann entfallen.

Wenn ich nicht wieder einen Fehler gemacht habe, könnte das dann so aussehen:

Code: Alles auswählen

def on_select(mpd_client, commands, feedback_lines):
    filters = []
    tags = mpd_client.tagtypes()
    while True:
        command, index = select_from_list(
            commands, "Tag", tags, feedback_lines
        )

        if command not in ["select", "enter"]:
            return

        selected_tag = tags[index]
        items = mpd_client.list(selected_tag, *filters)
        command, index = select_from_list(
            commands, selected_tag, items, feedback_lines
        )
        filters.append(selected_tag)
        filters.append(items[index])
        if command != "enter":
            break

    if command == "play":
        mpd_client.clear()
        mpd_client.searchadd(*filters)
        mpd_client.play()
    elif command == "select":
        items = mpd_client.list("album", *filters)
        command, index = select_from_list(
            commands, "Album", items, feedback_lines
        )
        selected_album = items[index]
        if command in ["play", "select"]:
            mpd_client.clear()
            mpd_client.findadd("album", selected_album)
            mpd_client.play()
        elif command == "enter":
            mpd_client.findadd("album", selected_album)
Die vielen Threads für die Überwachung der einzelnen Pins müsste man übrigens relativ einfach durch *einen* ersetzen können wenn man aus den Funktionen die das jetzt machen Objekte macht wo der Initialisierungsteil in die `__init__()` wandert und der Code für *einen* Schleifendurchlauf in eine Methode. Dann kann man diese ganzen Objekte einer Funktion übergeben die diese Methode von jedem Objekt regelmässig aufruft, und dann nur diese eine Funktion in einem Thread laufen lassen. Statt Klassen könnte man die bisherigen Funktionen auch zu Generatorfunktionen machen und in der Funktion die das an antreibt einfach immer die `next()`-Funktion auf jedem Generator aufrufen. Dann sind die notwendigen Änderungen nicht wirklich gross. Im Grunde müsste man nur statt dem `sleep()`-Aufruf ein ``yield`` schreiben.
“Give a man a fire and he's warm for a day, but set fire to him and he's warm for the rest of his life.”
— Terry Pratchett, Jingo
Benutzeravatar
smutbert
User
Beiträge: 20
Registriert: Samstag 5. Juli 2014, 17:22
Wohnort: Graz

Samstag 2. November 2019, 22:59

Jetzt bin ich noch auf eine Variante gekommen, die zwar nicht kürzer, aber weniger verschachtelt ist und (fast) dasselbe tut. (Ich bin aber noch nicht dazu gekommen irgendeine der neuen Varianten auszuprobieren.)

Code: Alles auswählen

def on_select(mpd_client, commands, feedback_lines):
	filters = []
	tags = mpd_client.tagtypes()
	while True:
		command, index = select_from_list(
			commands, "Tag", tags, feedback_lines
		)

		if command not in ["select", "enter"]:
			return

		selected_tag = tags[index]
		items = mpd_client.list(selected_tag, *filters)
		command, index = select_from_list(
			commands, selected_tag, items, feedback_lines
		)
		filters.append(selected_tag)
		filters.append(items[index])
		if command != "enter":
			break

	if command == "select":
		items = mpd_client.list("album", *filters)
		command, index = select_from_list(
			commands, "Album", items, feedback_lines
		)
		filters[:] = []
		filters.append("Album")
		filters.append(items[index])	

	if command in ["play", "select"]:
		mpd_client.clear()
		mpd_client.searchadd(*filters)
		mpd_client.play()
	elif command == "enter":
		mpd_client.findadd("album", selected_album)
Die namedtuple durchblicke ich nach anfänglichen Schwiergkeiten wenigstens ansatzweise, aber das mit dem enum-Modul muss ich mir noch in Ruhe ansehen.

Schön langsam muss ich mir wohl auch Gedanken wegen eines Versionsverwaltungssystems machen - ohne wird es langsam unübersichtlich.

In das Beseitigen bzw. Reduzieren der Threads werde ich nach reiflicher Überlegung keine Arbeit investieren. Ich probiere nämlich bereits nebenbei mit einem Raspberry Pi herum, mit dem ich keine Threads bräuchte, weil das alles gpiozero bzw. pigpio im Hintergrund erledigen.


Danke und lg
(ich melde mich aber sowieso noch einmal, selbst in dem sehr unwahrscheinlichen Fall, dass ich keine Hilfe mehr brauchen sollte)
Benutzeravatar
__blackjack__
User
Beiträge: 4863
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

Sonntag 3. November 2019, 00:00

@smutbert: Hier fallen mir folgende Zeilen auf:

Code: Alles auswählen

        filters[:] = []
        filters.append("Album")
        filters.append(items[index])
Das ist ungewöhnlich die vorhandene Liste an der Stelle zu ändern. Diese drei Zeilen wären einfacher nur eine Zuweisung einer neuen Liste:

Code: Alles auswählen

        filters = ["Album", items[index]]
Selbst wenn man die vorhandene Liste leeren und wiederverwenden wollte, ist es eine etwas ungewöhnliche Variante den gesamten Inhalt der Liste durch den Inhalt einer temporär erzeugten leeren Liste zu ersetzen. Da wäre früher ``del filters[:]`` der offensichtliche Weg für gewesen, ab Python 3.3 dann aber die `clear()`-Methode.
“Give a man a fire and he's warm for a day, but set fire to him and he's warm for the rest of his life.”
— Terry Pratchett, Jingo
Antworten