gpio bei Programmstart schon High (OPi.GPIO threading)

Python auf Einplatinencomputer wie Raspberry Pi, Banana Pi / Python für Micro-Controller
Antworten
pumuckll
User
Beiträge: 15
Registriert: Donnerstag 30. August 2018, 17:45

Freitag 25. Oktober 2019, 16:30

Hallo,

Verzeiht mir dass ich wieder Hilfe suche, weil ich Code schreibe von dem ich zuwenig Ahnung habe.

Um Gpio´s von einem Orangepi Zero auszulessen nutze ich die Bibliothek OPi.GPIO, funktioniert soweit ganz gut.

Ich erfasse die steigende flanke , mit "Threaded Callbacks", zweier Eingänge, der erste wird von einem Bewegungsmelder Betätigt der zweite von einem Reedkontakt.

Mein Problem bei Programm start ist der Reedkontakt schon auf High und die steigende flanke wird nur erfasst wenn der zustand einmal "Low" ist.

Da das irgendwann passieren kann , habe ich bis dahin eine Falsche Ausgabe und ich finde einfach kein Workaround das Funktioniert.

Ich habs mit einem Thread probiert aber dann funktioniert mein main nicht richtig.

hier mal der Code gekürzt auf das wesentliche:

Code: Alles auswählen

#!/usr/bin/env python3
import socket
import os
import datetime
from time import sleep
import OPi.GPIO as GPIO
import threading

BUFFER_SIZE = 8192
SERVER = ("", 8008)

TARGET_HOST = ("10.10.10.4", 8007)


udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udp_socket.bind(SERVER)

GPIO.cleanup()
GPIO.setmode(GPIO.BOARD)
GPIO.setwarnings(False)

