Nach Umbau, weiterhin gleiches problematisches Verhalten!
Jetzt gibt es keine Timer mehr, dafür jede Menge asynchroner Funktionen, zwei davon ersetzen mit einer Endlosschleife und asynchronem sleep, die alten Timer Rückruffunktionen. Eine in 'main.py' liest alle 5 Sekunden Messungen aus dem I2C Sensor, die andere in der Klasse 'Logger' erechnet immer im gleichen Intervall je nach Initialisierung alle 1 bis 60 Minuten den Durchschnitt der Messungen und speichert diesen binär in einer Datai auf dem Flashspeicher.
In der ´russischen´ Klasse 'SCD4xSensirion' gibt es eine private Funktion '_send_command', die einen normalen sleep hatte, um nötige Pausen zwischen den Abfragen an den I2C Sensor gewährleisten zu können. Um diesen einen sleep, asynchron zu gestalten, musste ich die komplette Klasse asynchron schreiben. Zum einen musste ich alle Funktionen die '_send_command' aufrufen asynchron machen, dabei aber dennoch gewährleisten das '_send_command' seine Abfragepause an den I2C Sensor weiter einhält. Dazu wird am Anfang in der Funktion in einer Schleife solange gewartet bis '_self._waiting==False' ist, dann erst wird die Schleife verlassen und '_waiting' sofort, bis zum Ende der Funktion auf True gesetzt, damit die Funktion nicht gleichzeitig von mehreren Task Ablaufen kann.
Hier bin ich mir sehr unsicher, ob dieses Vorgehen, so richtig ist.
Die Klassen 'Sensor' und 'Logger' sind noch nicht ausgereift, hier müssen noch einige Vorkehrungen getroffen werden die ungülte Parameterangaben abfangen und eine Exeption auslösen. Zudem soll ab überschreiten einer bestimmten Größe der Logdatei, eine neue erstellt werden, damit wenn der Speicher knapp wird automatisch die ältesten gelöscht werden. Die Funktion 'get_log' liest die in den Dateien gespeicherten Messungen, für den vorgegebenen Zeitraum aus. Nachdem der Client Sie über 'Get' bekommen hat, sollen Sie via Plotly visualisiert werden.
So verhält es sich derzeit:
Entweder erstelle ich die beiden Tasks zum lesen und loggen der Messungen, dann aber reagiert der webserver auf keine Anfragen mehr. Oder Ich Kommentiere die beiden Zeilen zum erstellen der Tasks aus, dann arbeitet der Webserver sauber und z.B. Abfragen über get_log können gemacht und übermittelt werden, aber das loggen neuer Messungen geht dann nicht.
Wieder äußerst seltsam ist es das es nicht alleine ausreicht diese beiden Zeilen rauszunehmen, demit der Webserver wieder arbeitet:
Code: Alles auswählen
await init_scd4x()
#asyncio.create_task(periodic_sensor_reading())
#asyncio.create_task(logger.start_logger())
Zusätzlich muss die gesamte Funktion 'periodic_sensor_reading()' auskommentiert werden, erst dann reagiert der Webserver wieder auf Anfragen. Und damit komme ich überhaupt nicht zurecht, denn diese Funktion wird an keiner anderen Stelle aufgerufen, bzw. ein Task von Ihr erstellt, somit wäre sie sowiso ausen vor. Warum ist es dennoch notwendig Sie komplett rauszunehmen, das ergibt überhaupt keinen Sinn!
Im folgenden der gesamte Code, in dieser Reihenfolge:
1. main.py (so funktioniert der Webserver, 'async def periodic_sensor_reading()' ist komplett auskommentiert.
2. sensordata.py mit den Klassen Logger & Sensor
3. scd4x_sensirion.py mit der Klasse SCD4xSensirion
main.py:
Code: Alles auswählen
from microdot_asyncio import Microdot, Response, send_file
from microdot_utemplate import render_template
from scd4x_sensirion import SCD4xSensirion
from sensor_pack.bus_service import I2cAdapter
from sensordata import Sensor, Logger
from machine import I2C, Pin
import time, struct, os, asyncio
# Initialize SCD41 / I2C
#i2c = I2C(0, scl=Pin(22), sda=Pin(21), freq=400_000)
#adaptor = I2cAdapter(i2c)
scd4x = SCD4xSensirion(I2cAdapter(I2C(0, scl=Pin(22), sda=Pin(21), freq=400_000)))
# Init Sensoren
co2=Sensor('CO2', 'ppm', 'h')
temp=Sensor('Temperatur', '°C', 'e', 1)
humity=Sensor('rel. Luftfeuchte', '%', 'e', 1)
# Init Logger
logger=Logger((co2, temp, humity), 1)
# Initialize MicroDot
app = Microdot()
Response.default_content_type = 'text/html'
async def init_scd4x():
# Force return sensor in IDLE mode!
await scd4x.set_measurement(start=False, single_shot=False)
await scd4x.set_temperature_offset(4)
masl = 230 # Meter Above Sea Level
await scd4x.set_altitude(masl)
await scd4x.set_measurement(start=True, single_shot=False) # periodic start
await asyncio.sleep(5)
print('finish init_scd4x')
'''
async def periodic_sensor_reading():
while True:
scd4x_werte = await scd4x.get_meas_data()
co2.wert = scd4x_werte[0]
temp.wert = scd4x_werte[1]
humity.wert = scd4x_werte[2]
print('co2: {:>4d} | temp: {:>4.1f} | humity: {:>4.1f}'.format(co2.wert, temp.wert, humity.wert))
await asyncio.sleep(5)'''
# root route
@app.route('/')
async def index(request):
print('@app.route("/")')
return render_template('/index.html')
@app.get('/get_data')
async def read_sensor(request):
print('@app.get("/get_data")')
start,logs=logger.get_log(787604040,787604900)
#start,logs=(logger.get_log(788132700,788175900))
end=start+logger.logging_period_time*len(logs[0])
le=time.localtime(end)
ls=time.localtime(start)
print('start: {:<02d}.{:<02d}, {:<02d}:{:<02d} | end: {:<02d}.{:<02d}, {:<02d}:{:<02d}'.format(ls[2], ls[1], ls[3], ls[4], le[2], le[1], le[3], le[4]))
sensorlen=len(logger.sensor_attribute)
for index in range(0, len(logs[0])):
t=start+index*logger.logging_period_time
lt=time.localtime(t)
string='{:<09d} | {:<02d}:{:<02d} | '.format(t,lt[3],lt[4])
for sindex in range(0, sensorlen):
if logs[sindex][index] is not None:
if sindex==0:
string+='{:<04d} | '.format(logs[sindex][index])
else:
string+='{:<02.1f} | '.format(logs[sindex][index])
else:
string+=' | '
print(string)
return {'start': start, 'logging_period': logger.logging_period_time,'logs': logs}
# Static CSS/JSS
@app.route("/static/<path:path>")
def static(request, path):
print('@app.route("/static/<path:'+path+'>"')
if ".." in path:
# directory traversal is not allowed
return "Not found", 404
return send_file("static/" + path)
# shutdown
@app.get('/shutdown')
def shutdown(request):
print('@app.get("/shutdown")')
request.app.shutdown()
return 'The server is shutting down...'
async def main():
await init_scd4x()
#asyncio.create_task(periodic_sensor_reading())
#asyncio.create_task(logger.start_logger())
if __name__ == "__main__":
print('Starting microdot app')
try:
asyncio.create_task(main())
app.run()
except:
app.shutdown()
sensordata.py mit den Klassen Logger & Sensor:
Code: Alles auswählen
import time, struct, os, asyncio
class Logger(object):
def __init__(self, sensoren, logging_period_time):
self.__logging_period_time=logging_period_time*60 #Zeit in Minuten * 60 Sekunden
self.__sensoren=sensoren
self.__start_logging=None
for sensor in self.__sensoren:
sensor.__set_logger(self)
async def start_logger(self):
lnow=time.localtime(time.time())
periodstart = self.__logging_period_time - lnow[4]*60 % self.__logging_period_time-lnow[5]
if not periodstart==self.__logging_period_time:
await asyncio.sleep(periodstart)
now=time.time()
self.__log_start_time=now
print('start Logger at: ' + str(time.localtime(now)))
for sensor in self.__sensoren:
sensor.__period_counter = 0
sensor.__last_log_time=now
await asyncio.sleep(self.__logging_period_time)
asyncio.create_task(self.__period_schedule())
async def __period_schedule(self):
while True:
f=self.__open_write()
for sensor in self.__sensoren:
sensor.__new_period()
f.write(struct.pack(sensor.__format, sensor.__wert_period))
f.close()
now=time.time()
self.__log_start_time=now
lnow=time.localtime(now)
next_period = self.__logging_period_time - lnow[4]*60 % self.__logging_period_time-lnow[5]
#debugging:
sensoren=self.__sensoren
print('%09d | %02d:%02d | %04d | %02.1f | % 02.1f' %(now,lnow[3],lnow[4],sensoren[0].__wert_period,sensoren[1].__wert_period,sensoren[2].__wert_period))
#end_debbugging
await asyncio.sleep(next_period)
@property
def logging_period_time(self):
return self.__logging_period_time
@property
def sensor_attribute(self):
attr=[]
for sensor in self.__sensoren:
attr.append((sensor.__name, sensor.__einheit, sensor.__format, sensor.__kommastellen))
return attr
@property
def __chain_size(self):
chain_size=0
for sensor in self.__sensoren:
chain_size+=sensor.__size
return chain_Size
@property
def __chain_format(self):
chain_format=''
for sensor in self.__sensoren:
chain_format+=sensor.__format
return chain_format
def __open_write(self):
if self.__start_logging is not None:
return open('/logdata/' + str(self.__start_logging), 'a')
else:
now=time.time()
self.__start_logging = now
return open('/logdata/' + str(now), 'w')
async def get_log(self, start, end):
def append_none_logs(logging_periods_different):
for period in range(logging_periods_different,0):
for index, sensor in enumerate(self.__sensoren):
logs[index].append(None)
period_different=int((start%(self.__logging_period_time))*self.__logging_period_time)
if period_different>0:
start-=period_different-self.__logging_period_time
period_different=int((end%(self.__logging_period_time))*self.__logging_period_time)
if period_different>0:
end-=period_different#-self.__logging_period_time
logs=[]
for sensor in self.__sensoren:
logs.append([])
bevor_file_end=None
first_file_found=False
for file_name in os.listdir('/logdata'):
file_size=os.stat('/logdata/'+file_name)[6]
file_log_count=int(file_size/self.__chain_size)
file_start=int(file_name)
file_end=file_start+(file_log_count-1)*self.__logging_period_time
file_end=file_start+file_log_count*self.__logging_period_time
if bevor_file_end is not None:
if file_start>end:
range_end = end#-self.__logging_period_time
else:
range_end = file_start#-self.__logging_period_time
logging_periods_different=int((bevor_file_end-range_end)/self.__logging_period_time)
if logging_periods_different<0:
append_none_logs(logging_periods_different)
if file_start>end:
break
elif not first_file_found:
if start<file_end:
first_file_found=True
logging_periods_different=int((start-file_start)/self.__logging_period_time)
if logging_periods_different<0:
append_none_logs(logging_periods_different)
logging_periods_different=0
buf_start=logging_periods_different*self.__chain_size
else:
continue
else:
buf_start=0
bevor_file_end=file_end
if end<=file_end:
bytes_to_end=int((end-file_start)/self.__logging_period_time*self.__chain_size)+self.__chain_size
else:
bytes_to_end=file_size
bytes_to_end-=buf_start
with open('/logdata/'+file_name, 'rb') as file:
file.seek(buf_start, 0)
buf=file.read(bytes_to_end)
file.close()
buf_pos=0
buf_size=len(buf)
while buf_pos<buf_size:
werte=struct.unpack(self.__chain_format, buf[buf_pos:buf_pos+self.__chain_size+1])
buf_pos+=self.__chain_size
for index, sensor in enumerate(self.__sensoren):
logs[index].append(werte[index])
if end<=file_end:
break
if file_end is not None:
logging_periods_different=int((file_end-end)/self.__logging_period_time)-1
if logging_periods_different<0:
append_none_logs(logging_periods_different)
return start, logs
class Sensor(object):
def __convert(self, wert):
if (type(wert) is float) and ((self.__format=='h') or (self.__format=='b') or (self.__format=='i')):
wert=round(wert)
elif ((self.__format=='e') or (self.__format=='f')) and (self.__kommastellen != None):
wert=round(wert, self.__kommastellen)
return wert
def __init__ (self, name, einheit, forma, kommastellen=0):
self.__name = name
self.__einheit = einheit
self.__format = forma
self.__kommastellen=kommastellen
self.__period_counter = 0
self.__last_log_time = 0
self.__wert_period = None
self.__wert = None
def __set_logger(self, logger):
self.__logger=logger
@property
def wert(self):
return self.__wert
@wert.setter
def wert(self, val):
if self.__wert != val:
self.__wert=val
now=time.time()
self.__period_counter += (now-self.__last_log_time)*val
self.__last_log_time=now
@property
def wert_period(self):
return self.__wert_period
@property
def name(self):
return self.__name
@property
def einheit(self):
return self.__einheit
@property
def __size(self):
if self.__format == 'b':
return 1
elif self.__format == 'h':
return 2
elif self.__format == 'i':
return 4
elif self.__format == 'e':
return 2
elif self.__format == 'f':
return 4
def __new_period(self):
now=time.time()
self.__period_counter += (now-self.__last_log_time)*self.__wert
self.__last_log_time=now
log_time=now-self.__logger.__log_start_time
self.__wert_period = self.__convert(self.__period_counter/log_time)
self.__period_counter=0
scd4x_sensirion.py mit der Klasse SCD4xSensirion:
Code: Alles auswählen
"""SCD4x Sensirion module"""
from sensor_pack import bus_service
from sensor_pack.base_sensor import BaseSensor, Iterator
from sensor_pack import base_sensor
from sensor_pack.crc_mod import crc8
import micropython
import time, asyncio
def _calc_crc(sequence) -> int:
"""Обертка для короткого вызова.
Wrapper for a short call."""
return crc8(sequence, polynomial=0x31, init_value=0xFF)
class SCD4xSensirion(BaseSensor, Iterator):
"""Class for work with Sensirion SCD4x sensor"""
def __init__(self, adapter: bus_service.BusAdapter, address=0x62,
this_is_scd41: bool = True, check_crc: bool = True):
"""Если check_crc в Истина, то каждый, принятый от датчика пакет данных, проверяется на правильность путем
расчета контрольной суммы.
Если this_is_scd41 == True, то будут доступны методы для SCD41, иначе будут доступны методы ОБЩИЕ для SCD40/41!
If check_crs is True, then each data packet received from the sensor is checked for correctness by
calculating the checksum.
If this_is_scd41 == True then methods for SCD41 will be available,
otherwise GENERAL methods for SCD40/41 will be available!"""
super().__init__(adapter, address, True) # Big Endian
self._buf_3 = bytearray((0 for _ in range(3)))
self._buf_9 = bytearray((0 for _ in range(9)))
self.check_crc = check_crc
# power mode
self._low_power_mode = False
# measurement mode (single shot, continuous)
self._single_shot_mode = False
self._rht_only = False
self._isSCD41 = this_is_scd41
# сохраняю, чтобы не вызывать 125 раз
self.byte_order = self._get_byteorder_as_str()
def _get_local_buf(self, bytes_for_read: int) -> [None, bytearray]:
"""возвращает локальный буфер для операции чтения"""
if bytes_for_read not in (0, 3, 9):
raise ValueError(f"Invalid value for bytes_for_read: {bytes_for_read}")
if not bytes_for_read:
return None
if 3 == bytes_for_read:
return self._buf_3
return self._buf_9
def _to_bytes(self, value, length: int):
byteorder = self.byte_order[0]
return value.to_bytes(length, byteorder)
# def _read(self, n_bytes: int) -> bytes:
# return self.adapter.read(self.address, n_bytes)
def _write(self, buf: bytes) -> bytes:
return self.adapter.write(self.address, buf)
def _readfrom_into(self, buf):
"""Читает из устройства в буфер"""
return self.adapter.readfrom_into(self.address, buf)
async def _send_command(self, cmd: int, value: [bytes, None],
wait_time: int = 0, bytes_for_read: int = 0,
crc_index: range = None,
value_index: tuple = None) -> [bytes, None]:
"""Передает команду датчику по шине.
cmd - код команды.
value - последовательность, передаваемая после кода команды.
wait_time - время в мс. которое нужно подождать для обработки команды датчиком.
bytes_for_read - количество байт в ответе датчика, если не 0, то будет считан ответ,
проверена CRC (зависит от self.check_crc) и этот ответ будет возвращен, как результат.
crc_index_range - индексы crc в последовательности.
value_index_ranges- кортеж индексов (range) данных значений в
последовательности. (range(3), range(4,6), range(7,9))"""
# print(f"DBG: bytes_for_read: {bytes_for_read}")
raw_cmd = self._to_bytes(cmd, 2)
raw_out = raw_cmd
if value:
raw_out += value # добавляю value и его crc
raw_out += self._to_bytes(_calc_crc(value), 1) # crc считается только для данных!
self._write(raw_out) # выдача на шину
if wait_time:
await asyncio.sleep_ms(wait_time) # ожидание
if not bytes_for_read:
return None
# b = self._read(bytes_for_read) # читаю с шины с проверкой количества считанных байт
b = self._get_local_buf(bytes_for_read)
self._readfrom_into(b) # обновление
base_sensor.check_value(len(b), (bytes_for_read,),
f"Invalid buffer length for cmd: {cmd}. Received {len(b)} out of {bytes_for_read}")
if self.check_crc:
crc_from_buf = [b[i] for i in crc_index] # build list of CRC from buf
calculated_crc = [_calc_crc(b[rng.start:rng.stop]) for rng in value_index]
if crc_from_buf != calculated_crc:
raise ValueError(f"Invalid CRC! Calculated{calculated_crc}. From buffer {crc_from_buf}")
return b # возврат bytearray со считанными данными
# BaseSensor
# Advanced features
def save_config(self):
"""Настройки конфигурации, такие как смещение температуры, высота расположения датчика над уровнем моря
по умолчанию сохраняются только в энергозависимой памяти (ОЗУ) и будут потеряны после выключения и включения
питания. Метод сохраняет текущую конфигурацию в EEPROM SCD4x, сохраняя ее при отключении питания.
Чтобы избежать ненужного износа EEPROM, метод следует вызывать только в том случае, если это необходимо(!) и
если были внесены фактические изменения в конфигурацию. EEPROM гарантированно выдерживает не менее 2000
циклов записи до отказа(!).
Configuration settings such as temperature offset, sensor altitude are stored by default only in volatile memory
(RAM) and will be lost after a power cycle. The method saves the current configuration in the EEPROM of the
SCD4x, saving it when the power is turned off. To avoid unnecessary wear on the EEPROM, the method should only
be called if necessary(!) and if actual configuration changes have been made.
EEPROM is guaranteed to withstand at least 2000 write cycles to failure (!)"""
cmd = 0x3615
self._send_command(cmd, None, 800)
def get_id(self) -> tuple:
"""Return 3 words of unique serial number can be used to identify
the chip and to verify the presence of the sensor."""
# создатели датчика 'обрадовали'. вместо подсчета одного байта CRC на 6 байт (3 двухбайтных слова)
# они считают CRC для каждого из 3-х двухбайтных слов!
cmd = 0x3682
b = self._send_command(cmd, None, 0, bytes_for_read=9,
crc_index=range(2, 9, 3), value_index=(range(2), range(3, 5), range(6, 8)))
# return result
return tuple([(b[i] << 8) | b[i+1] for i in range(0, 9, 3)]) # Success
def soft_reset(self):
"""Я сознательно не стал использовать команду perfom_factory_reset, чтобы было невозможно испортить датчик
программным путем, так-как количество циклов записи во внутреннюю FLASH память датчика ограничено!
I deliberately did not use the perfom_factory_reset command, so that it would be impossible to spoil the
sensor programmatically, since the number of write cycles to the internal FLASH memory of the
sensor is limited!
09.09.2024. Добавил. Под вашу ответственность!"""
cmd = 0x3632
self._send_command(cmd, None, 1200)
def exec_self_test(self) -> bool:
""""Этот метод можно использовать в качестве конечного теста для проверки работоспособности датчика и
проверки подачи питания на датчик. Возвращает Истина, когда тест пройден успешно.
The feature can be used as an end-of-line test to check sensor functionality and the customer power
supply to the sensor. Returns True when the test is successful."""
cmd = 0x3639
length = 3
b = self._send_command(cmd, None, wait_time=10_000, # да, ждать 10 секунд! yes, wait 10 seconds!
bytes_for_read=length, crc_index=range(2, 3), value_index=(range(2),))
res = self.unpack("H", b)[0]
return 0 == res
def reinit(self) -> None:
"""Команда reinit повторно инициализирует датчик, загружая пользовательские настройки из EEPROM.
Перед отправкой команды reinit необходимо выполнить метод stop_measurement. Если команда reinit не вызывает
желаемой повторной инициализации, к SCD4x следует применить цикл включения и выключения питания.
The reinit command reinitializes the sensor by reloading user settings from EEPROM.
Before sending the reinit command, the stop_measurement method must be called.
If the reinit command does not trigger the desired re-initialization,
a power-cycle should be applied to the SCD4x."""
cmd = 0x3646
self._send_command(cmd, None, 20)
# On-chip output signal compensation
def set_temperature_offset(self, offset: float): # вызов нужно делать только в IDLE режиме датчика!
"""Смещение температуры не влияет на точность измерения CO2 . Правильная установка смещения температуры SCD4x
внутри пользовательского устройства позволяет пользователю использовать выходные сигналы RH и T. Обратите
внимание, что смещение температуры может зависеть от различных факторов, таких как режим измерения SCD4x,
самонагрев близких компонентов, температура окружающей среды и расход воздуха. Таким образом, смещение
температуры SCD4x должно определяться внутри пользовательского устройства в типичных условиях его работы
(включая режим работы, который будет использоваться в приложении) и при тепловом равновесии. По умолчанию
смещение температуры установлено в 4°C.
The temperature offset has no influence on the SCD4x CO 2 accuracy. Setting the temperature offset of the SCD4x
inside the customer device correctly allows the user to leverage the RH and T output signal. Note that the
temperature offset can depend on various factors such as the SCD4x measurement mode, self-heating of close
components, the ambient temperature and air flow.
Метод нужно вызывать только в IDLE режиме датчика!
The method should be called only in IDLE sensor mode!
𝑇 𝑜𝑓𝑓𝑠𝑒𝑡_𝑎𝑐𝑡𝑢𝑎𝑙 = 𝑇 𝑆𝐶𝐷40 − 𝑇 𝑅𝑒𝑓𝑒𝑟𝑒𝑛𝑐𝑒 + 𝑇 𝑜𝑓𝑓𝑠𝑒𝑡_ 𝑝𝑟𝑒𝑣𝑖𝑜𝑢𝑠"""
cmd = 0x241D
offset_raw = self._to_bytes(int(374.49142857 * offset), 2)
self._send_command(cmd, offset_raw, 1)
def get_temperature_offset(self) -> float:
"""Метод нужно вызывать только в IDLE режиме датчика!
The method should be called only in IDLE sensor mode!"""
cmd = 0x2318
b = self._send_command(cmd, None, wait_time=1, bytes_for_read=3, crc_index=range(2, 3), value_index=(range(2),))
temp_offs = self.unpack("H", b)[0]
return 0.0026702880859375 * temp_offs
def set_altitude(self, masl: int): # вызов нужно делать только в IDLE режиме датчика!
"""Чтение и запись высоты датчика должны выполняться, когда SCD4x находится в режиме ожидания.
Как правило, высота датчика устанавливается один раз после установки устройства. Чтобы сохранить настройку
в EEPROM, необходимо выполнить метод save_config. По умолчанию высота датчика установлена в
0 метров над уровнем моря (masl).
Reading and writing sensor height must be done when the SCD4x is in standby mode. As a rule, the height of the
sensor is set once after the installation of the device. To save the configuration to EEPROM, you must execute
the save_config method. By default, the sensor height is set to 0 meters above sea level (masl).
Метод нужно вызывать только в IDLE режиме датчика!
The method should be called only in IDLE sensor mode!"""
cmd = 0x2427
masl_raw = self._to_bytes(masl, 2)
self._send_command(cmd, masl_raw, 1)
def get_altitude(self) -> int:
"""Метод нужно вызывать только в IDLE режиме датчика!
The method should be called only in IDLE sensor mode!"""
cmd = 0x2322
b = self._send_command(cmd, None, wait_time=1, bytes_for_read=3, crc_index=range(2, 3), value_index=(range(2),))
return self.unpack("H", b)[0]
def set_ambient_pressure(self, pressure: float):
"""Метод может быть вызван во время периодических измерений, чтобы включить непрерывную компенсацию давления.
Обратите внимание, что установка давления окружающей среды с помощью set_ambient_pressure отменяет любую
компенсацию давления, основанную на ранее установленной высоте датчика. Использование этой команды настоятельно
рекомендуется для приложений со значительными изменениями давления окружающей среды,
чтобы обеспечить точность датчика.
The method can be called during periodic measurements to enable continuous pressure compensation.
Note that setting the ambient pressure using set_ambient_pressure overrides any pressure compensation based
on the previously set sensor height. The use of this command is highly recommended for applications with
significant changes in ambient pressure to ensure sensor accuracy."""
cmd = 0xE000
press_raw = self._to_bytes(int(pressure // 100), 2) # Pascal // 100
self._send_command(cmd, press_raw, 1)
# Field calibration
def force_recalibration(self, target_co2_concentration: int) -> int:
"""Please read '3.7.1 perform_forced_recalibration'"""
base_sensor.check_value(target_co2_concentration, range(2**16),
f"Invalid target CO2 concentration: {target_co2_concentration} ppm")
cmd = 0x362F
target_raw = self._to_bytes(target_co2_concentration, 2)
b = self._send_command(cmd, target_raw, 400, 3, crc_index=range(2, 3), value_index=(range(2),))
return self.unpack("h", b)[0]
def is_auto_calibration(self) -> bool:
"""Please read '3.7.3 get_automatic_self_calibration_enabled'"""
cmd = 0x2313
b = self._send_command(cmd, None, 1, 3, crc_index=range(2, 3), value_index=(range(2),))
return 0 != self.unpack("H", b)[0]
def set_auto_calibration(self, value: bool):
"""Please read '3.7.2 set_automatic_self_calibration_enabled'"""
cmd = 0x2416
value_raw = self._to_bytes(value, 2)
self._send_command(cmd, value_raw, 1, 3)
def set_measurement(self, start: bool, single_shot: bool = False, rht_only: bool = False):
"""Используется для запуска или остановки периодических измерений.
single_shot = False. rht_only не используется!
А также для запуска ОДНОКРАТНОГО измерения. single_shot = True. rht_only используется!
Если rht_only == True то датчик не вычисляет CO2 и оно будет равно нулю! Смотри метод get_meas_data()
start используется только при False == single_shot (periodic mode)
Used to start or stop periodic measurements. single_shot = False. rht_only is not used!
And also to start a SINGLE measurement. single_shot = True. rht_only is used!
If rht_only == True then the sensor does not calculate CO2 and it will be zero! See get_meas_data() method
start is used only when False == single_shot (periodic mode)"""
if single_shot:
return self._single_shot_meas(rht_only)
return self._periodic_measurement(start)
# Basic Commands
def _periodic_measurement(self, start: bool):
"""Start periodic measurement. In low power mode, signal update interval is approximately 30 seconds.
In normal power mode, signal update interval is approximately 5 seconds.
If start == True then measurement started, else stopped.
Для чтения результатов используйте метод get_meas_data.
To read the results, use the get_meas_data method."""
wt = 0
if start:
cmd = 0x21AC if self._low_power_mode else 0x21B1
else: # stop periodic measurement
cmd = 0x3F86
wt = 500
self._send_command(cmd, None, wt)
self._single_shot_mode = False
self._rht_only = False
def get_meas_data(self) -> tuple:
"""Чтение выходных данных датчика. Данные измерения могут быть считаны только один раз за интервал
обновления сигнала, так как буфер очищается при считывании. Смотри get_conversion_cycle_time()!
Read sensor data output. The measurement data can only be read out once per signal update interval
as the buffer is emptied upon read-out. See get_conversion_cycle_time()!"""
cmd = 0xEC05
val_index = (range(2), range(3, 5), range(6, 8))
b = self._send_command(cmd, None, 1, bytes_for_read=9,
crc_index=range(2, 9, 3), value_index=val_index)
words = [self.unpack("H", b[val_rng.start:val_rng.stop])[0] for val_rng in val_index]
# CO2 [ppm] T, Celsius Relative Humidity, %
return words[0], -45 + 0.0026703288 * words[1], 0.0015259022 * words[2]
def is_data_ready(self) -> bool:
"""Return data ready status"""
cmd = 0xE4B8
b = self._send_command(cmd, None, 1, 3, crc_index=range(2, 3), value_index=(range(2),))
return bool(self.unpack("H", b)[0] & 0b0000_0111_1111_1111)
@micropython.native
def get_conversion_cycle_time(self) -> int:
"""Возвращает время преобразования данных датчиком в зависимости от его настроек. мс.
returns the data conversion time of the sensor, depending on its settings. ms."""
if self.is_single_shot_mode and self.is_rht_only:
return 50
return 5000
# SCD41 only
def set_power(self, value: bool):
if not self._isSCD41:
return
"""Please read '3.10.3 power_down' and '3.10.4 wake_up'"""
cmd = 0x36F6 if value else 0x36E0
wt = 20 if value else 1
self._send_command(cmd, None, wt)
def _single_shot_meas(self, rht_only: bool = False):
"""Only for SCD41. Single shot measurement!
Запускает измерение температуры и относительной влажности!
После вызова этого метода, результаты будут готовы примерно через 5 секунд!
Для чтения результатов используйте метод get_meas_data. Содержание CO2 будет равно нулю, если true == rht_only!
After calling this method, the results will be ready in about 5 seconds!
To read the results, use the get_meas_data method.
SCD41 features a single shot measurement mode, i.e. allows for on-demand measurements.
Please see '3.10 Low power single shot (SCD41)'"""
if not self._isSCD41:
return
cmd = 0x2196 if rht_only else 0x219D
self._send_command(cmd, None, 0)
self._single_shot_mode = True
self._rht_only = rht_only
@property
def is_single_shot_mode(self) -> bool:
return self._single_shot_mode
@property
def is_rht_only(self) -> bool:
return self._rht_only
# Iterator
def __iter__(self):
return self
def __next__(self) -> [tuple, None]:
if self._single_shot_mode:
return None
if self.is_data_ready():
return self.get_meas_data()
return None