Musikspieler mit Display und Bedienelementen

Python auf Einplatinencomputer wie Raspberry Pi, Banana Pi / Python für Micro-Controller
Benutzeravatar
__blackjack__
User
Beiträge: 13080
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@smutbert: Ich schreibe gerade etwas, ist aber noch nicht fertig, und ich komme wohl erst morgen dazu das zuende zu bringen. :-)
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
Benutzeravatar
__blackjack__
User
Beiträge: 13080
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@smutbert: `socket` wird importiert aber anscheinen nirgends verwendet.

In `on_play()` wird der MPD-Status ganz kurz hintereinander zweimal abgerufen, obwohl beim ersten mal das Ergebnis sogar an einen Namen gebunden wird.

Die Zeile ``random = 1 - int(mpd_client.status()["random"])`` in `on_mode2()` sieht komisch aus. Kann es sein, dass Du hier versuchst eigentlich einen Wahrheitswert zu negieren? *Das* sollte man dann auch tun statt darauf zu setzen das -1 als im boole'schen Kontext als Wahr angesehen wird.

An einigen Stellen wird die gleiche Variable mit mehreren Werten verglichen in dem Einzelvergleiche mit ``or`` oder ``and`` verkettet werden – da kann man schauen ob man das nicht mit ``in``/``not in`` in einem Vergleich ausdrücken kann. Beispiel: ``if status["state"] != "play" and status["state"] != "pause":`` kann man als ``if status["state"] not in ["play", "pause"]:`` ausdrücken.

Wenn ein ``if``-Konstrukt in einem ``elif`` endet sollte man immer überlegen noch ein ``else`` hinzuzufügen. Notfalls mit einem ``assert False`` und einer Meldung warum das nicht passieren sollte.

Von `select_from_list()` wird das `mpd_client`-Argument überhaupt nicht verwendet, muss also auch nicht übergeben werden.

Bei `on_enter()` ist die ``while True:``-Schleife falsch. Da steckt ein ``if``-Konstrukt drin bei dem *jeder* Zweig mit einem ``return`` endet und damit die Schleife genau *einmal* durchlaufen wird. Damit ist das aber keine Schleife mehr.

In der ersten ``for``-Schleife wird unnötigerweise ein Index für den Zugriff verwendet, statt einfach direkt über die Elemente von `output` zu iterieren. Hier würde sich auch eine „list comprehension“ anbieten. Und ein besserer Name als `items` für das Ergebnis. Bei der zweiten ``for``-Schleife macht `enumerate()` Sinn.

In `on_select()` finde ich den Programmfluss recht undurchsichtig. Das ``continue`` beispielsweise kann man leicht beseitigen in dem man bei dem ``if`` die Bedingung umkehrt. Der Programmablauf kommt an diese Stelle auch nur wenn das `command` entweder "enter" oder "select" ist, man könnte die Abbruchbedingung also statt auf "enter" auch auf "select" beziehen, was ich in dem Fall dann verständlicher finden. Also wenn "select", dann Schleife verlassen. Anstelle von wenn nicht "enter" dann Schleife verlassen. Zudem kann das in den ``if``-Zweig wo geprüft wird ob `command` einen dieser beiden Werte hat, dann ist das noch deutlicher für den Leser was `command` an der Stelle für Werte annehmen kann.

Das bei anderen Werten als den beiden die Funktion mit ``return`` verlassen wird ist IMHO so wichtig dass das sofort nach der Auswahl des Kommandos geprüft werden sollte und nicht irgendwo nach mehreren Zeilen Code in einem ``else: return`` leicht übersehen werden kann.

Ab nach der Schleife habe ich ein Problem mit der Funktion – die ``while True:``-Schleife wird nur durch ``break`` verlassen wenn `command` den Wert "select" hat. Nach der Schleife steht dann aber Code der `command` gegen "play" und "enter" prüft, was ja nie der Fall sein kann‽ Den Code ab dort habe ich nicht weiter angefasst, den kann man IMHO auch mit deutlich weniger ``return``\s in den Zweigen schreiben, und damit IMHO verständlicher.

Die `main()`-Funktion ist mittlerweile ziemlich gross geworden. Die würde ich versuchen irgendwie sinnvoll aufzuteilen. Die ersten drei Variablen kann man prima als Konstanten an den Anfang herausziehen.

Der Aufruf sähe dann so aus:

Code: Alles auswählen

    mainloop(
        commands,
        mpd_client,
        feedback_screen,
        feedback_line1,
        feedback_line2,
        now_playing_line1,
        now_playing_line2,
        play_led,
        mode1_led,
        mode2_led,
    )
