gpio bei Programmstart schon High (OPi.GPIO threading)

Python auf Einplatinencomputer wie Raspberry Pi, Banana Pi / Python für Micro-Controller
Benutzeravatar
__blackjack__
User
Beiträge: 13100
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

Ich hätte bei `read_pins()` Bitoperationen (``<<`` und ``|``) verwendet, damit klarer wird was da passiert. Aber das ist letztlich Geschmackssache.
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
pumuckll
User
Beiträge: 56
Registriert: Donnerstag 30. August 2018, 17:45

Danke euch

Ich hab eine weile gebraucht, um die Dualzahl Rechnung zu verstehen, da bin ich etwas aus der Übung
Sirius3 hat geschrieben: Samstag 29. Februar 2020, 13:58 result = 0
for pin in pins:
result = result * 2 + (not GPIO.input(pin))
return result
nochlänger dafür:
__blackjack__ hat geschrieben: Samstag 29. Februar 2020, 15:43 Ich hätte bei `read_pins()` Bitoperationen (``<<`` und ``|``) verwendet, damit klarer wird was da passiert. Aber das ist letztlich Geschmackssache.

Code: Alles auswählen

result = (result <<1) | (not GPIO.input(pin)) #Dualzahl als Dezimalzahl darstellen

Im Arbeits Thread hab ich eine Zähl schleife erstellt, um gewisse Funktionen nicht in jeden Zyklus auszuführen.
Außerdem werden noch 1Wire Sensoren ausgelesen, falls welche vorhanden sind.


Ist das so OK?

Code: Alles auswählen

#!/usr/bin/env python3
from functools import partial
import socket
import threading
from time import sleep
from RPi import GPIO
from random import randrange

BUFFER_SIZE = 8192
SERVER = ("", 8008)
TARGET_HOST = ("10.10.10.4", 8007)

PIN_AUS = [29, 31, 33, 35, 37]
PIN_EIN = [16, 18]

# gpio definieren
def set_gpio():
    GPIO.cleanup()#falls dei gpios schon aktive sind
    GPIO.setmode(GPIO.BOARD)
    GPIO.setup(PIN_AUS, GPIO.OUT, initial=GPIO.HIGH)
    GPIO.setup(PIN_EIN, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)

def read_pins(pins):
    result = 0
    for pin in pins:
        result = (result <<1) | (not GPIO.input(pin)) #Dualzahl als Dezimalzahl darstellen
        # alternative       result = result * 2 + (not GPIO.input(pin))
    return result

def onewire_devices():#1wire geräte auslesen
    file = open('/sys/devices/w1_bus_master1/w1_master_slaves')
    w1_slaves = file.readlines()
    file.close()
    return w1_slaves

def onewire(udp_socket,w1_slaves):#1wire sensoren einzeln auslesen
    for line in w1_slaves:
        try:
            file = open('/sys/bus/w1/devices/' + line.split("\n")[0] + '/w1_slave')
            filecontent = file.read()
            file.close()
            stringvalue = ((filecontent.split("\n")[1].split(" ")[9])[2:]).encode()#formatieren und in bytes konvertieren
            temperatur = ("sensor " + line.split("\n")[0] +",").encode() + stringvalue
            print(temperatur)
            udp_socket.sendto(temperatur, TARGET_HOST)
        except OSError:
            print("OSError")

def reply_loxone(udp_socket, command):# Rückmeldung ob das Programm noch läuft
    udp_socket.sendto(command, TARGET_HOST)
    print("reply_loxone")

def ausgang_setzen(udp_socket, pin, zustand, command):
    #setzen der ausgänge
    GPIO.output(pin, zustand)
    udp_socket.sendto(command, TARGET_HOST)
    print("ausgang ", pin, zustand)

def tor_auf_zu(udp_socket, command):
    #tor öffner wird "gedrückt"
    GPIO.output(37, False)
    sleep(0.5)
    GPIO.output(37, True)
    udp_socket.sendto(command, TARGET_HOST)
    print("toraufzu")


