Langen Tastendruck einer Fernbedienung erkennen

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
unique24
User
Beiträge: 69
Registriert: Donnerstag 5. Juli 2018, 14:51

Hallo,

von einer Funkfernbedienung über USB bekomme ich die Tastendrücke.
Leider erhalte ich aber beim drücken nur einen Trigger. Drücke/Loslassen also nicht.

mein Code soweit um ein UDP Broadcast zu versenden wenn Taste 1 gedrückt wird:

Code: Alles auswählen

import evdev
import socket

UDP_IP = '192.168.0.255'
UDP_PORT = 20001

device = evdev.InputDevice('/dev/input/event0')
print(device)

for event in device.read_loop():
   if event.type == evdev.ecodes.EV_KEY:
#      print(evdev.categorize(event))
#      print('sec ' + str(event.sec))
#      print('usec ' + str(event.usec))
#      print('type ' + str(event.type))
#      print('code ' + str(event.code))
#      print('value ' + str(event.value))
#      print('timestamp ' + str(event.timestamp()))
      if (event.code == 2 and event.value == 1):
           sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
           sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
           sock.sendto(str.encode('KEY_1'), (UDP_IP, UDP_PORT))
Nun bräuchte ich aber folgendes:
Das senden um 400ms verzögern. Wenn die Taste nicht nochmal empfangen wird, das Telegramm senden, sonst ein anderes.

Danke!
unique24
User
Beiträge: 69
Registriert: Donnerstag 5. Juli 2018, 14:51

Soweit habe ich kurzen und langen Tastendruck fertig.

Aber ich sende den Kurzen Tastendruck und er soll 0.7 Sekunden verzögert werden.
Wenn der lange in der Zwischenzeit kommt, soll der kurze ohne senden abgebrochen werden.

Aber das klappt noch nicht:

Code: Alles auswählen

import evdev
import socket
import time
import threading

sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)

UDP_IP = '192.168.0.255'
UDP_PORT = 20001

device = evdev.InputDevice('/dev/input/event0')
print(device)


def send_command(key='empty', duration='short', delay=0.4):
    t = threading.currentThread()
    print(key , duration)
    while getattr(t, "do_run", True):
        time.sleep(delay)
        sock.sendto(str.encode(key + '@' + duration), (UDP_IP, UDP_PORT))
        print('command sended')
        t.do_run = False
    print('thread canceled')

def listen():
    lastTimeStamp = 0
    lastKey = -1
    longPush = 0
    for event in device.read_loop():
        if event.type == evdev.ecodes.EV_KEY:
            if (event.code == 2 and event.value == 1):
                delay = event.timestamp() - lastTimeStamp
                if (delay < 0.55 and delay > 0.05 and lastKey == event.code):
                    print ('warte auf langen Tastendruck')
                    longPush = 0
                elif (delay < 0.05 and lastKey == event.code and longPush == 0):
                    print ('langer Tastendruck')
                    print ('cancel Thread')
                    t1.do_run = False
                    t1.join()
                    longPush = 1
                    send_command('KEY_1', 'long', 0)
                elif (delay > 0.56 and lastKey == event.code):
                    print ('kurzer Tastendruck')
                    t1 = threading.Thread(target=send_command, args=('KEY_1','short', 0.7))
                    t1.start()
                    longPush = 0
                lastKey = event.code
                lastTimeStamp = event.timestamp()

listen()
Benutzeravatar
__blackjack__
User
Beiträge: 14052
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@unique24: Auf Modulebene sollte nur Code stehen der Konstanten, Funktionen, und Klassen definiert. Das Hauptprogramm steht üblicherweise in einer Funktion die `main()` heisst. Daraus folgt das `send_command()` das Socket-Objekt als Argument braucht und `listen()` das Socket-Objekt und das `InputDevice`-Objekt.

Namen werden in Python klein_mit_unterstrich geschrieben. Ausnahmen sind Konstanten (KOMPLETT_GROSS) und Klassen (MixedCase).

Für Wahrheitswerte hat Python einen eigenen Datentyp (`bool`) mit den Werten `True` und `False`. Da sollte man nicht die Zahlen 0 und 1 für missbrauchen. Und man vergleicht dann auch nicht mit `True` und `False` sondern nimmt den Wert direkt oder dessen Gegenteil mit ``not``.

Um die Bedingung von ``if`` gehören keine unnötigen Klammern.

Die ersten beiden ``if``\s in `listen()` lassen sich zusammenfassen.

Der Teiltest ob `event.code` dem letzten Code entspricht ist in allen drei Bedingungen enthalten, das kann man also in ein eigenes, übergeordnetes ``if`` heraus ziehen.

Ob ein Wert zwischen zwei anderen liegt lässt sich mit verketterten Vergleichen ausdrücken was der mathematischen Schreibweise entspricht und leichter lesbar ist.

Der letzte Test scheint mir falsch zu sein weil Dein Code etwas macht wenn `delay` <0,55 ist und er macht etwas wenn `delay` >0,56 ist, aber er macht nichts wenn `delay` *zwischen* 0,55 und 0,56 liegt. Soll das so sein, und wenn ja warum? Wenn man diese Lücke nicht haben will, dann braucht man am Ende auch kein ``elif`` denn dann würde man mit ``else`` einfach den Rest abdecken.