Also ein bisschen viel an Argumenten die man da übergeben muss. Wo man hier wirklich einfach jeweils zwei Argumente zu einem zusammenfassen kann sind die `*_line*`-Argumente. Ich sage das ja öfter: Nummerierte Namen sind ein Zeichen das man sich entweder bessere Namen ausdenken will, oder gar keine einzelnen Namen sondern eine Datenstruktur. Oft eine Liste. Und genau das passt hier: statt `feedback_line1` und `feedback_line2` einzeln zu übergeben, steckt man die einfach in eine Liste und greift dann mit `feedback_lines[0]` und `feedback_lines[1]` auf die Objekte zu. Analog lässt sich das mit `now_playing_line1` und `now_playing_line2` machen. Und das skaliert auch wunderbar wenn Du auf vier Zeilen erweitern willst. Dann wären wir bei:

Code: Alles auswählen

    mainloop(
        commands,
        mpd_client,
        feedback_screen,
        feedback_lines,
        now_playing_lines,
        play_led,
        mode1_led,
        mode2_led,
    )
Was sich hier einfach zusammenfassen lässt, wären die LEDs in einem `collections.namedtuple()`-Typ. An der Stelle könnte man den magischen Zahlen beim erstellen der `LED`-Objekte auch per Konstantendefinition Namen geben. Und damit dann auch leichter anpasssbar am Anfang des Quelltextes. Neuer Stand:

Code: Alles auswählen

    mainloop(
        commands,
        mpd_client,
        feedback_screen,
        feedback_lines,
        now_playing_lines,
        leds,
    )
Zwischenstand wäre dann (natürlich ungetestet und ich hoffe ich habe nichts/nicht zu viel kaputt gemacht):

Code: Alles auswählen

#!/usr/bin/env python3
import datetime
import math
import queue
import subprocess
import threading
import time
from collections import namedtuple
from functools import partial

from mpdlcd.vendor.lcdproc.server import Server as lcdd
import musicpd as mpd
import wiringpi2 as wiringpi

MPD_HOST = "localhost"
MPD_PORT = 6600
LCDD_HOST = "localhost"

PREVIOUS_PIN = 11
NEXT_PIN = 9
DOWN_PIN = 10
UP_PIN = 8
POWER_PIN = 20
PLAY_PIN = 17
MODE1_PIN = 19
MODE2_PIN = 16
RADIO_PIN = 18
SELECT_PIN = 15
ENTER_PIN = 12

PLAY_LED_PIN = 22
MODE1_LED_PIN = 21
MODE2_LED_PIN = 26

# 1 und 2 fuer Pull-up und -down sind beim Cubietruck vertauscht.
PULLUP = 1


def call_async(function, *arguments):
    thread = threading.Thread(target=function, args=arguments, daemon=True)
    thread.start()
    return thread


def watch_rotary_encoder(a_pin, b_pin, pin_pud, callback):
    wiringpi.pinMode(a_pin, 0)
    wiringpi.pinMode(b_pin, 0)
    wiringpi.pullUpDnControl(a_pin, pin_pud)
    wiringpi.pullUpDnControl(b_pin, pin_pud)
    steps_per_indent = 4
    last_direction = 0
    delta_sum = 0
    a_state = wiringpi.digitalRead(a_pin)
    b_state = wiringpi.digitalRead(b_pin)
    last_rotation_sequence = (a_state ^ b_state) | b_state << 1
    while True:
        a_state = wiringpi.digitalRead(a_pin)
        b_state = wiringpi.digitalRead(b_pin)
        rotation_sequence = (a_state ^ b_state) | b_state << 1
        if rotation_sequence != last_rotation_sequence:
            delta = (rotation_sequence - last_rotation_sequence) % 4
            if last_direction == 0:
                if delta == 3:
                    delta = -1
                elif delta == 2:
                    delta = 0
            elif last_direction * delta < 0:
                delta = math.copysign(4 - delta, last_direction)
            last_direction = delta
            last_rotation_sequence = rotation_sequence
            delta_sum += delta
            delta = delta_sum // steps_per_indent
            if delta != 0:
                callback(delta)
                delta_sum %= steps_per_indent
        else:
            last_direction = 0
        time.sleep(0.005)


def watch_switch(pin, pin_pud, debounce, callback):
    wiringpi.pinMode(pin, 0)
    wiringpi.pullUpDnControl(pin, pin_pud)
    last_state = wiringpi.digitalRead(pin)
    while True:
        state = wiringpi.digitalRead(pin)
        if state != last_state:
            callback(state - last_state)
            time.sleep(debounce)
        last_state = state
        time.sleep(0.05)


class LED:
    def __init__(self, pin):
        self.pin = pin
        wiringpi.pinMode(self.pin, 1)
        wiringpi.digitalWrite(self.pin, False)

    @property
    def state(self):
        return wiringpi.digitalRead(self.pin)

    @state.setter
    def state(self, value):
        wiringpi.digitalWrite(self.pin, value)


def rotary_encoder_callback(commands, left_command, right_command, event):
    if event == -1:
        commands.put(left_command)
    elif event == 1:
        commands.put(right_command)


def switch_callback(commands, command, event):
    if event == 1:
        commands.put(command)