#arbeits loop
def loop_einaus_abfragen(udp_socket,w1_slaves):
    while True:
        for counter in range(0, 150):
            zustand_aus = read_pins(PIN_AUS)
            zustand_ein = read_pins(PIN_EIN)
            udp_socket.sendto(b"zustand_aus,%d" % zustand_aus, TARGET_HOST)
            udp_socket.sendto(b"zustand_ein,%d" % zustand_ein, TARGET_HOST)
            if counter == 1:
                onewire(udp_socket, w1_slaves)
                # 1wire sensoren senden
            elif counter == 50 or counter == 75:
                print("schleife bei", counter)
                # platzhalter für andere funktionen
            elif counter in [25,45,90]:
                print("schleife bei", counter)
                # platzhalter für andere funktionen
            sleep(0.5)

def main():
    set_gpio()
    w1_slaves = onewire_devices()
    udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    udp_socket.bind(SERVER)
    udp_socket.sendto(b"tets", TARGET_HOST)
    command_to_function = {
        b"reply_loxone": partial(reply_loxone, udp_socket),
        b"tor_auf_zu": partial(tor_auf_zu, udp_socket),        
        b"ausgang1_ein": partial(ausgang_setzen, udp_socket, 35, False),
        b"ausgang1_aus": partial(ausgang_setzen, udp_socket, 35, True),
        b"ausgang2_ein": partial(ausgang_setzen, udp_socket, 33, False),
        b"ausgang2_aus": partial(ausgang_setzen, udp_socket, 33, True),
        b"ausgang3_ein": partial(ausgang_setzen, udp_socket, 31, False),
        b"ausgang3_aus": partial(ausgang_setzen, udp_socket, 31, True),
        b"ausgang4_ein": partial(ausgang_setzen, udp_socket, 29, False),
        b"ausgang4_aus": partial(ausgang_setzen, udp_socket, 29, True),
    }
    try:
        udp_socket.sendto(b"test", TARGET_HOST)	
        loop_einaus_thread = threading.Thread(
            target=loop_einaus_abfragen,
            args=(udp_socket,w1_slaves),
            daemon=True,
        )
        loop_einaus_thread.start()
        print("UDPServer Waiting for client on port", SERVER)
        while True:
            command, _addr = udp_socket.recvfrom(BUFFER_SIZE)
            print(command)
            try:
                function = command_to_function[command]
            except KeyError:
                print("KeyError")
                udp_socket.sendto(f"Unknown command {command!a}".encode("ascii"), TARGET_HOST)
            else:
                function(command)
    finally:
        GPIO.cleanup()


if __name__ == "__main__":
    main()
Gruss
Sirius3
User
Beiträge: 17747
Registriert: Sonntag 21. Oktober 2012, 17:20

Dateien öffnet man immer in einem with-Block. Konstante Strings sollten als Konstanten am Anfang der Datei stehen. Man stückelt keine Pfade per + zusammen. Um das Zeile-Ende-Zeichen zu entfernen gibt es rsplit. Den Dateiinhalt von w1_slave sollte man noch etwas genauer analysieren. Die Zeilen zu stringvalue und temperatur sind zu lang und unleserlich, vor allem, weil hier zu viel mit Strings verumgefummelt wird. Man berechnet nicht zwei mal das selbe, sondern merkt sich das Ergebnis. Die Umwandlung von Slave-Nummern sollte aber schon in onewire_devices stattfinden.

Code: Alles auswählen

W1_MASTER_SLAVES = '/sys/devices/w1_bus_master1/w1_master_slaves' 
DEVICE_PATH = '/sys/bus/w1/devices/{}/w1_slave'

def onewire_devices():
    """ 1wire geräte auslesen """
    with open(W1_MASTER_SLAVES) as lines:
        return [l.strip() for l in lines]

