[Code-Review]UART.IRQ Nextion Display

Python auf Einplatinencomputer wie Raspberry Pi, Banana Pi / Python für Micro-Controller
Antworten
Benutzeravatar
Dennis89
User
Beiträge: 1698
Registriert: Freitag 11. Dezember 2020, 15:13

Guten morgen zusammen,

ich bitte euch mal wieder um ein kurzes Code-Review.
Im Netz habe ich eine Bibliothek zur Ansteuerung eines Nextion-Displays gefunden. Da ist eigentlich gar nichts spannendes drin. Mir hat aber gefehlt, dass ich die aktuelle Seite, die das Display anzeigt, nicht abfragen kann. Das habe ich als optionale Option eingebaut. Da ich mit UART nicht 100% vertraut bin und ich nicht weiß was man alles beachten muss, damit alle notwendigen Bytes gelesen werden bzw. das nichts unter geht und ich alle Möglichkeiten zur Verifizierung der/des Bytes eingebaut habe, wollte ich euch mal drüber schauen lassen.

Das Display sendet beim wechsel der Seite #\x02P\XX. XX steht für die Seitenzahl. Seite 1 wäre #\x02P\01.

Das Display ist an einen ESP32 angeschlossen. So wie ich es jetzt habe, macht es einen funktionsfähigen Eindruck. Ich zeige euch den ganzen Code. Die `if`-Abfrage in der `__init__` und `_set_current_page()` ist von mir.

Code: Alles auswählen

from machine import UART
from time import sleep_ms


class Nextion:
    END_COMMAND = b"\xff\xff\xff"
    PAGE_NUMBER_VERIFICATION = b"#\x02P"

    def __init__(self, baudrate, tx=10, rx=9, get_page_event=False, start_page=0):
        self.uart = UART(1, baudrate, tx=tx, rx=rx)
        if get_page_event:
            self.uart.irq(handler=self._set_current_page, trigger=UART.IRQ_RXIDLE)
            self.current_page = start_page

    def change_page(self, page):
        self.cmd(f"page {page}")

    def _set_current_page(self, _):
        line = self.uart.readline()
        if len(line) == 4 and line[:3] == self.PAGE_NUMBER_VERIFICATION:
            self.current_page = int(line[3:].hex())

    def cmd(self, command, write_and_read=True):
        self.uart.write(command)
        self.uart.write(self.END_COMMAND)
        if write_and_read:
            sleep_ms(100)
            return self.uart.read()
        return None

    def sleep(self, delay):
        self.cmd(f"sleep={delay}")

    def reset(self):
        self.cmd("rest")

    def brightness(self, brightness):
        self.cmd(f"dim={brightness}")

    def read(self, raw=False):
        output = self.uart.read()
        if raw:
            return output
        try:
            return output.decode("ascii")
        except AttributeError:
            return None
Danke und einen schönen Sonntag
Dennis
"When I got the music, I got a place to go" [Rancid, 1993]
Benutzeravatar
DeaD_EyE
User
Beiträge: 1324
Registriert: Sonntag 19. September 2010, 13:45
Wohnort: Hagen
Kontaktdaten:

Du könntest einen Empfangspuffer und uart.readinto nutzen. Ob das Display überhaupt ein Newline sendet, ist mir nicht bekannt. Beim Instruction Set habe ich nichts gefunden: https://nextion.tech/instruction-set
Der Vorteil des Puffers ist, dass der Speicher nichts so stark fragmentiert. Ansonsten wird jedes Mal neuer Speicher angefragt und erst nachdem Garbage Collector verschwindet das alte Objekt.
sourceserver.info - sourceserver.info/wiki/ - ausgestorbener Support für HL2-Server
Benutzeravatar
Dennis89
User
Beiträge: 1698
Registriert: Freitag 11. Dezember 2020, 15:13

Danke für den Hinweis. Das würde ich dann wie folgt umsetzen. Ich habe noch eine Funktion eingebaut, die es erlaubt, bestimmte Seiten zu sperren. Nun habe ich aber bedenken, dass die handler-Funktion zu groß ist?

Code: Alles auswählen