def select_from_list(commands, name, items, feedback_lines):
    index = 0
    item_count = len(items)
    big_step = item_count // 20 if item_count > 10 else 1
    feedback_lines[0].set_text("SELECT {0}:".format(name))
    while True:
        feedback_lines[1].set_text(items[index])
        command = commands.get()
        if command == "next":
            index = (index + 1) % item_count
        elif command == "previous":
            index = (index - 1) % item_count
        elif command == "up":
            index = (index + big_step) % item_count
        elif command == "down":
            index = (index - big_step) % item_count
        else:
            feedback_lines[0].set_text("")
            feedback_lines[1].set_text("      ...")
            return command, index
        feedback_lines[0].set_text(
            "{0} {1}/{2}:".format(name, index + 1, item_count)
        )


def on_power(_mpd_client, _commands, _feedback_lines):
    subprocess.run(["shutdown", "-h", "now"])


def on_play(mpd_client, _commands, feedback_lines):
    status = mpd_client.status()
    if int(status["playlistlength"]) == 0:
        text = "EMPTY QUEUE"
    elif status["state"] == "play":
        mpd_client.pause()
        text = "PAUSE ||"
    else:
        mpd_client.play()
        text = "PLAY >"
    feedback_lines[0].set_text(text)
    feedback_lines[1].set_text("")


def on_mode1(mpd_client, _commands, feedback_lines):
    mpd_client.clear()
    feedback_lines[0].set_text("CLEAR QUEUE")


def on_mode2(mpd_client, _commands, feedback_lines):
    is_random = not bool(int(mpd_client.status()["random"]))
    mpd_client.random(is_random)
    feedback_lines[0].set_text("RANDOM:")
    feedback_lines[1].set_text("ON" if is_random else "OFF")


def on_radio(mpd_client, _commands, feedback_lines):
    mpd_client.clear()
    mpd_client.load("radio")
    mpd_client.play()
    song_data = mpd_client.currentsong()
    feedback_lines[0].set_text("INTERNET RADIO:")
    feedback_lines[1].set_text(
        "{0:3d}: {1}".format(int(song_data["pos"]) + 1, song_data["name"])
    )


def on_track_change(command, mpd_client, _commands, feedback_lines):
    status = mpd_client.status()
    if status["state"] not in ["play", "pause"]:
        line1 = "NOT PLAYING"
        line2 = ""
    else:
        song_count = int(status["playlistlength"])
        song_index = int(status["song"])
        if command == "next":
            line1 = "NEXT >>|"
            song_index = (song_index + 1) % song_count
        elif command == "previous":
            if float(status["elapsed"]) < 5:
                line1 = "PREV |<<"
                song_index = (song_index - 1) % song_count
            else:
                song_index = song_index
                line1 = "|<"
        else:
            assert False, "unknown command {!r}".format(command)
        mpd_client.play(song_index)
        line2 = "{0:3d}/{1:3d}".format(song_index + 1, song_count)

    feedback_lines[0].set_text(line1)
    feedback_lines[1].set_text(line2)


def on_volume_change(command, mpd_client, _commands, feedback_lines):
    delta = {"down": -1, "up": 1}[command]
    # TODO
    # rethink calculation of new_volume
    volume = 2 * min(
        50, max(0, int(float(mpd_client.status()["volume"]) / 2) + delta)
    )
    mpd_client.setvol(volume)
    feedback_lines[0].set_text("VOLUME:")
    feedback_lines[1].set_text("{0:1.2f}".format(volume / 100))


def on_enter(mpd_client, commands, feedback_lines):
    outputs = mpd_client.outputs()
    output_names = [output["outputname"] for output in outputs]
    command, index = select_from_list(
        commands, "Output", output_names, feedback_lines
    )
    output = outputs[index]

    if command == "enter":
        feedback_lines[0].set_text("{0}:".format(output_names[index]))
        feedback_lines[1].set_text(
            "OFF" if bool(int(output["outputenabled"])) else "ON"
        )
        mpd_client.toggleoutput(output["outputid"])

    elif command == "select":
        if bool(int(output["outputenabled"])):
            line2 = "ON TO EXCL. ON"
        else:
            line2 = "OFF TO EXCL. ON"
            mpd_client.toggleoutput(output["outputid"])

        feedback_lines[0].set_text("{0}:".format(output_names[index]))
        feedback_lines[1].set_text(line2)
        for i, output in enumerate(outputs):
            if i != index:
                mpd_client.disableoutput(output["outputid"])