def read_temperature(slave):
    with open(DEVICE_PATH.format(slave)) as lines:
        crc_ok = next(lines).split()[-1] == 'YES'
        if not crc_ok:
            raise OSError("crc not ok")
        temperature = next(lines).split()[-1]
        if not temperature.startswith('t='):
            raise OSError("format currupt")
        return temperature[2:]

def onewire(udp_socket, w1_slaves):
    """ 1wire sensoren einzeln auslesen """
    for slave in w1_slaves:
        try:
            temperature = read_temperature(slave)
            text = f"sensor {slave},{temperature}"
            udp_socket.sendto(text.encode(), TARGET_HOST)
        except OSError:
            print("OSError")
pumuckll
User
Beiträge: 56
Registriert: Donnerstag 30. August 2018, 17:45

Danke
das ist viel übersichtlicher so.
pumuckll
User
Beiträge: 56
Registriert: Donnerstag 30. August 2018, 17:45

Ich hab versucht, hier gelerntes anzuwenden und das Programm umgeschrieben, damit ich es leichter für verschieden Anwendungen anpassen kann.

Erstaunlicherweise Funktioniert läuft es auch.

UDP Nachrichten hab ic auf ein Minimum reduziert und das Wörterbuch wird von einer Schleife erstellt.
Somit brauche ich nur die gpio^s definieren und die Eingangs Kommandos werden automatisch generiert.
Habe ich das so gut gelöst?

Ich konnte den Logger nicht in eine Funktion verschieben, obwohl ich viel probiert habe.
Return in einer Funktion hat nicht funktioniert.

Wie macht man das?

Warum wird das entweder als Kommentar erkannt oder als string?

Code: Alles auswählen

""" 1wire sensoren einzeln auslesen """
Wie kommentiere ich den Code am besten?



Gruss

Code: Alles auswählen

#!/usr/bin/env python3
"""Diese Script Sendet und empfängt UDP Nachrichten,
   um Ausgänge zu schalten und um  den Status von Eingängen, Ausgängen sowie von 1Wire Sensoren  auszugeben.
   Verwendbar für Raspberrypi 3 und 4, orangepi zero """

from functools import partial
import socket
import threading
import OPi.GPIO as GPIO   #OrangePi
#from RPi import GPIO    #RaspberryPi
import logging
from logging.handlers import TimedRotatingFileHandler
from time import sleep

# """ UDP einstellungen  """
BUFFER_SIZE = 8192
SERVER = ("", 8006)
TARGET_HOST = ("10.10.10.4", 8005)

#gpio einstellungen
PIN_AUS = [11, 13, 15, 19]
INITIAL_AUS = [GPIO.HIGH, GPIO.HIGH, GPIO.HIGH, GPIO.HIGH]
PIN_EIN = [16, 18]
INITIAL_EIN = [GPIO.PUD_DOWN, GPIO.PUD_DOWN]

#1wire einstellungen
ONEWIRE_VORHANDEN = True
W1_MASTER_SLAVES = '/sys/devices/w1_bus_master1/w1_master_slaves'
DEVICE_PATH = '/sys/bus/w1/devices/{}/w1_slave'

#logger
logger = logging.getLogger("Rotating Log")
logger.setLevel(logging.DEBUG)
handler = TimedRotatingFileHandler("in_out.log", when='D', interval=1, backupCount=7, encoding=None,
                                   delay=False, utc=False, atTime=None)
fileformatter = logging.Formatter('%(asctime)s : %(levelname)s : %(name)s : %(message)s')
handler.setFormatter(fileformatter)
logger.addHandler(handler)

logger.debug("logger started")

