[Code-Review]UART.IRQ Nextion Display

Python auf Einplatinencomputer wie Raspberry Pi, Banana Pi / Python für Micro-Controller
Antworten
Benutzeravatar
Dennis89
User
Beiträge: 1690
Registriert: Freitag 11. Dezember 2020, 15:13

Guten morgen zusammen,

ich bitte euch mal wieder um ein kurzes Code-Review.
Im Netz habe ich eine Bibliothek zur Ansteuerung eines Nextion-Displays gefunden. Da ist eigentlich gar nichts spannendes drin. Mir hat aber gefehlt, dass ich die aktuelle Seite, die das Display anzeigt, nicht abfragen kann. Das habe ich als optionale Option eingebaut. Da ich mit UART nicht 100% vertraut bin und ich nicht weiß was man alles beachten muss, damit alle notwendigen Bytes gelesen werden bzw. das nichts unter geht und ich alle Möglichkeiten zur Verifizierung der/des Bytes eingebaut habe, wollte ich euch mal drüber schauen lassen.

Das Display sendet beim wechsel der Seite #\x02P\XX. XX steht für die Seitenzahl. Seite 1 wäre #\x02P\01.

Das Display ist an einen ESP32 angeschlossen. So wie ich es jetzt habe, macht es einen funktionsfähigen Eindruck. Ich zeige euch den ganzen Code. Die `if`-Abfrage in der `__init__` und `_set_current_page()` ist von mir.

Code: Alles auswählen

from machine import UART
from time import sleep_ms


class Nextion:
    END_COMMAND = b"\xff\xff\xff"
    PAGE_NUMBER_VERIFICATION = b"#\x02P"

    def __init__(self, baudrate, tx=10, rx=9, get_page_event=False, start_page=0):
        self.uart = UART(1, baudrate, tx=tx, rx=rx)
        if get_page_event:
            self.uart.irq(handler=self._set_current_page, trigger=UART.IRQ_RXIDLE)
            self.current_page = start_page

    def change_page(self, page):
        self.cmd(f"page {page}")

    def _set_current_page(self, _):
        line = self.uart.readline()
        if len(line) == 4 and line[:3] == self.PAGE_NUMBER_VERIFICATION:
            self.current_page = int(line[3:].hex())

    def cmd(self, command, write_and_read=True):
        self.uart.write(command)
        self.uart.write(self.END_COMMAND)
        if write_and_read:
            sleep_ms(100)
            return self.uart.read()
        return None

    def sleep(self, delay):
        self.cmd(f"sleep={delay}")

    def reset(self):
        self.cmd("rest")

    def brightness(self, brightness):
        self.cmd(f"dim={brightness}")

    def read(self, raw=False):
        output = self.uart.read()
        if raw:
            return output
        try:
            return output.decode("ascii")
        except AttributeError:
            return None
Danke und einen schönen Sonntag
Dennis
"When I got the music, I got a place to go" [Rancid, 1993]
Benutzeravatar
DeaD_EyE
User
Beiträge: 1324
Registriert: Sonntag 19. September 2010, 13:45
Wohnort: Hagen
Kontaktdaten:

Du könntest einen Empfangspuffer und uart.readinto nutzen. Ob das Display überhaupt ein Newline sendet, ist mir nicht bekannt. Beim Instruction Set habe ich nichts gefunden: https://nextion.tech/instruction-set
Der Vorteil des Puffers ist, dass der Speicher nichts so stark fragmentiert. Ansonsten wird jedes Mal neuer Speicher angefragt und erst nachdem Garbage Collector verschwindet das alte Objekt.
sourceserver.info - sourceserver.info/wiki/ - ausgestorbener Support für HL2-Server
Benutzeravatar
Dennis89
User
Beiträge: 1690
Registriert: Freitag 11. Dezember 2020, 15:13

Danke für den Hinweis. Das würde ich dann wie folgt umsetzen. Ich habe noch eine Funktion eingebaut, die es erlaubt, bestimmte Seiten zu sperren. Nun habe ich aber bedenken, dass die handler-Funktion zu groß ist?

Code: Alles auswählen

class Nextion:
    END_COMMAND = b"\xff\xff\xff"
    PAGE_NUMBER_VERIFICATION = b"#\x02P"

    def __init__(self, baudrate, tx=10, rx=9, get_page_event=False, start_page=0):
        self.uart = UART(1, baudrate, tx=tx, rx=rx)
        if get_page_event:
            self.uart.irq(handler=self._manage_next_page, trigger=UART.IRQ_RXIDLE)
            self.current_page = start_page
            self.buffer = bytearray()
            self.lock_pages = {}

    def _manage_next_page(self):
        self.uart.readinto(self.buffer, nbytes=4)
        if len(self.buffer) == 4 and self.buffer[:3] == self.PAGE_NUMBER_VERIFICATION:
            page = self.buffer[:3].hex()
            if page in self.lock_pages:
                self.change_page(self.current_page)
            else:
                self.current_page = int(self.buffer[3:].hex())
        self.buffer = bytearray()
