Programm mittels Knopfdruck beenden

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
m4tthi45
User
Beiträge: 2
Registriert: Montag 3. August 2020, 11:36

Hallo liebe Community,



zurzeit habe ich ein Python Programm, dass automatisch beim hochfahren gestartet wird und auch per Knopfdruck beendet werden kann.

Mein Problem ist, dass es manchmal passiert, dass dieser Knopfdruck manchmal nicht funktioniert (Das erkenne ich durch eine Kontroll LED, die ausgeschaltet wird sobald das Programm beendet wird).

Ich weiß leider nicht ob der Knopfdruck nicht erkannt wird, oder
ob es einen anderen Grund gibt, dass sich das Programm dann nicht beenden lässt.



Mein erster Gedanke war, dass ich bei meiner Programmierung einfach "unsauber" war und es einen besseren Weg gibt, dass Programm durch Knopfdruck zu beenden.

Ich weiß, dass mein Programm nicht "sauber" ist und ich noch einiges verbessern kann(subprocess statt os; gpiozero verwenden; CSV-Modull verwenden; Variablen besser benennen, ...).
Jedoch bitte ich, dies zu vernachlässigen da ich dies in der Zwischenzeit schon versuche auszubessern und sich auf mein Hauptproblem (eine art "Not-Aus-Schalter") zu fokusieren.

Vielen Dank bereits im Vorraus
Mfg
Matthias

Hier mein Programm:

Code: Alles auswählen

import serial, sys, struct, fcntl, errno, traceback
import struct
import RPi.GPIO as GPIO
import csv                                                                              # Anpassung DP
from time import strftime, sleep
from os import system
from datetime import datetime
import referenz2
 