# Wörterbuch erstellen
def dictionary_function(udp_socket):
    command_to_func = {b"tor_auf_zu": partial(tor_auf_zu, udp_socket),
                           b"reply_loxone": partial(reply_loxone, udp_socket)}
    for aus_nummer, aus_pin, initial_wert in zip(range(len(PIN_AUS)), PIN_AUS, INITIAL_AUS):
        aus_nummer += 1
        if initial_wert == True:
            initial_wert_ein = False
            command_to_func[str("ausgang_{}_ein".format(aus_nummer)).encode()] = partial(ausgang_setzen, udp_socket,
                                                                                             aus_pin, initial_wert_ein)
        else:
            initial_wert_ein = True
            command_to_func[str("ausgang_{}_ein".format(aus_nummer)).encode()] = partial(ausgang_setzen, udp_socket,
                                                                                             aus_pin, initial_wert_ein)
        if initial_wert == True:
            initial_wert_aus = True
            command_to_func[str("ausgang_{}_aus".format(aus_nummer)).encode()] = partial(ausgang_setzen, udp_socket,
                                                                                             aus_pin, initial_wert_aus)
        else:
            initial_wert_aus = False
            command_to_func[str("ausgang_{}_aus".format(aus_nummer)).encode()] = partial(ausgang_setzen, udp_socket,
                                                                                             aus_pin, initial_wert_aus)
    return command_to_func



# gpio definieren
def set_gpio():
    try:
        GPIO.cleanup()#falls die gpios schon aktive sind
        GPIO.setmode(GPIO.BOARD)
        for pin,initial  in zip(PIN_AUS,INITIAL_AUS):
            GPIO.setup(pin, GPIO.OUT, initial=initial)
        for pin, initial in zip(PIN_EIN, INITIAL_EIN):
            GPIO.setup(pin, GPIO.IN, pull_up_down=initial)
    except OSError:
        logger.exception("gpio set error")

def read_pins(pins):
    result = 0
    for pin in pins:
        result = result * 2 + (not GPIO.input(pin))
    return result

def onewire_devices():
    """ 1wire geräte auslesen """
    with open(W1_MASTER_SLAVES) as lines:
        one_wire = [l.strip() for l in lines]
        logger.debug("1Wire slaves {}".format(one_wire))
        return one_wire

def read_temperature(slave):
    with open(DEVICE_PATH.format(slave)) as lines:
        crc_ok = next(lines).split()[-1] == 'YES'
        if not crc_ok:
            raise OSError("crc not ok")
            logger.exception("crc not ok")
        temperature = next(lines).split()[-1]
        if not temperature.startswith('t='):
            raise OSError("format currupt")
            logger.exception("format currupt")

        return temperature[2:]
def onewire(udp_socket, w1_slaves):
    """ 1wire sensoren einzeln auslesen """
    for slave in w1_slaves:
        try:
            temperature = read_temperature(slave)
            text = f"sensor {slave},{temperature}"
            udp_socket.sendto(text.encode(), TARGET_HOST)
        except OSError:
            logger.exception("OSError")

def reply_loxone(udp_socket, command):
    udp_socket.sendto(command, TARGET_HOST)

def ausgang_setzen(udp_socket, pin, zustand, command):
    try:
        GPIO.output(pin, zustand)
        logging.debug(" Asugang {} = {} ".format(pin, zustand))
    except Exception:
        logger.exception("gpio error")

#tor öffner wird "gedrückt"
def tor_auf_zu(udp_socket, command):
    try:
        GPIO.output(37, False)
        sleep(0.5)
        GPIO.output(37, True)
    except Exception:
        logger.exception("gpio tor_auf_zu error")

