Easymeter auslesen - Problem

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
seecosmea
User
Beiträge: 5
Registriert: Freitag 5. Januar 2018, 11:08

Guten Tag,

Ich habe im Net das ein Programm gefunden welches mein Strom Zähler Easymeter Q3D auslesen und die werte in Python ausgeben soll. Bekomme immer folgende Meldung wenn Ich das folgende Script starte:
---------------------------------------------------------------------------------------------------------------------------
Python 2.7.9 (default, Sep 17 2016, 20:26:04)
[GCC 4.9.2] on linux2
Type "copyright", "credits" or "license()" for more information.
>>> ================================ RESTART ================================
>>>

Traceback (most recent call last):
File "/home/pi/rrd12.py", line 16, in <module>
if sys.argv[1] == 'A+': # Meter reading import
IndexError: list index out of range
>>>
---------------------------------------------------------------------------------------------------------------------------
Den code den ich benutze ist folgender:

import serial, sys
ser = serial.Serial(port='/dev/ttyUSB0', baudrate=9600, bytesize=7, parity='E', timeout=1)
reading = ser.read(300).decode("utf-8")
ser.flushInput()

# Try again if reading was unsuccessful
if reading.find('/',0) <> 0:
reading = ser.read(300).decode("utf-8")
ser.flushInput()
ser.close()

if len(sys.argv) > 2:
print ("Aborting: To many arguments"), sys.exit()
else:
if sys.argv[1] == 'A+': # Meter reading import
start = reading.find('1-0:1.8.0')
s_id = 1
elif sys.argv[1] == 'A-': # Meter reading export
start = reading.find('1-0:2.8.0')
s_id = 2
elif sys.argv[1] == 'L1': # Power L1
start = reading.find('1-0:21.7.255')
s_id = 3
elif sys.argv[1] == 'L2': # Power L2
start = reading.find('1-0:41.7.255')
s_id = 4
elif sys.argv[1] == 'L3': # Power L3
start = reading.find('1-0:61.7.255')
s_id = 5
elif sys.argv[1] == 'In': # Power total in
start = reading.find('1-0:1.7.255')
s_id = 1
elif sys.argv[1] == 'Out': # Power total out
start = reading.find('1-0:1.7.255')
s_id = 2
else:
print ("Aborting:", sys.argv[1] ,"is not a valid argument"),sys.exit()


idx_StartValue = reading.find('(', start) # Start index of value string
idx_Seperator = reading.find('*', idx_StartValue) # Index of seperator character
idx_EndUnit = reading.find(')', idx_Seperator) # End index of unit string
s_value = reading[idx_StartValue+1:idx_Seperator] # Value as string
s_unit = reading[idx_Seperator+1:idx_EndUnit] # Unit as string
s_prefix = reading[idx_Seperator+1:idx_Seperator+2] # Assume 1st character as prefix

if s_prefix == 'k': # Allocate value and prefix
r_value = float(s_value)*1000
s_unit = s_unit[1:]
else:
r_value = float(s_value)

# Adapt acutal values
if sys.argv[1] == 'In' and r_value <= 0 : # Set value to 0 if negative
r_value = 0
elif sys.argv[1] == 'Out' : # Invert value if negative, set to 0 if positive
if r_value <= 0:
r_value = r_value * -1
else:
r_value = 0

result = str(s_id) + '(' + str(r_value) + '*' + s_unit + ')' # create P1 string

print result.encode('utf-8')

quit()

#/ESY5Q3DA2024 V3.03
#
#1-0:0.0.0*255(184293)
#1-0:1.8.0*255(00004209.8821908*kWh)
#1-0:2.8.0*255(00007505.1469667*kWh)
#1-0:21.7.255*255(000107.34*W)
#1-0:41.7.255*255(000192.22*W)
#1-0:61.7.255*255(000226.02*W)
#1-0:1.7.255*255(000525.58*W)
#1-0:96.5.5*255(82)
#0-0:96.1.255*255(1ESY1213007893)
#!