def on_select(mpd_client, commands, feedback_lines):
    filters = []
    tags = mpd_client.tagtypes()
    while True:
        command, index = select_from_list(
            commands, "Tag", tags, feedback_lines
        )

        if command not in ["select", "enter"]:
            return

        selected_tag = tags[index]
        items = mpd_client.list(selected_tag, *filters)
        command, index = select_from_list(
            commands, selected_tag, items, feedback_lines
        )
        filters.append(selected_tag)
        filters.append(items[index])
        if command == "select":
            break
    #
    # FIXME `command` kann im folgenden Code nur den Wert "select" haben!
    #
    if command == "play":
        mpd_client.clear()
        mpd_client.searchadd(*filters)
        mpd_client.play()
        return
    elif command == "select":
        items = mpd_client.list("album", *filters)
        command, index = select_from_list(
            commands, "Album", items, feedback_lines
        )
        selected_album = items[index]
    else:
        return

    if command in ["play", "select"]:
        mpd_client.clear()
        mpd_client.findadd("album", selected_album)
        mpd_client.play()
        return
    elif command == "enter":
        mpd_client.findadd("album", selected_album)
        return


LEDS = namedtuple("LEDS", "play mode1 mode2")


def mainloop(
    commands,
    mpd_client,
    feedback_screen,
    feedback_lines,
    now_playing_lines,
    leds,
):
    commmand_to_func = {
        "power": on_power,
        "play": on_play,
        "mode1": on_mode1,
        "mode2": on_mode2,
        "radio": on_radio,
        "select": on_select,
        "previous": partial(on_track_change, "previous"),
        "next": partial(on_track_change, "next"),
        "enter": on_enter,
        "down": partial(on_volume_change, "down"),
        "up": partial(on_volume_change, "up"),
    }

    while True:
        try:
            command = commands.get(timeout=1)
        except queue.Empty:
            current_song = mpd_client.currentsong()
            status = mpd_client.status()
            feedback_screen.set_priority("hidden")
            feedback_lines[0].set_text("")
            feedback_lines[1].set_text("")
            # TODO
            # andere Taktik für die Titelanzeige überlegen
            try:
                line2 = current_song["artist"]
            except KeyError:
                try:
                    line2 = current_song["name"]
                except KeyError:
                    line1 = datetime.datetime.now().strftime("     %H:%M")
                    line2 = datetime.date.today().strftime("   %Y-%m-%d")
                else:
                    try:
                        line1 = current_song["title"]
                    except KeyError:
                        line1 = "{0:3d}/{1:3d}".format(
                            int(status["song"]) + 1,
                            int(status["playlistlength"]),
                        )
            else:
                line1 = current_song["title"]

            now_playing_lines[0].set_text(line1)
            now_playing_lines[1].set_text(line2)
        else:
            feedback_screen.set_priority("input")
            commmand_to_func[command](mpd_client, commands, feedback_lines)

        leds.play.state = status["state"] == "play"
        leds.mode1.state = int(status["playlistlength"]) > 0
        leds.mode2.state = int(status["random"])


def main():
    wiringpi.wiringPiSetup()
    mpd_client = mpd.MPDClient()
    mpd_client.connect(MPD_HOST, MPD_PORT)

    oled = lcdd(LCDD_HOST, debug=False, charset="iso-8859-1")
    oled.start_session()

    now_playing_screen = oled.add_screen("now_playing")
    now_playing_screen.set_heartbeat("off")
    now_playing_screen.set_priority("foreground")
    now_playing_screen.set_timeout(0)
    now_playing_lines = [
        now_playing_screen.add_scroller_widget(
            "now_playing_line1",
            left=1,
            top=1,
            right=28,
            bottom=1,
            speed=5,
            text="",
        ),
        now_playing_screen.add_scroller_widget(
            "now_playing_line2",
            left=1,
            top=2,
            right=20,
            bottom=2,
            speed=5,
            text="",
        ),
    ]

    feedback_screen = oled.add_screen("feedback")
    feedback_screen.set_heartbeat("off")
    feedback_screen.set_priority("hidden")
    feedback_screen.set_timeout(0)
    feedback_lines = [
        feedback_screen.add_string_widget("feedback_line1", "", x=1, y=1),
        feedback_screen.add_string_widget("feedback_line2", "", x=1, y=2),
    ]

    commands = queue.Queue()

    call_async(
        watch_rotary_encoder,
        PREVIOUS_PIN,
        NEXT_PIN,
        PULLUP,
        partial(rotary_encoder_callback, commands, "previous", "next"),
    )
    call_async(
        watch_rotary_encoder,
        DOWN_PIN,
        UP_PIN,
        PULLUP,
        partial(rotary_encoder_callback, commands, "down", "up"),
    )

    for pin, command in [
        (POWER_PIN, "power"),
        (PLAY_PIN, "play"),
        (MODE1_PIN, "mode1"),
        (MODE2_PIN, "mode2"),
        (RADIO_PIN, "radio"),
        (SELECT_PIN, "select"),
        (ENTER_PIN, "enter"),
    ]:
        call_async(
            watch_switch,
            pin,
            PULLUP,
            0.1,
            partial(switch_callback, commands, command),
        )

    leds = LEDS(LED(PLAY_LED_PIN), LED(MODE1_LED_PIN), LED(MODE2_LED_PIN))

    mainloop(
        commands,
        mpd_client,
        feedback_screen,
        feedback_lines,
        now_playing_lines,
        leds,
    )


