Fehler abhängig von der Menge des Codes inkl. Kommentare

Python auf Einplatinencomputer wie Raspberry Pi, Banana Pi / Python für Micro-Controller
Antworten
ragnar
User
Beiträge: 5
Registriert: Mittwoch 25. Dezember 2024, 06:40

Hallo,

bei einer inzwischen sehr lang andauernden Fehlersuche stellte ich nun fest, das ein Fehlverhalten auftritt, sobald zuviel Code oder Kommentare in der "main.py" enthalten sind. Nun habe ich keinen weiteren Ansatz mehr, die Ursache ausfindig zu machen und hoffe auf Ideen von euch!

Ich habe kürzlich erst angefangen mit Python zu programmieren, konkret mit Micropython auf einem ESP32 WROOM 32. Ich habe ein Modul / Klasse geschrieben, die Sensormessungen verarbeitet, im nächsten Schritt sollte ein asyncroner Webserver mit "Microdot" hinzukommen, dabei tauchte das Problem dann auf, der Webserver reagierte zuerst nicht auf anfragen.
Nachdem ich viele Codezeilen, vor dem Start des Webservers auskommentiert und gekürzt (gelöscht) habe, reagierte der Webserver plötzlich auf eingehende Anfragen, er tat also was er tun sollte. Inzwischen kann ich sicher sagen das es nichts mit dem Code zu tun hat, der vor dem Start des Webservers ausgeführt wird, sondern mit der Menge an Code oder auch Kommentare, die enthalten sind!
Zu dieser Erkenntnis kam ich als ich an einem Punkt, wo der Webserver auf Anfragen reagiert, eine kleine asyncrone Funktion hinzufügte, die eine LED auf dem Board blinken lässt. Dies, um prüfen ob irgendwo ein Task blockiert und den Webserver lahmlegt. Die LED blinke fleißig, aber der Webserver reagierte wieder nicht auf Anfragen, dann kommerntierte ich die neu geschriebene Blinkfunktion aus, der Webserver reagierte aber trotzdem nicht. Erst als ich den auskommentierten Code rauslöschte, reagierte der Webserver wieder auf anfragen. Ähnliches Verhalten trat zuvor immer wieder auf, allerdings konzentierte ich mich dann immer auf den Code, den ich rausgenommen habe, da ich dort die Ursache vermutete. Nun habe ich Code an anderer Stelle entfernt und die asyncrone Blinkfunktion wieder hinzugefügt, der Webserver reagiert jetzt auf Anfragen und die LED blinkt. Somit kann ich entgültig die Ursache im Code ausschließen, der Fehler muss an anderer Stelle gesucht werden.