Ich würde mich sehr über Lösungsvorschläge freuen, vielen Dank im Voraus !

Seecosea
__deets__
User
Beiträge: 14541
Registriert: Mittwoch 14. Oktober 2015, 14:29

Bitte benutz die Code Tags. Sonst ist der Code nicht lesbar.

Und du sollst das Programm nicht in die interaktive Shell kloppen. Sondern in einer Datei Speichern & aufrufen mit zb „python script.py argument“. Das Argument ist notwendig für dein Programm, warum und was es sein muss kann ich nicht sagen - siehe oben.
Sirius3
User
Beiträge: 17750
Registriert: Sonntag 21. Oktober 2012, 17:20

@seecosmea: das Programm ist wie sehr vieles, das man so findet, furchtbar schlecht geschrieben.

So sieht's etwas besser aus:

Code: Alles auswählen

import sys
import re
import serial

KEYWORDS = {
    'A+': '1-0:1.8.0', # Meter reading import
	'A-': '1-0:2.8.0', # Meter reading export
	'L1': '1-0:21.7.255', # Power L1
	'L2': '1-0:41.7.255', # Power L2
	'L3': '1-0:61.7.255', # Power L3
	'In': '1-0:1.7.255', # Power total in
	'Out': '1-0:1.7.255', # Power total out
}

if len(sys.argv) != 2:
    print "wrong arguments."
    print "Usage: %s [KEYWORD]" % sys.argv[0]
    sys.exit()

keyword = sys.argv[1]

with serial.Serial(port='/dev/ttyUSB0', baudrate=9600, bytesize=7, parity='E', timeout=1) as ser:
    while True:
        reading = ser.read(300).decode("utf-8")
        ser.flushInput()
        if reading.startswith('/'):
            break
        # Try again if reading was unsuccessful

try:
    pattern = KEYWORDS[keyword]
except KeyError:
    print"Aborting: %s is not a valid argument" % keyword
    sys.exit()

match = re.search(r"%s.*?\((.*?)(?:\*(.*?))?\)" % pattern, reading)
value, unit = match.groups()
value = float(value)

# Adapt acutal value
if keyword == 'In':
    value = max(value, 0)
elif keyword == 'Out':
    value = max(-value, 0)

print value, unit
Man muß das Programm mit einem Argument aufrufen.
seecosmea
User
Beiträge: 5
Registriert: Freitag 5. Januar 2018, 11:08

Hallo Sirius3,

vielen Dank, es funktioniert super. jetzt kann es für mich weitergehen.

Seecosmea :D
arneman
User
Beiträge: 1
Registriert: Sonntag 12. Januar 2020, 01:07

Ich habe (auch mit den Infos aus diesem Thread - vielen Danke dafür) ein Python Script zum Auslesen des EasyMeter, MQTT publishing, sqlite Speicherung und Systemd Service unter https://github.com/arneman/easymeter-py erstellt.

Vielleicht ist es ja für den ein oder anderen auch interessant. Läuft extrem stabil seit ca. 1 Monat bei mir.

Gruß

Arne
Benutzeravatar
__blackjack__
User
Beiträge: 13110
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@arneman: `sqlite3` ist ein Modul aus der Standardbibliothek, ich kann mir nicht vorstellen das ein Debian einen dazu zwingt Python selbst zu kompilieren wenn man das nutzen möchte.
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
Sirius3
User
Beiträge: 17750
Registriert: Sonntag 21. Oktober 2012, 17:20