if __name__ == "__main__":
    main()
Bei dem Display-Kram wäre es in der Tat sinnvoll das in einer Klasse zusammen zu fassen. Bei den ganzen `on_*`-Funktionen könnte man sich auch eine Klasse vorstellen auf der die definiert sind, denn die bekommen ja alle fast die gleichen Argumente.

Ich sehe auch zwei mögliche Einsätze für das `enum`-Modul. Einmal `IntEnum` für die Pinnummern wo man mit dem `enum.unique()`-Dekorator sicherstellen kann das keine Pinnummer zweimal vergeben wird. Und dann für die Kommandos, damit leichter auffällt wenn man sich mal vertippt und um Kommandos von anderen Zeichenketten unterscheiden zu können.
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
Benutzeravatar
smutbert
User
Beiträge: 31
Registriert: Samstag 5. Juli 2014, 17:22
Wohnort: Graz

Oh, das ich so viele nicht verwendete Dinge übersehen habe, hätte ich nicht gedacht :oops:
__blackjack__ hat geschrieben: Donnerstag 24. Oktober 2019, 10:34 In `on_select()` [...]

Ab nach der Schleife habe ich ein Problem mit der Funktion – die ``while True:``-Schleife wird nur durch ``break`` verlassen wenn `command` den Wert "select" hat. Nach der Schleife steht dann aber Code der `command` gegen "play" und "enter" prüft, was ja nie der Fall sein kann‽[...]
Das stimmt ausnahmsweise nicht.

Das Verwirrende an dem Punkt ist glaube ich, dass command im ersten if-Block im Falle von "select" oder "enter" einen neuen Wert zugewiesen bekommt, der dann in der nächsten if-Abfrage geprüft wird.
und zwar zuerst darauf ob in der Schleife weitere Kriterien hinzugefügt werden sollen ("enter") oder ob etwas anderes passieren soll. Nachdem „etwas anderes“ aber auf jeden Fall nur einmal ausgeführt werden soll, breche ich die Schleife zuerst ab und mache außerhalb mit einer neuen if-elif-Abfrage weiter.

(Das ist ganz typisch eine von den Situationen, in denen ich nicht weiß wie man es richtig macht.
Ungünstigerweise jongliere ich nebenbei auch noch ein bisschen mit den Funktionen der Taster um herauszufinden wie die Bedienung am flüssigsten und intuitivsten ist. Unheimlich motivierend ist dabei die Tatsache, dass sich mein Gerät meiner voreingenommenen Meinung nach jetzt schon viel flüssiger und angenehmer bedient als übliche Hifi-Geräte oder mpd-Clients am Handy oder Tablet.)


Den Rest muss ich wieder erst einmal verdauen, nur die PIN-Benennung für die Drehimpulsgeber werde ich nicht übernehmen, wenn du nicht darauf bestehst (es haben beide Pins jeweils gleich viel mit up/next wie mit previous/down zu tun).

Danke!
Benutzeravatar
__blackjack__
User
Beiträge: 13080
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@smutbert: Okay, dann muss die Bedingung für das ``break`` doch ``command != "enter"`` lauten.

Aus dem letzten ``if``/``elif`` können die beiden ``return`` gestrichen werden, denn das passiert da sowieso.

Wenn man den Code dann in den ``elif``-Zweig darüber verschiebt, kann dort im ersten Zweig das ``return`` weg und das ``else: return`` am Ende kann entfallen.

Wenn ich nicht wieder einen Fehler gemacht habe, könnte das dann so aussehen:

Code: Alles auswählen

def on_select(mpd_client, commands, feedback_lines):
    filters = []
    tags = mpd_client.tagtypes()
    while True:
        command, index = select_from_list(
            commands, "Tag", tags, feedback_lines
        )

        if command not in ["select", "enter"]:
            return

        selected_tag = tags[index]
        items = mpd_client.list(selected_tag, *filters)
        command, index = select_from_list(
            commands, selected_tag, items, feedback_lines
        )
        filters.append(selected_tag)
        filters.append(items[index])
        if command != "enter":
            break

    if command == "play":
        mpd_client.clear()
        mpd_client.searchadd(*filters)
        mpd_client.play()
    elif command == "select":
        items = mpd_client.list("album", *filters)
        command, index = select_from_list(
            commands, "Album", items, feedback_lines
        )
        selected_album = items[index]
        if command in ["play", "select"]:
            mpd_client.clear()
            mpd_client.findadd("album", selected_album)
            mpd_client.play()
        elif command == "enter":
            mpd_client.findadd("album", selected_album)
