LED Lauflicht parallel zu Skriptausführung

Python auf Einplatinencomputer wie Raspberry Pi, Banana Pi / Python für Micro-Controller
Antworten
ds91
User
Beiträge: 5
Registriert: Dienstag 5. Mai 2020, 12:26

Hallo zusammen!

Ich habe folgendes Problem (ja ich weiß, dass es sowas ähnliches hier bereits gegeben hat, das konnte aber mein Problem leider nicht lösen.):
Es handelt sich bei meinem Projekt um ein Alarmsystem, welches eine SMS empfängt und jene auf einem 4x20 Zeichen LCD ausgibt. Parallel dazu soll ein Buzzer ertönen und ein Blaulicht (LED Lauflicht bestehend aus vier LEDs) aktiviert werden. Nun habe ich bzw. ein befreundeter Kollege (der sehr schwer zu erreichen ist) den Code schon so aufgesetzt, dass einzelne Funktionen als eigener Thread aufgerufen werden, damit sie sich nicht gegenseitig blockieren. Das funktioniert soweit auch sehr gut. Was ich aber noch nicht hinbekommen habe ist, das LED Lauflicht sowie der Buzzer parallel ablaufen zu lassen. Selbstverständlich soll das Blaulicht einige Zeit laufen, d.h. ich kann es nicht via einer Schleife in der aufgerufenen Funktion via threading.Timer(1, blaulicht).start() aufrufen, da kommt irgendwas dabei raus. Die Funktion blaulicht() sieht so aus:

Code: Alles auswählen

def blaulicht():
    blink_cntr = 140
    led_lcntr = 1   
    sleeptime = 1
    
    while led_lcntr < blink_cntr:

        GPIO.output(led_1, True)
        sleep(sleeptime)
        GPIO.output(led_1, False)
        sleep(sleeptime)
        GPIO.output(led_2, True)
        sleep(sleeptime)
        GPIO.output(led_2, False)
        sleep(sleeptime)
        GPIO.output(led_3, True)
        sleep(sleeptime)
        GPIO.output(led_3, False)
        sleep(sleeptime)
        GPIO.output(led_4, True)
        sleep(sleeptime)
        GPIO.output(led_4, False)
        sleep(sleeptime)

        led_lcntr += 1
Wie kann ich diese Funktion (bzw. die Buzzer Funktion, die grundsätzlich gleich aussieht) nun umbauen, sodass ich sie als weiteren Thread eröffnen kann, ohne dass ich das System mit einer While Schleife blockiere?

Hier ist der gesamte Code:

Code: Alles auswählen

#!/usr/bin/env python
# -*- coding: utf-8 -*-


# Description: 
# This script must run always in the background at startup of the pi.
# It looks for a new file called "processed_sms.txt" under /home/pi/processed_sms
# If file is found, the alarm gets triggered and the sms content gets displayed onto the LCD. 
# After finding the file, the textfile gets renamed so no additional alarm gets triggered.
#

from __future__ import print_function, division, absolute_import, unicode_literals
import os
import os.path, time
import glob
import sys
from time import sleep, strftime
from time import *
import time
import datetime
import threading
from math import ceil

from RPLCD.i2c import CharLCD
import RPi.GPIO as GPIO
import codecs
from itertools import islice
import textwrap


lcd = CharLCD(i2c_expander="PCF8574", address=0x27,port=1,cols=20,rows=4,
              dotsize=8,charmap="A00",auto_linebreaks=True,backlight_enabled=False)
              
gpio_button = 26
gpio_cancel_button = 5
led_1 = 16
led_2 = 23
led_3 = 24
led_4 = 25

GPIO.setwarnings(False)
GPIO.setmode(GPIO.BCM) # BCM = use logic numbering of pins (not physical)
GPIO.setup(led_1, GPIO.OUT)
GPIO.setup(led_2, GPIO.OUT)
GPIO.setup(led_3, GPIO.OUT)
GPIO.setup(led_4, GPIO.OUT)
GPIO.setup(gpio_cancel_button, GPIO.IN, pull_up_down = GPIO.PUD_DOWN)
GPIO.setup(gpio_button, GPIO.IN, pull_up_down = GPIO.PUD_DOWN)