In `read` verwendest Du ein Flag, das anzeigt, ob das Lesen geklappt hat, so etwas macht man aber über Exceptions. Das `reset_input_buffer` sieht überflüssig aus.
Das Wörterbuch KEYWORDS sollte umgedreht werden, dass man nicht den selben Text etliche male durchsuchen muß. In `worker_publish_mqtt` ist denke ich nur einmal ein connect nötig.
Dieses Abfragen ob die Queue leer ist und dann nicht zu machen, ist unsinnig. Dann kannst Du gleich blockierend Warten.
In `worker_sqlite` verstehe ich diese Größenbehandlung der Queue nicht. Und daher auch nicht dieses komplizierte Sammeln mehrerer Datensätze.
In `run` machst Du dann seltsame Sachen mit Listen, und mußt diesen Fehler dann in `worker_read_meter` wieder korrigieren. Mach es am besten gleich richtig.
Wenn das Hauptprogramm nur schläft, kann man sich auch einen Prozess sparen.

Das alles gibt ungefähr das:

Code: Alles auswählen

import re
import serial
import logging
import multiprocessing
import json
import datetime

from config import CONFIG

KEYWORDS = {
    '1-0:1.8.0': {'name': 'A+', 'type': float}, # Meter reading import
    #'A-': '1-0:2.8.0', # Meter reading export
    '1-0:21.7.255': {'name': 'L1', 'type': float}, # Power L1
    '1-0:41.7.255': {'name': 'L2', 'type': float}, # Power L1
    '1-0:61.7.255': {'name': 'L3', 'type': float}, # Power L1
    '1-0:1.7.255': {'name': 'In', 'type': float}, # Power L1
    '0-0:96.1.255': {'name': 'SERIAL', 'type': str}, # Serial number
    #'Out': '1-0:1.7.255', # Power total out
}
extract_all = re.compile(r"(\d-\d:\d+\.\d+\.\d+).*?\((.*?)(?:\*(.*?))?\)").findall

TS_FORMAT = '%Y-%m-%d %H:%M:%S'

SQLITE_CREATE = """CREATE TABLE IF NOT EXISTS meter_data
                   ( meter TEXT
                    ,l1 NUMERIC
                    ,l2 NUMERIC
                    ,l3 NUMERIC
                    ,load NUMERIC
                    ,kwh NUMERIC
                    ,TS TEXT DEFAULT CURRENT_TIMESTAMP);"""

def read():
    with serial.Serial(port=CONFIG['dev'], baudrate=9600, bytesize=7, parity='E', timeout=2.5, exclusive=True) as ser:
        reading = ser.read_until(b'!').decode("utf-8")
        if not reading.startswith('/'):
            raise ValueError(reading)
        return reading

def read_meter(task_queues):
    logger = multiprocessing.get_logger()
    while True:
        try:
            reading = read()
            logger.debug(f'reading: {reading}, len: {len(reading)}')

            if CONFIG['utc']:
                ts = datetime.datetime.utcnow()
            else:
                ts = datetime.datetime.now()
            data = {'ts': ts.strftime(TS_FORMAT)}
            for keyword, value, unit in extract_all(reading):
                if keyword in KEYWORDS:
                    info = KEYWORDS[keyword]
                    data[info['name']] = info['type'](value)
                logger.debug((keyword,value,unit))
            #put the reading_dict into all publishing queues
            for queue in task_queues:
                queue.put(data)
        except:
            logger.exception('Error in worker_read_meter')

def worker_publish_mqtt(task_queue):
    import paho.mqtt.client as mqtt
    logger = multiprocessing.get_logger()
    client = mqtt.Client()
    if CONFIG['mqtt']['auth']['enabled']:
          client.username_pw_set(CONFIG['mqtt']['auth']['username'],
                                 CONFIG['mqtt']['auth']['password'])

    client.connect(host=CONFIG['mqtt']['host'], 
                   port=CONFIG['mqtt']['port'],
                   keepalive=CONFIG['mqtt']['keepalive'],
                   bind_address="")
 
    while True:
        try:
            reading = task_queue.get()
            client.publish(topic=CONFIG['mqtt']['topic'], 
                           payload=json.dumps(reading),
                           qos=CONFIG['mqtt']['qos'],
                           retain=CONFIG['mqtt']['retain'])
            logger.debug('worker_publish_mqtt' + json.dumps(reading))
        except:
            logger.exception('Error in worker_publish_mqtt')