Weitere Informationen:
- Der Fehler tritt nicht bei überschreiten einer genauen Anzahl von Zeilen oder Zeichen in der main.py auf.
- Das Entfernen eines imports, der nirgenswo anders importiert wird ändert was am Verhalten
- Importierte Module, die um ein vielfaches größer sind wie meine main.py, funktionieren (z.B. microdot.py)
- Vor dem Start habe ich den belegten & freien Speicher des Ram, sowie vom Flashspeicher auf der Konsole ausgegeben, dort deutet nichts auf ein Platzproblem hin.
- Ich habe andere Betriebssysteme geflasht, ohne Änderung des Verhalten. Installiert ist Micropython 1.24 für den ESP32 WROOM 32.
- Zuvor, bei der Programmierung meiner "Sensorlogger" Klasse stellte ich fest, das die größe von selbst erzeugten Dateien mit Binärdaten, beim hinzufügen weiterer Daten keine Änderung Ihrer größe aufweisen (os.statvfs('pfad_der_log_datei'). Könnte es sein die Partiotion nicht korrekt ist?

Ich habe keinen Ansatz mehr, wo ich noch suchen soll und bin kurz davor das Handtuch zu schmeisen. Solltest du bis hierher gelesen haben, möchte ich dir dafür schon mal recht herzlichsten danken, für deine Aufmerksamkeit. Solltest du eine Idee haben, woran es liegen könnte, wäre ich dir weiterhin dankbar, wenn du diese mit mir teilst.

Hier der Code von 'main.py':

Code: Alles auswählen

from microdot_asyncio import Microdot, Response, send_file
from microdot_utemplate import render_template
from scd4x_sensirion import SCD4xSensirion
from sensor_pack.bus_service import I2cAdapter
from sensordata import Sensorlogger
from machine import I2C, Pin, Timer
import time, struct, os, asyncio

scd4x = SCD4xSensirion(I2cAdapter(I2C(0, scl=Pin(22), sda=Pin(21), freq=400_000)))
scd4x.set_measurement(start=False, single_shot=False)
scd4x.set_temperature_offset(7)
masl = 230  # Meter Above Sea Level
scd4x.set_altitude(masl)
scd4x.set_measurement(start=True, single_shot=False)      # periodic start
co2=Sensorlogger('CO2', 'ppm', 'h')
temp=Sensorlogger('Temperatur', '°C', 'e', 1)
humity=Sensorlogger('rel. Luftfeuchte', '%', 'e', 1)

def periodic_sensor_reading(timer):
    scd4x_werte = scd4x.get_meas_data()
    co2.wert = scd4x_werte[0]
    temp.wert = scd4x_werte[1]
    humity.wert = scd4x_werte[2]
    print("co2: " + str(co2.wert) + " | temp: " + str(temp.wert) + " | humity: " + str(humity.wert))

Sensorlogger.start_logging(periodic_sensor_reading, 5000, 1)

# Initialize MicroDot
app = Microdot()
Response.default_content_type = 'text/html'  

# root route
@app.route('/')
async def index(request):
    print('/ | index()')
    return render_template('/index.html')

@app.route('/getData')
async def read_sensor(request):
    print('/getData | read_sensor()')
    start,logs=(Sensorlogger.get_log(787604040,787604900))
    return {'start': start, 'logging_period': Sensorlogger.logging_period_time()*60,'logs': logs}

# Static CSS/JSS
@app.route("/static/<path:path>")
def static(request, path):
    print('/static/<path:'+path+'> | static()')
    if ".." in path:
        # directory traversal is not allowed
        return "Not found", 404
    return send_file("static/" + path)

# shutdown
@app.get('/shutdown')
def shutdown(request):
    request.app.shutdown()
    return 'The server is shutting down...'

#define LED
led=Pin(2, Pin.OUT)

async def toggle_led():
    while True:
        led.value(not led.value())
        await asyncio.sleep_ms(500)

async def main():
    server=asyncio.create_task(app.start_server())
    asyncio.create_task(toggle_led())
    print('Microdot start')
    await server    

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print('Microdot trow exeption')
        pass
Viele Grüße und Frohe Weihnachten
Andi
Benutzeravatar
__blackjack__
User
Beiträge: 13919
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@ragnar: Das ist asynchroner Code, da kann man nicht so einfach ausschliessen, dass es nicht am Code liegt, denn da können die ganzen Fehler auftreten die man bei nebenläufiger Programmierung halt so hat. Und eben auch mit der unangenehmen Eigenschaft das Fehler mal auftreten und mal nicht, und der Auslöser ob zum Beispiel race conditions oder deadlocks auftreten, an ganz komischen Sachen hängen können.

Ich würde da als erstes mal alle Microdot-bezogenen Funktionen wo man sich aussuchen kann ob man die normal oder ``async`` definiert, die ``async`` definieten, damit die nichts blockieren können.

Diese Sensorgeschichte sieht auch so aus als würde sie etwas asynchron machen, das heisst da können auch Fehler drin sein die blockieren.

`Sensorlogger` sieht auch ein bisschen schräg in der Handhabung aus, mit der Klassenmethodie die eine Rückruffunktion erhält, die dann auf global definierten `Sensorlogger`-Objekten von aussen den `wert` setzen. Diese ganzen globalen Variablen sollten da nicht sein.
“I am Dyslexic of Borg, Your Ass will be Laminated” — unknown
Benutzeravatar
DeaD_EyE
User
Beiträge: 1205
Registriert: Sonntag 19. September 2010, 13:45
Wohnort: Hagen
Kontaktdaten:

Die Funktion periodic_sensor_reading soll offenbar durch einen Timer regelmäßig aufgerufen werden. Hier ist das Problem, dass genau dort eine Race-Condition besteht.

Hier gibt es mehr Informationen: https://github.com/peterhinch/micropyth ... th-asyncio

Man könnte die Funktion aber auch einfach asynchron als Task laufen lassen, wie du es schon mit der LED machst.
Der Hardware-Zugriff ist aber immer synchron und blockiert, bis das Ergebnis geliefert wird oder ein Timeout ausgelöst wird. Ein Timeout löst dann eine Exception aus, meist einen OSError.
Größtenteils sind die Zugriffe auf die Hardware so kurz, dass man das nicht merkt. Das merkt man nur, wenn die Hardware nicht funktioniert und z.B. ein an I2C angeschlossener Sensor nicht reagiert.
sourceserver.info - sourceserver.info/wiki/ - ausgestorbener Support für HL2-Server
ragnar
User
Beiträge: 5
Registriert: Mittwoch 25. Dezember 2024, 06:40

Vielen Danke blackjack und dead eye für eure Antworten

Stimmt genau dead eye, ein Timer ruft die Funktion periodic_sensor_reading auf, diese kann problemlos auch als asynchrone Funktion laufen ohne von einem Timer Interrupt sofort ausgeführt werden zu müssen, das muss ich auch zwingend ändern, aber aus anderen Gründen. In der Sensorlogger Klasse gibt es einen weiteren Timer, bei dem eine Zeitgenaue Ausführung jedoch wichtig ist, zudem besteht der Sinn dieses Projektes in erster Linie darin den Umgang mit asyncronem Tasks scheduling und brutal dazwischendrängenden Interrupts zu testen, um den Umgang zu lernen.
Mein Plan war es, in der vom Timer aufgerufenen Funktion ausschließlich eine kurze Berechnung durchzuführen und das Ergebniss zwischenzuspeichern, dann wird noch ein Task erstellt, der wenn er vom scheduler aufgerufen wird, die weitere Verarbeitung (binäre Daten in einer Datei speichern) durchführt.
Aber auch dabei muss ich daran denken das der Intterupt genau in dem Moment zuschlagen kann, wenn ein Task gerade dabei ist die Variablen zu verändern, mit denen die Interrupt Funktion rechnet. Ist das mit "race conditions" gemeint?
Ich habe mal deinen Link aufgerufen, aber zunächst nur kurz überflogen, das ist genau das Thema was mich Interessiert, ich werde mir das ganau durchlesen / studieren!

@blackjack: Die Klasse Sensorlogger besteht überwiegend aus statischen Attributen und Methoden, dort werden verschiedene sensoren gemeinsam und auf gleicher weise bearbeitet. Lediglich die einzelnen Sensoren sind Instanzen der Klasse. Ich habe das so noch nie gemacht, für mich ist es ein Weg den ich ausprobiere, mit dem Wissen das es vielleicht nur ein Holzweg ist, aber bisher macht sie bestens, das was sie soll.

Das Problem was jetzt auftritt hat aber, so denke ich relativ sicher, nichts mit den Tücken der asyncroner Programmierung zu tun. In den letzten Tagen der Fehlersuche habe ich in allen Funktionen die am Code beteiligt sind, sämliche asynchrone Operationen (es gibt noch keine weiteren), oder Blockaden durch sleep und wie erwähnt die beiden Timer deaktiviert, um das auschließen zu können, dabei habe ich mir auch sämtlichen Code angeschaut, der durch import verwendet wird und nicht von mir geschrieben ist.
Das Problem, das keinerlei Anfragen vom Webserver bedient werden, tritt entweder immer auf oder garnicht, aber nie sporadisch. Heute morgen war es so das nur durch entfernen von ein paar Zeilen Kommentar des Problem verschwand, der Webserver reagierte danach immer. Dann habe ich die Kommentar Zeilen wieder eingefügt und erneut mehrfach getestet, hier reagierte der Webserver nie. In den Tagen davor waren es Codezeilen die ich auskommentiert oder ganz rausgenommen habe, wenn danach das Problem verschwand, suchte ich immer nach einem Zusammenhang, mit der entfernten Codezeile, aber wie kann ein Kommentar das Verhalten des Codes verändern? Das ergibt doch überhaupt keinen Sinn, aus diesem Grund denke inzwischen der Fehler ist nicht im Code finden.
Ich gestehe aber gerade konnte es ich es nur mit Kommentarzeilen nicht erneut reproduzieren konnte, erst nach wieder hinzufügen von Code zu Debug-Zwecken konnte ich das Problem wieder hervorrufen (im oben geposteten Code, tritt das Problem nicht auf!).

Hier mal eine Änderung zum Code von oben, die das Problem wieder erscheinen lässt:
"sensorlen=len(Sensorlogger.sensor_attribute())" lässt den Webserver wieder einschlafen(?), aber die LED blinkt weiter! Wenn die asyncrone Funktion "toggle_led" läuft, zeigt das doch, das nichts blockiert und Tasks vom scheduler ausgeführt werden, bis auf die von Microdot dekorierten nicht, das ist merkwürdeg.
Auch sehe ich keinen Hinweis daraus, warum diese Code Zeile und die aufgerufene Funktion für das veränderte Fehlhalten verantworlich sein kann. Es waren viele andere Codezeilen zuvor, die für das erscheinen oder nicht erscheinen des Problems sorgten und in denen ich den Fehler suchte.

Code: Alles auswählen

@app.route('/getData')
async def read_sensor(request):
    print('/getData | read_sensor()')
    start,logs=(Sensorlogger.get_log(787604040,787604900))
    print(start)
    print(logs)
    le=time.localtime(start+Sensorlogger.logging_period_time()*60*len(logs[0]))
    ls=time.localtime(start)
    print('start: %02d.%02d, %02d:%02d | end: %02d.%02d, %02d:%02d' %(ls[2], ls[1], ls[3], ls[4], le[2], le[1], le[3], le[4]))
    sensorlen=len(Sensorlogger.sensor_attribute())
    '''
    for index in range(0, len(logs[0])):
        t=start+index*Sensorlogger.logging_period_time()*60
        lt=time.localtime(t)
        string='%09d | %02d:%02d | ' %(t,lt[3],lt[4])
        for sindex in range(0, sensorlen):
            if logs[sindex][index] is not None:
                if sindex==0:
                    string+='%04d | ' %(logs[sindex][index])
                else:
                    string+='%02.1f | ' %(logs[sindex][index])
            else:
                string+='     | '
        print(string)'''
    return {'start': start, 'logging_period': Sensorlogger.logging_period_time()*60,'logs': logs}

Code: Alles auswählen

    @staticmethod
    def sensor_attribute():
        attr=[]
        for sensor in Sensorlogger.__sensoren:
            attr.append((sensor.__name, sensor.__einheit, sensor.__format, sensor.__kommastellen))
        return attr
Benutzeravatar
__blackjack__
User
Beiträge: 13919
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@ragnar: In den Rückruffunktionen von Timern darf kein Speicher angefordert werden.
Das ist in der gezeigten `periodic_sensor_reading()` definitiv verletzt. So etwas muss nicht zum Absturz führen, kann also durchaus scheinbar funktionieren, kann aber auch beliebige Effekte haben. Und es ist auch nicht definiert woran es liegt ob das nun funktioniert oder auch nicht. Undefiniertes Verhalten halt.

Die Klasse `Sensorlogger` ist also keine Klasse, oder zumindest ein frankenstein'sches Monster. Das wäre dann einfach falsch. Klar kann Code der Sprachmittel falsch verwendet auch laufen, das macht den aber nicht besser.

Das zeigt auch der neue Code wo Attribute mit doppelten führenden Unterstrichen zu sehen sind, und erkennbar ist, dass diese Klasse eine globale Liste von Sensoren, also eine weitere globale Variable beinhaltet. Und anscheinend auch das Log selbst wird so angesammelt‽

Der Code sieht auch insgesamt nicht nach Python aus. Lauter magische Indexzugriffe, eine Zeichenkette die in einer Schleife durch wiederholtes ``+=`` aufgebaut wird, und das iterieren über einen Index von 0 bis zur Längte (minus 1) um damit dann an die einzelnen Elemente zu gelangen, was in Python ein „anti pattern“ ist. Man kann direkt über die Elemente von Sequenzen iterieren, ohne den Umweg über einen Index.

Den ``%``-Operator würde ich auch nicht ohne eine gute Begründung verwenden. Den hat man in Python 2 schon nicht mehr verwendet, auch da gab es schon die `format()`-Methode und jetzt mit f-Zeichenketten gibt es dafür noch weniger Gründe.

Wie hast Du denn alle asynchronen Operationen entfernt und dann festgestellt das der Webserver Anfragen nicht beantwortet? Das geht doch gar nicht, denn der *Webserver* ist doch asynchron. Und den *muss* man auch asynchron programmieren, also immer brav ``async``-Funktionen verwenden, denn wenn man das nicht tut ist der ja trotzdem noch nebenläufig und nimmt dann Threads, was auf Mikrokontrollern gar nicht oder nur sehr eingeschränkt geht.
“I am Dyslexic of Borg, Your Ass will be Laminated” — unknown
ragnar
User
Beiträge: 5
Registriert: Mittwoch 25. Dezember 2024, 06:40

@blackjack: Nun denn, da ich aufgrund des Problemes nicht weiterkomme, werde ich deine Kritikpunkte erst nehmen und erst mal alles neu schreiben, mit dem Ziel dem Python Standart näher zu kommen, um fehlerunanfälligen und besser lesbaren Code zu schreiben. Es macht keinen Sinn mit komplexen Dingen zu beginnen, ohne die Basics zu beherschen, dennoch möchte ich gerne paralell an der asynchronen Programmierung weiterarbeiten. Zwar habe ich vor den ersten geschrieben Zeilen in Python sehr viel gelesen, wohl aber grundlegendes nur beiläufig üerflogen und den Basics insgsamt kaum Beachtung geschenkt.
Ich schreibe Sensorlogger einaml komplett mal neu, 2 eigenständige Klassen, 'Sensor' & 'Logger' sorgen dann dafür das es keine globalen Variablen und Methoden mehr gibt und nebenbei ein wesentlich flexibleren Einsatz imöglich wird..
Auch werde ich mich bemühen andere Wege zu gehen wie die gewohnten aus javascript (z.B. Iterieren ohne Index zu nutzen). Einige Dinge wollte ich nach Python Standart umsetzen, jedoch war dies in Micropython im ersten Moment so nicht umsetzbar, dabei habe ich schnell den Weg des geringsten Wiederstand eingeschlagen, indem ich rumprobierte, bis es irgendwie funktionierte, ohne jegliche Beachtung ob das der richtige Weg ist. Ich war der Meinung, ich müsse erst mal was schreiben, da ich zuvor schon genug gelesen habe. Das bringt aber nix, man eignet sich dann schnell und viel unsauberen Mist an, der nur Probleme schafft. Somit heist es bei mir erst mal "restart" und zurück auf Anfang!
Auch lasse ich zunächst mal die Finger von Rückruffunktionen aus einem Interrupts heraus, anstatt dem Timer kann das auch mit asynchronen Tasks umgesetzt werden. Wenn die Zeit des letzten Durchlaufs der Funktion gespeichert wird, kann durch Berücksichtigung der Tatsächlich verstrichenen Zeit zwischen den Durchläufen ebenfalls ganau gerechnet werden, eine Zeitgenaue Ausführung ist also nicht zwingend erforderlich.
Aber dennoch würde ich gerne Verstehen wie es mit einer Timer-Rückruffunktion gehen könnte. Wäre es möglich Speicher für eine Variable bei der Initalisierung in der benötigten Größe anzufordern und diesen lediglich neu zu beschreiben, in der Interrupt Rückruffunktion?
Ich werde wohl ein paar Tage zu tun haben, wenn ich einmal alles neu geschrieben habe würde ich das gerne hier posten, denn sicherlich gibt es dann immernoch einiges Verbesserungsbedürftiges. Zudem werde ich Rückmeldung geben, ob das eigentliche Problem behoben wurde oder es immer noch auftaucht.
Zunächst mal Danke an euch beiden!
ragnar
User
Beiträge: 5
Registriert: Mittwoch 25. Dezember 2024, 06:40

Nach Umbau, weiterhin gleiches problematisches Verhalten!

Jetzt gibt es keine Timer mehr, dafür jede Menge asynchroner Funktionen, zwei davon ersetzen mit einer Endlosschleife und asynchronem sleep, die alten Timer Rückruffunktionen. Eine in 'main.py' liest alle 5 Sekunden Messungen aus dem I2C Sensor, die andere in der Klasse 'Logger' erechnet immer im gleichen Intervall je nach Initialisierung alle 1 bis 60 Minuten den Durchschnitt der Messungen und speichert diesen binär in einer Datai auf dem Flashspeicher.

In der ´russischen´ Klasse 'SCD4xSensirion' gibt es eine private Funktion '_send_command', die einen normalen sleep hatte, um nötige Pausen zwischen den Abfragen an den I2C Sensor gewährleisten zu können. Um diesen einen sleep, asynchron zu gestalten, musste ich die komplette Klasse asynchron schreiben. Zum einen musste ich alle Funktionen die '_send_command' aufrufen asynchron machen, dabei aber dennoch gewährleisten das '_send_command' seine Abfragepause an den I2C Sensor weiter einhält. Dazu wird am Anfang in der Funktion in einer Schleife solange gewartet bis '_self._waiting==False' ist, dann erst wird die Schleife verlassen und '_waiting' sofort, bis zum Ende der Funktion auf True gesetzt, damit die Funktion nicht gleichzeitig von mehreren Task Ablaufen kann.
Hier bin ich mir sehr unsicher, ob dieses Vorgehen, so richtig ist.

Die Klassen 'Sensor' und 'Logger' sind noch nicht ausgereift, hier müssen noch einige Vorkehrungen getroffen werden die ungülte Parameterangaben abfangen und eine Exeption auslösen. Zudem soll ab überschreiten einer bestimmten Größe der Logdatei, eine neue erstellt werden, damit wenn der Speicher knapp wird automatisch die ältesten gelöscht werden. Die Funktion 'get_log' liest die in den Dateien gespeicherten Messungen, für den vorgegebenen Zeitraum aus. Nachdem der Client Sie über 'Get' bekommen hat, sollen Sie via Plotly visualisiert werden.

So verhält es sich derzeit:
Entweder erstelle ich die beiden Tasks zum lesen und loggen der Messungen, dann aber reagiert der webserver auf keine Anfragen mehr. Oder Ich Kommentiere die beiden Zeilen zum erstellen der Tasks aus, dann arbeitet der Webserver sauber und z.B. Abfragen über get_log können gemacht und übermittelt werden, aber das loggen neuer Messungen geht dann nicht.
Wieder äußerst seltsam ist es das es nicht alleine ausreicht diese beiden Zeilen rauszunehmen, demit der Webserver wieder arbeitet:

Code: Alles auswählen

    
await init_scd4x()
    #asyncio.create_task(periodic_sensor_reading())
    #asyncio.create_task(logger.start_logger())
Zusätzlich muss die gesamte Funktion 'periodic_sensor_reading()' auskommentiert werden, erst dann reagiert der Webserver wieder auf Anfragen. Und damit komme ich überhaupt nicht zurecht, denn diese Funktion wird an keiner anderen Stelle aufgerufen, bzw. ein Task von Ihr erstellt, somit wäre sie sowiso ausen vor. Warum ist es dennoch notwendig Sie komplett rauszunehmen, das ergibt überhaupt keinen Sinn!

Im folgenden der gesamte Code, in dieser Reihenfolge:
1. main.py (so funktioniert der Webserver, 'async def periodic_sensor_reading()' ist komplett auskommentiert.
2. sensordata.py mit den Klassen Logger & Sensor
3. scd4x_sensirion.py mit der Klasse SCD4xSensirion

main.py:

Code: Alles auswählen

from microdot_asyncio import Microdot, Response, send_file
from microdot_utemplate import render_template
from scd4x_sensirion import SCD4xSensirion
from sensor_pack.bus_service import I2cAdapter
from sensordata import Sensor, Logger
from machine import I2C, Pin
import time, struct, os, asyncio

# Initialize SCD41 / I2C
#i2c = I2C(0, scl=Pin(22), sda=Pin(21), freq=400_000)
#adaptor = I2cAdapter(i2c)
scd4x = SCD4xSensirion(I2cAdapter(I2C(0, scl=Pin(22), sda=Pin(21), freq=400_000)))

# Init Sensoren
co2=Sensor('CO2', 'ppm', 'h')
temp=Sensor('Temperatur', '°C', 'e', 1)
humity=Sensor('rel. Luftfeuchte', '%', 'e', 1)

# Init Logger
logger=Logger((co2, temp, humity), 1)

# Initialize MicroDot
app = Microdot()
Response.default_content_type = 'text/html'

async def init_scd4x():
    # Force return sensor in IDLE mode!
    await scd4x.set_measurement(start=False, single_shot=False)
    await scd4x.set_temperature_offset(4)
    masl = 230  # Meter Above Sea Level
    await scd4x.set_altitude(masl)
    await scd4x.set_measurement(start=True, single_shot=False)      # periodic start
    await asyncio.sleep(5)
    print('finish init_scd4x')
'''
async def periodic_sensor_reading():
    while True:
        scd4x_werte = await scd4x.get_meas_data()
        co2.wert = scd4x_werte[0]
        temp.wert = scd4x_werte[1]
        humity.wert = scd4x_werte[2]
        print('co2: {:>4d} | temp: {:>4.1f} | humity: {:>4.1f}'.format(co2.wert, temp.wert, humity.wert))
        await asyncio.sleep(5)'''

# root route
@app.route('/')
async def index(request):
    print('@app.route("/")')
    return render_template('/index.html')


@app.get('/get_data')
async def read_sensor(request):
    print('@app.get("/get_data")')
    start,logs=logger.get_log(787604040,787604900)
    #start,logs=(logger.get_log(788132700,788175900))
    end=start+logger.logging_period_time*len(logs[0])
    le=time.localtime(end)
    ls=time.localtime(start)
    print('start: {:<02d}.{:<02d}, {:<02d}:{:<02d} | end: {:<02d}.{:<02d}, {:<02d}:{:<02d}'.format(ls[2], ls[1], ls[3], ls[4], le[2], le[1], le[3], le[4]))
    sensorlen=len(logger.sensor_attribute)
    for index in range(0, len(logs[0])):
        t=start+index*logger.logging_period_time
        lt=time.localtime(t)
        string='{:<09d} | {:<02d}:{:<02d} | '.format(t,lt[3],lt[4])
        for sindex in range(0, sensorlen):
            if logs[sindex][index] is not None:
                if sindex==0:
                    string+='{:<04d} | '.format(logs[sindex][index])
                else:
                    string+='{:<02.1f} | '.format(logs[sindex][index])
            else:
                string+='     | '
        print(string)
    return {'start': start, 'logging_period': logger.logging_period_time,'logs': logs}

# Static CSS/JSS
@app.route("/static/<path:path>")
def static(request, path):
    print('@app.route("/static/<path:'+path+'>"')
    if ".." in path:
        # directory traversal is not allowed
        return "Not found", 404
    return send_file("static/" + path)

# shutdown
@app.get('/shutdown')
def shutdown(request):
    print('@app.get("/shutdown")')
    request.app.shutdown()
    return 'The server is shutting down...'

async def main():
    await init_scd4x()
    #asyncio.create_task(periodic_sensor_reading())
    #asyncio.create_task(logger.start_logger())

if __name__ == "__main__":
    print('Starting microdot app')
    try:
        asyncio.create_task(main())
        app.run()
    except:
        app.shutdown()
sensordata.py mit den Klassen Logger & Sensor:

Code: Alles auswählen

import time, struct, os, asyncio

class Logger(object):
    
    def __init__(self, sensoren, logging_period_time):
        self.__logging_period_time=logging_period_time*60 #Zeit in Minuten * 60 Sekunden
        self.__sensoren=sensoren
        self.__start_logging=None
        for sensor in self.__sensoren:
            sensor.__set_logger(self)
        
    async def start_logger(self):
        lnow=time.localtime(time.time())
        periodstart = self.__logging_period_time - lnow[4]*60 % self.__logging_period_time-lnow[5]
        if not periodstart==self.__logging_period_time:
            await asyncio.sleep(periodstart)
        now=time.time()
        self.__log_start_time=now
        print('start Logger at: ' + str(time.localtime(now)))
        for sensor in self.__sensoren:
            sensor.__period_counter = 0
            sensor.__last_log_time=now
        await asyncio.sleep(self.__logging_period_time)
        asyncio.create_task(self.__period_schedule())
            
    async def __period_schedule(self):
        while True:
            f=self.__open_write()
            for sensor in self.__sensoren:
                sensor.__new_period()
                f.write(struct.pack(sensor.__format, sensor.__wert_period))  
            f.close()
            now=time.time()
            self.__log_start_time=now
            lnow=time.localtime(now)
            next_period = self.__logging_period_time - lnow[4]*60 % self.__logging_period_time-lnow[5]
            #debugging:
            sensoren=self.__sensoren
            print('%09d | %02d:%02d | %04d | %02.1f | % 02.1f' %(now,lnow[3],lnow[4],sensoren[0].__wert_period,sensoren[1].__wert_period,sensoren[2].__wert_period))
            #end_debbugging
            await asyncio.sleep(next_period)
                     
    @property  
    def logging_period_time(self):
        return self.__logging_period_time
    
    @property
    def sensor_attribute(self):
        attr=[]
        for sensor in self.__sensoren:
            attr.append((sensor.__name, sensor.__einheit, sensor.__format, sensor.__kommastellen))
        return attr
    
    @property
    def __chain_size(self):
        chain_size=0
        for sensor in self.__sensoren:
            chain_size+=sensor.__size
        return chain_Size
    
    @property
    def __chain_format(self):
        chain_format=''
        for sensor in self.__sensoren:
            chain_format+=sensor.__format
        return chain_format
    
    def __open_write(self):
        if self.__start_logging is not None:
            return open('/logdata/' + str(self.__start_logging), 'a')
        else:
            now=time.time()
            self.__start_logging = now
            return open('/logdata/' + str(now), 'w')
    
    async def get_log(self, start, end):
        
        def append_none_logs(logging_periods_different):
            for period in range(logging_periods_different,0):
                for index, sensor in enumerate(self.__sensoren):
                    logs[index].append(None)
        
        period_different=int((start%(self.__logging_period_time))*self.__logging_period_time)
        if period_different>0:
            start-=period_different-self.__logging_period_time
        period_different=int((end%(self.__logging_period_time))*self.__logging_period_time)
        if period_different>0:
            end-=period_different#-self.__logging_period_time
        logs=[]        
        for sensor in self.__sensoren:
            logs.append([])
        bevor_file_end=None
        first_file_found=False
        for file_name in os.listdir('/logdata'):
            file_size=os.stat('/logdata/'+file_name)[6]
            file_log_count=int(file_size/self.__chain_size)
            file_start=int(file_name)
            file_end=file_start+(file_log_count-1)*self.__logging_period_time
            file_end=file_start+file_log_count*self.__logging_period_time
            if bevor_file_end is not None:
                if file_start>end:
                    range_end = end#-self.__logging_period_time
                else:
                    range_end = file_start#-self.__logging_period_time
                logging_periods_different=int((bevor_file_end-range_end)/self.__logging_period_time)
                if logging_periods_different<0:    
                    append_none_logs(logging_periods_different)
            if file_start>end:
                break
            elif not first_file_found:               
                if start<file_end:
                    first_file_found=True
                    logging_periods_different=int((start-file_start)/self.__logging_period_time)
                    if logging_periods_different<0:
                        append_none_logs(logging_periods_different)
                        logging_periods_different=0
                    buf_start=logging_periods_different*self.__chain_size
                else:
                    continue
            else:
                buf_start=0
            bevor_file_end=file_end
            if end<=file_end:   
                bytes_to_end=int((end-file_start)/self.__logging_period_time*self.__chain_size)+self.__chain_size
            else:
                bytes_to_end=file_size
            bytes_to_end-=buf_start
            with open('/logdata/'+file_name, 'rb') as file:
                file.seek(buf_start, 0)
                buf=file.read(bytes_to_end)
                file.close()
            buf_pos=0
            buf_size=len(buf)
            while buf_pos<buf_size:
                werte=struct.unpack(self.__chain_format, buf[buf_pos:buf_pos+self.__chain_size+1])
                buf_pos+=self.__chain_size
                for index, sensor in enumerate(self.__sensoren):
                    logs[index].append(werte[index])
            if end<=file_end:                  
                break
        if file_end is not None:
            logging_periods_different=int((file_end-end)/self.__logging_period_time)-1
            if logging_periods_different<0:
                append_none_logs(logging_periods_different)                
        return start, logs
    
class Sensor(object):
        
    def __convert(self, wert):
        if (type(wert) is float) and ((self.__format=='h') or (self.__format=='b') or (self.__format=='i')):
            wert=round(wert)
        elif ((self.__format=='e') or (self.__format=='f')) and (self.__kommastellen != None):
            wert=round(wert, self.__kommastellen)
        return wert
        
    def __init__ (self, name, einheit, forma, kommastellen=0):
        self.__name = name
        self.__einheit = einheit
        self.__format = forma
        self.__kommastellen=kommastellen
        self.__period_counter = 0
        self.__last_log_time = 0
        self.__wert_period = None
        self.__wert = None
        
    def __set_logger(self, logger):
        self.__logger=logger
            
    @property
    def wert(self):
        return self.__wert
    
    @wert.setter
    def wert(self, val):
        if self.__wert != val:
            self.__wert=val
            now=time.time()
            self.__period_counter += (now-self.__last_log_time)*val
            self.__last_log_time=now
            
    @property
    def wert_period(self):
        return self.__wert_period
    
    @property
    def name(self):
        return self.__name
    
    @property
    def einheit(self):
        return self.__einheit
    
    @property
    def __size(self):        
        if self.__format == 'b':
            return 1
        elif self.__format == 'h':
            return 2
        elif self.__format == 'i':
            return 4
        elif self.__format == 'e':
            return 2
        elif self.__format == 'f':
            return 4
        
    def __new_period(self):
        now=time.time()
        self.__period_counter += (now-self.__last_log_time)*self.__wert
        self.__last_log_time=now
        log_time=now-self.__logger.__log_start_time
        self.__wert_period = self.__convert(self.__period_counter/log_time)
        self.__period_counter=0        
scd4x_sensirion.py mit der Klasse SCD4xSensirion:

Code: Alles auswählen

"""SCD4x Sensirion module"""

from sensor_pack import bus_service
from sensor_pack.base_sensor import BaseSensor, Iterator
from sensor_pack import base_sensor
from sensor_pack.crc_mod import crc8
import micropython
import time, asyncio


def _calc_crc(sequence) -> int:
    """Обертка для короткого вызова.
    Wrapper for a short call."""
    return crc8(sequence, polynomial=0x31, init_value=0xFF)


class SCD4xSensirion(BaseSensor, Iterator):
    """Class for work with Sensirion SCD4x sensor"""
    def __init__(self, adapter: bus_service.BusAdapter, address=0x62,
                 this_is_scd41: bool = True, check_crc: bool = True):
        """Если check_crc в Истина, то каждый, принятый от датчика пакет данных, проверяется на правильность путем
        расчета контрольной суммы.
        Если this_is_scd41 == True, то будут доступны методы для SCD41, иначе будут доступны методы ОБЩИЕ для SCD40/41!
        If check_crs is True, then each data packet received from the sensor is checked for correctness by
        calculating the checksum.
        If this_is_scd41 == True then methods for SCD41 will be available,
        otherwise GENERAL methods for SCD40/41 will be available!"""
        super().__init__(adapter, address, True)    # Big Endian
        self._buf_3 = bytearray((0 for _ in range(3)))
        self._buf_9 = bytearray((0 for _ in range(9)))
        self.check_crc = check_crc
        # power mode
        self._low_power_mode = False
        # measurement mode (single shot, continuous)
        self._single_shot_mode = False
        self._rht_only = False
        self._isSCD41 = this_is_scd41
        # сохраняю, чтобы не вызывать 125 раз
        self.byte_order = self._get_byteorder_as_str()

    def _get_local_buf(self, bytes_for_read: int) -> [None, bytearray]:
        """возвращает локальный буфер для операции чтения"""
        if bytes_for_read not in (0, 3, 9):
            raise ValueError(f"Invalid value for bytes_for_read: {bytes_for_read}")
        if not bytes_for_read:
            return None
        if 3 == bytes_for_read:
            return self._buf_3
        return self._buf_9

    def _to_bytes(self, value, length: int):
        byteorder = self.byte_order[0]
        return value.to_bytes(length, byteorder)

    # def _read(self, n_bytes: int) -> bytes:
    #    return self.adapter.read(self.address, n_bytes)

    def _write(self, buf: bytes) -> bytes:
        return self.adapter.write(self.address, buf)

    def _readfrom_into(self, buf):
        """Читает из устройства в буфер"""
        return self.adapter.readfrom_into(self.address, buf)

    async def _send_command(self, cmd: int, value: [bytes, None],
                      wait_time: int = 0, bytes_for_read: int = 0,
                      crc_index: range = None,
                      value_index: tuple = None) -> [bytes, None]:
        """Передает команду датчику по шине.
        cmd - код команды.
        value - последовательность, передаваемая после кода команды.
        wait_time - время в мс. которое нужно подождать для обработки команды датчиком.
        bytes_for_read - количество байт в ответе датчика, если не 0, то будет считан ответ,
        проверена CRC (зависит от self.check_crc) и этот ответ будет возвращен, как результат.
        crc_index_range - индексы crc в последовательности.
        value_index_ranges- кортеж индексов (range) данных значений в
        последовательности. (range(3), range(4,6), range(7,9))"""
        # print(f"DBG: bytes_for_read: {bytes_for_read}")
        raw_cmd = self._to_bytes(cmd, 2)
        raw_out = raw_cmd
        if value:
            raw_out += value    # добавляю value и его crc
            raw_out += self._to_bytes(_calc_crc(value), 1)     # crc считается только для данных!
        self._write(raw_out)    # выдача на шину
        if wait_time:
            await asyncio.sleep_ms(wait_time)   # ожидание
        if not bytes_for_read:
            return None
        # b = self._read(bytes_for_read)  # читаю с шины с проверкой количества считанных байт
        b = self._get_local_buf(bytes_for_read)
        self._readfrom_into(b)      # обновление
        base_sensor.check_value(len(b), (bytes_for_read,),
                                f"Invalid buffer length for cmd: {cmd}. Received {len(b)} out of {bytes_for_read}")
        if self.check_crc:
            crc_from_buf = [b[i] for i in crc_index]  # build list of CRC from buf
            calculated_crc = [_calc_crc(b[rng.start:rng.stop]) for rng in value_index]
            if crc_from_buf != calculated_crc:
                raise ValueError(f"Invalid CRC! Calculated{calculated_crc}. From buffer {crc_from_buf}")
        return b    # возврат bytearray со считанными данными

    # BaseSensor
    # Advanced features
    def save_config(self):
        """Настройки конфигурации, такие как смещение температуры, высота расположения датчика над уровнем моря
        по умолчанию сохраняются только в энергозависимой памяти (ОЗУ) и будут потеряны после выключения и включения
        питания. Метод сохраняет текущую конфигурацию в EEPROM SCD4x, сохраняя ее при отключении питания.
        Чтобы избежать ненужного износа EEPROM, метод следует вызывать только в том случае, если это необходимо(!) и
        если были внесены фактические изменения в конфигурацию. EEPROM гарантированно выдерживает не менее 2000
        циклов записи до отказа(!).
        Configuration settings such as temperature offset, sensor altitude are stored by default only in volatile memory
        (RAM) and will be lost after a power cycle. The method saves the current configuration in the EEPROM of the
        SCD4x, saving it when the power is turned off. To avoid unnecessary wear on the EEPROM, the method should only
        be called if necessary(!) and if actual configuration changes have been made.
        EEPROM is guaranteed to withstand at least 2000 write cycles to failure (!)"""
        cmd = 0x3615
        self._send_command(cmd, None, 800)

    def get_id(self) -> tuple:
        """Return 3 words of unique serial number can be used to identify
        the chip and to verify the presence of the sensor."""
        # создатели датчика 'обрадовали'. вместо подсчета одного байта CRC на 6 байт (3 двухбайтных слова)
        # они считают CRC для каждого из 3-х двухбайтных слов!
        cmd = 0x3682
        b = self._send_command(cmd, None, 0, bytes_for_read=9,
                               crc_index=range(2, 9, 3), value_index=(range(2), range(3, 5), range(6, 8)))
        # return result
        return tuple([(b[i] << 8) | b[i+1] for i in range(0, 9, 3)])    # Success

    def soft_reset(self):
        """Я сознательно не стал использовать команду perfom_factory_reset, чтобы было невозможно испортить датчик
        программным путем, так-как количество циклов записи во внутреннюю FLASH память датчика ограничено!
        I deliberately did not use the perfom_factory_reset command, so that it would be impossible to spoil the
        sensor programmatically, since the number of write cycles to the internal FLASH memory of the
        sensor is limited!
        09.09.2024. Добавил. Под вашу ответственность!"""
        cmd = 0x3632
        self._send_command(cmd, None, 1200)

    def exec_self_test(self) -> bool:
        """"Этот метод можно использовать в качестве конечного теста для проверки работоспособности датчика и
        проверки подачи питания на датчик. Возвращает Истина, когда тест пройден успешно.
        The feature can be used as an end-of-line test to check sensor functionality and the customer power
        supply to the sensor. Returns True when the test is successful."""
        cmd = 0x3639
        length = 3
        b = self._send_command(cmd, None, wait_time=10_000,     # да, ждать 10 секунд! yes, wait 10 seconds!
                               bytes_for_read=length, crc_index=range(2, 3), value_index=(range(2),))
        res = self.unpack("H", b)[0]
        return 0 == res

    def reinit(self) -> None:
        """Команда reinit повторно инициализирует датчик, загружая пользовательские настройки из EEPROM.
        Перед отправкой команды reinit необходимо выполнить метод stop_measurement. Если команда reinit не вызывает
        желаемой повторной инициализации, к SCD4x следует применить цикл включения и выключения питания.
        The reinit command reinitializes the sensor by reloading user settings from EEPROM.
        Before sending the reinit command, the stop_measurement method must be called.
        If the reinit command does not trigger the desired re-initialization,
        a power-cycle should be applied to the SCD4x."""
        cmd = 0x3646
        self._send_command(cmd, None, 20)

    # On-chip output signal compensation
    def set_temperature_offset(self, offset: float):    # вызов нужно делать только в IDLE режиме датчика!
        """Смещение температуры не влияет на точность измерения CO2 . Правильная установка смещения температуры SCD4x
        внутри пользовательского устройства позволяет пользователю использовать выходные сигналы RH и T. Обратите
        внимание, что смещение температуры может зависеть от различных факторов, таких как режим измерения SCD4x,
        самонагрев близких компонентов, температура окружающей среды и расход воздуха. Таким образом, смещение
        температуры SCD4x должно определяться внутри пользовательского устройства в типичных условиях его работы
        (включая режим работы, который будет использоваться в приложении) и при тепловом равновесии. По умолчанию
        смещение температуры установлено в 4°C.
        The temperature offset has no influence on the SCD4x CO 2 accuracy. Setting the temperature offset of the SCD4x
        inside the customer device correctly allows the user to leverage the RH and T output signal. Note that the
        temperature offset can depend on various factors such as the SCD4x measurement mode, self-heating of close
        components, the ambient temperature and air flow.
        Метод нужно вызывать только в IDLE режиме датчика!
        The method should be called only in IDLE sensor mode!

        𝑇 𝑜𝑓𝑓𝑠𝑒𝑡_𝑎𝑐𝑡𝑢𝑎𝑙 = 𝑇 𝑆𝐶𝐷40 − 𝑇 𝑅𝑒𝑓𝑒𝑟𝑒𝑛𝑐𝑒 + 𝑇 𝑜𝑓𝑓𝑠𝑒𝑡_ 𝑝𝑟𝑒𝑣𝑖𝑜𝑢𝑠"""
        cmd = 0x241D
        offset_raw = self._to_bytes(int(374.49142857 * offset), 2)
        self._send_command(cmd, offset_raw, 1)

    def get_temperature_offset(self) -> float:
        """Метод нужно вызывать только в IDLE режиме датчика!
        The method should be called only in IDLE sensor mode!"""
        cmd = 0x2318
        b = self._send_command(cmd, None, wait_time=1, bytes_for_read=3, crc_index=range(2, 3), value_index=(range(2),))
        temp_offs = self.unpack("H", b)[0]
        return 0.0026702880859375 * temp_offs

    def set_altitude(self, masl: int):  # вызов нужно делать только в IDLE режиме датчика!
        """Чтение и запись высоты датчика должны выполняться, когда SCD4x находится в режиме ожидания.
        Как правило, высота датчика устанавливается один раз после установки устройства. Чтобы сохранить настройку
        в EEPROM, необходимо выполнить метод save_config. По умолчанию высота датчика установлена в
        0 метров над уровнем моря (masl).
        Reading and writing sensor height must be done when the SCD4x is in standby mode. As a rule, the height of the
        sensor is set once after the installation of the device. To save the configuration to EEPROM, you must execute
        the save_config method. By default, the sensor height is set to 0 meters above sea level (masl).
        Метод нужно вызывать только в IDLE режиме датчика!
        The method should be called only in IDLE sensor mode!"""
        cmd = 0x2427
        masl_raw = self._to_bytes(masl, 2)
        self._send_command(cmd, masl_raw, 1)

    def get_altitude(self) -> int:
        """Метод нужно вызывать только в IDLE режиме датчика!
        The method should be called only in IDLE sensor mode!"""
        cmd = 0x2322
        b = self._send_command(cmd, None, wait_time=1, bytes_for_read=3, crc_index=range(2, 3), value_index=(range(2),))
        return self.unpack("H", b)[0]

    def set_ambient_pressure(self, pressure: float):
        """Метод может быть вызван во время периодических измерений, чтобы включить непрерывную компенсацию давления.
        Обратите внимание, что установка давления окружающей среды с помощью set_ambient_pressure отменяет любую
        компенсацию давления, основанную на ранее установленной высоте датчика. Использование этой команды настоятельно
        рекомендуется для приложений со значительными изменениями давления окружающей среды,
        чтобы обеспечить точность датчика.
        The method can be called during periodic measurements to enable continuous pressure compensation.
        Note that setting the ambient pressure using set_ambient_pressure overrides any pressure compensation based
        on the previously set sensor height. The use of this command is highly recommended for applications with
        significant changes in ambient pressure to ensure sensor accuracy."""
        cmd = 0xE000
        press_raw = self._to_bytes(int(pressure // 100), 2)     # Pascal // 100
        self._send_command(cmd, press_raw, 1)

    # Field calibration
    def force_recalibration(self, target_co2_concentration: int) -> int:
        """Please read '3.7.1 perform_forced_recalibration'"""
        base_sensor.check_value(target_co2_concentration, range(2**16),
                                f"Invalid target CO2 concentration: {target_co2_concentration} ppm")
        cmd = 0x362F
        target_raw = self._to_bytes(target_co2_concentration, 2)
        b = self._send_command(cmd, target_raw, 400, 3, crc_index=range(2, 3), value_index=(range(2),))
        return self.unpack("h", b)[0]

    def is_auto_calibration(self) -> bool:
        """Please read '3.7.3 get_automatic_self_calibration_enabled'"""
        cmd = 0x2313
        b = self._send_command(cmd, None, 1, 3, crc_index=range(2, 3), value_index=(range(2),))
        return 0 != self.unpack("H", b)[0]

    def set_auto_calibration(self, value: bool):
        """Please read '3.7.2 set_automatic_self_calibration_enabled'"""
        cmd = 0x2416
        value_raw = self._to_bytes(value, 2)
        self._send_command(cmd, value_raw, 1, 3)

    def set_measurement(self, start: bool, single_shot: bool = False, rht_only: bool = False):
        """Используется для запуска или остановки периодических измерений.
        single_shot = False. rht_only не используется!
        А также для запуска ОДНОКРАТНОГО измерения. single_shot = True. rht_only используется!
        Если rht_only == True то датчик не вычисляет CO2 и оно будет равно нулю! Смотри метод get_meas_data()
        start используется только при False == single_shot (periodic mode)

        Used to start or stop periodic measurements. single_shot = False. rht_only is not used!
        And also to start a SINGLE measurement. single_shot = True. rht_only is used!
        If rht_only == True then the sensor does not calculate CO2 and it will be zero! See get_meas_data() method
        start is used only when False == single_shot (periodic mode)"""
        if single_shot:
            return self._single_shot_meas(rht_only)
        return self._periodic_measurement(start)

    # Basic Commands
    def _periodic_measurement(self, start: bool):
        """Start periodic measurement. In low power mode, signal update interval is approximately 30 seconds.
        In normal power mode, signal update interval is approximately 5 seconds.
        If start == True then measurement started, else stopped.
        Для чтения результатов используйте метод get_meas_data.
        To read the results, use the get_meas_data method."""
        wt = 0
        if start:
            cmd = 0x21AC if self._low_power_mode else 0x21B1
        else:   # stop periodic measurement
            cmd = 0x3F86
            wt = 500
        self._send_command(cmd, None, wt)
        self._single_shot_mode = False
        self._rht_only = False

    def get_meas_data(self) -> tuple:
        """Чтение выходных данных датчика. Данные измерения могут быть считаны только один раз за интервал
        обновления сигнала, так как буфер очищается при считывании. Смотри get_conversion_cycle_time()!
        Read sensor data output. The measurement data can only be read out once per signal update interval
        as the buffer is emptied upon read-out. See get_conversion_cycle_time()!"""
        cmd = 0xEC05
        val_index = (range(2), range(3, 5), range(6, 8))
        b = self._send_command(cmd, None, 1, bytes_for_read=9,
                               crc_index=range(2, 9, 3), value_index=val_index)
        words = [self.unpack("H", b[val_rng.start:val_rng.stop])[0] for val_rng in val_index]
        #       CO2 [ppm]           T, Celsius              Relative Humidity, %
        return words[0], -45 + 0.0026703288 * words[1], 0.0015259022 * words[2]

    def is_data_ready(self) -> bool:
        """Return data ready status"""
        cmd = 0xE4B8
        b = self._send_command(cmd, None, 1, 3, crc_index=range(2, 3), value_index=(range(2),))
        return bool(self.unpack("H", b)[0] & 0b0000_0111_1111_1111)

    @micropython.native
    def get_conversion_cycle_time(self) -> int:
        """Возвращает время преобразования данных датчиком в зависимости от его настроек. мс.
        returns the data conversion time of the sensor, depending on its settings. ms."""
        if self.is_single_shot_mode and self.is_rht_only:
            return 50
        return 5000

    # SCD41 only
    def set_power(self, value: bool):
        if not self._isSCD41:
            return
        """Please read '3.10.3 power_down' and '3.10.4 wake_up'"""
        cmd = 0x36F6 if value else 0x36E0
        wt = 20 if value else 1
        self._send_command(cmd, None, wt)

    def _single_shot_meas(self, rht_only: bool = False):
        """Only for SCD41. Single shot measurement!
        Запускает измерение температуры и относительной влажности!
        После вызова этого метода, результаты будут готовы примерно через 5 секунд!
        Для чтения результатов используйте метод get_meas_data. Содержание CO2 будет равно нулю, если true == rht_only!
        After calling this method, the results will be ready in about 5 seconds!
        To read the results, use the get_meas_data method.
        SCD41 features a single shot measurement mode, i.e. allows for on-demand measurements.
        Please see '3.10 Low power single shot (SCD41)'"""
        if not self._isSCD41:
            return
        cmd = 0x2196 if rht_only else 0x219D
        self._send_command(cmd, None, 0)
        self._single_shot_mode = True
        self._rht_only = rht_only

    @property
    def is_single_shot_mode(self) -> bool:
        return self._single_shot_mode

    @property
    def is_rht_only(self) -> bool:
        return self._rht_only

    # Iterator
    def __iter__(self):
        return self

    def __next__(self) -> [tuple, None]:
        if self._single_shot_mode:
            return None
        if self.is_data_ready():
            return self.get_meas_data()
        return None

ragnar
User
Beiträge: 5
Registriert: Mittwoch 25. Dezember 2024, 06:40

Problem gelöst!
Nachdem ich alle infrage kommende Firmware auf dem ESP32 WROOM 32 geflasht habe und dies zu keiner Änderung führte, habe ich noch ein anderes Board gekauft.
Auf dem neuen DEV KIT R8N8 Board mit ESP32 S3 WROOM 1 läuft das Programm!!! Auf den beiden ersten Boards ist kein Espressif aufgedruckt, ich nehme mal an das es billige nachbauten sind, die entweder fehlerhaft sind oder spezille Firmeware benötigt wird, die ich aber nicht finden konnte.

Mit dem neuen Board hatte ich anfänglich auch Probleme, es stellte sich aber schnell heraus das eine Stromversorgung über eine Steckdose (in der Wand) mit 2 zusätzlichen USB Ports zum laden von Geräten, keine gute Versorgung für das Board ist! Via Log-Einträge in einer Datei, mit Zeitstempel konnte ich jede Menge Neustarts feststellen, am Rechner oder Power Bank angeschlossen läuft das Programm sauber durch ohne Neustarts.

Mein Resume dazu lautet:
  • Keinen billigen-Nachbau kaufen, genau hinschauen was angeboten wird, lieber ein paar Euro mehr ausgeben, dafür aber unnötige Zeitverluste einsparen!
  • Nicht jedes Netzteil mit dem man ein Handy laden kann, eignet sich zur Stromversorgung eines Microkontroller, nur ein Spannungsregler auf dem Board reicht nicht aus zur stabilen Stromversorgung!
Sollte jemand auf die Idee kommen sich etwas vom Code oben als Vorlage zu verwenden, dem sei gesagt das dort noch einige Fehler enthalten sind und keinerlei Exceptions abgefangen werden. Auch sollte man sich den Stil nicht abschauen, der ist sicherlich stark Javascript geprägt.
Antworten