class Nextion:
    END_COMMAND = b"\xff\xff\xff"
    PAGE_NUMBER_VERIFICATION = b"#\x02P"

    def __init__(self, baudrate, tx=10, rx=9, get_page_event=False, start_page=0):
        self.uart = UART(1, baudrate, tx=tx, rx=rx)
        if get_page_event:
            self.uart.irq(handler=self._manage_next_page, trigger=UART.IRQ_RXIDLE)
            self.current_page = start_page
            self.buffer = bytearray()
            self.lock_pages = {}

    def _manage_next_page(self):
        self.uart.readinto(self.buffer, nbytes=4)
        if len(self.buffer) == 4 and self.buffer[:3] == self.PAGE_NUMBER_VERIFICATION:
            page = self.buffer[:3].hex()
            if page in self.lock_pages:
                self.change_page(self.current_page)
            else:
                self.current_page = int(self.buffer[3:].hex())
        self.buffer = bytearray()
Grüße
Dennis
"When I got the music, I got a place to go" [Rancid, 1993]
Benutzeravatar
DeaD_EyE
User
Beiträge: 1324
Registriert: Sonntag 19. September 2010, 13:45
Wohnort: Hagen
Kontaktdaten:

Habs mal ein wenig angepasst. Den Puffer jedes Mal neu zu erzeugen, ist unvorteilhaft. Dadurch wird dann auch wieder neuer Speicher zugewiesen. Die Methode readinto liefert die Anzahl der gelesenen Bytes zurück.

Wenn man mit slices auf ein bytearray zugreift, wird auch wieder kopiert (neues bytearray wird erzeugt).
Ein memoryview verhindert das.

Code: Alles auswählen

class Nextion:
    END_COMMAND = b"\xff\xff\xff"
    PAGE_NUMBER_VERIFICATION = b"#\x02P"

    def __init__(self, baudrate, tx=10, rx=9, get_page_event=False, start_page=0):
        self.uart = UART(1, baudrate, tx=tx, rx=rx)
        if get_page_event:
            self.uart.irq(handler=self._manage_next_page, trigger=UART.IRQ_RXIDLE)
            self.current_page = start_page
            
            # festen Puffer verwenden
            self.buffer = bytearray(256)
            
            # memoryview auf den buffer, um das Kopieren von Daten zu verhindern
            self.view = memoryview(self.buffer)
            
            self.lock_pages = {}

    def _manage_next_page(self):
        size = self.uart.readinto(self.buffer, nbytes=4)
        # sollte immer 4 bytes liefern, es seiden ein Timeout ist
        # konfiguriert, dann wird ein None zurückgegeben ansonsten
        # die Menge der gelesen bytes
        
        # ggf. macht es aber doch sinn mehr Daten zu lesen
        # da andere Meldungen mehr als 4 bytes in Anspruch nehmen
        
        # wenn man nbytes nicht setzt, werden maximal soviel bytes glesen, wie in den
        # Puffer passt oder soviele bytes, wie gerade verfügbar sind.

        # self.buffer[:3] erzeugt soweit ich weiß neue bytes,
        # d.h. es wird wieder kopiert
        # ein memoryview soll das verhindern
        if self.view[:3] == self.PAGE_NUMBER_VERIFICATION:
            # die Methode hex erzeugt einen neuen str
            # solange der IRQ nicht 'hard' ist, sollte es keine Probleme geben
            page = self.view[:3].hex()
            if page in self.lock_pages:
                self.change_page(self.current_page)
            else:
                self.current_page = int(self.view[3:].hex())
        # nicht erneut erstellen, sondern den alten Puffer nutzen
        # self.buffer = bytearray()
sourceserver.info - sourceserver.info/wiki/ - ausgestorbener Support für HL2-Server
Benutzeravatar
Dennis89
User
Beiträge: 1698
Registriert: Freitag 11. Dezember 2020, 15:13

Danke für die Anpassung. Werde ich morgen testen.

Wenn ich den Code lese, dann frage ich mich, wie `readinto` genau funktioniert. `buffer` ist jetzt voll mit 0en, dann muss `readinto` "hinten" 4 Bytes raus nehmen, damit "vorne" die 4 gelesenen Platz haben? Ist das richtig? Wenn ja, wo steht das, außer im Quellcode? Wenn das nicht stimmt, dann funktioniert der Code nicht. Wieso macht man `buffer` eigentlich so groß? 4 Bytes würden doch reichen?
Und noch mehr Fragen, `size` sollte ich noch auf `None` prüfen?
`page = self.view[:3].hex()` könnte ich so ändern `page = int(self.view[:3.hex())` und dann könnte ich `page` im `else`-Zweig nutzen?