def worker_sqlite(task_queue):
    import sqlite3
    logger = multiprocessing.get_logger()

    previous_filename = None
    db_connection = None
    while True:
        try:
            reading = task_queue.get()
            logger.debug(reading)
            #build sqlite filename with timestamp
            ts = datetime.datetime.strptime(reading['ts'], TS_FORMAT)
                    reading['ts_datetime'] = ts
            db_filename = ts.strftime(CONFIG['sqlite']['fname'])
            logger.debug(db_filename)
            if db_filename != previous_filename:
                previous_filename = db_filename
                if db_connection is not None:
                    db_connection.close()
                db_connection = sqlite3.connect(db_filename)
                logger.debug(f'connected to {db_filename}')
                logger.debug('setting up new db...')
                cursor = db_connection.cursor()
                cursor.execute(SQLITE_CREATE)
                    
            #build insert stmnt
            sql = """INSERT INTO meter_data
                     (meter, l1, l2, l3, kwh, ts) 
                     VALUES (?,?,?,?,?,?);"""
            params = [reading['SERIAL'], reading['L1'],
               reading['L2'], reading['L3'], reading['A+'], 
               reading['ts']]

            cursor = db_connection.cursor()
            cursor.execute(sql, params)
            cursor.close()
            db_connection.commit()
        except:
            logger.exception('Error in worker_sqlite')

def worker_logfile(task_queue):
    raise NotImplementedError

def run():
    multiprocessing.log_to_stderr(CONFIG['loglevel'])
    multiprocessing.get_logger().setLevel(CONFIG['loglevel'])

    #target functions for publishing services
    targets ={'mqtt': worker_publish_mqtt,
              'logfile': worker_logfile,
              'sqlite': worker_sqlite} 

    #prepare workers (create queues, link target functions)
    task_queues = []
    for name, function in targets.items():
        if CONFIG[name]['enabled']:
            queue = multiprocessing.Queue()
            task_queues.append(queue)
            p = multiprocessing.Process(target=function,
                                        args=(queue,))
            p.daemon = True #main process kills children before it will be terminated
            p.start()

    read_meter(task_queues)

if __name__ == '__main__':
    run()


""" 
/ESY5Q3DA1004 V3.02
1-0:0.0.0*255(0273011003684)
1-0:1.8.0*255(00026107.7034231*kWh)
1-0:21.7.255*255(000200.13*W)
1-0:41.7.255*255(000122.31*W)
1-0:61.7.255*255(000014.01*W)
1-0:1.7.255*255(000336.45*W)
1-0:96.5.5*255(82)
0-0:96.1.255*255(1ESY1011003684) """
Benutzeravatar
__blackjack__
User
Beiträge: 13110
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

Anmerkungen zum Quellext: `sys` und `logging` werden importiert, aber nicht verwendet.

Zeichenkettenliterale sind nicht dazu da um als Kommentare missbraucht zu werden.

Das ``for i in range(len(worker_targets)):`` in Python ein „anti pattern“ ist, heisst nicht das man das dann komplizierter als ``for idx, _ in enumerate(worker_targets):`` umschreiben sollte. Das ist entweder ein Fall für `zip()` oder noch besser das man da einfach nur *eine* Liste verwendet, statt zwei bei denen die Elemente paarweise zusammengehören.

Ich verstehe nicht so ganz den Sinn eines extra Prozesses für das auslesen der Daten, den man ja auf jeden Fall braucht, und dann den Hauptprozess einfach sinnlos nichts machen zu lassen in einer ``while``/`time.sleep()`-Schleife. Die Aufgabe kann doch einfach der Hauptprozess machen.

Importe sollten am Anfang des Moduls stehen, damit man die Abhängigkeiten leichter sieht. Insbesondere bei Modulen aus der Standardbibliothek macht es keinen Sinn die irgendwo tief im Modul zu verstecken.

