Ich hab versucht, hier gelerntes anzuwenden und das Programm umgeschrieben, damit ich es leichter für verschieden Anwendungen anpassen kann.
Erstaunlicherweise Funktioniert läuft es auch.
UDP Nachrichten hab ic auf ein Minimum reduziert und das Wörterbuch wird von einer Schleife erstellt.
Somit brauche ich nur die gpio^s definieren und die Eingangs Kommandos werden automatisch generiert.
Habe ich das so gut gelöst?
Ich konnte den Logger nicht in eine Funktion verschieben, obwohl ich viel probiert habe.
Return in einer Funktion hat nicht funktioniert.
Wie macht man das?
Warum wird das entweder als Kommentar erkannt oder als string?
Wie kommentiere ich den Code am besten?
Gruss
Code: Alles auswählen
#!/usr/bin/env python3
"""Diese Script Sendet und empfängt UDP Nachrichten,
um Ausgänge zu schalten und um den Status von Eingängen, Ausgängen sowie von 1Wire Sensoren auszugeben.
Verwendbar für Raspberrypi 3 und 4, orangepi zero """
from functools import partial
import socket
import threading
import OPi.GPIO as GPIO #OrangePi
#from RPi import GPIO #RaspberryPi
import logging
from logging.handlers import TimedRotatingFileHandler
from time import sleep
# """ UDP einstellungen """
BUFFER_SIZE = 8192
SERVER = ("", 8006)
TARGET_HOST = ("10.10.10.4", 8005)
#gpio einstellungen
PIN_AUS = [11, 13, 15, 19]
INITIAL_AUS = [GPIO.HIGH, GPIO.HIGH, GPIO.HIGH, GPIO.HIGH]
PIN_EIN = [16, 18]
INITIAL_EIN = [GPIO.PUD_DOWN, GPIO.PUD_DOWN]
#1wire einstellungen
ONEWIRE_VORHANDEN = True
W1_MASTER_SLAVES = '/sys/devices/w1_bus_master1/w1_master_slaves'
DEVICE_PATH = '/sys/bus/w1/devices/{}/w1_slave'
#logger
logger = logging.getLogger("Rotating Log")
logger.setLevel(logging.DEBUG)
handler = TimedRotatingFileHandler("in_out.log", when='D', interval=1, backupCount=7, encoding=None,
delay=False, utc=False, atTime=None)
fileformatter = logging.Formatter('%(asctime)s : %(levelname)s : %(name)s : %(message)s')
handler.setFormatter(fileformatter)
logger.addHandler(handler)
logger.debug("logger started")
# Wörterbuch erstellen
def dictionary_function(udp_socket):
command_to_func = {b"tor_auf_zu": partial(tor_auf_zu, udp_socket),
b"reply_loxone": partial(reply_loxone, udp_socket)}
for aus_nummer, aus_pin, initial_wert in zip(range(len(PIN_AUS)), PIN_AUS, INITIAL_AUS):
aus_nummer += 1
if initial_wert == True:
initial_wert_ein = False
command_to_func[str("ausgang_{}_ein".format(aus_nummer)).encode()] = partial(ausgang_setzen, udp_socket,
aus_pin, initial_wert_ein)
else:
initial_wert_ein = True
command_to_func[str("ausgang_{}_ein".format(aus_nummer)).encode()] = partial(ausgang_setzen, udp_socket,
aus_pin, initial_wert_ein)
if initial_wert == True:
initial_wert_aus = True
command_to_func[str("ausgang_{}_aus".format(aus_nummer)).encode()] = partial(ausgang_setzen, udp_socket,
aus_pin, initial_wert_aus)
else:
initial_wert_aus = False
command_to_func[str("ausgang_{}_aus".format(aus_nummer)).encode()] = partial(ausgang_setzen, udp_socket,
aus_pin, initial_wert_aus)
return command_to_func
# gpio definieren
def set_gpio():
try:
GPIO.cleanup()#falls die gpios schon aktive sind
GPIO.setmode(GPIO.BOARD)
for pin,initial in zip(PIN_AUS,INITIAL_AUS):
GPIO.setup(pin, GPIO.OUT, initial=initial)
for pin, initial in zip(PIN_EIN, INITIAL_EIN):
GPIO.setup(pin, GPIO.IN, pull_up_down=initial)
except OSError:
logger.exception("gpio set error")
def read_pins(pins):
result = 0
for pin in pins:
result = result * 2 + (not GPIO.input(pin))
return result
def onewire_devices():
""" 1wire geräte auslesen """
with open(W1_MASTER_SLAVES) as lines:
one_wire = [l.strip() for l in lines]
logger.debug("1Wire slaves {}".format(one_wire))
return one_wire
def read_temperature(slave):
with open(DEVICE_PATH.format(slave)) as lines:
crc_ok = next(lines).split()[-1] == 'YES'
if not crc_ok:
raise OSError("crc not ok")
logger.exception("crc not ok")
temperature = next(lines).split()[-1]
if not temperature.startswith('t='):
raise OSError("format currupt")
logger.exception("format currupt")
return temperature[2:]
def onewire(udp_socket, w1_slaves):
""" 1wire sensoren einzeln auslesen """
for slave in w1_slaves:
try:
temperature = read_temperature(slave)
text = f"sensor {slave},{temperature}"
udp_socket.sendto(text.encode(), TARGET_HOST)
except OSError:
logger.exception("OSError")
def reply_loxone(udp_socket, command):
udp_socket.sendto(command, TARGET_HOST)
def ausgang_setzen(udp_socket, pin, zustand, command):
try:
GPIO.output(pin, zustand)
logging.debug(" Asugang {} = {} ".format(pin, zustand))
except Exception:
logger.exception("gpio error")
#tor öffner wird "gedrückt"
def tor_auf_zu(udp_socket, command):
try:
GPIO.output(37, False)
sleep(0.5)
GPIO.output(37, True)
except Exception:
logger.exception("gpio tor_auf_zu error")
#arbeits loop
def loop_einaus_abfragen(udp_socket,w1_slaves):
try:
komparator_aus = 0
komparator_ein = 0
while True:
for counter in range(0, 150):
zustand_aus = read_pins(PIN_AUS)
zustand_ein = read_pins(PIN_EIN)
if zustand_aus != komparator_aus:
udp_socket.sendto(b"zustand_aus,%d" % zustand_aus, TARGET_HOST)
komparator_aus = zustand_aus
logger.debug(" zustand_aus {} ungleich".format(zustand_aus))
if zustand_ein != komparator_ein:
udp_socket.sendto(b"zustand_ein,%d" % zustand_ein, TARGET_HOST)
komparator_ein = zustand_ein
logger.debug(" zustand_ein {} ungleich".format(zustand_ein))
if counter == 1:
if ONEWIRE_VORHANDEN:
onewire(udp_socket, w1_slaves)
logger.debug("1Wire device {}".format(onewire))
#1wire sensoren senden
elif counter == 50:
logger.debug("schleife bei {}".format(counter))
#platzhalter für andere funktionen
elif counter in [0,30,60,120]:
udp_socket.sendto(b"zustand_aus,%d" % zustand_aus, TARGET_HOST)
udp_socket.sendto(b"zustand_ein,%d" % zustand_ein, TARGET_HOST)
logger.debug("zustand_ein {} ".format(zustand_aus))
logger.debug("zustand_aus {} ".format(zustand_aus))
sleep(0.5)
except Exception:
logger.exception("arbeits loop error")
def main():
set_gpio()
w1_slaves = onewire_devices()
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udp_socket.bind(SERVER)
dictionary_function(udp_socket)
command_to_function =dictionary_function(udp_socket)
logger.debug("dictionary created : {} ".format(command_to_function))
#
try:
loop_einaus_thread = threading.Thread(
target=loop_einaus_abfragen,
args=(udp_socket,w1_slaves),
daemon=True,
)
loop_einaus_thread.start()
logger.debug("UDPServer Waiting for client on port : {} ".format(SERVER))
while True:
command, _addr = udp_socket.recvfrom(BUFFER_SIZE)
logger.debug("UDPServer command : {} ".format(command))
try:
function = command_to_function[command]
except KeyError:
udp_socket.sendto(f"Unknown command {command!a}".encode("ascii"), TARGET_HOST)
logger.exception("KeyError")
else:
function(command)
logger.debug(" function {} ".format(function))
finally:
GPIO.cleanup()
if __name__ == "__main__":
main()