Und noch für das Verständnis, `memoryview` verhindert, dass wenn ich "slice" eine Kopie erstellt wird? Weil das direkten Zugriff auf den Speicher hat? Wenn ja, wie wird das kopieren verhindert? Aus der Doku bin ich nicht wirklich schlau geworden, was das macht.

Danke und GRüße
Dennis
"When I got the music, I got a place to go" [Rancid, 1993]
Sirius3
User
Beiträge: 18368
Registriert: Sonntag 21. Oktober 2012, 17:20

@Dennis89: da wird nichts hinten raus verschoben, sondern einfach nur die ersten Bytes überschrieben. Der Buffer muß nur 4 Bytes groß sein, wenn da nur 4 Bytes gelesen werden.

Code: Alles auswählen

self.buffer = bytearray(4)
warum `self.buffer[:3].hex()`? Die ersten 3 Bytes sind doch immer identisch. Du meinst wohl self.buffer[3], oder?
Statt slicing sollte man startswith benutzen.

Code: Alles auswählen

size = self.uart.readinto(self.buffer)
if size == 4 and self.buffer.startswith(self.PAGE_NUMBER_VERIFICATION):
    page = self.buffer[3]
Benutzeravatar
Dennis89
User
Beiträge: 1698
Registriert: Freitag 11. Dezember 2020, 15:13

Danke für die Antwort.

Eigentlich meinte ich `self.buffer[3:].hex()`schreiben, aber mit dem frischen Kopf von heute natürlich `self.buffer[3]`:)
`startswith`weil da nichts kopiert wird uns es performanter ist?
Benötige ich dann noch `memoryview`?

Stand jetzt macht der Code, mit den Anpassungen, was er soll.

Grüße
Dennis
"When I got the music, I got a place to go" [Rancid, 1993]
Benutzeravatar
Dennis89
User
Beiträge: 1698
Registriert: Freitag 11. Dezember 2020, 15:13

Hallo,

mittlerweile habe ich herausgefunden, dass mit `sendme` die aktuelle Seite des Displays abgefragt werden kann.
In der Dokue steht:

Code: Alles auswählen

Current Page Number
Return Length: Fixed 5 bytes

0x66 0x01 0xFF 0xFF 0xFF

Returned when the sendme command is used.
0x01 is current page number
data: page 1
Not effected by the bkcmd setting
Mein absoluter, reduzierter lauffähiger Code:

Code: Alles auswählen

from machine import UART
from time import sleep_ms

RX_PIN = 32
TX_PIN = 16


class Nextion:
    END_COMMAND = b"\xff\xff\xff"
    NUMBER_VERIFICATION = b"f"

    def __init__(self, baudrate, tx=10, rx=9):
        self.uart = UART(1, baudrate, tx=tx, rx=rx)
        self.buffer = bytearray(5)

    def get_current_page(self):
        self.cmd("sendme")
        sleep_ms(100)
        size = self.uart.readinto(self.buffer)
        print(f"Buffer: {self.buffer}")
        if size == 5 and self.buffer.startswith(self.NUMBER_VERIFICATION):
            return self.buffer[1]

    def cmd(self, command):
        self.uart.write(command)
        self.uart.write(self.END_COMMAND)


def main():
    hmi = Nextion(9600, tx=TX_PIN, rx=RX_PIN)
    sleep_ms(100)
    while True:
        print(f"Page: {hmi.get_current_page()}")
        sleep_ms(1000)


if __name__ == "__main__":
    main()
Es funktioniert kurz und dann "rutscht" das `f` an das Ende des `bytearray`. Die Ausgabe bei dem Wechsel von Seite 3 auf Seite 1 und zurück auf Seite 3:

Code: Alles auswählen

Terminal ready