`readings` in `worker_sqlite` sieht nach einem Fall für `collections.defaultdict` aus.

Das mehrfache hin- und herkonvertieren des Zeitstempels zwischen `datetime`-Objekten und Zeichenketten ist verwirred und fehleranfällig.

Namen sollten nicht kryptisch abgekürzt werden oder kryptische Abkürzungen enthalten oder gar nur aus einzelnen Buchstaben bestehen. `filename` statt `fname`, `connection` statt `conn` und `cursor` statt `c`.

In der Schleife wo die Ergebnisse in SQLite-DBs geschrieben werden, braucht man in der Schleife sowohl Schlüssel als auch Werte von dem Wörterbuch, also ist das eine Schleife über `items()` und nicht nur über die Schlüssel.

Der Ermittlung von `create_new` ist eigentlich ein ganz simpler Ausdruck:

Code: Alles auswählen

                    create_new = False
                    if not os.path.exists(filename):
                        create_new = True

                # =>

                    create_new = not os.path.exists(filename)
Datenbank und Cusor sollten genau wie Dateien *sicher* wieder geschlossen werden, also entweder mit ``try``/``finally`` oder mit ``with``.

Logging ist eine der wenigen Stellen wo man nocht ``%``-Platzhalter statt `format()` oder f-Zeichenkettenliterale verwenden sollte, denn die Variablen übergibt man dort in der Regel als weitere Argumente damit das formatieren nicht gemacht werden muss wenn da sowieso nichts ausgegeben würde.

`datetime`-Objekte sollte man nicht in Zeichenketten umwandeln wenn man sie in eine Datenbank einfügt. Der Datentyp in der Datenbank dafür ist auch nicht TEXT sondern TIMESTAMP.

Die lokalen Funktion in `worker_publish_mqtt()` gehören da nicht hin. Die werden nicht sinnvoll als Closures verwendet und man muss zweimal hinschauen um zu sehen, dass das trotz Fehler im Code doch funktioniert: Die eine Funktion bekommt ein Argument das nicht verwendet wird und an der Stelle wo man es verwenden müsste wird zufälligerweise der gleiche Wert aus dem umgebenden Namensraum verwendet der beim Aufruf übergeben wird.

Die ”busy waiting”-Schleife macht keinen Sinn. `get()` blockiert – das sollte man hier einfach ausnutzen.

Ich kann mir nur sehr schwer vorstellen das es das richtige vorgehen ist für *jedes einzelne Messergebnis* eine *neue* Verbindung aufzummachen‽

In `worker_read_meter()` wird als erstes eine Queue entfernt die niemals irgendwo verwendet wird – warum wird die dann überhaupt erst erzeugt?

Ausnahmen wurden erfunden damit man keine zusätzlichen Fehlerflags braucht.

`reading` in fast dem ganzen Programm für ein Wörterbuch mit den Messergebnissen zu verwenden und das dann in einer Funktion `reading_dict` zu nennen und dafür `reading` für die ungeparsten Daten zu verwenden ist verwirrend.

Die ausgelesenen Daten für fast jede Zeile erneut mit einem regulären Ausdruck zu durchsuchen macht IMHO nur Sinn wenn man nur einen Bruchteil der Daten braucht. Wenn man sowieso am Ende fast jeden Wert da raus pickt, macht es mehr Sinn die ausgelesenen Daten nur *einmal* per regulärem Ausdruck zu verarbeiten und dabei alle Schlüssel/Wert-Paare auszulesen.

In der `extract()`-Funktion ist ein Fehler: `pattern` enthält Punkte die auch tatsächlich Punkte sein sollen und nicht ein beliebiges Zeichen. Das fehlt also ein `re.escape()`.

Ungetesterer Zwischenstand:

Code: Alles auswählen

