Hallo nochmal,
der Großteil von meinen Programm funktioniert, ich aber aber noch Probleme zu lösen.
Ich verstehe nicht ganz warum das senden von UDP funktioniert, bzw ich sehe nicht wo der Port "8021" aktiviert wird, aber an diesen Port kommen die Nachrichten an.
TARGET_HOST wird nicht verwendet oder?
wie wird im "min" aus "addr" , "adress" in den "options"
Code: Alles auswählen
data, addr = udp_socket.recvfrom(BUFFER_SIZE)
def amplifier_off(udp_socket, address):
AMPLIFIER.off()
udp_socket.sendto(b"amplifier_off", address)
in data wird die UDP info geschrieben aber was in addr? bis das teil der Deklaration von udp_socket.recvfrom ?
____________________________________________________
Ich hab 2 Eingänge die auch eine UDP nachricht senden sollen, das hab ich aber noch nicht hinbekommen.
Der socket wird im main definiert und ist deswegen in dieser Funktion nicht bekannt.
Wie macht man das richtig? kann ich den selben port verwenden?
Den Socket wieder global deklarieren wollte ich nicht.
Code: Alles auswählen
#eingänge
def es_klingelt(udp_socket, address):
print("es_klingelt!")
udp_socket.sendto(b"es_klingelt", address)
def ein_alarm(udp_socket, address):
print("ein_alarm!")
udp_socket.sendto(b"ein_alarm", address)
KLINGEL= Button(18,pull_up=False, bounce_time=None)
ALARM = Button(23,pull_up=False, bounce_time=None)
KLINGEL.when_pressed = es_klingelt
ALARM.when_pressed = ein_alarm
__________________________________________________________
zu guter letzt möchte ich noch Threads einfügen
vermutlich soll ich sie im main aufrufen oder?
Code: Alles auswählen
def func_1():
t2 = threading.Thread(target=func2)
t2.start()
t3 = threading.Thread(target=func3)
t3.start()
print("func1 aufgerufen")
def func_2():
while True:
try:
print( "func2 schleife läuft")
udp_socket.sendto(b"func2 schleife läuft", address)
sleep(4)
except KeyError:
print( "KeyError")
udp_socket.sendto(b"KeyError", address)
def func_3():
time.sleep(6)
print("func3 läuft auch ")
daraus möchte ich auch nachrichten senden
hier nochmal der Ganze code:
Code: Alles auswählen
#!/usr/bin/env python3
import socket
import os
import datetime
from time import sleep
from gpiozero import LED
from gpiozero import Button
BUFFER_SIZE = 8192
SERVER = ("", 8020)
TARGET_HOST = ("10.10.11.2", 8021)
udp_socket1 = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
#Ausgänge definieren
TASTERLED = LED(27,initial_value=False, active_high=True)
IRLED = LED(17,initial_value=False, active_high=True)
STATUSLED = LED(22,initial_value=False, active_high=True)
AMPLIFIER = LED(5,initial_value=False, active_high=False)
RESERVE = LED(6,initial_value=True, active_high=False)
#eingänge
def es_klingelt():
print("es_klingelt!")
udp_socket1.sendto(b"es_klingelt", address)
def ein_alarm():
print("ein_alarm!")
udp_socket1.sendto(b"ein_alarm", address)
KLINGEL= Button(18,pull_up=False, bounce_time=0.1)
ALARM = Button(23,pull_up=False, bounce_time=None)
KLINGEL.when_pressed = es_klingelt
ALARM.when_pressed = ein_alarm
#funktionen deklarieren
def reply_loxone(udp_socket, address):
udp_socket.sendto(b"reply_loxone", address)
print("reply_loxone")
#heute_jetzt = datetime.datetime.now()
#f = open("reply_loxone.txt", "w")
#f.write(str(heute_jetzt) +"\n" )
#f.close()
def try_restart(udp_socket, address):
udp_socket.sendto(b"try_restart", address)
print("try_restart")
heute_jetzt = datetime.datetime.now()
f = open("try_restart.txt", "a")
f.write("try_restart" + str(heute_jetzt) +"\n" )
f.close()
os.system('sudo service door restart')
def try_reboot(udp_socket, address):
udp_socket.sendto(b"try_reboot", address)
print("try_reboot")
heute_jetzt = datetime.datetime.now()
f = open("rebootlog.txt", "a")
f.write("try_reboot" + str(heute_jetzt) +"\n" )
f.close()
os.system('sudo shutdown -r now')
#ausgänge
def taster_led_on(udp_socket, address):
TASTERLED.on()
udp_socket.sendto(b"taster_led_on", address)
print( "taster_led_on on")
def taster_led_off(udp_socket, address):
TASTERLED.off()
udp_socket.sendto(b"taster_led_off", address)
print( "taster_led_off" )
def taster_led_blink(udp_socket, address):
TASTERLED.blink(on_time=0.5, off_time=0.5, n=None, background=True)
udp_socket.sendto(b"taster_led_blink", address)
print("taster_led_blink")
def ir_led_on(udp_socket, address):
IRLED.on()
udp_socket.sendto(b"ir_led_on", address)
print("ir_led_on")
def ir_led_off(udp_socket, address):
IRLED.off()
udp_socket.sendto(b"ir_led_off", address)
print("ir_led_off")
def status_led_on(udp_socket, address):
STATUSLED.on()
udp_socket.sendto(b"status_led_on", address)
print("status_led_on")
def status_led_off(udp_socket, address):
STATUSLED.off()
udp_socket.sendto(b"status_led_off", address)
print("status_led_off")
def status_led_blink(udp_socket, address):
STATUSLED.blink(on_time=0.5, off_time=0.5, n=None, background=True)
udp_socket.sendto(b"status_led_blink", address)
print("status_led_blink")
def amplifier_on(udp_socket, address):
AMPLIFIER.on()
udp_socket.sendto(b"amplifier_on", address)
print("amplifier_on")
def amplifier_off(udp_socket, address):
AMPLIFIER.off()
udp_socket.sendto(b"amplifier_off", address)
print("amplifier_off")
def reserve_on(udp_socket, address):
RESERVE.on()
udp_socket.sendto(b"reserve_on", address)
print("reserve_on")
def reserve_off(udp_socket, address):
RESERVE.off()
udp_socket.sendto(b"reserve_off", address)
print("reserve_off")
options = {
b"reply_loxone": reply_loxone,
b"try_restart": try_restart,
b"try_reboot": try_reboot,
b"taster_led_on": taster_led_on,
b"taster_led_off": taster_led_off,
b"taster_led_blink": taster_led_blink,
b"ir_led_on": ir_led_on,
b"ir_led_off": ir_led_off,
b"status_led_on": status_led_on,
b"status_led_off": status_led_off,
b"status_led_blink": status_led_blink,
b"amplifier_on": amplifier_on,
b"amplifier_off": amplifier_off,
b"reserve_on": reserve_on,
b"reserve_off": reserve_off,
}
def func_1():
t2 = threading.Thread(target=func_2)
t2.start()
t3 = threading.Thread(target=func_3)
t3.start()
print("func1 aufgerufen")
def func_2():
while True:
try:
print( "func2 schleife läuft")
sleep(4)
except KeyError:
print( "KeyError")
udp_socket.sendto(b"KeyError", address)
def func_3():
time.sleep(6)
print("func3 läuft auch ")
def main():
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udp_socket.bind(SERVER)
func_1()
print("UDPServer Waiting for client on port", SERVER)
while True:
data, addr = udp_socket.recvfrom(BUFFER_SIZE)
print(data)
try:
options[data](udp_socket, addr)
except KeyError:
print("KeyError")
udp_socket.sendto(b"KeyError", addr)
if __name__ == '__main__':
main()
grüsse
Gerhard