#arbeits loop
def loop_einaus_abfragen(udp_socket,w1_slaves):
    try:
        komparator_aus = 0
        komparator_ein = 0
        while True:
            for counter in range(0, 150):
                zustand_aus = read_pins(PIN_AUS)
                zustand_ein = read_pins(PIN_EIN)
                if zustand_aus != komparator_aus:
                    udp_socket.sendto(b"zustand_aus,%d" % zustand_aus, TARGET_HOST)
                    komparator_aus = zustand_aus
                    logger.debug(" zustand_aus {} ungleich".format(zustand_aus))
                if zustand_ein != komparator_ein:
                    udp_socket.sendto(b"zustand_ein,%d" % zustand_ein, TARGET_HOST)
                    komparator_ein = zustand_ein
                    logger.debug(" zustand_ein {} ungleich".format(zustand_ein))
                if counter == 1:
                    if ONEWIRE_VORHANDEN:
                        onewire(udp_socket, w1_slaves)
                        logger.debug("1Wire device {}".format(onewire))
                        #1wire sensoren senden
                elif counter == 50:
                    logger.debug("schleife bei {}".format(counter))
                    #platzhalter für andere funktionen
                elif counter in [0,30,60,120]:
                    udp_socket.sendto(b"zustand_aus,%d" % zustand_aus, TARGET_HOST)
                    udp_socket.sendto(b"zustand_ein,%d" % zustand_ein, TARGET_HOST)
                    logger.debug("zustand_ein {} ".format(zustand_aus))
                    logger.debug("zustand_aus {} ".format(zustand_aus))
                sleep(0.5)
    except Exception:
        logger.exception("arbeits loop error")

def main():
    set_gpio()
    w1_slaves = onewire_devices()
    udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    udp_socket.bind(SERVER)
    dictionary_function(udp_socket)
    command_to_function =dictionary_function(udp_socket)
    logger.debug("dictionary created : {} ".format(command_to_function))
#
    try:
        loop_einaus_thread = threading.Thread(
            target=loop_einaus_abfragen,
            args=(udp_socket,w1_slaves),
            daemon=True,
        )
        loop_einaus_thread.start()

        logger.debug("UDPServer Waiting for client on port : {} ".format(SERVER))
        while True:
            command, _addr = udp_socket.recvfrom(BUFFER_SIZE)
            logger.debug("UDPServer command : {} ".format(command))
            try:
                function = command_to_function[command]
            except KeyError:
                udp_socket.sendto(f"Unknown command {command!a}".encode("ascii"), TARGET_HOST)
                logger.exception("KeyError")
            else:
                function(command)
                logger.debug(" function {} ".format(function))

    finally:
        GPIO.cleanup()

if __name__ == "__main__":
    main()
Sirius3
User
Beiträge: 17747
Registriert: Sonntag 21. Oktober 2012, 17:20

Wenn der erste Ausdruck eines Modules, einer Klasse oder einer Funktion ein String ist, dann ist das ein sogenannter doc-String für die Dokumentation.
Kommentare # erklären, warum etwas in der nächsten Zeile gemacht wird.
Z.B. ›# Wörterbuch erstellen‹ erklärt nicht das warum, statt dessen sollte die Funktion besser soetwas wie ›create_command_dictionary‹ heißen.
Wenn man eine laufende Nummer braucht, nimmt man enumerate, das auch einen Startwert verträgt.
Einen String mit ‹str› nochmal in einen String umzuwandeln ist unsinnig. Explizit auf True zu prüfen ist unnötig und das was im else-Block steht ist fast identisch zum if-Block, und sollte daher zusammengefasst werden:

Code: Alles auswählen

def create_command_dictionary(udp_socket):
    command_to_func = {
        b"tor_auf_zu": partial(tor_auf_zu, udp_socket),
        b"reply_loxone": partial(reply_loxone, udp_socket)
    }
    for aus_nummer, aus_pin, initial_wert in enumerate(zip(PIN_AUS, INITIAL_AUS), start=1):
        command_to_func["ausgang_{}_ein".format(aus_nummer).encode()] = partial(
            ausgang_setzen, udp_socket, aus_pin, not initial_wert)
        command_to_func["ausgang_{}_aus".format(aus_nummer).encode()] = partial(
            ausgang_setzen, udp_socket, aus_pin, initial_wert)
    return command_to_func
Benutzeravatar
__blackjack__
User
Beiträge: 13100
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

Ergänzend zu den Docstrings: Neben denen die der Python-Compiler erkennt und entsprechend verarbeitet, erkennen einige Dokumentationswerkzeuge auch Zeichenketten an anderen Stellen als ”Docstrings”. Sphinx' `autodoc`-Erweiterung erkennt beispielsweise auch Zeichenketten als ”Docstrings” die auf Modulebene, Klassenebene, und in der `__init__()` nach Definitionen von Namen/Attributen stehen.
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
pumuckll
User
Beiträge: 56
Registriert: Donnerstag 30. August 2018, 17:45