# looks for "processed_sms.txt" file (generated by a seperate script) and displays the SMS on LCD
def displaySms(smsPath, displayCount):
    global g_displayCount
    global g_smsToDisplay
    global g_smsLineIndex
    
    filesize = os.path.getsize(smsPath)
    
    while filesize <= 0:
        time.sleep(0.1) # wait until file is not empty anymore
        filesize = os.path.getsize(smsPath)
        
    with codecs.open(smsPath, "r", "utf-8") as file:
        #g_smsToDisplay = [] 
        g_smsToDisplay = file.readlines()
        print(g_smsToDisplay)           
        time.sleep(1)
        g_displayCount = displayCount * ceil( len(g_smsToDisplay) / 4 ) # multiplication by the number of pages that will be displayed
        g_smsLineIndex = 0      
        displayLinesCallback() 

# Text Display function
def displayLinesCallback():
    global g_smsToDisplay
    global g_displayCount
    global g_smsLineIndex

    if g_displayCount > 0:
        linesToDisplay = g_smsToDisplay[g_smsLineIndex:g_smsLineIndex+4]
        lcd.backlight_enabled = True 
        lcd.clear()
        lcd.write_string("".join(linesToDisplay)  )

        g_smsLineIndex += 4
        if (g_smsLineIndex > len(g_smsToDisplay) ):
            g_smsLineIndex = 0
        
        g_displayCount -= 1
        threading.Timer(1, displayLinesCallback).start()
        threading.Timer(1, blaulicht).start()
    else:
        #print("Text not displayed or already displayed!")
        lcd.clear()
        lcd.backlight_enabled = False
        
 
# SMS-Recall function, to display sms again after the alarm, gets triggered by a Button via GPIO.add_event_detect function
def displaySmsWithTimestamp(smsPath, displayCount):
    from datetime import datetime
    global g_displayCount
    global g_smsToDisplay
    global g_smsLineIndex

    with codecs.open(smsPath, "r", "utf-8") as file:
        
        g_smsToDisplay = file.readlines()
        g_displayCount = displayCount * ceil( len(g_smsToDisplay) / 4 ) # multiplication by the number of pages that will be displayed
        g_smsLineIndex = 0
        while not g_smsToDisplay:
            time.sleep(0.01)
        
    timestamp = os.path.getmtime(smsPath)
    date_time = datetime.fromtimestamp(timestamp)
    date = date_time.strftime("%d.%m.%Y")
    time = date_time.strftime("%H:%M")        
   
    #insert 4 lines with following text before displaying actual SMS content 
    g_smsToDisplay.insert(0, "  Letzte Alarm SMS:   ")
    g_smsToDisplay.insert(1, "--------------------")
    g_smsToDisplay.insert(2, "Datum: " + date)
    g_smsToDisplay.insert(3, "   Zeit: " + time)   
    
    displayLinesCallbackWoBl() 
          


def displaySmsWithTimestampCallback(gpio_button):
    displaySmsWithTimestamp("/home/pi/processed_sms/processed_sms_lastReceived.txt", 2)
    

def stop_blaulicht(gpio_cancel_button):
    led_1 = 16
    led_2 = 23
    led_3 = 24
    led_4 = 25
    GPIO.output(led_1, False)
    GPIO.output(led_2, False)
    GPIO.output(led_3, False)
    GPIO.output(led_4, False)
   


def blaulicht():
    blink_cntr = 140
    led_lcntr = 1   
    sleeptime = 1
    
    while led_lcntr < blink_cntr:

        GPIO.output(led_1, True)
        sleep(sleeptime)
        GPIO.output(led_1, False)
        sleep(sleeptime)
        GPIO.output(led_2, True)
        sleep(sleeptime)
        GPIO.output(led_2, False)
        sleep(sleeptime)
        GPIO.output(led_3, True)
        sleep(sleeptime)
        GPIO.output(led_3, False)
        sleep(sleeptime)
        GPIO.output(led_4, True)
        sleep(sleeptime)
        GPIO.output(led_4, False)
        sleep(sleeptime)

        led_lcntr += 1
            