Grüße
Dennis
"When I got the music, I got a place to go" [Rancid, 1993]
Benutzeravatar
DeaD_EyE
User
Beiträge: 1324
Registriert: Sonntag 19. September 2010, 13:45
Wohnort: Hagen
Kontaktdaten:

Habs mal ein wenig angepasst. Den Puffer jedes Mal neu zu erzeugen, ist unvorteilhaft. Dadurch wird dann auch wieder neuer Speicher zugewiesen. Die Methode readinto liefert die Anzahl der gelesenen Bytes zurück.

Wenn man mit slices auf ein bytearray zugreift, wird auch wieder kopiert (neues bytearray wird erzeugt).
Ein memoryview verhindert das.

Code: Alles auswählen

class Nextion:
    END_COMMAND = b"\xff\xff\xff"
    PAGE_NUMBER_VERIFICATION = b"#\x02P"

    def __init__(self, baudrate, tx=10, rx=9, get_page_event=False, start_page=0):
        self.uart = UART(1, baudrate, tx=tx, rx=rx)
        if get_page_event:
            self.uart.irq(handler=self._manage_next_page, trigger=UART.IRQ_RXIDLE)
            self.current_page = start_page
            
            # festen Puffer verwenden
            self.buffer = bytearray(256)
            
            # memoryview auf den buffer, um das Kopieren von Daten zu verhindern
            self.view = memoryview(self.buffer)
            
            self.lock_pages = {}

    def _manage_next_page(self):
        size = self.uart.readinto(self.buffer, nbytes=4)
        # sollte immer 4 bytes liefern, es seiden ein Timeout ist
        # konfiguriert, dann wird ein None zurückgegeben ansonsten
        # die Menge der gelesen bytes
        
        # ggf. macht es aber doch sinn mehr Daten zu lesen
        # da andere Meldungen mehr als 4 bytes in Anspruch nehmen
        
        # wenn man nbytes nicht setzt, werden maximal soviel bytes glesen, wie in den
        # Puffer passt oder soviele bytes, wie gerade verfügbar sind.

        # self.buffer[:3] erzeugt soweit ich weiß neue bytes,
        # d.h. es wird wieder kopiert
        # ein memoryview soll das verhindern
        if self.view[:3] == self.PAGE_NUMBER_VERIFICATION:
            # die Methode hex erzeugt einen neuen str
            # solange der IRQ nicht 'hard' ist, sollte es keine Probleme geben
            page = self.view[:3].hex()
            if page in self.lock_pages:
                self.change_page(self.current_page)
            else:
                self.current_page = int(self.view[3:].hex())
        # nicht erneut erstellen, sondern den alten Puffer nutzen
        # self.buffer = bytearray()
sourceserver.info - sourceserver.info/wiki/ - ausgestorbener Support für HL2-Server
Benutzeravatar
Dennis89
User
Beiträge: 1690
Registriert: Freitag 11. Dezember 2020, 15:13

Danke für die Anpassung. Werde ich morgen testen.

Wenn ich den Code lese, dann frage ich mich, wie `readinto` genau funktioniert. `buffer` ist jetzt voll mit 0en, dann muss `readinto` "hinten" 4 Bytes raus nehmen, damit "vorne" die 4 gelesenen Platz haben? Ist das richtig? Wenn ja, wo steht das, außer im Quellcode? Wenn das nicht stimmt, dann funktioniert der Code nicht. Wieso macht man `buffer` eigentlich so groß? 4 Bytes würden doch reichen?
Und noch mehr Fragen, `size` sollte ich noch auf `None` prüfen?
`page = self.view[:3].hex()` könnte ich so ändern `page = int(self.view[:3.hex())` und dann könnte ich `page` im `else`-Zweig nutzen?

Und noch für das Verständnis, `memoryview` verhindert, dass wenn ich "slice" eine Kopie erstellt wird? Weil das direkten Zugriff auf den Speicher hat? Wenn ja, wie wird das kopieren verhindert? Aus der Doku bin ich nicht wirklich schlau geworden, was das macht.

Danke und GRüße
Dennis
"When I got the music, I got a place to go" [Rancid, 1993]
Sirius3
User
Beiträge: 18362
Registriert: Sonntag 21. Oktober 2012, 17:20

@Dennis89: da wird nichts hinten raus verschoben, sondern einfach nur die ersten Bytes überschrieben. Der Buffer muß nur 4 Bytes groß sein, wenn da nur 4 Bytes gelesen werden.

Code: Alles auswählen

self.buffer = bytearray(4)
warum `self.buffer[:3].hex()`? Die ersten 3 Bytes sind doch immer identisch. Du meinst wohl self.buffer[3], oder?
Statt slicing sollte man startswith benutzen.

Code: Alles auswählen

size = self.uart.readinto(self.buffer)
if size == 4 and self.buffer.startswith(self.PAGE_NUMBER_VERIFICATION):
    page = self.buffer[3]
Antworten