Danke für Eure Hilfe

hab das jetzt so gelöst:

Code: Alles auswählen

def create_command_dictionary(udp_socket):
    command_to_func = {
        b"tor_auf_zu": partial(tor_auf_zu, udp_socket),
        b"reply_loxone": partial(reply_loxone, udp_socket)
    }

    for aus_nummer, aus_pin, initial_wert in zip(enumerate(PIN_AUS[DEVICE],start=1), PIN_AUS[DEVICE], INITIAL_AUS[DEVICE]):
        command_to_func["ausgang_{}_ein".format(aus_nummer[0]).encode()] = partial(
            ausgang_setzen, udp_socket, aus_pin, not initial_wert)
        command_to_func["ausgang_{}_aus".format(aus_nummer[0]).encode()] = partial(
            ausgang_setzen, udp_socket, aus_pin, not not initial_wert)
    return command_to_func
not not habe ich verwedet das das Ergebnis "True/False" und nicht "0/1" ist.


Ich hätte noch weitere Fragen:

Ich hab aus einigen Konstaten dictionary´s ersetellt.
Sind das jetzt noch Konstanten?

Sollte ich jedes Dictionary in eine Funkion mit return stecken?

Kann man Funktion erstellen die mehrere Werte rückliefern?

google hilft mir da nicht wirklich

Code: Alles auswählen

DEVICE = "keller"

SERVER = {"keller" : ("", 8008) , "garage" : ("", 8008) , "door" : ("", 8010), "loxberry" : ("", 8012)}
TARGET_HOST = {"keller" : ("10.10.10.4", 8005) , "garage" : ("10.10.10.4", 8007) , "door" : ("10.10.10.4", 8009), "loxberry" : ("10.10.10.4", 8011)}

PIN_AUS = {"keller": [11, 13, 15, 19],
                  "garage": [29, 31, 33, 35, 37],
                  "door": [29, 33, 35],
                  "loxberry": [29, 31, 33, 35, 37]}
INITIAL_AUS = {"keller": [GPIO.HIGH, GPIO.HIGH, GPIO.HIGH, GPIO.HIGH],
                      "garage": [GPIO.HIGH, GPIO.HIGH, GPIO.HIGH, GPIO.HIGH, GPIO.HIGH],
                      "door": [GPIO.LOW, GPIO.LOW, GPIO.LOW],
                      "loxberry": [GPIO.HIGH, GPIO.HIGH, GPIO.HIGH, GPIO.HIGH, GPIO.HIGH]}

Mit dem Logging muss ich mich noch intensiver beschäftigen, hab das auf die schnelle so gelöst.

Das GPIO System Liefert Warnungen, wie kann ich diese in den log eintragen?

Code: Alles auswählen

UserWarning: Pull up/down setting are not (yet) fully supported, continuing anyway. Use GPIO.setwarnings(False) to disable warnings.
schönen Sonntag
Benutzeravatar
__blackjack__
User
Beiträge: 13100
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

Das ``not not`` ist unsinnig. Ob 1/0 oder True/False ist ja auch egal. Falls es das nicht sein sollte dann sollte man in *beiden* Fällen `bool()` verwenden, also ``not bool(initialwert)`` und ``bool(initialwert)``.
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
pumuckll
User
Beiträge: 56
Registriert: Donnerstag 30. August 2018, 17:45

Danke, habe es nur wegen der Optik geändert.

Ich hätte noch eine Frage zum Logging, bzw zu RPI GPIO:

Wenn der Code nicht Richtig beendet wird habe ich beim nächsten Start solche Meldungen:

Code: Alles auswählen

in_out.py:92: UserWarning: Channel 11 is already in use, continuing anyway. Use                                                                          GPIO.setwarnings(False) to disable warnings.
  GPIO.setup(pin, GPIO.OUT, initial=initial)