GPIO.setup(8, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
GPIO.setup(10, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)

def eingang_1(channel):

	while( GPIO.input(8) == 1):
		GPIO.output(23, True)
		sleep(2)
		udp_socket.sendto(b"eingang_1", TARGET_HOST)
	else:
		GPIO.output(23, False)

def eingang_2(channel):
	while( GPIO.input(10) == 1):
		udp_socket.sendto(b"eingang_2", TARGET_HOST)
		sleep(2)


GPIO.add_event_detect(8, GPIO.RISING)
GPIO.add_event_callback(8, callback=eingang_1, bouncetime=50)

GPIO.add_event_detect(10, GPIO.RISING)
GPIO.add_event_callback(10, callback=eingang_2, bouncetime=50)


def main():
	eingang2_startup = threading.Thread(target=eingang_2(10))
	eingang2_startup.start()
	print("UDPServer Waiting for client on port", SERVER)
	while True:
	
		data, addr = udp_socket.recvfrom(BUFFER_SIZE)
		print(data)
		try:
			options[data](udp_socket, TARGET_HOST)
		except KeyError:
			print("KeyError")
			print(data)
			udp_socket.sendto(b"KeyError",TARGET_HOST)


if __name__ == '__main__':
	main()

Ich weiss man sollte keine globalen Variablen verwenden, aber dann funktioniert das senden mit udp nicht richtig.

Mein Versuch war einen Thread zu starten der die Funktion eingang_2 einmal ausführt.

die Funktion wird richtig ausgeführt, mein Programm stoppt aber bei den Thread und ich komm nicht weiter.

Code: Alles auswählen

	print("UDPServer Waiting for client on port", SERVER)
wird erst angezeigt wenn der Thread beendet ist.


wie könnte ich das lösen, bzw was mache ich falsch?

grüsse Gerhard
Sirius3
User
Beiträge: 10900
Registriert: Sonntag 21. Oktober 2012, 17:20

Freitag 25. Oktober 2019, 17:00

Eingerückt wird immer mit 4 Leerzeichen pro Ebene.
Um die Bedingung der while-Schleife gehört keine Klammer. Ein else-Block nach einer while-Schleife ohne `break` ist unsinnig. while-Schleifen im Callback sind sowieso falsch, weil die Routine darin möglichst kurz sein soll.
Bei Threads muß target ein Funktionsobjekt sein und nicht der Rückgabewert einer Funktion.

GPIO.cleanup sollte am Ende eines Programms aufgerufen werden und nicht am Anfang. Warnings sind dazu da, dass man sie behebt und nicht ignoriert.

Wenn ich alles richtig verstanden habe, dann sind zwei Threads, für jeden Eingang einen, das einfachste:

Code: Alles auswählen

#!/usr/bin/env python3
import socket
from time import sleep
from OPi import GPIO
import threading

BUFFER_SIZE = 8192
SERVER = ("", 8008)
TARGET_HOST = ("10.10.10.4", 8007)

def initialize():
    udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    udp_socket.bind(SERVER)
    GPIO.setmode(GPIO.BOARD)
    GPIO.setup(8, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
    GPIO.setup(10, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
    return udp_socket

def loop_eingang1(udp_socket):
    edge = threading.Event()
    if GPIO.input(8):
        edge.set()
    GPIO.add_event_detect(8, GPIO.RISING, callback=lambda c:edge.set(), bouncetime=50)
    while True:
        edge.wait()
        edge.clear()
        GPIO.output(23, True)
        while GPIO.input(8):
            sleep(2)
            udp_socket.sendto(b"eingang_1", TARGET_HOST)
        GPIO.output(23, False)

def loop_eingang2(udp_socket):
    edge = threading.Event()
    if GPIO.input(10):
        edge.set()
    GPIO.add_event_detect(10, GPIO.RISING, callback=lambda c:edge.set(), bouncetime=50)
    while True:
        edge.wait()
        edge.clear()
        while GPIO.input(10):
            udp_socket.sendto(b"eingang_2", TARGET_HOST)
            sleep(2)


def main():
    try:
        udp_socket = initialize()
	    eingang1 = threading.Thread(target=loop_eingang1, args=(udp_socket,))
        eingang1.daemon = True
        eingang1.start()
	    eingang2 = threading.Thread(target=loop_eingang2, args=(udp_socket,))
        eingang2.daemon = True
        eingang2.start()
        print("UDPServer Waiting for client on port", SERVER)
        while True:
            data, addr = udp_socket.recvfrom(BUFFER_SIZE)
            print(data)
            try:
                options[data](udp_socket, TARGET_HOST)
            except KeyError:
                print("KeyError")
                print(data)
                udp_socket.sendto(b"KeyError",TARGET_HOST)
    finally:
        GPIO.cleanup()

if __name__ == '__main__':
	main()
Benutzeravatar
__blackjack__
User
Beiträge: 4681
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

Freitag 25. Oktober 2019, 17:20

@pumuckll: Ich weiss nicht warum ein Thread da irgendetwas am Verhalten der Hardware ändern sollte. Du startest den auf jeden Fall falsch, denn Du rufst ``eingang_2(10)`` ja *selbst* auf, im Hauptthread, und übergibst dann den *Rückgabewert* davon, also `None`, als `target` an `Thread`, was keinen Sinn macht. Du musst als `target` etwas Aufrufbares übergeben was dann in einem eigenen Thread läuft. Wenn das Argumente braucht, müssen die als `args` und/oder `kwargs` übergeben werden. ``daemon=True`` wäre als Argument auch empfehlenswert.

Was heisst die `main()` funktioniert nicht richtig? Da wird offensichtlich auf `options` zugegriffen was nirgends definiert ist und damit zu einem `NameError` führt. Das hat aber auch nichts mit der Hardware oder dem Thread zu tun. Du schreibst Du hast das Programm gekürzt — dann lass bitte auch das gekürzte Programm laufen und beschreibe *dessen* Probleme. Denn einfach das Programm zu kürzen und nicht zu prüfen ob das immer noch das gleiche Verhalten hat was Du beschreibst, lässt uns dann raten wie Dein Programm denn tatsächlich aussehen mag und ob Du nicht was wichtiges rausgekürzt hast was zum Problem beiträgt.

Hast Du denn mal nachgemessen ob der Pin tatsächlich High ist? Denn da kann dann alle Programmierung der Welt nichts dran ändern wenn da tatsächlich Strom anliegt.
“Give a man a fire and he's warm for a day, but set fire to him and he's warm for the rest of his life.”
— Terry Pratchett, Jingo
pumuckll
User
Beiträge: 15
Registriert: Donnerstag 30. August 2018, 17:45

Freitag 25. Oktober 2019, 17:58

Danke schon mal für eure antworten und den Code


Mein Haupt Problem ist das beim Programm start schon high anliegt und das Programm auf die steigende flanke wartet.

Ich müsste den zustand des gpio´s auslesen und weitergeben, so funktioniert mein Bisheriges Programm, das könnte ich bis zu fallenden flanke machen.

Der weggekürzte Teil hat damit nichts zutun, werde ich aber zufunkt alles Posten.

Ich schau mir das morgen in ruhe an und überarbeite das Programm
dann melde mich wieder

danke schon mal
pumuckll
User
Beiträge: 15
Registriert: Donnerstag 30. August 2018, 17:45

Samstag 26. Oktober 2019, 18:15

Nabend,

Der Code ist den Ganzen Tag fehlerlos gelaufen und macht was es soll.



vielen dank

Code: Alles auswählen

#!/usr/bin/env python3
import socket
from time import sleep
from OPi import GPIO
import threading


BUFFER_SIZE = 8192
SERVER = ("", 8008)
TARGET_HOST = ("10.10.10.4", 8007)



def initialize():
    udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    udp_socket.bind(SERVER)
    GPIO.setmode(GPIO.BOARD)
    GPIO.setup(13, GPIO.OUT)
    GPIO.setup(15, GPIO.OUT)
    GPIO.setup(19, GPIO.OUT)
    GPIO.setup(21, GPIO.OUT)
    GPIO.setup(23, GPIO.OUT)
    GPIO.setup(8, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
    GPIO.setup(10, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
    return udp_socket

def loop_eingang1(udp_socket):
    edge = threading.Event()
    edge = threading.Event()
    if GPIO.input(8):
        edge.set()
    GPIO.add_event_detect(8, GPIO.RISING, callback=lambda c:edge.set(), bouncetime=50)
    while True:
        edge.wait()
        edge.clear()
        GPIO.output(23, True)
        while GPIO.input(8):
            sleep(2)
            udp_socket.sendto(b"eingang_1", TARGET_HOST)
        GPIO.output(23, False)

def loop_eingang2(udp_socket):
    edge = threading.Event()
    if GPIO.input(10):
        edge.set()
    GPIO.add_event_detect(10, GPIO.RISING, callback=lambda c:edge.set(), bouncetime=50)
    while True:
        edge.wait()
        edge.clear()
        while GPIO.input(10):
            udp_socket.sendto(b"eingang_2", TARGET_HOST)
            sleep(2)

def reply_loxone(udp_socket, TARGET_HOST):
    udp_socket.sendto(b"reply_loxone", TARGET_HOST)
    print("reply_loxone")

def ausgang1_on(udp_socket, TARGET_HOST):
    GPIO.output(13, True)
    udp_socket.sendto(b"ausgang1 on", TARGET_HOST)
    print("ausgang1 on")

def ausgang1_off(udp_socket, TARGET_HOST):
    GPIO.output(13, False)
    udp_socket.sendto(b"ausgang1 off", TARGET_HOST)
    print("ausgang1 off")

def ausgang2_on(udp_socket, TARGET_HOST):
    GPIO.output(15, True)
    udp_socket.sendto(b"ausgang2 on", TARGET_HOST)
    print("ausgang2 on")

def ausgang2_off(udp_socket, TARGET_HOST):
    GPIO.output(15, False)
    udp_socket.sendto(b"ausgang2 off", TARGET_HOST)
    print("ausgang2 off")	

def ausgang3_on(udp_socket, TARGET_HOST):
    GPIO.output(19, True)
    udp_socket.sendto(b"ausgang3 on", TARGET_HOST)
    print("ausgang3 on")

def ausgang3_off(udp_socket, TARGET_HOST):
    GPIO.output(19, False)
    udp_socket.sendto(b"ausgang3 off", TARGET_HOST)
    print("ausgang3 off")

def ausgang4_on(udp_socket, TARGET_HOST):
    GPIO.output(21, True)
    udp_socket.sendto(b"ausgang4 on", TARGET_HOST)
    print("ausgang4 on")

def ausgang4_off(udp_socket, TARGET_HOST):
    GPIO.output(21, False)
    udp_socket.sendto(b"ausgang4 off", TARGET_HOST)
    print("ausgang4 off")

options = {
    b"reply_loxone": reply_loxone,
    b"ausgang1_on": ausgang1_on,
    b"ausgang1_off": ausgang1_off,
    b"ausgang2_on": ausgang2_on,
    b"ausgang2_off": ausgang2_off,
    b"ausgang3_on": ausgang3_on,
    b"ausgang3_off": ausgang3_off,
    b"ausgang4_on": ausgang4_on,
    b"ausgang4_off": ausgang4_off,
}

def main():
    try:
        udp_socket = initialize()
        eingang1 = threading.Thread(target=loop_eingang1, args=(udp_socket,))
        eingang1 = threading.Thread(target=loop_eingang1, args=(udp_socket,))
        eingang1.daemon = True
        eingang1.start()
        eingang2 = threading.Thread(target=loop_eingang2, args=(udp_socket,))
        eingang2.daemon = True
        eingang2.start()
        print("UDPServer Waiting for client on port", SERVER)
        while True:
            data, addr = udp_socket.recvfrom(BUFFER_SIZE)
            print(data)
            try:
                options[data](udp_socket, TARGET_HOST)
            except KeyError:
                print("KeyError")
                print(data)
                udp_socket.sendto(b"KeyError",TARGET_HOST)
    finally:
        GPIO.cleanup()

if __name__ == '__main__':
    main()

Sirius3
User
Beiträge: 10900
Registriert: Sonntag 21. Oktober 2012, 17:20

Samstag 26. Oktober 2019, 18:50

In loop_eingang1 ist eine Zeile doppelt. Die ganzen ausgangY_on/off lassen sich zu einer Funktion zusammenfassen. TARGET_HOST ist laut Schreibweise eine Konstante und muss nicht als Argument übergeben werden.
pumuckll
User
Beiträge: 15
Registriert: Donnerstag 30. August 2018, 17:45

Dienstag 29. Oktober 2019, 13:44

Hab eine weile gebraucht, aber ich hab einen weg, zur Zusammenfassung gefunden.

Code: Alles auswählen

#!/usr/bin/env python3
import socket
from time import sleep
import RPi.GPIO as GPIO
import threading


BUFFER_SIZE = 8192
SERVER = ("", 8008)
TARGET_HOST = ("10.10.10.4", 8007)


def initialize():
    udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    udp_socket.bind(SERVER)
    GPIO.setmode(GPIO.BOARD)
    GPIO.setup(29, GPIO.OUT, initial=1)
    GPIO.setup(31, GPIO.OUT, initial=1)
    GPIO.setup(33, GPIO.OUT, initial=1)
    GPIO.setup(35, GPIO.OUT, initial=1)
    GPIO.setup(37, GPIO.OUT, initial=1)
    GPIO.setup(16, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
    GPIO.setup(18, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
    return udp_socket


def initialize_werte(udp_socket, data):
    werte = {
    b"reply_loxone": [udp_socket, data, 0, 0],
    b"ausgang1_on": [udp_socket, data, 35, 1],
    b"ausgang1_off": [udp_socket, data, 35, 0],
    b"ausgang2_on": [udp_socket, data, 33, 1],
    b"ausgang2_off": [udp_socket, data, 33, 0],
    b"tor_auf_zu": [udp_socket, data, 0, 0],
    }
    return werte


def loop_eingang1(udp_socket):
    edge = threading.Event()
    if GPIO.input(16):
        edge.set()
    GPIO.add_event_detect(16, GPIO.RISING, callback=lambda c:edge.set(), bouncetime=50)
    while True:
        edge.wait()
        edge.clear()
        GPIO.output(29, True)
        while GPIO.input(16):
            sleep(2)
            udp_socket.sendto(b"eingang_1", TARGET_HOST)
        GPIO.output(29, False)

def loop_eingang2(udp_socket):
    edge = threading.Event()
    if GPIO.input(18):
        edge.set()
    GPIO.add_event_detect(18, GPIO.RISING, callback=lambda c:edge.set(), bouncetime=50)
    while True:
        edge.wait()
        edge.clear()
        while GPIO.input(18):
            udp_socket.sendto(b"eingang_2", TARGET_HOST)
            sleep(2)


def reply_loxone(udp_socket, data, pin, zustand):
    udp_socket.sendto(b"reply_loxone", TARGET_HOST)
    print("reply_loxone")


def ausgang_setzen(udp_socket, data, pin, zustand):
    GPIO.output(pin, zustand)
    udp_socket.sendto(data, TARGET_HOST)
    print("ausgang ",pin, zustand)


def tor_auf_zu(udp_socket, data, pin, zustand):
    GPIO.output(37, True)
    sleep(0.4)
    GPIO.output(37, False)
    udp_socket.sendto(b"tor_auf_zu", TARGET_HOST)
    print("toraufzu")





options = {
    b"reply_loxone": reply_loxone,
    b"ausgang1_on": ausgang_setzen,
    b"ausgang1_off": ausgang_setzen,
    b"ausgang2_on": ausgang_setzen,
    b"ausgang2_off": ausgang_setzen,
    b"tor_auf_zu": tor_auf_zu,
}

def main():
    try:
        udp_socket = initialize()
        data, addr = udp_socket.recvfrom(BUFFER_SIZE)
        werte = initialize_werte(udp_socket, data)
        eingang1 = threading.Thread(target=loop_eingang1, args=(udp_socket,))
        eingang1.daemon = True
        eingang1.start()
        eingang2 = threading.Thread(target=loop_eingang2, args=(udp_socket,))
        eingang2.daemon = True
        eingang2.start()
        print("UDPServer Waiting for client on port", SERVER)
        while True:
            data, addr = udp_socket.recvfrom(BUFFER_SIZE)
            werte = initialize_werte(udp_socket, data)
            print(data)
            try:
                werte[data]
                options[data](*werte[data])
            except KeyError:
                print("KeyError")
                print(data)
                udp_socket.sendto(b"KeyError",TARGET_HOST)
    finally:
        GPIO.cleanup()

if __name__ == '__main__':
    main()


Was ich noch nicht ganz verstehe ist wenn in :

Code: Alles auswählen

def initialize_werte(udp_socket, data):
die Zeile:

Code: Alles auswählen

    b"tor_auf_zu": [udp_socket, data, 0, 0],
    
fehlt, bekomme ich einen Keyerror, wenn "tor_auf_zu" gesendet wird.

PS: ich hab noch eine andere frage:
wie könnte ich von diesen Stream :

Code: Alles auswählen

rtsp://10.10.10.xx:xxxxx/user=xxxxxx&password=xxxxxxxx&channel=1&stream=0.sdp?
den ton aufzeichnen.

per befehl möchte ich die Aufzeichnung starten und stoppen
welche Bibliothek eignet sich am besten?
__deets__
User
Beiträge: 6861
Registriert: Mittwoch 14. Oktober 2015, 14:29

Dienstag 29. Oktober 2019, 15:50

Zu deiner ersten Frage: wenn du einen Schluessel in einem Woerterbuch nachschlaegst, den es darin nicht gibt, dann ist das das erwartete Ergebnis. Was sonst soll passieren?

Zur zweiten kann ich nur googeln.
Benutzeravatar
__blackjack__
User
Beiträge: 4681
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

Dienstag 29. Oktober 2019, 16:42

@pumuckll: Du verstehst nicht warum es zu einem `KeyError` kommt wenn man auf ein Wörterbuch auf einen Schlüssel zugreift den es nicht gibt?

Das erste empfangen eines UDP-Pakets vor der Schleife sieht sinnlos aus. Das `werte` was in der nächsten Zeile erzeugt wird, wird nie irgendwo verwendet.

`loop_eingang1()` und `loop_eingang2()` sind fast identisch. Ausser durch Pin-Nummer und Text der gesendet wird, was man einfach als Argumente heraus ziehen kann, unterscheiden die sich nur durch einen Ausgabepin und die Position des `sleep()` innerhalb der Schleife. Ist dieser Unterschied mit dem `sleep()` *tatsächlich* wichtig?

`initialize_werte()` ist komisch. Da sind sowohl ”statische” als auch ”dynamische” Werte in den Werten und die Schlüssel sind die gleichen wie bei `options`. Nur weil da dynamische Werte drin sind, muss diese Datenstruktur ständig neu aufgebaut werden. Das wäre besser gelöst wenn man schon in `options` die ”statischen” Werte per `functools.partial()` binden würde.

`tor_auf_zu()` hat ein `pin`-Argument das nicht benutzt wird, und eine hart kodierte Pin-Nummer.

Ungetestet:

Code: Alles auswählen

#!/usr/bin/env python3
from functools import partial
import socket
import threading
from time import sleep

from RPi import GPIO

BUFFER_SIZE = 8192
SERVER = ("", 8008)
TARGET_HOST = ("10.10.10.4", 8007)


def loop_eingang(udp_socket, in_pin, out_pin, command):
    edge = threading.Event()
    if GPIO.input(in_pin):
        edge.set()
    GPIO.add_event_detect(
        in_pin, GPIO.RISING, callback=lambda _: edge.set(), bouncetime=50
    )
    while True:
        edge.wait()
        edge.clear()

        if out_pin is not None:
            GPIO.output(out_pin, True)

        while GPIO.input(in_pin):
            sleep(2)
            udp_socket.sendto(command, TARGET_HOST)

        if out_pin is not None:
            GPIO.output(out_pin, False)


def reply_loxone(udp_socket, command):
    udp_socket.sendto(command, TARGET_HOST)
    print("reply_loxone")


def ausgang_setzen(udp_socket, pin, zustand, command):
    GPIO.output(pin, zustand)
    udp_socket.sendto(command, TARGET_HOST)
    print("ausgang ", pin, zustand)


def tor_auf_zu(udp_socket, pin, command):
    GPIO.output(pin, True)
    sleep(0.4)
    GPIO.output(pin, False)
    udp_socket.sendto(command, TARGET_HOST)
    print("toraufzu")


def main():
    udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    udp_socket.bind(SERVER)
    command_to_function = {
        b"reply_loxone": partial(reply_loxone, udp_socket),
        b"ausgang1_on": partial(ausgang_setzen, udp_socket, 35, True),
        b"ausgang1_off": partial(ausgang_setzen, udp_socket, 35, False),
        b"ausgang2_on": partial(ausgang_setzen, udp_socket, 33, True),
        b"ausgang2_off": partial(ausgang_setzen, udp_socket, 33, False),
        b"tor_auf_zu": partial(tor_auf_zu, udp_socket, 37),
    }
    try:
        GPIO.setmode(GPIO.BOARD)
        GPIO.setup([29, 31, 33, 35, 37], GPIO.OUT, initial=GPIO.HIGH)
        GPIO.setup([16, 18], GPIO.IN, pull_up_down=GPIO.PUD_DOWN)

        eingang_1_thread = threading.Thread(
            target=loop_eingang,
            args=(udp_socket, 16, 29, b"eingang_1"),
            daemon=True,
        )
        eingang_1_thread.start()

        eingang_2_thread = threading.Thread(
            target=loop_eingang,
            args=(udp_socket, 18, None, b"eingang_2"),
            daemon=True,
        )
        eingang_2_thread.start()

        print("UDPServer Waiting for client on port", SERVER)
        while True:
            command, _addr = udp_socket.recvfrom(BUFFER_SIZE)
            print(command)
            try:
                function = command_to_function[command]
            except KeyError:
                print("KeyError")
                udp_socket.sendto(
                    f"Unknown command {command!a}".encode("ascii"), TARGET_HOST
                )
            else:
                function(command)
    finally:
        GPIO.cleanup()


if __name__ == "__main__":
    main()
“Give a man a fire and he's warm for a day, but set fire to him and he's warm for the rest of his life.”
— Terry Pratchett, Jingo
pumuckll
User
Beiträge: 15
Registriert: Donnerstag 30. August 2018, 17:45

Sonntag 1. Dezember 2019, 11:22

Vielen danke euch beiden,
ich hatte endlich wieder Zeit um dieses Projekt weiterzuverfolgen und bis auf 2 kleine Probleme, läuft der Code prima.


hier hab ich einen Invaliden Syntax:

Code: Alles auswählen

udp_socket.sendto(f"Unknown command {command!a}".encode("ascii"), TARGET_HOST)
vor .encode ist die "^" Markierung

Ich komm nicht drauf was hier Fehlt.



Von Zeit zu zeit gab es ein Problem mit den GPIO´s, deswegen lasse ich das Programm einmal am Tag neustarten.
Dabei wird das Programm anscheinend nicht richtig geschlossen und beim neustart sind die gpio´s schon belegt.
deswegen hab ich am Anfang wieder :

Code: Alles auswählen

GPIO.cleanup()
eingefügt, damit nach dem Fehler Fall alles wieder läuft.


Außerdem möchte ich Led´s eine Bestimmte anzahl Blinken lassen, aber in einem Thread.

Eine Funktion zum Blinken und eine um einen Thread zu erstellen.
Ich bin mir nicht sicher ob ich das so richtig mache bzw was ich wo übergeben muss.

Code: Alles auswählen

def blink_led(udp_socket, pin, anzahl, command):
    udp_socket.sendto(command, TARGET_HOST)
    for x in range(anzahl):
	    GPIO.output(pin, True)
	    sleep(0.5)
	    GPIO.output(pin, False)
	
def blinken_thread(udp_socket, pin, anzahl, command):
    blinken_thread = threading.Thread(
        target=blink_led,
        args=(udp_socket, pin, anzahl, command),
        daemon=True,
    )
    blinken_thread.start()   
Im main wird mit "command" "b"tasterled_blink": partial(blinken_thread, udp_socket, 29, 10)" , der "blinken_thread" aktiviert

Code: Alles auswählen

def main():
    udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    udp_socket.bind(SERVER)
    command_to_function = {
        b"reply_loxone": partial(reply_loxone, udp_socket),
        b"tasterled_ein": partial(ausgang_setzen, udp_socket, 29, True),
        b"tasterled_aus": partial(ausgang_setzen, udp_socket, 29, False),
        b"tasterled_blink": partial(blinken_thread, udp_socket, 29, 10),
        b"verstaerker_ein": partial(ausgang_setzen, udp_socket, 31, False),
        b"verstaerker_aus": partial(ausgang_setzen, udp_socket, 31, True),
        b"nacht_ein": partial(ausgang_setzen, udp_socket, 33, False),
        b"nacht_aus": partial(ausgang_setzen, udp_socket, 33, True),
        b"onewireled_ein": partial(ausgang_setzen, udp_socket, 35, True),
        b"onewireled_aus": partial(ausgang_setzen, udp_socket, 35, False),
        b"onewireled_blink": partial(blinken_thread, udp_socket, 35, 11),
    }
    try:
        GPIO.setmode(GPIO.BOARD)
        GPIO.cleanup()
        GPIO.setup([29, 31, 33, 35, 37], GPIO.OUT, initial=GPIO.HIGH)
        GPIO.setup([16, 18], GPIO.IN, pull_up_down=GPIO.PUD_DOWN)


        print("UDPServer Waiting for client on port", SERVER)
        while True:
            command, _addr = udp_socket.recvfrom(BUFFER_SIZE)
            print(command)
            try:
                function = command_to_function[command]
            except KeyError:
                print("KeyError")
                udp_socket.sendto(command, TARGET_HOST)
#                udp_socket.sendto(f"Unknown command {command!a}".encode("ascii"), TARGET_HOST)
            else:
                function(command)
    finally:
        GPIO.cleanup()

bzw kann ich das auch gleich mit einer Funktion machen.

mit Gpiozero wäre es einfach, aber dann könnte ich den Code nur auf einem RPI laufenlassen, so läuft er auch auf anderen SBC´s.

Schönen ersten Advent
Benutzeravatar
Fire Spike
User
Beiträge: 194
Registriert: Montag 13. Mai 2019, 16:05
Wohnort: Erde

Sonntag 1. Dezember 2019, 20:42

poste mal deinen Fehler.
Mein Beispiel funktioniert.

Code: Alles auswählen

>>> command = "dj"
>>> f"Unknown command {command!a}".encode("ascii")
b"Unknown command 'dj'"
__deets__
User
Beiträge: 6861
Registriert: Mittwoch 14. Oktober 2015, 14:29

Sonntag 1. Dezember 2019, 21:24

Die format strings gehen erst seit Python 3.6.
Antworten