gpio_cancel_button = 5
GPIO.setup(gpio_cancel_button, GPIO.IN, pull_up_down=GPIO.PUD_UP)
# interrupt this code immediately (anytime) when button is pressed:
GPIO.add_event_detect(gpio_button, GPIO.RISING, callback=displaySmsWithTimestampCallback, bouncetime = 10000) # set bouncetime high in order to block repeated button press
sleep(0.1) # sleep to give CPU time to react on event
GPIO.add_event_detect(gpio_cancel_button, GPIO.RISING, callback=stop_blaulicht)
sleep(0.1) # sleep to give CPU time to react on event

#initial lcd screen @ Startup
lcd.clear()

while True:
    
    #new seg sms?
    processedFilePath = "/home/pi/processed_sms/processed_sms.txt"
    lastReadFilePath = "/home/pi/processed_sms/processed_sms_lastReceived.txt"
    if os.path.isfile(processedFilePath) :
        displaySms(processedFilePath, 5)
        os.rename(processedFilePath, lastReadFilePath) #rename sms textfile so alarm event doesen't get triggered again

    
Sirius3
User
Beiträge: 18270
Registriert: Sonntag 21. Oktober 2012, 17:20

Python2 sollte man nicht mehr benutzen.
An den Importen ist einiges durcheinander. ›os.path‹ muß man nicht explizit importieren. ›glob‹ und ›sys‹ werden nicht benutzt, genausowenig wie strftime, islice oder textwrap. ›time‹ wird vier mal in unterschiedlichen Varianten importiert. ›as‹ ist zum Umbenennen da, ›GPIO‹ wird aber gar nicht umbenannt.

Konstanten schreibt man komplett gross: GPIO_BUTTON, etc.
gpio_cancel_button und die lcds werden später nochmal definiert.
Dateinamen sollten als Konstanten definiert werden, statt sie mehrfach im Code stehen zu haben.
Statt Konstanten durchzunummerieren, definiere eine Liste.
Funktionen und Variablennamen schreibt man dagegen komplett klein.
Warnungen sind dazu da, dass man sie behebt, nicht dass man sie ignoriert. Dazu muß aber ›GPIO.cleanup‹ verlässlich aufgerufen werden, in einem finally-Block.

Vergiss gleich wieder, dass es global gibt, das macht (vor allem in Kombination mit Threads) den Programmfluß absolut undurchschaubar und führt zu schwer findenden Fehlern. Da hilft es auch nichts, den Variablen ein g_ voranzustellen.

In Python3 braucht man das codecs-Modul nicht mehr. Der with-Block sollte so klein wie möglich sein, nach dem readlines ist das Dateilesen eigentlich abgeschlossen.
Wenn vor der while-Schleife der gleiche Code steht, wie in der while-Schleife, ist das eigentlich ein Zeichen dafür, dass man eine while-True-Schleife schreiben wollte.
insert sollte man nicht verwenden. Da kannst Du einfach zwei Listen zusammensetzen.
`displayLinesCallbackWoBl` wird nirgends definiert.

Die sleep nach add_event_detect sind nutzlos. In den Callbacks sollte möglichst wenig passieren.
In `blaulicht` sollte man statt einer while-Schleife zwei for-Schleifen benutzen.
Benutze keine Abkürzungen. Was soll denn led_lcntr heißen?
Sirius3
User
Beiträge: 18270
Registriert: Sonntag 21. Oktober 2012, 17:20

Um mit Threads kommunizieren zu können, eignen sich Events und Queues.
So könnte das aussehen:

Code: Alles auswählen

#!/usr/bin/env python3
# Description: 
# This script must run always in the background at startup of the pi.
# It looks for a new file called "processed_sms.txt" under /home/pi/processed_sms
# If file is found, the alarm gets triggered and the sms content gets displayed onto the LCD. 
# After finding the file, the textfile gets renamed so no additional alarm gets triggered.
#
import os
from time import sleep
import datetime
import threading
from queue import Queue
from RPLCD.i2c import CharLCD
from RPi import GPIO

GPIO_BUTTON = 26
GPIO_CANCEL_BUTTON = 5
LEDS = [16, 23, 24, 25]