in_out.py:92: UserWarning: Channel 13 is already in use, continuing anyway. Use                                                                          GPIO.setwarnings(False) to disable warnings.
  GPIO.setup(pin, GPIO.OUT, initial=initial)
in_out.py:92: UserWarning: Channel 15 is already in use, continuing anyway. Use                                                                          GPIO.setwarnings(False) to disable warnings.
  GPIO.setup(pin, GPIO.OUT, initial=initial)
in_out.py:92: UserWarning: Channel 19 is already in use, continuing anyway. Use                                                                          GPIO.setwarnings(False) to disable warnings.
  GPIO.setup(pin, GPIO.OUT, initial=initial)
in_out.py:94: UserWarning: Pull up/down setting are not (yet) fully supported, c                                                                         ontinuing anyway. Use GPIO.setwarnings(False) to disable warnings.
  GPIO.setup(pin, GPIO.IN, pull_up_down=initial)
in_out.py:94: UserWarning: Channel 16 is already in use, continuing anyway. Use                                                                          GPIO.setwarnings(False) to disable warnings.
  GPIO.setup(pin, GPIO.IN, pull_up_down=initial)
in_out.py:94: UserWarning: Channel 18 is already in use, continuing anyway. Use                                                                          GPIO.setwarnings(False) to disable warnings.
  GPIO.setup(pin, GPIO.IN, pull_up_down=initial)


Dann muss ich den Code nochmal neustarten und die Meldungen sind weg.


Kann ich für:

Code: Alles auswählen

UserWarning: Channel "KANAL"  is already in use, continuing anyway.
eine Exeption anlegen , damit ich in diesen Fall die GPIO^s neu initialisiere?

bzw könnte ich den Code automatisch neustarten?



Grüsse
Benutzeravatar
__blackjack__
User
Beiträge: 13100
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@pumuckll: Was heisst wenn der Code nicht richtig beendet wird? Man sollte halt sicherstellen das `GPIO.cleanup()` aufgerufen wird, egal aus welchem Grund das Programm beendet wird. Das einzige was man so nicht erwischt sind harte Abstürze bei denen der Kernel das Programm beendet und wenn jemand das Programm mit einem SIGKILL beendet. Beim einen sollte man dringend die Ursache finden und beheben, und das andere sollte man einfach nicht machen bzw. nur wenn ein SIGTERM nicht macht was man erwartet. Der ``try``-Block in Deinem Code hat beispielsweise eine Lücke zwischen konfigurieren der GPIOs und dem Punkt ab dem dann sichergestellt wird das `cleanup()` aufgerufen wird. Da `cleanup()` in jedem Fall aufgerufen werden kann, sollte der ``try``-Block einfach komplett die `main()`-Funktion abdecken.

`cleanup()` am Anfang aufzurufen macht keinen Sinn. Das räumt die Pins auf die in dem aktuellen Prozess bis dahin konfiguriert wurden. Also am Anfang gar keine.
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
pumuckll
User
Beiträge: 56
Registriert: Donnerstag 30. August 2018, 17:45

$das Programm läuft als systemd dienst.

Wenn ich den dienst beende und manuell starte habe ich diesen Fehler.

Code: Alles auswählen

 root@keller:~# sudo service in_out start
root@keller:~# sudo service in_out stop
root@keller:~#
root@keller:~#
root@keller:~# python3 in_out.py
in_out.py:92: UserWarning: Channel 11 is already in use, continuing anyway. Use GPIO.setwarnings(False) to disable warnings.
  GPIO.setup(pin, GPIO.OUT, initial=initial)
in_out.py:92: UserWarning: Channel 13 is already in use, continuing anyway. Use GPIO.setwarnings(False) to disable warnings.
  GPIO.setup(pin, GPIO.OUT, initial=initial)