Die vielen Threads für die Überwachung der einzelnen Pins müsste man übrigens relativ einfach durch *einen* ersetzen können wenn man aus den Funktionen die das jetzt machen Objekte macht wo der Initialisierungsteil in die `__init__()` wandert und der Code für *einen* Schleifendurchlauf in eine Methode. Dann kann man diese ganzen Objekte einer Funktion übergeben die diese Methode von jedem Objekt regelmässig aufruft, und dann nur diese eine Funktion in einem Thread laufen lassen. Statt Klassen könnte man die bisherigen Funktionen auch zu Generatorfunktionen machen und in der Funktion die das an antreibt einfach immer die `next()`-Funktion auf jedem Generator aufrufen. Dann sind die notwendigen Änderungen nicht wirklich gross. Im Grunde müsste man nur statt dem `sleep()`-Aufruf ein ``yield`` schreiben.
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
Benutzeravatar
smutbert
User
Beiträge: 31
Registriert: Samstag 5. Juli 2014, 17:22
Wohnort: Graz

Jetzt bin ich noch auf eine Variante gekommen, die zwar nicht kürzer, aber weniger verschachtelt ist und (fast) dasselbe tut. (Ich bin aber noch nicht dazu gekommen irgendeine der neuen Varianten auszuprobieren.)

Code: Alles auswählen

def on_select(mpd_client, commands, feedback_lines):
	filters = []
	tags = mpd_client.tagtypes()
	while True:
		command, index = select_from_list(
			commands, "Tag", tags, feedback_lines
		)

		if command not in ["select", "enter"]:
			return

		selected_tag = tags[index]
		items = mpd_client.list(selected_tag, *filters)
		command, index = select_from_list(
			commands, selected_tag, items, feedback_lines
		)
		filters.append(selected_tag)
		filters.append(items[index])
		if command != "enter":
			break

	if command == "select":
		items = mpd_client.list("album", *filters)
		command, index = select_from_list(
			commands, "Album", items, feedback_lines
		)
		filters[:] = []
		filters.append("Album")
		filters.append(items[index])	

	if command in ["play", "select"]:
		mpd_client.clear()
		mpd_client.searchadd(*filters)
		mpd_client.play()
	elif command == "enter":
		mpd_client.findadd("album", selected_album)
Die namedtuple durchblicke ich nach anfänglichen Schwiergkeiten wenigstens ansatzweise, aber das mit dem enum-Modul muss ich mir noch in Ruhe ansehen.

Schön langsam muss ich mir wohl auch Gedanken wegen eines Versionsverwaltungssystems machen - ohne wird es langsam unübersichtlich.

In das Beseitigen bzw. Reduzieren der Threads werde ich nach reiflicher Überlegung keine Arbeit investieren. Ich probiere nämlich bereits nebenbei mit einem Raspberry Pi herum, mit dem ich keine Threads bräuchte, weil das alles gpiozero bzw. pigpio im Hintergrund erledigen.


Danke und lg
(ich melde mich aber sowieso noch einmal, selbst in dem sehr unwahrscheinlichen Fall, dass ich keine Hilfe mehr brauchen sollte)
Benutzeravatar
__blackjack__
User
Beiträge: 13080
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@smutbert: Hier fallen mir folgende Zeilen auf:

Code: Alles auswählen

        filters[:] = []
        filters.append("Album")
        filters.append(items[index])
Das ist ungewöhnlich die vorhandene Liste an der Stelle zu ändern. Diese drei Zeilen wären einfacher nur eine Zuweisung einer neuen Liste:

Code: Alles auswählen

        filters = ["Album", items[index]]
Selbst wenn man die vorhandene Liste leeren und wiederverwenden wollte, ist es eine etwas ungewöhnliche Variante den gesamten Inhalt der Liste durch den Inhalt einer temporär erzeugten leeren Liste zu ersetzen. Da wäre früher ``del filters[:]`` der offensichtliche Weg für gewesen, ab Python 3.3 dann aber die `clear()`-Methode.
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
Benutzeravatar
smutbert
User
Beiträge: 31
Registriert: Samstag 5. Juli 2014, 17:22
Wohnort: Graz

Hallo,

nach langer Zeit habe ich mich wieder mit meinem Projekt beschäftigt und außerdem habe ich sowieso versprochen mich noch zu melden. Jetzt habe ich endlich den geplanten Umstieg vom Cubietruck auf den Raspberry Pi gemacht. Dabei habe ich einiges ändern müssen und versucht die Vorteile der guten Unterstützung des Raspberry Pi zu nutzen.

Code: Alles auswählen

#!/usr/bin/env python3

import math
import queue
import threading
import evdev
import selectors
import gpiozero

from functools import partial
from mpdlcd.vendor.lcdproc.server import Server as lcdd
import musicpd as mpd

MPD_HOST = "localhost"
MPD_PORT = 6600
LCDD_HOST = "localhost"

POWER_LED_PIN = 26
LED_PINS = {
    'Play': 12,
    'Mode': 13,
    'Debug': 21
    }

INPUT_DEVICES = [
        '/dev/input/event0',
        '/dev/input/event1',
        '/dev/input/event2',
        '/dev/input/event3',
        '/dev/input/event4',
        '/dev/input/event5',
        '/dev/input/event7',
        '/dev/input/event8'
        ]