`t1` ist kein guter Name. `t` ist nichtssagend. Man nummeriert keine Namen. Die 1 macht auch gar keinen Sinn, denn es gibt ja gar keine 2.

Bei Threads will man eigentlich fast immer einen „daemon“-Thread erstellen.

Es ist unhöflich und verwirrend einfach so dynamisch Attribute auf fremden Objekten zu setzen. Also `do_run` auf `Thread`-Objekten. Wenn man ein Flag thread-sicher über Threadgrenzen hinweg kommunizieren möchte, verwendet man ein `Event`-Objekt. Wenn man so ein Objekt `send_command()` übergibt, wird ein bisschen deutlicher wie verquer das ist diese Funktion mal synchron und mal asynchron aufzurufen, weil man in beiden Fällen ein Objekt zur Kommunikation über Threadgrenzen hinweg übergibt, das aber in einem Fall gar nicht gemacht werden muss.

Man ruft Methoden nicht auf der Klasse auf und übergibt ein Exemplar sondern ruft die Methode auf dem Exemplar auf. Also nicht ``str.encode(something)`` sondern ``something.encode()``.

Mit einem `Event` sähe `send_command()` so aus:

Code: Alles auswählen

def send_command(cancel_event, udp_socket, key, duration, delay):
    print(key, duration)
    while not cancel_event.is_set():
        time.sleep(delay)
        udp_socket.sendto(f"{key}@{duration}".encode(), (UDP_IP, UDP_PORT))
        print("command sent")
        cancel_event.set()
    print("thread canceled")
Die ``while``-Schleife in `send_command()` ist keine Schleife weil die genau einmal durchlaufen wird, also gar nichts wiederholt wird und auch das sie gar nicht durchlaufen wird kann eigentlich nicht passieren. Die kann also weg, womit dann aber auch das `Event` gar nicht mehr verwendet wird. Und die Ausgabe grundsätzlich am Ende macht keinen Sinn weil die ja auch gemacht wird wenn der Thread nicht abgebrochen wurde, selbst wenn das passieren könnte.

Was dann übrig bliebe wäre das hier:

Code: Alles auswählen

#!/usr/bin/env python3
import socket
import threading
import time

import evdev

UDP_IP = "192.168.0.255"
UDP_PORT = 20001
REMOTE_DEVICE_FILENAME = "/dev/input/event0"


def send_command(udp_socket, key, duration, delay):
    print(key, duration)
    time.sleep(delay)
    udp_socket.sendto(f"{key}@{duration}".encode(), (UDP_IP, UDP_PORT))
    print("command sent")
    print("thread ended")


def listen(udp_socket, device):
    last_timestamp = 0
    last_key_code = None
    is_long_push = False
    for event in device.read_loop():
        if (
            event.type == evdev.ecodes.EV_KEY
            and event.code == 2
            and event.value == 1
        ):
            if event.code == last_key_code:
                delay = event.timestamp() - last_timestamp
                if 0.05 <= delay < 0.55:
                    print("warte auf langen Tastendruck")
                    is_long_push = False
                elif delay < 0.05 and not is_long_push:
                    print("langer Tastendruck")
                    print("cancel Thread")
                    thread.join()
                    is_long_push = True
                    send_command(
                        udp_socket, "KEY_1", "long", 0
                    )
                else:
                    print("kurzer Tastendruck")
                    thread = threading.Thread(
                        target=send_command,
                        args=(udp_socket, "KEY_1", "short", 0.7),
                        daemon=True,
                    )
                    thread.start()
                    is_long_push = False
            last_key_code = event.code
            last_timestamp = event.timestamp()


def main():
    udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
    device = evdev.InputDevice(REMOTE_DEVICE_FILENAME)
    print(device)
    listen(udp_socket, device)


if __name__ == "__main__":
    main()
“Vir, intelligence has nothing to do with politics!” — Londo Mollari
unique24
User
Beiträge: 69
Registriert: Donnerstag 5. Juli 2018, 14:51

Hallo,

oh danke für die ausführliche korrektur.

Ich benötige am Ende die prüfung auf >0.56, da sonst der kurze Tastendruck ausgeführt wird, wenn der letzte unter 0.05s kommt und noch nicht ein is_long_push.

Das wäre die Ausgabe, wenn ich auf der Fernbedienung die Taste 1 drücke und halte:
15.256728887557983s from 2
0.5280160903930664s from 2
0.04002690315246582s from 2
0.03999805450439453s from 2
0.0400090217590332s from 2
0.03999900817871094s from 2

Ich drückte nach 15s nochmal die Taste 1 und halte diese:
nach 0.528s kommt nochmal die selbe Taste
danach alle 0.04s