NUMBER_OF_BLINKS = 140
BLINK_DELAY = 1

PROCESSED_FILENAME = "/home/pi/processed_sms/processed_sms.txt"
LAST_PROCESSED_FILENAME = "/home/pi/processed_sms/processed_sms_lastReceived.txt"

def initialize():
    lcd = CharLCD(i2c_expander="PCF8574", address=0x27,port=1,cols=20,rows=4,
                  dotsize=8,charmap="A00",auto_linebreaks=True,backlight_enabled=False)
    lcd.clear()
    GPIO.setmode(GPIO.BCM) # BCM = use logic numbering of pins (not physical)
    GPIO.setup(LEDS, GPIO.OUT)
    GPIO.setup(GPIO_CANCEL_BUTTON, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
    GPIO.setup(GPIO_BUTTON, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
    return lcd

def blaulicht():
    for _ in range(NUMBER_OF_BLINKS):
        for led in LEDS:
            GPIO.output(led, True)
            if cancel_event.wait(BLINK_DELAY):
                return
            GPIO.output(led, False)
            if cancel_event.wait(BLINK_DELAY):
                return

def blaulicht_loop(start_event, cancel_event):
    while True:
        start_event.wait()
        start_event.clear()
        blaulicht()
        cancel_event.clean()
        GPIO.output(LEDS, False)

def display_lines(lines_queue):
    while True:
        lines =lines_queue.get()
        lcd.backlight_enabled = True 
        lcd.write_string("".join(lines))
        sleep(2)
        lcd.clear()
        if lines_queue.empty():
            lcd.backlight_enabled = False
        
def display_sms(sms_path, lines_queue, with_timestamp=False):
    if with_timestamp:
        timestamp = datetime.datetime.fromtimestamp(os.path.getmtime(sms_path))
        date = timestamp.strftime("%d.%m.%Y")
        time = timestamp.strftime("%H:%M")
        # insert 4 lines with following text before displaying actual SMS content 
        lines_queue.put([
            "  Letzte Alarm SMS:   ",
            "--------------------",
            "Datum: {timestamp:%d.%m.%Y}",
            "   Zeit: {timestamp:%H:%M}",
        ])

    with open(sms_path, encoding="utf-8") as sms:
        lines = list(sms)
    while lines:
        lines_queue.put(lines[:4])
        lines = lines[4:]
    
def wait_for_new_sms(replay_event):
    while True:
        if os.path.isfile(PROCESSED_FILENAME) and os.path.getsize(PROCESSED_FILENAME):
            break
        if replay_event.wait(0.2):
            return "replay"
    # rename sms textfile so alarm event doesen't get triggered again
    os.rename(PROCESSED_FILENAME, LAST_PROCESSED_FILENAME)
    return "new"

def main():
    try:
        start_event = threading.Event()
        cancel_event = threading.Event()
        replay_event = threading.Event()
        lines_queue = Queue()
        lcd = initialize()
        GPIO.setup(GPIO_CANCEL_BUTTON, GPIO.IN, pull_up_down=GPIO.PUD_UP)
        # interrupt this code immediately (anytime) when button is pressed:
        GPIO.add_event_detect(GPIO_BUTTON, GPIO.RISING, callback=replay_event, bouncetime=10000) # set bouncetime high in order to block repeated button press
        GPIO.add_event_detect(GPIO_CANCEL_BUTTON, GPIO.RISING, callback=cancel_event.set)
        threading.Thread(target=blaulicht_loop, args=(start_event, cancel_event), daemon=True).start()
        threading.Thread(target=display_lines, args=(lines_queue, ), daemon=True).start()

        while True:
            what = wait_for_new_sms()
            display_sms(LAST_PROCESSED_FILENAME, lines_queue, with_timestamp=what == "replay")
    finally:
        GPIO.cleanup()

if __name__ == '__main__':
    main()
ds91
User
Beiträge: 5
Registriert: Dienstag 5. Mai 2020, 12:26

Sirius3 hat geschrieben: Donnerstag 1. Oktober 2020, 18:10 Python2 sollte man nicht mehr benutzen.
An den Importen ist einiges durcheinander. ›os.path‹ muß man nicht explizit importieren. ›glob‹ und ›sys‹ werden nicht benutzt, genausowenig wie strftime, islice oder textwrap. ›time‹ wird vier mal in unterschiedlichen Varianten importiert. ›as‹ ist zum Umbenennen da, ›GPIO‹ wird aber gar nicht umbenannt.

Konstanten schreibt man komplett gross: GPIO_BUTTON, etc.
gpio_cancel_button und die lcds werden später nochmal definiert.
Dateinamen sollten als Konstanten definiert werden, statt sie mehrfach im Code stehen zu haben.
Statt Konstanten durchzunummerieren, definiere eine Liste.
Funktionen und Variablennamen schreibt man dagegen komplett klein.
Warnungen sind dazu da, dass man sie behebt, nicht dass man sie ignoriert. Dazu muß aber ›GPIO.cleanup‹ verlässlich aufgerufen werden, in einem finally-Block.

Vergiss gleich wieder, dass es global gibt, das macht (vor allem in Kombination mit Threads) den Programmfluß absolut undurchschaubar und führt zu schwer findenden Fehlern. Da hilft es auch nichts, den Variablen ein g_ voranzustellen.

In Python3 braucht man das codecs-Modul nicht mehr. Der with-Block sollte so klein wie möglich sein, nach dem readlines ist das Dateilesen eigentlich abgeschlossen.
Wenn vor der while-Schleife der gleiche Code steht, wie in der while-Schleife, ist das eigentlich ein Zeichen dafür, dass man eine while-True-Schleife schreiben wollte.
insert sollte man nicht verwenden. Da kannst Du einfach zwei Listen zusammensetzen.
`displayLinesCallbackWoBl` wird nirgends definiert.

Die sleep nach add_event_detect sind nutzlos. In den Callbacks sollte möglichst wenig passieren.
In `blaulicht` sollte man statt einer while-Schleife zwei for-Schleifen benutzen.
Benutze keine Abkürzungen. Was soll denn led_lcntr heißen?
Danke für die vielen Tips! Ich bin (wie man vielleicht erkennen kann) ein sehr schlampiger "Programmierer", denn so würde ich mich nicht mal nennen ;) Ich nehme mit die Ratschläge aber zu Herzen!
Sirius3 hat geschrieben: Donnerstag 1. Oktober 2020, 19:08 Um mit Threads kommunizieren zu können, eignen sich Events und Queues.
So könnte das aussehen:

Code: Alles auswählen

#!/usr/bin/env python3
# Description: 
# This script must run always in the background at startup of the pi.
# It looks for a new file called "processed_sms.txt" under /home/pi/processed_sms
# If file is found, the alarm gets triggered and the sms content gets displayed onto the LCD. 
# After finding the file, the textfile gets renamed so no additional alarm gets triggered.
#
import os
from time import sleep
import datetime
import threading
from queue import Queue
from RPLCD.i2c import CharLCD
from RPi import GPIO

GPIO_BUTTON = 26
GPIO_CANCEL_BUTTON = 5
LEDS = [16, 23, 24, 25]

NUMBER_OF_BLINKS = 140
BLINK_DELAY = 1

PROCESSED_FILENAME = "/home/pi/processed_sms/processed_sms.txt"
LAST_PROCESSED_FILENAME = "/home/pi/processed_sms/processed_sms_lastReceived.txt"

def initialize():
    lcd = CharLCD(i2c_expander="PCF8574", address=0x27,port=1,cols=20,rows=4,
                  dotsize=8,charmap="A00",auto_linebreaks=True,backlight_enabled=False)
    lcd.clear()
    GPIO.setmode(GPIO.BCM) # BCM = use logic numbering of pins (not physical)
    GPIO.setup(LEDS, GPIO.OUT)
    GPIO.setup(GPIO_CANCEL_BUTTON, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
    GPIO.setup(GPIO_BUTTON, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
    return lcd

def blaulicht():
    for _ in range(NUMBER_OF_BLINKS):
        for led in LEDS:
            GPIO.output(led, True)
            if cancel_event.wait(BLINK_DELAY):
                return
            GPIO.output(led, False)
            if cancel_event.wait(BLINK_DELAY):
                return

def blaulicht_loop(start_event, cancel_event):
    while True:
        start_event.wait()
        start_event.clear()
        blaulicht()
        cancel_event.clean()
        GPIO.output(LEDS, False)

def display_lines(lines_queue):
    while True:
        lines =lines_queue.get()
        lcd.backlight_enabled = True 
        lcd.write_string("".join(lines))
        sleep(2)
        lcd.clear()
        if lines_queue.empty():
            lcd.backlight_enabled = False
        
def display_sms(sms_path, lines_queue, with_timestamp=False):
    if with_timestamp:
        timestamp = datetime.datetime.fromtimestamp(os.path.getmtime(sms_path))
        date = timestamp.strftime("%d.%m.%Y")
        time = timestamp.strftime("%H:%M")
        # insert 4 lines with following text before displaying actual SMS content 
        lines_queue.put([
            "  Letzte Alarm SMS:   ",
            "--------------------",
            "Datum: {timestamp:%d.%m.%Y}",
            "   Zeit: {timestamp:%H:%M}",
        ])

    with open(sms_path, encoding="utf-8") as sms:
        lines = list(sms)
    while lines:
        lines_queue.put(lines[:4])
        lines = lines[4:]
    
def wait_for_new_sms(replay_event):
    while True:
        if os.path.isfile(PROCESSED_FILENAME) and os.path.getsize(PROCESSED_FILENAME):
            break
        if replay_event.wait(0.2):
            return "replay"
    # rename sms textfile so alarm event doesen't get triggered again
    os.rename(PROCESSED_FILENAME, LAST_PROCESSED_FILENAME)
    return "new"

def main():
    try:
        start_event = threading.Event()
        cancel_event = threading.Event()
        replay_event = threading.Event()
        lines_queue = Queue()
        lcd = initialize()
        GPIO.setup(GPIO_CANCEL_BUTTON, GPIO.IN, pull_up_down=GPIO.PUD_UP)
        # interrupt this code immediately (anytime) when button is pressed:
        GPIO.add_event_detect(GPIO_BUTTON, GPIO.RISING, callback=replay_event, bouncetime=10000) # set bouncetime high in order to block repeated button press
        GPIO.add_event_detect(GPIO_CANCEL_BUTTON, GPIO.RISING, callback=cancel_event.set)
        threading.Thread(target=blaulicht_loop, args=(start_event, cancel_event), daemon=True).start()
        threading.Thread(target=display_lines, args=(lines_queue, ), daemon=True).start()

        while True:
            what = wait_for_new_sms()
            display_sms(LAST_PROCESSED_FILENAME, lines_queue, with_timestamp=what == "replay")
    finally:
        GPIO.cleanup()

if __name__ == '__main__':
    main()
Vielen Dank dafür, das werde ich am Wochenende ausprobieren und mich dann wieder hier melden!
ds91
User
Beiträge: 5
Registriert: Dienstag 5. Mai 2020, 12:26

Ich kann leider irgendwie meinen eigenen Beitrag nicht editieren. Deshalb jetzt so: ich tue mir etwas schwer, deinem Konstrukt zu folgen Sirius3. Ich hab mal testweise deinen Code einfach versucht auszuführen. Als ersten Fehler erhalte ich in der Zeile "GPIO.add_event_detect(GPIO_BUTTON, GPIO.RISING, callback=replay_event, bouncetime=10000)" --> TypeError: Parameter must be callable
Dann ist mir aufgefallen, dass "replay_event, start und cancel_event ja nicht definiert sind bzw. wie sind das Übergabeparameter für die Funktionen? In der GPIO.add_event_detect Funktion muss ja eine andere Funktion aufgerufen werden, sobald der Knopf gedrückt wird. Sry für mein Unwissen, aber ich arbeite schon ewig an dem Zeug und komm irgendwie zu keinem Ende, weil ich einfach kein Programmierer bin ;-)
Sirius3
User
Beiträge: 18270
Registriert: Sonntag 21. Oktober 2012, 17:20

Eine Zeile tiefer ist es ja richtig, hättest ja einfach weiterzulesen können: callback=replay_event.set
Antworten