#!/usr/bin/env python3
import datetime
import json
import multiprocessing
import os
import re
import sqlite3
import time
from collections import defaultdict
from contextlib import closing

import paho.mqtt.client as mqtt
import serial
from config import CONFIG

KEYWORDS = {
    "A+": {"keyword": "1-0:1.8.0", "dtype": float},  # Meter reading import
    # "A-": "1-0:2.8.0",  # Meter reading export
    "L1": {"keyword": "1-0:21.7.255", "dtype": float},  # Power L1
    "L2": {"keyword": "1-0:41.7.255", "dtype": float},  # Power L2
    "L3": {"keyword": "1-0:61.7.255", "dtype": float},  # Power L3
    "In": {"keyword": "1-0:1.7.255", "dtype": float},  # Power total in
    "SERIAL": {"keyword": "0-0:96.1.255", "dtype": str},  # Serial number
    # "Out": "1-0:1.7.255",  # Power total out
}

TIMESTAMP_FORMAT = "%Y-%m-%d %H:%M:%S"

DB_SCHEMA = """CREATE TABLE IF NOT EXISTS meter_data
                   (meter TEXT,
                    l1 NUMERIC,
                    l2 NUMERIC,
                    l3 NUMERIC,
                    load NUMERIC,
                    kwh NUMERIC,
                    TS TIMESTAMP DEFAULT CURRENT_TIMESTAMP);"""


def read():
    with serial.Serial(
        port=CONFIG["dev"],
        bytesize=serial.SEVENBITS,
        parity=serial.PARITY_EVEN,
        timeout=2.5,
        exclusive=True,
    ) as connnection:
        reading = connnection.read_until(b"!").decode("utf-8")
        connnection.reset_input_buffer()

        if reading.startswith("/"):
            return reading
        time.sleep(0.5)  # wait to reach the right cycle
        raise IOError(f"reading failed {reading!r}")


def extract(keyword, reading):
    match = re.search(
        re.escape(KEYWORDS[keyword]["keyword"]) + r".*?\((.*?)(?:\*(.*?))?\)",
        reading,
    )
    value, unit = match.groups()
    return KEYWORDS[keyword]["dtype"](value), unit


def worker_read_meter(task_queues):
    logger = multiprocessing.get_logger()
    while True:
        try:
            reading_source = read()
            logger.debug(
                "reading: %r, len: %s", reading_source, len(reading_source)
            )
            if CONFIG["utc"]:
                timestamp = datetime.datetime.utcnow()
            else:
                timestamp = datetime.datetime.now()
            #
            # TODO Don't put a string into the data here but the datetime
            # object.
            #
            reading = {"ts": timestamp.strftime(TIMESTAMP_FORMAT)}
            #
            # TODO Instead of calling extract for each key, extract all
            # key(word)/value+unit pairs and then picke the ones we are
            # interested in.
            #
            for key in KEYWORDS:
                value, unit = extract(key, reading_source)
                reading[key] = value
                logger.debug("%r", (key, value, unit))

            for queue in task_queues:
                queue.put(reading)
        except:
            logger.exception("Error in worker_read_meter")


def worker_publish_mqtt(task_queue):
    logger = multiprocessing.get_logger()
    client = mqtt.Client()
    while True:
        try:
            reading = task_queue.get()
            if CONFIG["mqtt"]["auth"]["enabled"]:
                client.username_pw_set(
                    CONFIG["mqtt"]["auth"]["username"],
                    CONFIG["mqtt"]["auth"]["password"],
                )
            client.connect(
                host=CONFIG["mqtt"]["host"],
                port=CONFIG["mqtt"]["port"],
                keepalive=CONFIG["mqtt"]["keepalive"],
                bind_address="",
            )
            payload = json.dumps(reading)
            client.publish(
                topic=CONFIG["mqtt"]["topic"],
                payload=payload,
                qos=CONFIG["mqtt"]["qos"],
                retain=CONFIG["mqtt"]["retain"],
            )
            logger.debug("worker_publish_mqtt %s", payload)
        except:
            logger.exception("Error in worker_publish_mqtt")
            time.sleep(0.1)