INPUT_EVENTS = {
        59: { 1: 'play'},
        60: { 1: 'mode1'},
        61: { 1: 'mode2'},
        62: { 1: 'radio'},
        63: { 1: 'select'},
        64: { 1: 'enter'},
        5: { 1: 'right', -1: 'left'},
        6: { 1: 'up', -1: 'down'}
        }


def call_async(function, *arguments):
    thread = threading.Thread(target=function, args=arguments)
    thread.daemon = True
    thread.start()
    return thread


def collect_input(devices, commands):
    selector = selectors.DefaultSelector()
    for i in devices:
        selector.register(evdev.InputDevice(i), selectors.EVENT_READ)
    while True:
        for key, mask in selector.select():
            device = key.fileobj
            for event in device.read():
                input_to_commands(event, commands)


def input_to_commands(event, commands):
    # event.type -> device
    # event.value -> press=01, rotary +/-1
    # code -> keycode, axis (rotary)
    try:
        commands.put(INPUT_EVENTS[event.code][event.value])
    except KeyError:
        return


def on_play(mpd_client, _commands, display_lines):
    now_status = mpd_client.status()
    line2 = ''
    if int(now_status['playlistlength']) == 0:
        line1 = 'EMPTY QUEUE'
    elif mpd_client.status()['state'] == 'play':
        mpd_client.pause()
        line1 = '||'
    else:
        mpd_client.play()
        line1 = '>'
    display_lines[1].set_text(line1)
    display_lines[2].set_text(line2)


def on_mode1(mpd_client, _commands, display_lines):
    mpd_client.clear()
    display_lines[1].set_text('CLEAR QUEUE')


def on_mode2(mpd_client, _commands, display_lines):
    random = (1 - int(mpd_client.status()['random']))
    mpd_client.random(random)
    line1 = 'RANDOM'
    if random == 0:
        line2 = 'OFF'
    else:
        line2 = 'ON'
    display_lines[1].set_text(line1)
    display_lines[1].set_text(line2)


def on_radio(mpd_client, _commands, display_lines):
    mpd_client.clear()
    mpd_client.load('radio')
    mpd_client.play()
    now_song = mpd_client.currentsong()
    line1 = 'INTERNET RADIO'
    line2 = '{0:3d}: {1}'.format(int(now_song['pos']) + 1, now_song['name'])
    display_lines[1].set_text(line1)
    display_lines[2].set_text(line2)


def on_track_change(command, mpd_client, commands, display_lines):
    now_status = mpd_client.status()
    if int(now_status['playlistlength']) == 0:
        on_select(command, mpd_client, commands, display_lines)
        return
    now_queue = mpd_client.playlistid()
    now_queue_length = len(now_queue)
    now_queue_titles = []
    for i in range(now_queue_length):
        try:
            now_queue_titles.append(now_queue[i]['title'])
        except KeyError:
            now_queue_titles.append(now_queue[i]['name'])
    try:
        start = int(now_status['song'])
    except:
        start = 0

    command, index = select_from_list(mpd_client, commands, 'Title', now_queue_titles, display_lines, index=start)
    if command == 'select' or command == 'enter':
        selected_position = now_queue[index]['pos'] 
        mpd_client.play(selected_position)


def on_volume_change(command, mpd_client, _commands, display_lines):
    delta = {'down': -1, 'up': 1}[command]
    mpd_client.volume(delta)
    display_lines[1].set_text('VOLUME')
    display_lines[2].set_text('{0:1.2f}'.format(int(mpd_client.status()['volume']) / 100))


def on_enter(mpd_client, commands, display_lines):
    outputs = mpd_client.outputs()
    number_of_outputs = len(outputs)
    items = []
    for i in range(number_of_outputs):
        items.append(outputs[i]['outputname'])

    command, index = select_from_list(mpd_client, commands, 'Output', items, display_lines)
    if command == 'enter':
        line1 = '{0}'.format(outputs[index]['outputname'])
        if int(outputs[index]['outputenabled']) == 0:
            line2 = 'ON'
        else:
            line2 = 'OFF'
        display_lines[1].set_text(line1)
        display_lines[2].set_text(line2)
        mpd_client.toggleoutput(outputs[index]['outputid'])
    elif command == 'select':
        line1 = '{0}'.format(outputs[index]['outputname'])
        line2 = 'EXCLUSIVE ON'
        display_lines[1].set_text(line1)
        display_lines[2].set_text(line2)
        mpd_client.enableoutput(outputs[index]['outputid'])
        for i in range(number_of_outputs):
            if i != index:
                mpd_client.disableoutput(outputs[i]['outputid'])