def f_MesswertEmpfangen():
    tupel_messwerte = ("leer",)
    flag = False
    # Auslesen des Praefix-Bytes
    praefix = verbindung.readline(1)
    praefix = praefix.encode("hex")
    
    #Auswertung der Messwerte
    if praefix == "aa" :
        identifier = verbindung.readline(1)
        identifier = identifier.encode("hex")
        if identifier == "15":
            rest = verbindung.readline(26)
            rest = rest.encode("hex")
            Antwort = praefix + identifier + rest
            if len(Antwort) == 56:  
                ch1 = byte_to_value(Antwort[6:14], "float")
                ch2 = byte_to_value(Antwort[14:22], "float")
                ch3 = byte_to_value(Antwort[22:30], "float")
                ch4 = byte_to_value(Antwort[30:38], "float")
                ch5 = byte_to_value(Antwort[38:46], "float")
                ch6 = byte_to_value(Antwort[46:54], "float")
                tupel_messwerte = ((datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'),ch1,ch2,ch3,ch4,ch5,ch6),) 
                flag = True
    return flag, tupel_messwerte        # return: Messwert-Tupel und bool, ob Messwert empfangen wurde
 
def antwort_empfangen(verbindung):
    var_routine = True
    while (var_routine == True) :
        praefix = verbindung.readline(1)                                # lese praefix
        praefix = praefix.encode("hex")                                    # string to hexstring
        if praefix == "aa":
            identifier = verbindung.readline(1)                         # lese identifier
            identifier = identifier.encode("hex")                       # identifier: string to hexstring
            if (identifier != 0x15):                                    # Identifier vergleichen
                rest = verbindung.readline(int(identifier)-0x50+2)      # lese Rest des strings
                rest = rest.encode("hex")                               # Rest: string to hexstring
                Antwort = praefix + identifier + rest                   # string zusammensetzen
                #print "Antwort: " + Antwort                             # print Antwortframe
                var_routine = False
    return Antwort
 
def byte_to_value(daten, typ):
    if (typ == "u8") or (typ == "i8"):          # uint 8 Byte
        Multiplikator = [16,1]
    if (typ == "u16") or (typ == "i16"):        # uint 16 Byte
        Multiplikator = [4096,256,16,1]
    if (typ == "u32") or (typ == "i32"):        # uint 32 Byte
        Multiplikator = [268435456,16777216,1048576,65536,4096,256,16,1]
    if typ == "float":                          # uint 32 Byte
        Multiplikator = [268435456,16777216,1048576,65536,4096,256,16,1]
    MW_list = []
    for i in range(len(Multiplikator)):
        MW_list.append(int(daten[i:(i+1)], 16))
    MW = 0
    for k in range(len(Multiplikator)):
        MW += ( MW_list[k] * Multiplikator[k] )
    if typ == "float":
        MW = struct.pack("I", MW)
        MW = struct.unpack("f", MW)[0]

    return MW
 

if __name__ == '__main__':
    GPIO.setmode(GPIO.BOARD)  
    GPIO.setup(11, GPIO.IN, pull_up_down = GPIO.PUD_DOWN) 
    GPIO.setup(12, GPIO.OUT)                                       # GSV-6 einschalten
    # Verbindung UART-Shield
    verbindung = serial.Serial('/dev/ttyAMA0', 230400, timeout=1)
    verbindung.isOpen()
    sleep(0.5)
    
    verbindung.write(chr(0xAA)+chr(0x90)+chr(0x23)+chr(0x85))   # stop transmission
    Antwort = antwort_empfangen(verbindung)
    print "serielle Verbindung aufgebaut"
    sleep(0.5)
    
    verbindung.write(referenz2.Befehl(verbindung,"WriteDataRate", [300]))
    Antwort = antwort_empfangen(verbindung)
    
    verbindung.write(chr(0xAA)+chr(0x90)+chr(0x78)+chr(0x85)) #Reset Device
    sleep(0.5)

    verbindung.write(chr(0xAA)+chr(0x90)+chr(0x23)+chr(0x85))
    sleep(0.5)
    Antwort = antwort_empfangen(verbindung)
    sleep(0.5)
    

    verbindung.write(chr(0xAA)+chr(0x90)+chr(0x8a)+chr(0x85))   # get DataRate
    print("DataRate Serial: ",byte_to_value(antwort_empfangen(verbindung)[6:-1],'float'))


    verbindung.write(chr(0xAA)+chr(0x90)+chr(0x24)+chr(0x85))   # start transmission 
    j = 0  
    print("Start der Messung:" + (datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')))
    print ("Messung gestartet")  
    j = 0
    GPIO.output(12,1)  #Kontroll LED einschlten
    try:
        while True:
            dateiname = ("meas" + str(j) + ".csv")     
            system('touch /media/pi/INTENSO8GB/Messungen/'+ dateiname)
            system('rm /media/pi/INTENSO8GB/Messungen/' + dateiname)
            csv_file = open("/media/pi/INTENSO8GB/Messungen/"+dateiname,'a')
            k = 0
            while k < 500000:
                flag_f = False
                flag_f, tupel_MWe_f = f_MesswertEmpfangen()
                if flag_f == True:                          # wenn ein Messwert empfangen wurde...
                        for row in tupel_MWe_f[0]:
                           csv_file.write(str(row) + ',')
                        csv_file.write("\n")
                        k += 1
                        if GPIO.input(11)==GPIO.HIGH:
                            raise Exception("Knopfdruck")  #Bei Knopfdruck Exception auswerfen und so Program beenden
            j += 1
                
 
    except:
        verbindung.close()  
        GPIO.output(12,0)   #Kontroll LED ausschalten
        GPIO.cleanup()
             
    print "\nProgrammende"
    sleep(10.0)
    system('sudo shutdown now') #RPi herunterfahren
__deets__
User
Beiträge: 14545
Registriert: Mittwoch 14. Oktober 2015, 14:29

Wie das geht, wurde dir doch schon hier erklärt: https://forum-raspberrypi.de/forum/thre ... nopfdruck/

Warum willst du das hier nochmal diskutieren?
Sirius3
User
Beiträge: 18272
Registriert: Sonntag 21. Oktober 2012, 17:20

Da kann ich einfach nicht wegsehen:

Code: Alles auswählen

import serial
import struct
from datetime import datetime
from time import sleep
from RPi import GPIO
import subprocess
from itertools import count
import referenz2

def messwerte_empfangen(verbindung):
    praefix = verbindung.read(1)
    if praefix != b'\xaa':
        raise ValueError()
    identifier = verbindung.read(1)
    if identifier != b'\x15':
        raise ValueError()
    data = verbindung.read(26)
    if len(data) != 26:
        raise ValueError()
    werte = struct.unpack_from('>6f', data, 0)
    return werte
 
def antwort_empfangen(verbindung):
    while True:
        praefix = verbindung.read(1)
        if praefix == b'\xaa':
            identifier = verbindung.read(1)
            if identifier != b'\x15':
                break
    return verbindung.read(ord(identifier) - 0x50 + 2)


def main():
    GPIO.setmode(GPIO.BOARD)  
    GPIO.setup(11, GPIO.IN, pull_up_down = GPIO.PUD_DOWN) 
    GPIO.setup(12, GPIO.OUT)                                       # GSV-6 einschalten

    verbindung = serial.Serial('/dev/ttyAMA0', 230400, timeout=1)
    sleep(0.5)
    verbindung.write(b"\xAA\x90\x23\x85")   # stop transmission
    _ = antwort_empfangen(verbindung)
    print "serielle Verbindung aufgebaut"
    sleep(0.5)
    verbindung.write(referenz2.Befehl(verbindung, "WriteDataRate", [300]))
    _ = antwort_empfangen(verbindung)
    
    verbindung.write(b"\xAA\x90\x78\x85") #Reset Device
    sleep(0.5)
    verbindung.write(b"\xAA\x90\x23\x85")
    sleep(0.5)
    _ = antwort_empfangen(verbindung)
    sleep(0.5)
    
    verbindung.write(b"\xAA\x90\x8a\x85")   # get DataRate
    antwort = antwort_empfangen(verbindung)
    datarate = struct.unpack_from(">f", antwort, 4)[0]
    print "DataRate Serial: ", datarate

    verbindung.write(b"\xAA\x90\x24\x85")   # start transmission 
    print "Start der Messung:", datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')
    print "Messung gestartet"
    j = 0
    GPIO.output(12,1)  #Kontroll LED einschlten
    try:
        for j in count():
            dateiname = "/media/pi/INTENSO8GB/Messungen/meas{}.csv".format(j)
            with open(dateiname, "w") as csv_file:
                for k in range(500000):
                    try:
                        messwerte = messwerte_empfangen(verbindung)
                    except ValueError:
                        pass
                    else:
                        # wenn ein Messwert empfangen wurde...
                        csv_file.write("{},{}\n".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'), ','.join(map(str, messwerte))))
                        if GPIO.input(11) == GPIO.HIGH:
                            # Bei Knopfdruck Exception auswerfen und so Program beenden
                            break
    finally:
        verbindung.close()  
        GPIO.output(12,0)   #Kontroll LED ausschalten
        GPIO.cleanup()
             
    print "\nProgrammende"
    sleep(10.0)
    subprocess.call(['sudo', 'shutdown, 'now']) #RPi herunterfahren

if __name__ == '__main__':
    main()
Antworten