def worker_sqlite(task_queue):
    #
    # TODO This function does too much — split up.
    #
    logger = multiprocessing.get_logger()
    while True:
        try:
            #
            # TODO: Take care the queue doesnt get too large (in case of insert
            # issues here)
            #
            if task_queue.qsize() >= CONFIG["sqlite"]["min_rows_insert"]:
                # get readings and build sqlite filenames (maybe different
                # filenames because of timestamp)
                filename_to_readings = defaultdict(list)
                while not task_queue.empty():
                    reading = task_queue.get()
                    logger.debug(reading)
                    timestamp = datetime.datetime.strptime(
                        reading["ts"], TIMESTAMP_FORMAT
                    )
                    reading["ts_datetime"] = timestamp
                    filename = timestamp.strftime(CONFIG["sqlite"]["fname"])
                    logger.debug(filename)
                    filename_to_readings[filename].append(reading)

                for filename, readings in filename_to_readings.items():
                    is_new_database = not os.path.exists(filename)
                    with sqlite3.connect(filename) as connection:
                        with closing(connection.cursor()) as cursor:
                            logger.debug("connected to %s", filename)
                            try:
                                if is_new_database:
                                    logger.debug("setting up new db...")
                                    cursor.execute(DB_SCHEMA)

                                cursor.executemany(
                                    "INSERT INTO meter_data"
                                    " (meter, l1, l2, l3, kwh, ts)"
                                    " VALUES (?,?,?,?,?,?)",
                                    [
                                        (
                                            reading["SERIAL"],
                                            reading["L1"],
                                            reading["L2"],
                                            reading["L3"],
                                            reading["A+"],
                                            reading["ts_datetime"],
                                        )
                                        for reading in readings
                                    ],
                                )
                                connection.commit()
                                logger.debug(
                                    "insert into %s was successful. "
                                    "Inserted %s readings.",
                                    filename,
                                    len(readings),
                                )
                            except:
                                logger.exception(
                                    "insert into %s failed", filename
                                )
                                for reading in readings:
                                    logger.debug(
                                        "add %s to queue again", reading
                                    )
                                    task_queue.put(reading)
                    logger.debug("closed connection to %s", filename)
        except:
            logger.exception("Error in worker_sqlite")
        time.sleep(1)


def worker_logfile(task_queue):
    raise NotImplementedError


def run():
    multiprocessing.log_to_stderr(CONFIG["loglevel"])
    multiprocessing.get_logger().setLevel(CONFIG["loglevel"])

    # target functions for publishing services
    targets = {
        "mqtt": worker_publish_mqtt,
        "logfile": worker_logfile,
        "sqlite": worker_sqlite,
    }
    # prepare workers (create queues, link target functions)
    workers = []
    for key in targets:
        if CONFIG[key]["enabled"]:
            workers.append((multiprocessing.Queue(), targets[key]))
    # now add worker_read_meter and give it a ref to all queues as argument
    workers.append(([queue for queue, _ in workers], worker_read_meter))

    for queue, target in workers:
        process = multiprocessing.Process(
            target=target, args=[queue], daemon=True
        )
        process.start()

    worker_read_meter([queue for queue, _ in workers])


if __name__ == "__main__":
    run()


# /ESY5Q3DA1004 V3.02

# 1-0:0.0.0*255(0273011003684)
# 1-0:1.8.0*255(00026107.7034231*kWh)
# 1-0:21.7.255*255(000200.13*W)
# 1-0:41.7.255*255(000122.31*W)
# 1-0:61.7.255*255(000014.01*W)
# 1-0:1.7.255*255(000336.45*W)
# 1-0:96.5.5*255(82)
# 0-0:96.1.255*255(1ESY1011003684)
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
Antworten