def on_select(mpd_client, commands, display_lines):
    filters = []
    tags = mpd_client.tagtypes()
    while True:
        command, index = select_from_list(mpd_client, commands, 'Tag', tags, display_lines)
        selected_tag = tags[index]
        if command == 'select' or command == 'enter':
            items = mpd_client.list(selected_tag, *filters)
            command, index = select_from_list(mpd_client, commands, selected_tag, items, display_lines)
            filters.append(selected_tag)
            filters.append(items[index])
        else:
            return
        if command == 'enter':
            continue
        else:
            break

    if command == 'play':
        mpd_client.clear()
        mpd_client.searchadd(*filters)
        mpd_client.play()
        return
    elif command == 'select':
        items = mpd_client.list('album', *filters)
        command, index = select_from_list(mpd_client, commands, 'Album', items, display_lines)
        selected_album = items[index]
    else:
        return

    if command == 'play' or command == 'select':
        mpd_client.clear()
        mpd_client.findadd('album', selected_album)
        mpd_client.play()
        return
    elif command == 'enter':
        mpd_client.findadd('album', selected_album)
        return


def select_from_list(mpd_client, commands, name, items, display_lines, index=0):
    number_of_items = len(items)
    number_padding = int(math.log10(number_of_items)+1)
    if number_of_items > 10:
        big_step = int(number_of_items / 20)
    else:
        big_step = 1
    while True:
        display_lines[1].set_text('# {1:{width}}/{2:{width}} {0}'.format(name, index + 1, number_of_items, width=number_padding))
        display_lines[2].set_text(items[index])
        command = commands.get()
        if command == "right":
            index = (index + 1) % number_of_items
        elif command == "left":
            index = (index - 1) % number_of_items
        elif command == 'up':
            index = (index + big_step) % number_of_items
        elif command == 'down':
            index = (index - big_step) % number_of_items
        else:
            display_lines[1].set_text('...')
            display_lines[2].set_text('')
            return command, index


def mainloop(commands, mpd_client, display, display_lines, leds):
    previous_play_led_state = 'unknown'

    commmand_to_func = {
        "play": on_play,
        "mode1": on_mode1,
        "mode2": on_mode2,
        "radio": on_radio,

        "select": on_select,
        "left": partial(on_track_change, "left"),
        "right": partial(on_track_change, "right"),

        "enter": on_enter,
        "down": partial(on_volume_change, "down"),
        "up": partial(on_volume_change, "up"),
    }

    while True:
        try:
            command = commands.get(timeout=1)
        except queue.Empty:
            display.set_priority('hidden')
            for i in display_lines.keys():
                display_lines[i].set_text('')
            leds['Mode'].off()
            now_status = mpd_client.status()

            if now_status['state'] == 'play':
                leds['Play'].on()
                previous_play_led_state = 'play'
            elif now_status['state'] == 'pause' and previous_play_led_state != 'pause':
                # pulse() stört jedes Mal das Pulsieren
                previous_play_led_state = 'pause'
                leds['Play'].pulse()
            elif now_status['state'] == 'stop':
                leds['Play'].off()
                previous_play_led_state = 'stop'

            if int(now_status['playlistlength']) > 0:
                leds['Mode'].on()
            else:
                leds['Mode'].off()
            if int(now_status['random']) == 1:
                leds['Debug'].blink()
            else:
                leds['Debug'].off()
        else:
            leds['Mode'].pulse()
            leds['Play'].off()
            display.set_priority('input')
            commmand_to_func[command](mpd_client, commands, display_lines)    


def main():
    mpd_client = mpd.MPDClient()
    mpd_client.connect(MPD_HOST, MPD_PORT)

    oled = lcdd(LCDD_HOST, debug=False, charset='iso-8859-1')
    oled.start_session()

    display = oled.add_screen('orpheus')
    display.set_heartbeat('off')
    display.set_priority('hidden')
    display.set_timeout(0)
    display_lines = {}
    for i in 1, 2:
        display_lines[i] = display.add_string_widget(f'line{i}', '', x=1, y=i)

    commands = queue.Queue()

    call_async(collect_input, INPUT_DEVICES, commands)

    leds = { 'Power': gpiozero.LED(POWER_LED_PIN, active_high=False, initial_value=True) }
    for i in LED_PINS.keys():
        leds[i] = gpiozero.PWMLED(LED_PINS[i])

    # TODO: except einschraenken oder entfernen, wenn keine Fehler mehr auftreten
    try:
        mainloop(commands, mpd_client, display, display_lines, leds)
    except:
        leds['Power'].off()


if __name__ == '__main__':
    main()
Bis auf seltene Abstürze, deren Ursachen ich nach und nach beseitige, läuft das Programm einwandfrei und hat fast alle Features, die ich mir gewünscht habe.

Für weitere sachdienliche Hinweise (zB ob der Verwendung von python-evdev oder weil ich dictionaries sehr gerne verwende?) bin ich natürlich immer noch dankbar und vielleicht ist jemandem fad und will mir näherbringen wie eine Klasse für die on_*-Funktionen aussehehen könnte (was wollte ich damit für Objekte erstellen?).


lg smutbert
Antworten