gpio bei Programmstart schon High (OPi.GPIO threading)
- __blackjack__
- User
- Beiträge: 13100
- Registriert: Samstag 2. Juni 2018, 10:21
- Wohnort: 127.0.0.1
- Kontaktdaten:
Ich hätte bei `read_pins()` Bitoperationen (``<<`` und ``|``) verwendet, damit klarer wird was da passiert. Aber das ist letztlich Geschmackssache.
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
Danke euch
Ich hab eine weile gebraucht, um die Dualzahl Rechnung zu verstehen, da bin ich etwas aus der Übung
Im Arbeits Thread hab ich eine Zähl schleife erstellt, um gewisse Funktionen nicht in jeden Zyklus auszuführen.
Außerdem werden noch 1Wire Sensoren ausgelesen, falls welche vorhanden sind.
Ist das so OK?
Gruss
Ich hab eine weile gebraucht, um die Dualzahl Rechnung zu verstehen, da bin ich etwas aus der Übung
nochlänger dafür:
__blackjack__ hat geschrieben: ↑Samstag 29. Februar 2020, 15:43 Ich hätte bei `read_pins()` Bitoperationen (``<<`` und ``|``) verwendet, damit klarer wird was da passiert. Aber das ist letztlich Geschmackssache.
Code: Alles auswählen
result = (result <<1) | (not GPIO.input(pin)) #Dualzahl als Dezimalzahl darstellen
Im Arbeits Thread hab ich eine Zähl schleife erstellt, um gewisse Funktionen nicht in jeden Zyklus auszuführen.
Außerdem werden noch 1Wire Sensoren ausgelesen, falls welche vorhanden sind.
Ist das so OK?
Code: Alles auswählen
#!/usr/bin/env python3
from functools import partial
import socket
import threading
from time import sleep
from RPi import GPIO
from random import randrange
BUFFER_SIZE = 8192
SERVER = ("", 8008)
TARGET_HOST = ("10.10.10.4", 8007)
PIN_AUS = [29, 31, 33, 35, 37]
PIN_EIN = [16, 18]
# gpio definieren
def set_gpio():
GPIO.cleanup()#falls dei gpios schon aktive sind
GPIO.setmode(GPIO.BOARD)
GPIO.setup(PIN_AUS, GPIO.OUT, initial=GPIO.HIGH)
GPIO.setup(PIN_EIN, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
def read_pins(pins):
result = 0
for pin in pins:
result = (result <<1) | (not GPIO.input(pin)) #Dualzahl als Dezimalzahl darstellen
# alternative result = result * 2 + (not GPIO.input(pin))
return result
def onewire_devices():#1wire geräte auslesen
file = open('/sys/devices/w1_bus_master1/w1_master_slaves')
w1_slaves = file.readlines()
file.close()
return w1_slaves
def onewire(udp_socket,w1_slaves):#1wire sensoren einzeln auslesen
for line in w1_slaves:
try:
file = open('/sys/bus/w1/devices/' + line.split("\n")[0] + '/w1_slave')
filecontent = file.read()
file.close()
stringvalue = ((filecontent.split("\n")[1].split(" ")[9])[2:]).encode()#formatieren und in bytes konvertieren
temperatur = ("sensor " + line.split("\n")[0] +",").encode() + stringvalue
print(temperatur)
udp_socket.sendto(temperatur, TARGET_HOST)
except OSError:
print("OSError")
def reply_loxone(udp_socket, command):# Rückmeldung ob das Programm noch läuft
udp_socket.sendto(command, TARGET_HOST)
print("reply_loxone")
def ausgang_setzen(udp_socket, pin, zustand, command):
#setzen der ausgänge
GPIO.output(pin, zustand)
udp_socket.sendto(command, TARGET_HOST)
print("ausgang ", pin, zustand)
def tor_auf_zu(udp_socket, command):
#tor öffner wird "gedrückt"
GPIO.output(37, False)
sleep(0.5)
GPIO.output(37, True)
udp_socket.sendto(command, TARGET_HOST)
print("toraufzu")
#arbeits loop
def loop_einaus_abfragen(udp_socket,w1_slaves):
while True:
for counter in range(0, 150):
zustand_aus = read_pins(PIN_AUS)
zustand_ein = read_pins(PIN_EIN)
udp_socket.sendto(b"zustand_aus,%d" % zustand_aus, TARGET_HOST)
udp_socket.sendto(b"zustand_ein,%d" % zustand_ein, TARGET_HOST)
if counter == 1:
onewire(udp_socket, w1_slaves)
# 1wire sensoren senden
elif counter == 50 or counter == 75:
print("schleife bei", counter)
# platzhalter für andere funktionen
elif counter in [25,45,90]:
print("schleife bei", counter)
# platzhalter für andere funktionen
sleep(0.5)
def main():
set_gpio()
w1_slaves = onewire_devices()
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udp_socket.bind(SERVER)
udp_socket.sendto(b"tets", TARGET_HOST)
command_to_function = {
b"reply_loxone": partial(reply_loxone, udp_socket),
b"tor_auf_zu": partial(tor_auf_zu, udp_socket),
b"ausgang1_ein": partial(ausgang_setzen, udp_socket, 35, False),
b"ausgang1_aus": partial(ausgang_setzen, udp_socket, 35, True),
b"ausgang2_ein": partial(ausgang_setzen, udp_socket, 33, False),
b"ausgang2_aus": partial(ausgang_setzen, udp_socket, 33, True),
b"ausgang3_ein": partial(ausgang_setzen, udp_socket, 31, False),
b"ausgang3_aus": partial(ausgang_setzen, udp_socket, 31, True),
b"ausgang4_ein": partial(ausgang_setzen, udp_socket, 29, False),
b"ausgang4_aus": partial(ausgang_setzen, udp_socket, 29, True),
}
try:
udp_socket.sendto(b"test", TARGET_HOST)
loop_einaus_thread = threading.Thread(
target=loop_einaus_abfragen,
args=(udp_socket,w1_slaves),
daemon=True,
)
loop_einaus_thread.start()
print("UDPServer Waiting for client on port", SERVER)
while True:
command, _addr = udp_socket.recvfrom(BUFFER_SIZE)
print(command)
try:
function = command_to_function[command]
except KeyError:
print("KeyError")
udp_socket.sendto(f"Unknown command {command!a}".encode("ascii"), TARGET_HOST)
else:
function(command)
finally:
GPIO.cleanup()
if __name__ == "__main__":
main()
Dateien öffnet man immer in einem with-Block. Konstante Strings sollten als Konstanten am Anfang der Datei stehen. Man stückelt keine Pfade per + zusammen. Um das Zeile-Ende-Zeichen zu entfernen gibt es rsplit. Den Dateiinhalt von w1_slave sollte man noch etwas genauer analysieren. Die Zeilen zu stringvalue und temperatur sind zu lang und unleserlich, vor allem, weil hier zu viel mit Strings verumgefummelt wird. Man berechnet nicht zwei mal das selbe, sondern merkt sich das Ergebnis. Die Umwandlung von Slave-Nummern sollte aber schon in onewire_devices stattfinden.
Code: Alles auswählen
W1_MASTER_SLAVES = '/sys/devices/w1_bus_master1/w1_master_slaves'
DEVICE_PATH = '/sys/bus/w1/devices/{}/w1_slave'
def onewire_devices():
""" 1wire geräte auslesen """
with open(W1_MASTER_SLAVES) as lines:
return [l.strip() for l in lines]
def read_temperature(slave):
with open(DEVICE_PATH.format(slave)) as lines:
crc_ok = next(lines).split()[-1] == 'YES'
if not crc_ok:
raise OSError("crc not ok")
temperature = next(lines).split()[-1]
if not temperature.startswith('t='):
raise OSError("format currupt")
return temperature[2:]
def onewire(udp_socket, w1_slaves):
""" 1wire sensoren einzeln auslesen """
for slave in w1_slaves:
try:
temperature = read_temperature(slave)
text = f"sensor {slave},{temperature}"
udp_socket.sendto(text.encode(), TARGET_HOST)
except OSError:
print("OSError")
Ich hab versucht, hier gelerntes anzuwenden und das Programm umgeschrieben, damit ich es leichter für verschieden Anwendungen anpassen kann.
Erstaunlicherweise Funktioniert läuft es auch.
UDP Nachrichten hab ic auf ein Minimum reduziert und das Wörterbuch wird von einer Schleife erstellt.
Somit brauche ich nur die gpio^s definieren und die Eingangs Kommandos werden automatisch generiert.
Habe ich das so gut gelöst?
Ich konnte den Logger nicht in eine Funktion verschieben, obwohl ich viel probiert habe.
Return in einer Funktion hat nicht funktioniert.
Wie macht man das?
Warum wird das entweder als Kommentar erkannt oder als string?
Wie kommentiere ich den Code am besten?
Gruss
Erstaunlicherweise Funktioniert läuft es auch.
UDP Nachrichten hab ic auf ein Minimum reduziert und das Wörterbuch wird von einer Schleife erstellt.
Somit brauche ich nur die gpio^s definieren und die Eingangs Kommandos werden automatisch generiert.
Habe ich das so gut gelöst?
Ich konnte den Logger nicht in eine Funktion verschieben, obwohl ich viel probiert habe.
Return in einer Funktion hat nicht funktioniert.
Wie macht man das?
Warum wird das entweder als Kommentar erkannt oder als string?
Code: Alles auswählen
""" 1wire sensoren einzeln auslesen """
Gruss
Code: Alles auswählen
#!/usr/bin/env python3
"""Diese Script Sendet und empfängt UDP Nachrichten,
um Ausgänge zu schalten und um den Status von Eingängen, Ausgängen sowie von 1Wire Sensoren auszugeben.
Verwendbar für Raspberrypi 3 und 4, orangepi zero """
from functools import partial
import socket
import threading
import OPi.GPIO as GPIO #OrangePi
#from RPi import GPIO #RaspberryPi
import logging
from logging.handlers import TimedRotatingFileHandler
from time import sleep
# """ UDP einstellungen """
BUFFER_SIZE = 8192
SERVER = ("", 8006)
TARGET_HOST = ("10.10.10.4", 8005)
#gpio einstellungen
PIN_AUS = [11, 13, 15, 19]
INITIAL_AUS = [GPIO.HIGH, GPIO.HIGH, GPIO.HIGH, GPIO.HIGH]
PIN_EIN = [16, 18]
INITIAL_EIN = [GPIO.PUD_DOWN, GPIO.PUD_DOWN]
#1wire einstellungen
ONEWIRE_VORHANDEN = True
W1_MASTER_SLAVES = '/sys/devices/w1_bus_master1/w1_master_slaves'
DEVICE_PATH = '/sys/bus/w1/devices/{}/w1_slave'
#logger
logger = logging.getLogger("Rotating Log")
logger.setLevel(logging.DEBUG)
handler = TimedRotatingFileHandler("in_out.log", when='D', interval=1, backupCount=7, encoding=None,
delay=False, utc=False, atTime=None)
fileformatter = logging.Formatter('%(asctime)s : %(levelname)s : %(name)s : %(message)s')
handler.setFormatter(fileformatter)
logger.addHandler(handler)
logger.debug("logger started")
# Wörterbuch erstellen
def dictionary_function(udp_socket):
command_to_func = {b"tor_auf_zu": partial(tor_auf_zu, udp_socket),
b"reply_loxone": partial(reply_loxone, udp_socket)}
for aus_nummer, aus_pin, initial_wert in zip(range(len(PIN_AUS)), PIN_AUS, INITIAL_AUS):
aus_nummer += 1
if initial_wert == True:
initial_wert_ein = False
command_to_func[str("ausgang_{}_ein".format(aus_nummer)).encode()] = partial(ausgang_setzen, udp_socket,
aus_pin, initial_wert_ein)
else:
initial_wert_ein = True
command_to_func[str("ausgang_{}_ein".format(aus_nummer)).encode()] = partial(ausgang_setzen, udp_socket,
aus_pin, initial_wert_ein)
if initial_wert == True:
initial_wert_aus = True
command_to_func[str("ausgang_{}_aus".format(aus_nummer)).encode()] = partial(ausgang_setzen, udp_socket,
aus_pin, initial_wert_aus)
else:
initial_wert_aus = False
command_to_func[str("ausgang_{}_aus".format(aus_nummer)).encode()] = partial(ausgang_setzen, udp_socket,
aus_pin, initial_wert_aus)
return command_to_func
# gpio definieren
def set_gpio():
try:
GPIO.cleanup()#falls die gpios schon aktive sind
GPIO.setmode(GPIO.BOARD)
for pin,initial in zip(PIN_AUS,INITIAL_AUS):
GPIO.setup(pin, GPIO.OUT, initial=initial)
for pin, initial in zip(PIN_EIN, INITIAL_EIN):
GPIO.setup(pin, GPIO.IN, pull_up_down=initial)
except OSError:
logger.exception("gpio set error")
def read_pins(pins):
result = 0
for pin in pins:
result = result * 2 + (not GPIO.input(pin))
return result
def onewire_devices():
""" 1wire geräte auslesen """
with open(W1_MASTER_SLAVES) as lines:
one_wire = [l.strip() for l in lines]
logger.debug("1Wire slaves {}".format(one_wire))
return one_wire
def read_temperature(slave):
with open(DEVICE_PATH.format(slave)) as lines:
crc_ok = next(lines).split()[-1] == 'YES'
if not crc_ok:
raise OSError("crc not ok")
logger.exception("crc not ok")
temperature = next(lines).split()[-1]
if not temperature.startswith('t='):
raise OSError("format currupt")
logger.exception("format currupt")
return temperature[2:]
def onewire(udp_socket, w1_slaves):
""" 1wire sensoren einzeln auslesen """
for slave in w1_slaves:
try:
temperature = read_temperature(slave)
text = f"sensor {slave},{temperature}"
udp_socket.sendto(text.encode(), TARGET_HOST)
except OSError:
logger.exception("OSError")
def reply_loxone(udp_socket, command):
udp_socket.sendto(command, TARGET_HOST)
def ausgang_setzen(udp_socket, pin, zustand, command):
try:
GPIO.output(pin, zustand)
logging.debug(" Asugang {} = {} ".format(pin, zustand))
except Exception:
logger.exception("gpio error")
#tor öffner wird "gedrückt"
def tor_auf_zu(udp_socket, command):
try:
GPIO.output(37, False)
sleep(0.5)
GPIO.output(37, True)
except Exception:
logger.exception("gpio tor_auf_zu error")
#arbeits loop
def loop_einaus_abfragen(udp_socket,w1_slaves):
try:
komparator_aus = 0
komparator_ein = 0
while True:
for counter in range(0, 150):
zustand_aus = read_pins(PIN_AUS)
zustand_ein = read_pins(PIN_EIN)
if zustand_aus != komparator_aus:
udp_socket.sendto(b"zustand_aus,%d" % zustand_aus, TARGET_HOST)
komparator_aus = zustand_aus
logger.debug(" zustand_aus {} ungleich".format(zustand_aus))
if zustand_ein != komparator_ein:
udp_socket.sendto(b"zustand_ein,%d" % zustand_ein, TARGET_HOST)
komparator_ein = zustand_ein
logger.debug(" zustand_ein {} ungleich".format(zustand_ein))
if counter == 1:
if ONEWIRE_VORHANDEN:
onewire(udp_socket, w1_slaves)
logger.debug("1Wire device {}".format(onewire))
#1wire sensoren senden
elif counter == 50:
logger.debug("schleife bei {}".format(counter))
#platzhalter für andere funktionen
elif counter in [0,30,60,120]:
udp_socket.sendto(b"zustand_aus,%d" % zustand_aus, TARGET_HOST)
udp_socket.sendto(b"zustand_ein,%d" % zustand_ein, TARGET_HOST)
logger.debug("zustand_ein {} ".format(zustand_aus))
logger.debug("zustand_aus {} ".format(zustand_aus))
sleep(0.5)
except Exception:
logger.exception("arbeits loop error")
def main():
set_gpio()
w1_slaves = onewire_devices()
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
udp_socket.bind(SERVER)
dictionary_function(udp_socket)
command_to_function =dictionary_function(udp_socket)
logger.debug("dictionary created : {} ".format(command_to_function))
#
try:
loop_einaus_thread = threading.Thread(
target=loop_einaus_abfragen,
args=(udp_socket,w1_slaves),
daemon=True,
)
loop_einaus_thread.start()
logger.debug("UDPServer Waiting for client on port : {} ".format(SERVER))
while True:
command, _addr = udp_socket.recvfrom(BUFFER_SIZE)
logger.debug("UDPServer command : {} ".format(command))
try:
function = command_to_function[command]
except KeyError:
udp_socket.sendto(f"Unknown command {command!a}".encode("ascii"), TARGET_HOST)
logger.exception("KeyError")
else:
function(command)
logger.debug(" function {} ".format(function))
finally:
GPIO.cleanup()
if __name__ == "__main__":
main()
Wenn der erste Ausdruck eines Modules, einer Klasse oder einer Funktion ein String ist, dann ist das ein sogenannter doc-String für die Dokumentation.
Kommentare # erklären, warum etwas in der nächsten Zeile gemacht wird.
Z.B. ›# Wörterbuch erstellen‹ erklärt nicht das warum, statt dessen sollte die Funktion besser soetwas wie ›create_command_dictionary‹ heißen.
Wenn man eine laufende Nummer braucht, nimmt man enumerate, das auch einen Startwert verträgt.
Einen String mit ‹str› nochmal in einen String umzuwandeln ist unsinnig. Explizit auf True zu prüfen ist unnötig und das was im else-Block steht ist fast identisch zum if-Block, und sollte daher zusammengefasst werden:
Kommentare # erklären, warum etwas in der nächsten Zeile gemacht wird.
Z.B. ›# Wörterbuch erstellen‹ erklärt nicht das warum, statt dessen sollte die Funktion besser soetwas wie ›create_command_dictionary‹ heißen.
Wenn man eine laufende Nummer braucht, nimmt man enumerate, das auch einen Startwert verträgt.
Einen String mit ‹str› nochmal in einen String umzuwandeln ist unsinnig. Explizit auf True zu prüfen ist unnötig und das was im else-Block steht ist fast identisch zum if-Block, und sollte daher zusammengefasst werden:
Code: Alles auswählen
def create_command_dictionary(udp_socket):
command_to_func = {
b"tor_auf_zu": partial(tor_auf_zu, udp_socket),
b"reply_loxone": partial(reply_loxone, udp_socket)
}
for aus_nummer, aus_pin, initial_wert in enumerate(zip(PIN_AUS, INITIAL_AUS), start=1):
command_to_func["ausgang_{}_ein".format(aus_nummer).encode()] = partial(
ausgang_setzen, udp_socket, aus_pin, not initial_wert)
command_to_func["ausgang_{}_aus".format(aus_nummer).encode()] = partial(
ausgang_setzen, udp_socket, aus_pin, initial_wert)
return command_to_func
- __blackjack__
- User
- Beiträge: 13100
- Registriert: Samstag 2. Juni 2018, 10:21
- Wohnort: 127.0.0.1
- Kontaktdaten:
Ergänzend zu den Docstrings: Neben denen die der Python-Compiler erkennt und entsprechend verarbeitet, erkennen einige Dokumentationswerkzeuge auch Zeichenketten an anderen Stellen als ”Docstrings”. Sphinx' `autodoc`-Erweiterung erkennt beispielsweise auch Zeichenketten als ”Docstrings” die auf Modulebene, Klassenebene, und in der `__init__()` nach Definitionen von Namen/Attributen stehen.
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
Danke für Eure Hilfe
hab das jetzt so gelöst:
not not habe ich verwedet das das Ergebnis "True/False" und nicht "0/1" ist.
Ich hätte noch weitere Fragen:
Ich hab aus einigen Konstaten dictionary´s ersetellt.
Sind das jetzt noch Konstanten?
Sollte ich jedes Dictionary in eine Funkion mit return stecken?
Kann man Funktion erstellen die mehrere Werte rückliefern?
google hilft mir da nicht wirklich
Mit dem Logging muss ich mich noch intensiver beschäftigen, hab das auf die schnelle so gelöst.
Das GPIO System Liefert Warnungen, wie kann ich diese in den log eintragen?
schönen Sonntag
hab das jetzt so gelöst:
Code: Alles auswählen
def create_command_dictionary(udp_socket):
command_to_func = {
b"tor_auf_zu": partial(tor_auf_zu, udp_socket),
b"reply_loxone": partial(reply_loxone, udp_socket)
}
for aus_nummer, aus_pin, initial_wert in zip(enumerate(PIN_AUS[DEVICE],start=1), PIN_AUS[DEVICE], INITIAL_AUS[DEVICE]):
command_to_func["ausgang_{}_ein".format(aus_nummer[0]).encode()] = partial(
ausgang_setzen, udp_socket, aus_pin, not initial_wert)
command_to_func["ausgang_{}_aus".format(aus_nummer[0]).encode()] = partial(
ausgang_setzen, udp_socket, aus_pin, not not initial_wert)
return command_to_func
Ich hätte noch weitere Fragen:
Ich hab aus einigen Konstaten dictionary´s ersetellt.
Sind das jetzt noch Konstanten?
Sollte ich jedes Dictionary in eine Funkion mit return stecken?
Kann man Funktion erstellen die mehrere Werte rückliefern?
google hilft mir da nicht wirklich
Code: Alles auswählen
DEVICE = "keller"
SERVER = {"keller" : ("", 8008) , "garage" : ("", 8008) , "door" : ("", 8010), "loxberry" : ("", 8012)}
TARGET_HOST = {"keller" : ("10.10.10.4", 8005) , "garage" : ("10.10.10.4", 8007) , "door" : ("10.10.10.4", 8009), "loxberry" : ("10.10.10.4", 8011)}
PIN_AUS = {"keller": [11, 13, 15, 19],
"garage": [29, 31, 33, 35, 37],
"door": [29, 33, 35],
"loxberry": [29, 31, 33, 35, 37]}
INITIAL_AUS = {"keller": [GPIO.HIGH, GPIO.HIGH, GPIO.HIGH, GPIO.HIGH],
"garage": [GPIO.HIGH, GPIO.HIGH, GPIO.HIGH, GPIO.HIGH, GPIO.HIGH],
"door": [GPIO.LOW, GPIO.LOW, GPIO.LOW],
"loxberry": [GPIO.HIGH, GPIO.HIGH, GPIO.HIGH, GPIO.HIGH, GPIO.HIGH]}
Mit dem Logging muss ich mich noch intensiver beschäftigen, hab das auf die schnelle so gelöst.
Das GPIO System Liefert Warnungen, wie kann ich diese in den log eintragen?
Code: Alles auswählen
UserWarning: Pull up/down setting are not (yet) fully supported, continuing anyway. Use GPIO.setwarnings(False) to disable warnings.
- __blackjack__
- User
- Beiträge: 13100
- Registriert: Samstag 2. Juni 2018, 10:21
- Wohnort: 127.0.0.1
- Kontaktdaten:
Das ``not not`` ist unsinnig. Ob 1/0 oder True/False ist ja auch egal. Falls es das nicht sein sollte dann sollte man in *beiden* Fällen `bool()` verwenden, also ``not bool(initialwert)`` und ``bool(initialwert)``.
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
Danke, habe es nur wegen der Optik geändert.
Ich hätte noch eine Frage zum Logging, bzw zu RPI GPIO:
Wenn der Code nicht Richtig beendet wird habe ich beim nächsten Start solche Meldungen:
Dann muss ich den Code nochmal neustarten und die Meldungen sind weg.
Kann ich für:
eine Exeption anlegen , damit ich in diesen Fall die GPIO^s neu initialisiere?
bzw könnte ich den Code automatisch neustarten?
Grüsse
Ich hätte noch eine Frage zum Logging, bzw zu RPI GPIO:
Wenn der Code nicht Richtig beendet wird habe ich beim nächsten Start solche Meldungen:
Code: Alles auswählen
in_out.py:92: UserWarning: Channel 11 is already in use, continuing anyway. Use GPIO.setwarnings(False) to disable warnings.
GPIO.setup(pin, GPIO.OUT, initial=initial)
in_out.py:92: UserWarning: Channel 13 is already in use, continuing anyway. Use GPIO.setwarnings(False) to disable warnings.
GPIO.setup(pin, GPIO.OUT, initial=initial)
in_out.py:92: UserWarning: Channel 15 is already in use, continuing anyway. Use GPIO.setwarnings(False) to disable warnings.
GPIO.setup(pin, GPIO.OUT, initial=initial)
in_out.py:92: UserWarning: Channel 19 is already in use, continuing anyway. Use GPIO.setwarnings(False) to disable warnings.
GPIO.setup(pin, GPIO.OUT, initial=initial)
in_out.py:94: UserWarning: Pull up/down setting are not (yet) fully supported, c ontinuing anyway. Use GPIO.setwarnings(False) to disable warnings.
GPIO.setup(pin, GPIO.IN, pull_up_down=initial)
in_out.py:94: UserWarning: Channel 16 is already in use, continuing anyway. Use GPIO.setwarnings(False) to disable warnings.
GPIO.setup(pin, GPIO.IN, pull_up_down=initial)
in_out.py:94: UserWarning: Channel 18 is already in use, continuing anyway. Use GPIO.setwarnings(False) to disable warnings.
GPIO.setup(pin, GPIO.IN, pull_up_down=initial)
Dann muss ich den Code nochmal neustarten und die Meldungen sind weg.
Kann ich für:
Code: Alles auswählen
UserWarning: Channel "KANAL" is already in use, continuing anyway.
bzw könnte ich den Code automatisch neustarten?
Grüsse
- __blackjack__
- User
- Beiträge: 13100
- Registriert: Samstag 2. Juni 2018, 10:21
- Wohnort: 127.0.0.1
- Kontaktdaten:
@pumuckll: Was heisst wenn der Code nicht richtig beendet wird? Man sollte halt sicherstellen das `GPIO.cleanup()` aufgerufen wird, egal aus welchem Grund das Programm beendet wird. Das einzige was man so nicht erwischt sind harte Abstürze bei denen der Kernel das Programm beendet und wenn jemand das Programm mit einem SIGKILL beendet. Beim einen sollte man dringend die Ursache finden und beheben, und das andere sollte man einfach nicht machen bzw. nur wenn ein SIGTERM nicht macht was man erwartet. Der ``try``-Block in Deinem Code hat beispielsweise eine Lücke zwischen konfigurieren der GPIOs und dem Punkt ab dem dann sichergestellt wird das `cleanup()` aufgerufen wird. Da `cleanup()` in jedem Fall aufgerufen werden kann, sollte der ``try``-Block einfach komplett die `main()`-Funktion abdecken.
`cleanup()` am Anfang aufzurufen macht keinen Sinn. Das räumt die Pins auf die in dem aktuellen Prozess bis dahin konfiguriert wurden. Also am Anfang gar keine.
`cleanup()` am Anfang aufzurufen macht keinen Sinn. Das räumt die Pins auf die in dem aktuellen Prozess bis dahin konfiguriert wurden. Also am Anfang gar keine.
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
$das Programm läuft als systemd dienst.
Wenn ich den dienst beende und manuell starte habe ich diesen Fehler.
Es wird nur eine Warnung ausgegeben , das Programm läuft weiter und Anscheinen Funktioniert es auch.
wenn ich das Service Start und Stoppe bekommen ich das nicht mit.
Ich hab das Cleanup am Anfang raus genommen, es ändert nichts also hat es auch nichts gebracht.
Deswegen würde ich diese Warnungen zumindest loggen.
Wenn ich den dienst beende und manuell starte habe ich diesen Fehler.
Code: Alles auswählen
root@keller:~# sudo service in_out start
root@keller:~# sudo service in_out stop
root@keller:~#
root@keller:~#
root@keller:~# python3 in_out.py
in_out.py:92: UserWarning: Channel 11 is already in use, continuing anyway. Use GPIO.setwarnings(False) to disable warnings.
GPIO.setup(pin, GPIO.OUT, initial=initial)
in_out.py:92: UserWarning: Channel 13 is already in use, continuing anyway. Use GPIO.setwarnings(False) to disable warnings.
GPIO.setup(pin, GPIO.OUT, initial=initial)
in_out.py:92: UserWarning: Channel 15 is already in use, continuing anyway. Use GPIO.setwarnings(False) to disable warnings.
GPIO.setup(pin, GPIO.OUT, initial=initial)
in_out.py:92: UserWarning: Channel 19 is already in use, continuing anyway. Use GPIO.setwarnings(False) to disable warnings.
GPIO.setup(pin, GPIO.OUT, initial=initial)
in_out.py:94: UserWarning: Pull up/down setting are not (yet) fully supported, continuing anyway. Use GPIO.setwarnings(False) to disable warnings.
GPIO.setup(pin, GPIO.IN, pull_up_down=initial)
in_out.py:94: UserWarning: Channel 16 is already in use, continuing anyway. Use GPIO.setwarnings(False) to disable warnings.
GPIO.setup(pin, GPIO.IN, pull_up_down=initial)
in_out.py:94: UserWarning: Channel 18 is already in use, continuing anyway. Use GPIO.setwarnings(False) to disable warnings.
GPIO.setup(pin, GPIO.IN, pull_up_down=initial)
wenn ich das Service Start und Stoppe bekommen ich das nicht mit.
Ich hab das Cleanup am Anfang raus genommen, es ändert nichts also hat es auch nichts gebracht.
Code: Alles auswählen
def set_gpio():
"""Gpio definieren
Ein und Ausgänge Einstellen
GPIO.cleanup()am Anfang, falls die Gpio^s schon aktive sind"""
try:
GPIO.setmode(GPIO.BOARD)
for pin, initial in zip(PIN_AUS[DEVICE], INITIAL_AUS[DEVICE]):
GPIO.setup(pin, GPIO.OUT, initial=initial)
for pin, initial in zip(PIN_EIN[DEVICE], INITIAL_EIN[DEVICE]):
GPIO.setup(pin, GPIO.IN, pull_up_down=initial)
except Exception:
logger.exception("gpio set error")
Deswegen würde ich diese Warnungen zumindest loggen.
- __blackjack__
- User
- Beiträge: 13100
- Registriert: Samstag 2. Juni 2018, 10:21
- Wohnort: 127.0.0.1
- Kontaktdaten:
Was macht denn das Stoppen? Bei Python-Programmen würde ich das mit einem SIGINT machen.
Die Ausnahmebehandlung in `set_gpio()` ist fehlerhaft denn bei jeglicher Ausnahme wird einfach so weitergemacht als hätte es keine Ausnahme gegeben.
Die Ausnahmebehandlung in `set_gpio()` ist fehlerhaft denn bei jeglicher Ausnahme wird einfach so weitergemacht als hätte es keine Ausnahme gegeben.
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
Das ist das ystemd script:
das beenden ist nicht definiert.
man kann es definieren:
auf die schnelle hab ich das gefunden:
https://www.freedesktop.org/software/sy ... .kill.html
Vermutlich sollte ich SIGTERM in meinen Code integrieren.
Werde mich damit und mit den exceptions am Wochenende beschäftigen.
Code: Alles auswählen
[Unit]
Description=in_out
After=syslog.target
[Service]
Type=simple
User=root
Group=root
WorkingDirectory=/root/
ExecStartPre=/bin/sleep 10
#ExecStart=/usr/bin/python3 /root/in_out.py
ExecStart=/usr/bin/python3 /home/pi/in_out.py
SyslogIdentifier=in_out
StandardOutput=syslog
StandardError=syslog
Restart=always
RestartSec=2
[Install]
WantedBy=multi-user.target
man kann es definieren:
Code: Alles auswählen
ExecStop=/usr/bin/flume-ng agent stop
auf die schnelle hab ich das gefunden:
Code: Alles auswählen
As described in systemd documentation, processes will receive SIGTERM and then, immediately, SIGHUP. After some time, SIGKILL will be send.
All signals, as well as strategy of sending them, can be changed in relevant service file.
See another stackoverflow question to see how handle these signals in your Python program.
Vermutlich sollte ich SIGTERM in meinen Code integrieren.
Werde mich damit und mit den exceptions am Wochenende beschäftigen.