MPY: soft reboot
Buffer: bytearray(b'\x1a\xff\xff\xff\x00')
Page: None
Buffer: bytearray(b'f\x03\xff\xff\xff')
Page: 3
Buffer: bytearray(b'f\x03\xff\xff\xff')
Page: 3
Buffer: bytearray(b'f\x03\xff\xff\xff')
Page: 3
Buffer: bytearray(b'f\x03\xff\xff\xff')
Page: 3
Buffer: bytearray(b'f\x03\xff\xff\xff')
Page: 3
Buffer: bytearray(b'f\x03\xff\xff\xff')
Page: 3
Buffer: bytearray(b'f\x03\xff\xff\xff')
Page: 3
Buffer: bytearray(b'f\x03\xff\xff\xff')
Page: 3
Buffer: bytearray(b'f\x03\xff\xff\xff')
Page: 3
Buffer: bytearray(b'f\x03\xff\xff\xff')
Page: 3
Buffer: bytearray(b'f\x03\xff\xff\xff')
Page: 3
Buffer: bytearray(b'f\x03\xff\xff\xff')
Page: 3
Buffer: bytearray(b'f\x03\xff\xff\xff')
Page: 3
Buffer: bytearray(b'f\x03\xff\xff\xff')
Page: 3
Buffer: bytearray(b'f\x03\xff\xff\xff')
Page: 3
Buffer: bytearray(b'f\x03\xff\xff\xff')
Page: 3
Buffer: bytearray(b'f\x03\xff\xff\xff')
Page: 3
Buffer: bytearray(b'\x04\xff\xff\xfff')
Page: None
Buffer: bytearray(b'\x01\xff\xff\xfff')
Page: None
Buffer: bytearray(b'\x01\xff\xff\xfff')
Page: None
Buffer: bytearray(b'\x01\xff\xff\xfff')
Page: None
Buffer: bytearray(b'\x01\xff\xff\xfff')
Page: None
Buffer: bytearray(b'\x01\xff\xff\xfff')
Page: None
Buffer: bytearray(b'\x01\xff\xff\xfff')
Page: None
Buffer: bytearray(b'\x01\xff\xff\xfff')
Page: None
Buffer: bytearray(b'\x01\xff\xff\xfff')
Page: None
Buffer: bytearray(b'\x01\xff\xff\xfff')
Page: None
Buffer: bytearray(b'\x01\xff\xff\xfff')
Page: None
Buffer: bytearray(b'\x01\xff\xff\xfff')
Page: None
Buffer: bytearray(b'\x01\xff\xff\xfff')
Page: None
Buffer: bytearray(b'\x03\xff\xff\xfff')
Page: None
Buffer: bytearray(b'\x03\xff\xff\xfff')
Page: None
Buffer: bytearray(b'\x03\xff\xff\xfff')
Page: None
Buffer: bytearray(b'\x03\xff\xff\xfff')
Page: None
Buffer: bytearray(b'\x03\xff\xff\xfff')
Page: None
Liegt das an meinem Code? Könnt ihr euch das erklären? Das Display reagiert auf die Seitenänderung und das `x03` oder `x01` befindet sich auch zum richtigen Zeitpunkt in der Ausgabe, nur startet das Ganze, aus mir unerklärlichen Gründen, nicht mehr mit `f`. Die Stromversorgung vom ESP und dem Display ist über ein externes Netzteil sichergestellt und ich würde da eigentlich auch ein anders Verhalten erwarten.

Bitte gebt mir auch Rückmeldung, wenn ihr der Meinung seid, dass das nicht am Code liegt.

Vielen Dank und Grüße
Dennis
"When I got the music, I got a place to go" [Rancid, 1993]
Benutzeravatar
Dennis89
User
Beiträge: 1698
Registriert: Freitag 11. Dezember 2020, 15:13

Nachtrag: Das ganze passiert natürlich auch so, wenn *nur* das Display und sonst gar nichts an dem ESP32 angeschlossen ist. Damit schließe ich Störungen oder schlechte Verbindungen auch aus.
"When I got the music, I got a place to go" [Rancid, 1993]
Benutzeravatar
__blackjack__
User
Beiträge: 14324
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@Dennis89: Da scheint ein Byte beim empfangen verloren gegangen zu sein. Wenn so etwas passieren kann, dann muss man das lesen so implementieren, dass man sich nicht an der Byteanzahl orientiert, sondern an dem Nachrichten-End-Marker also den drei 0xFF, um einzelne Nachrichten zu trennen, und dann die verwerfen, mit denen man nichts anfangen kann. Die sind ja zum synchronisieren mit dem Datenstrom gedacht.
„Debugging is twice as hard as writing the code in the first place. Therefore, if you write the code as cleverly as possible, you are, by definition, not smart enough to debug it.“ — Brian W. Kernighan
Benutzeravatar
Dennis89
User
Beiträge: 1698
Registriert: Freitag 11. Dezember 2020, 15:13