Der Code sollte wie folgt arbeite:
1: nach "Ruhe" kommt bei 15.256 taste 1 => Short Pressed soll nach 0.7s gesendet werden
2: nach 0.528 kommt nochmal die selbe Taste => Waren auf langen Tastendruck (könnte ja die Taste schnell 2x hintereinander drücken)
3: nach 0.04 kommt wieder die selbe Taste => eindeutig lang gedrückt => Abbruch von short Pressed und long pressed senden
4: warte mind. 0.55s bis die selbe Taste wieder frei gegeben ist und ein kurzer Tastendruck detektiert wird

Das würde damit nun klappen:

Code: Alles auswählen

#!/usr/bin/env python3
import socket
import threading
import time

import evdev

UDP_IP = "192.168.0.255"
UDP_PORT = 20001
REMOTE_DEVICE_FILENAME = "/dev/input/event0"


def send_command(udp_socket, key, duration, delay):
#    print(key, duration)
    time.sleep(delay)
    udp_socket.sendto(f"{key}@{duration}".encode(), (UDP_IP, UDP_PORT))
    print(f"sendet command: {key} with duration {duration}")

def listen(udp_socket, device):
    last_timestamp = 0
    last_key_code = None
    is_long_push = False
    for event in device.read_loop():
        if (
            event.type == evdev.ecodes.EV_KEY
            and event.code == 2
            and event.value == 1
        ):
            if event.code == last_key_code:
                delay = event.timestamp() - last_timestamp
                if 0.05 <= delay < 0.55:
                    print("warte auf langen Tastendruck")
                    is_long_push = False
                elif delay < 0.05 and not is_long_push:
                    print("langer Tastendruck")
                    print("cancel Thread")
                    thread.join()
                    is_long_push = True
                    send_command(
                        udp_socket, "KEY_1", "long", 0
                    )
                elif 0.55 <= delay:
                    print("kurzer Tastendruck")
                    thread = threading.Thread(
                        target=send_command,
                        args=(udp_socket, "KEY_1", "short", 0.7),
                        daemon=True,
                    )
                    thread.start()
                    is_long_push = False
            last_key_code = event.code
            last_timestamp = event.timestamp()


def main():
    udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
    device = evdev.InputDevice(REMOTE_DEVICE_FILENAME)
    print(device)
    listen(udp_socket, device)


if __name__ == "__main__":
    main()
aber:

Nach wie vor wird der kurze Tastendruck bei Zeile 46 nicht abgebrochen.

Könntest du mir da bitte nocheinmal helfen?

Danke!
unique24
User
Beiträge: 69
Registriert: Donnerstag 5. Juli 2018, 14:51

edit:

wenn ich eine Variable global setzen würde, klappt das bei mir:

Code: Alles auswählen

#!/usr/bin/env python3
import socket
import threading
import time

import evdev

UDP_IP = "192.168.0.255"
UDP_PORT = 20001
REMOTE_DEVICE_FILENAME = "/dev/input/event0"
IS_SKIP_SHORT_PRESS = False


def send_command(udp_socket, key, duration, delay):
#    print(key, duration)
    global IS_SKIP_SHORT_PRESS
    time.sleep(delay)
    if not IS_SKIP_SHORT_PRESS:
        udp_socket.sendto(f"{key}@{duration}".encode(), (UDP_IP, UDP_PORT))
        print(f"sendet command: {key} with duration {duration}")
    IS_SKIP_SHORT_PRESS = False

def listen(udp_socket, device):
    global IS_SKIP_SHORT_PRESS
    last_timestamp = 0
    last_key_code = None
    is_long_push = False
    for event in device.read_loop():
        if (
            event.type == evdev.ecodes.EV_KEY
            and event.code == 2
            and event.value == 1
        ):
            if event.code == last_key_code:
                delay = event.timestamp() - last_timestamp
                if 0.05 <= delay < 0.55:
                    print("warte auf langen Tastendruck")
                    is_long_push = False
                elif delay < 0.05 and not is_long_push:
                    print("langer Tastendruck")
                    # thread.join()
                    is_long_push = True
                    send_command(
                        udp_socket, "KEY_1", "long", 0
                    )
                    IS_SKIP_SHORT_PRESS = True
                elif 0.55 <= delay:
                    print("kurzer Tastendruck")
                    thread = threading.Thread(
                        target=send_command,
                        args=(udp_socket, "KEY_1", "short", 0.7),
                        daemon=True,
                    )
                    thread.start()
                    is_long_push = False
            last_key_code = event.code
            last_timestamp = event.timestamp()


def main():
    udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
    device = evdev.InputDevice(REMOTE_DEVICE_FILENAME)
    print(device)
    listen(udp_socket, device)


if __name__ == "__main__":
    main()
Benutzeravatar
__blackjack__
User
Beiträge: 14052
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@unique24: Iiih, ``global`` *und* Threads — die Kombination für die besten Kopfschmerzen. 😱

Das ist IMHO auch alles ziemlich unübersichtlich und nicht leicht verständlich ausgedrückt. Da `evdev` anscheinend kein lesen von Ereignissen mit Zeitüberschreitung anbietet, würde ich *einen* langlaufenden Thread und eine Queue *dafür* einsetzen. Denn von einer Queue kann man Elemente mit Zeitüberschreitung abfragen.
“Vir, intelligence has nothing to do with politics!” — Londo Mollari
Antworten