in_out.py:92: UserWarning: Channel 15 is already in use, continuing anyway. Use GPIO.setwarnings(False) to disable warnings.
  GPIO.setup(pin, GPIO.OUT, initial=initial)
in_out.py:92: UserWarning: Channel 19 is already in use, continuing anyway. Use GPIO.setwarnings(False) to disable warnings.
  GPIO.setup(pin, GPIO.OUT, initial=initial)
in_out.py:94: UserWarning: Pull up/down setting are not (yet) fully supported, continuing anyway. Use GPIO.setwarnings(False) to disable warnings.
  GPIO.setup(pin, GPIO.IN, pull_up_down=initial)
in_out.py:94: UserWarning: Channel 16 is already in use, continuing anyway. Use GPIO.setwarnings(False) to disable warnings.
  GPIO.setup(pin, GPIO.IN, pull_up_down=initial)
in_out.py:94: UserWarning: Channel 18 is already in use, continuing anyway. Use GPIO.setwarnings(False) to disable warnings.
  GPIO.setup(pin, GPIO.IN, pull_up_down=initial)
Es wird nur eine Warnung ausgegeben , das Programm läuft weiter und Anscheinen Funktioniert es auch.

wenn ich das Service Start und Stoppe bekommen ich das nicht mit.


Ich hab das Cleanup am Anfang raus genommen, es ändert nichts also hat es auch nichts gebracht.

Code: Alles auswählen

def set_gpio():
    """Gpio definieren
       Ein und Ausgänge Einstellen
       GPIO.cleanup()am Anfang, falls die Gpio^s schon aktive sind"""
    try:
        GPIO.setmode(GPIO.BOARD)
        for pin, initial in zip(PIN_AUS[DEVICE], INITIAL_AUS[DEVICE]):
            GPIO.setup(pin, GPIO.OUT, initial=initial)
        for pin, initial in zip(PIN_EIN[DEVICE], INITIAL_EIN[DEVICE]):
            GPIO.setup(pin, GPIO.IN, pull_up_down=initial)
    except Exception:
        logger.exception("gpio set error")

Deswegen würde ich diese Warnungen zumindest loggen.
Benutzeravatar
__blackjack__
User
Beiträge: 13100
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

Was macht denn das Stoppen? Bei Python-Programmen würde ich das mit einem SIGINT machen.

Die Ausnahmebehandlung in `set_gpio()` ist fehlerhaft denn bei jeglicher Ausnahme wird einfach so weitergemacht als hätte es keine Ausnahme gegeben.
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
Sirius3
User
Beiträge: 17747
Registriert: Sonntag 21. Oktober 2012, 17:20

Das sind ja auch nur Warnungen, und da hilft kein Exception-Handling.
pumuckll
User
Beiträge: 56
Registriert: Donnerstag 30. August 2018, 17:45

Das ist das ystemd script:

Code: Alles auswählen

[Unit]
Description=in_out
After=syslog.target

[Service]
Type=simple
User=root
Group=root
WorkingDirectory=/root/
ExecStartPre=/bin/sleep 10
#ExecStart=/usr/bin/python3 /root/in_out.py
ExecStart=/usr/bin/python3 /home/pi/in_out.py
SyslogIdentifier=in_out
StandardOutput=syslog
StandardError=syslog
Restart=always
RestartSec=2

[Install]
WantedBy=multi-user.target
das beenden ist nicht definiert.

man kann es definieren:

Code: Alles auswählen

ExecStop=/usr/bin/flume-ng agent stop

auf die schnelle hab ich das gefunden:

Code: Alles auswählen

As described in systemd documentation, processes will receive SIGTERM and then, immediately, SIGHUP. After some time, SIGKILL will be send.

All signals, as well as strategy of sending them, can be changed in relevant service file.

See another stackoverflow question to see how handle these signals in your Python program.
https://www.freedesktop.org/software/sy ... .kill.html

Vermutlich sollte ich SIGTERM in meinen Code integrieren.

Werde mich damit und mit den exceptions am Wochenende beschäftigen.
Antworten