Danke für die Antwort.

Mir ist gerade noch etwas aufgefallen. Ich habe das `bytearry` auf 7 vergrößert und wenn ich von Seite 3 auf 1 wechsle, dann erhalte ich eine nicht valide Antwort und die macht mir meinen buffer "kaputt".

Code: Alles auswählen

Buffer: bytearray(b'f\x03\xff\xff\xff\x00\x00')
Page: 3
Buffer: bytearray(b'\x04\xff\xff\xfff\x01\xff')
Page: None
Buffer: bytearray(b'\xff\xfff\x01\xff\xff\xff')
Page: None
Buffer: bytearray(b'f\x01\xff\xff\xff\xff\xff')
Page: 1
Dadurch, dass das Arry jetzt größer ist, wird der Effekt überspielt, aber wenn das Array nur Platz für 5 hat, weil ich auch nicht mehr erwarte, dann startet mein Buffer nicht mehr mit `f` obwohl es die gesendete Antwort macht. Eine Antwort, die mit `0x40` beginnt ist in der Doku nicht beschrieben.

Wenn ich nun auf das erste und die drei letzten Bytes prüfe, dann mache ich den Buffer einfach um 2 größer, weil das halt passt?

Grüße
Dennis
"When I got the music, I got a place to go" [Rancid, 1993]
Benutzeravatar
Dennis89
User
Beiträge: 1698
Registriert: Freitag 11. Dezember 2020, 15:13

Augenscheinlich funktioniert es mit folgendem Code:

Code: Alles auswählen

class Nextion:
    END_COMMAND = b"\xff\xff\xff"
    NUMBER_VERIFICATION = b"f"
    BUTTON_VERIFICATION = b"#\x01P"
    PAGE_PATTERN = r"66\d\dfffff"

    def __init__(self, baudrate, tx=10, rx=9):

        self.uart = UART(1, baudrate, tx=tx, rx=rx)
        self.buffer = bytearray(8)

    def get_current_page(self):
        self.cmd("sendme", write_and_read=False)
        sleep_ms(100)
        _ = self.uart.readinto(self.buffer)
        if match := search(self.PAGE_PATTERN, self.buffer.hex()):
            return int(match.group(0)[2:4])
Das mit `slice` ist wieder nicht so schön und ich fand keinen anderen Weg, als `hex()` anzuwenden um etwas, für mich, durchsuchbares zu haben.


Grüße
Dennis
"When I got the music, I got a place to go" [Rancid, 1993]
Sirius3
User
Beiträge: 18368
Registriert: Sonntag 21. Oktober 2012, 17:20

Aus einem Hex-Wert eine Dezimalzahl zu machen, ist wohl nicht gewollt.
Außerdem fehlt bei Deiner Suche ein f und auch dann könnte alles um 4 bit verschoben sein.
Wenn Du unbedingt reguläre Ausdrücke verwenden willst, dann benutz doch gleich die Bytes:

Code: Alles auswählen

match = re.search(b"f(.)\xff\xff\xff", self.buffer, re.DOTALL)
return match.group(0)[0] if match else None
Benutzeravatar
Dennis89
User
Beiträge: 1698
Registriert: Freitag 11. Dezember 2020, 15:13

Danke, das sieht schöner aus. Funktioniert unter Python auch (mit dem Index 1 anstatt 0), allerdings unter MicroPython nicht:

Code: Alles auswählen

Connecting..........Traceback (most recent call last):
  File "main.py", line 335, in <module>
  File "main.py", line 311, in main
  File "hmi.py", line 69, in start_up
  File "Nextion.py", line 34, in get_current_page
TypeError: can't convert 'bytearray' object to str implicitly
Es muss auch nicht unbedingt ein regulärer Ausdruck sein. Das war nur der schönste Code, der mir eingefallen ist.

Grüße
Dennis
"When I got the music, I got a place to go" [Rancid, 1993]
Antworten