Datenaustausch mit Heizungssteuerung

Python auf Einplatinencomputer wie Raspberry Pi, Banana Pi / Python für Micro-Controller
Antworten
Alter forscht
User
Beiträge: 8
Registriert: Sonntag 8. Dezember 2019, 18:45

Hallo hier bin ich wieder, vielen Dank noch mal für eure bisherige Hilfe.

Einige Fragezeichen in meinem Kopf sind weg, dafür gibt es wieder andere :D

Da mein Code immer länger wird denke ich es wäre vielleicht sinnvoll in auf mehrere Module auf zu teile z.B. Hauptcode, Abfrage allgemeine Daten, Abfrage ist Temperaturen, Abfrage Solltemperaturen, Abfrage Betriebszustände, Vergleich Soll-Ist-Werte mit Benachrichtigung z.B. auf Smartphone.

Beim Abfragen von Betriebszuständen habe ich im Moment noch keine Ahnung wie ich die Werte die ich vom Regler über die Abfrage der Bitvariablen erhalte brauchbar auswerten kann.
https://1drv.ms/b/s!Aj1wKT5IcL0qkH34MoW ... -?e=1OkBm8 ich hoffe der Link zur Regler-Dokumentation funktioniert so.
Unter Punkt 5.12 ist die Abfrage der Bitvariationen beschrieben. Lieder verstehe ich als Unwissender nicht, was mir das sagen soll :cry:

Bezüglich Vergleich Soll-Ist-Werte möchte ich eine Störmeldung auf Smartphone, wenn z.B. die Puffertemperatur nicht den Sollwert erreicht.

Für Hilfe, Tipps, Codevorschläge und Erklärungen (die meinen bescheidenen Programmierhorizont) erweitern bin ich dankbar.


Hier noch der aktuelle Stand meines Codes

Code: Alles auswählen

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import serial
import sollwerte
import time
import datetime

TEMPERATURFUEHLER = {
    1: "Kollektorfühlertemperatur",
    2: "Kollekorvorlauftemperatur",
    3: "Speicher Oben Temperatur",
    4: "Speicher Mitte Temperatur",
    5: "Speicher Unten Temperatur",
    7: "Kollekorrücklauf Temperatur",
    10: "Vorlauftemperatur Heizkreis 1",
    11: "Vorlauftemperatur Heizkreis 2",
    12: "Zirkulation Rücklauftemperatur",
    15: "Holzpelletkessel Temperatur",
    16: "Außentemperatur"
}

def send_message(serial, message):
    assert len(message) < 256
    checksum = -(2 + len(message) + sum(message)) & 0xff
    data = bytes([2, len(message)] + message + [checksum])
    serial.write(data)

def read_message(serial):
    data = serial.read(2)
    if data is None:
        # timeout
        return None
    if data[0] != 2:
        raise IOError("message-header wrong")
    message = serial.read(data[1])
    checksum = serial.read(1)
    if -sum(data + message) & 0xff != checksum[0]:
        raise IOError("checksum wrong")
    return message

def call_command(serial, message):
    send_message(serial, message)
    return read_message(serial)

def reglerkennung(serial):
    ans = call_command(serial, [ 0x01 ])
    if ans[0] != 0x01:
        raise IOError("Wrong answer id")

    return int.from_bytes(ans[1:3], byteorder="big")

def versionsnummer(serial):
    ans = call_command(serial, [ 0x02 ])
    if ans[0] != 0x02:
        raise IOError("Wrong answer id")
    if len(ans) != 4:
        raise IOError("Wrong answer length")

    major = int.from_bytes(ans[1:3], byteorder="big")
    minor = int(ans[3])
    return "V{}.{:02}.{}".format(int(major / 100), major % 100, minor)

def temperaturfuehler(serial, fuehler):
    assert fuehler < 17
    ans = call_command(serial, [ 0x10, 0x00, fuehler + 4 ])
    if ans[0] != 0x10:
        raise IOError("Wrong answer id")
    if len(ans) != 7:
        raise IOError("Wrong answer length")
    if int.from_bytes(ans[1:3], byteorder="big") != fuehler + 4:
        raise IOError("Wrong var id")

    value = int.from_bytes(ans[3:8], signed=True, byteorder="big")

    return "{}°C".format(value / 10)

#Vom Regler berechnete Vorlaufsolltemperatur Heizkreis 1 (Heizkörper)
def vorlauf_HK1_soll(serial):
    ans = call_command(serial, [ 0x10, 0x00, 75 ])
    if ans[0] != 0x10:
        raise IOError("Wrong answer id")
    if len(ans) != 7:
        raise IOError("Wrong answer length")
    if int.from_bytes(ans[1:3], byteorder="big") != 75:
        raise IOError("Wrong var id")

    value = int.from_bytes(ans[3:8], byteorder="big")

    return "{}°C".format(value / 10)

#Vom Regler berechnete Vorlaufsolltemperatur Heizkreis 2 (Fußbodenheizung)
def vorlauf_HK2_soll(serial):
    ans = call_command(serial, [ 0x10, 0x00, 92 ])
    if ans[0] != 0x10:
        raise IOError("Wrong answer id")
    if len(ans) != 7:
        raise IOError("Wrong answer length")
    if int.from_bytes(ans[1:3], byteorder="big") != 92:
        raise IOError("Wrong var id")

    value = int.from_bytes(ans[3:8], byteorder="big")

    return "{}°C".format(value / 10)

#Vom Regler berechnete Speichersolltemperatur
def speicher_soll(serial):
    ans = call_command(serial, [ 0x10, 0x00, 116 ])
    if ans[0] != 0x10:
        raise IOError("Wrong answer id")
    if len(ans) != 7:
        raise IOError("Wrong answer length")
    if int.from_bytes(ans[1:3], byteorder="big") != 116:
        raise IOError("Wrong var id")

    value = int.from_bytes(ans[3:8], byteorder="big")

    return "{}°C".format(value / 10)


#wird zum Vergleich Speicher Soll/Ist Temperatur benutzt
def speicher_oben_ist(serial):
    ans = call_command(serial, [ 0x10, 0x00, 7 ])
    if ans[0] != 0x10:
        raise IOError("Wrong answer id")
    if len(ans) != 7:
        raise IOError("Wrong answer length")
    if int.from_bytes(ans[1:3], byteorder="big") != 7:
        raise IOError("Wrong var id")

    value = int.from_bytes(ans[3:8], byteorder="big")

    return "{}°C".format(value / 10)

#gesammt Energieertrag der Solaranlage seit IBN
def energieertrag_solar(serial):
    ans = call_command(serial, [ 0x10, 0x00, 36 ])
    if ans[0] != 0x10:
        raise IOError("Wrong answer id")
    if len(ans) != 7:
        raise IOError("Wrong answer length")
    if int.from_bytes(ans[1:3], byteorder="big") != 36:
        raise IOError("Wrong var id")

    value = int.from_bytes(ans[3:8], byteorder="big")

    return "{} MWh".format(round(value / 1000000, 1))

#Tagesertrag der Solaranlage
def tagesertrag_solar(serial):
    ans = call_command(serial, [ 0x10, 0x00, 37 ])
    if ans[0] != 0x10:
        raise IOError("Wrong answer id")
    if len(ans) != 7:
        raise IOError("Wrong answer length")
    if int.from_bytes(ans[1:3], byteorder="big") != 37:
        raise IOError("Wrong var id")

    value = int.from_bytes(ans[3:8], byteorder="big")

    return "{} KWh".format(value / 10)

#Versuch Abfrage der Bitvariablen
def abfrage_bitv(serial):
    ans = call_command(serial, [ 0x20 ])
    if ans[0] != 0x20:
        raise IOError("Wrong answer id")

    return int.from_bytes(ans[1:3], byteorder="big")#hier habe gerade Keine wie ich die Info vom Regel sinnvoll auswerte



def main():
    ser = serial.Serial("/dev/ttyUSB0",
                    19200,
                    xonxoff=False,
                    parity=serial.PARITY_NONE,
                    stopbits=serial.STOPBITS_ONE,
                    bytesize=serial.EIGHTBITS,
                    timeout=2)
    ser.rts = 0
#    for i in range(3):
    while True:
        now = datetime.datetime.now()
        print(now.strftime('%d.%m.%Y'))
        print(now.strftime('%H:%M:%S'))
        print("prosolar PS600"), print("Kennung:", reglerkennung(ser))
        print("Versionsnummer:", versionsnummer(ser))
        print("")
        print("Istwerte")
        for fuehler, name in TEMPERATURFUEHLER.items():
            print("{}: {}".format(name, temperaturfuehler(ser, fuehler)))
        print("")
        print("Erträge")
        print("Energieertrag Solar:", energieertrag_solar(ser))
        print("Tagesertrag Solar:" , tagesertrag_solar(ser))
        print("")
        print("Abfrage Bitvariante:" , abfrage_bitv(ser))
        print("")
        print("Sollwerte")
        print("Vorlauf Soll HK1:", vorlauf_HK1_soll(ser)) ; print("Vorlauf Soll HK2:", vorlauf_HK2_soll(ser))
        print("Speicher Soll:", speicher_soll(ser))

#Überprüfung ob Speicheristtemperatur über Speichersolltemperatur liegt
#sollte das nicht zutreffen, soll hier (wenn ich wies wie) eine Benachrichtigung zu auf Smartphone erfolgen
        if  (speicher_oben_ist(ser)) < (speicher_soll(ser)):
            print("Temp. Warnung")
        else:
            print("Temp.ok")

#Versuch Daten aus anderem Modul zu verwenden
        sollwerte.sollwert_test()
        print("")
        print("") 
        time.sleep(10)#zu Testzwecken auf 10 Sekunden

if __name__ == '__main__':
        main()
Benutzeravatar
__blackjack__
User
Beiträge: 13103
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@Alter forscht: So 200 Zeilen finde ich ja noch nicht besonders lang. Zumindest an dieser Zeilenzahl würde ich nicht festmachen wollen das man das sogar auf mehrere Module aufteilen sollte. Zumal es so aussieht als wenn da einiges an Code wiederholt wurde wo sich die einzelnen Funktionen nur durch Werte unterscheiden, aber sonst die gleiche Struktur haben, man also wahrscheinlich noch ein paar Zeilen einsparen kann.

Man sollte nicht versuchen mehr als eine Anweisung in eine Zeile zu quetschen. Das Semikolon wird in normalen Python-Programmen nicht verwendet. Das kommt fast nur bei Code-Golf und bei Einzeilen die per ``-c``-Option an Python übergeben werden, verwendet.

Und ein Komma sollte man dafür erst recht nicht verwenden, denn damit erstellt man ein Tupel. Das betrifft die Zeile mit ``print("prosolar PS600")``. In der Zeile wird ein Tupel mit den Rückgabewerten der beiden `print()`-Aufrufe erstellt, also ``(None, None)`` mit dem dann natürlich nichts weiter gemacht wird, weil das ja auch keinen Sinn ergibt.

In der `main()`-Funktion wird das `Serial`-Objekt an den Namen `ser` gebunden weil `serial` bereits durch das Modul belegt ist. Und in allen anderen Funktionen verdeckt das `serial`-Argument das gleichnamige Modul. Das ist unschön. Entweder man sorgt dafür das auf Modulebene kein `serial` als Name existiert, oder man benennt das `Serial`-Objekt anders, beispielsweise `connection`.

Um eine Leerzeile mit `print()` auszugeben braucht man keine leere Zeichenkette übergeben.

Kommentare sollten entsprechend dem Code eingerückt werden. Sonst entspricht die Formatierung nicht mehr der Struktur des Codes.

Der Name `ans` ist nicht gut wenn man `answer` meint. Keine kryptischen Abkürzugen verwenden.

Funktionen werden üblicherweise nach Tätigkeiten benannt, um sie von eher passiven Werten unterscheiden zu können. Hinter einem Namen wie `versionsnummer` erwartet der Leser eine Versionsnummer, keine funktion die eine Versionsnummer ermittelt.

Erwartete Länge und ID der Antwort wird in (fast) jeder Funktion geprüft, den Code kann man also in `call_command()` verschieben. Aus dem bisherigen Code wird der Zusammenhang zwischen der Antwort-ID die ja eigentlich ein Fehlercode ist, und der gesendeten Nachricht nur indirekt erkennbar – bei allen Befehlen die Werte abfragen ist das erste Byte der gesendeten und der empfangenen Nachricht gleich, wenn alles richtig lief. Bei den Befehlen die Werte setzen ist das erste Byte der Wert 0.

Man kann aus den Funktionen eine `get_variable()`-Funktion herausziehen, womit die ganzen Funktionen die Variablen abfragen kürzer werden. Da bleibt am Ende dann immer noch ein Muster übrig das sich nur durch Werte unterscheidet. Da würde ich wahrscheinlich aufgrund der Tabelle in der Dokumentation versuchen so viel wie möglich als Daten zu kodieren, so das man am Ende vielleicht sogar mit einer einzigen Funktion auskommt.

Zwischenstand (natürlich ungetestet):

Code: Alles auswählen

#!/usr/bin/env python3
import datetime
import time

import serial
import sollwerte

TEMPERATURFUEHLER = {
    1: "Kollektorfühlertemperatur",
    2: "Kollekorvorlauftemperatur",
    3: "Speicher Oben Temperatur",
    4: "Speicher Mitte Temperatur",
    5: "Speicher Unten Temperatur",
    7: "Kollekorrücklauf Temperatur",
    10: "Vorlauftemperatur Heizkreis 1",
    11: "Vorlauftemperatur Heizkreis 2",
    12: "Zirkulation Rücklauftemperatur",
    15: "Holzpelletkessel Temperatur",
    16: "Außentemperatur",
}


def send_message(connection, message):
    checksum = -(2 + len(message) + sum(message)) & 0xFF
    data = bytes([2, len(message), *message, checksum])
    connection.write(data)


def read_message(connection):
    data = connection.read(2)
    if data is None:
        # timeout
        return None
    if data[0] != 2:
        raise IOError("message-header wrong")
    message = connection.read(data[1])
    checksum = connection.read(1)
    if -sum(data + message) & 0xFF != checksum[0]:
        raise IOError("checksum wrong")
    return message


def call_command(connection, message, does_echo_first_byte=True):
    send_message(connection, message)
    answer = read_message(connection)
    #
    # TODO Die Dokumentation enthält mehr Informationen zu den Fehlercodes die
    # man hier besser melden könnte.  Vielleicht auch als eigenen Ausnahmetyp.
    #
    if answer[0] != (message[0] if does_echo_first_byte else 0x00):
        raise IOError("Error code 0x{:02X}".format(answer[0]))
    return answer


def get_reglerkennung(connection):
    answer = call_command(connection, [0x01])
    return int.from_bytes(answer[1:3], byteorder="big")


def get_versionsnummer(connection):
    answer = call_command(connection, [0x02])
    if len(answer) != 4:
        raise IOError("Wrong answer length")

    major = int.from_bytes(answer[1:3], byteorder="big")
    minor = int(answer[3])
    return "V{}.{:02}.{}".format(major // 100, major % 100, minor)


def get_variable(connection, variable_id):
    answer = call_command(connection, [0x10, 0x00, variable_id])
    if len(answer) != 7:
        raise IOError("Wrong answer length")
    if int.from_bytes(answer[1:3], byteorder="big") != variable_id:
        raise IOError("Wrong var id")
    return answer[3:8]


def get_temperaturfuehler(connection, fuehler):
    assert fuehler < 17
    data = get_variable(connection, fuehler + 4)
    value = int.from_bytes(data, signed=True, byteorder="big") / 10
    return "{}°C".format(value)


# Vom Regler berechnete Vorlaufsolltemperatur Heizkreis 1 (Heizkörper)
def get_vorlauf_hk1_soll(connection):
    data = get_variable(connection, 75)
    value = int.from_bytes(data, byteorder="big") / 10
    return "{}°C".format(value)


# Vom Regler berechnete Vorlaufsolltemperatur Heizkreis 2 (Fußbodenheizung)
def get_vorlauf_hk2_soll(connection):
    data = get_variable(connection, 92)
    value = int.from_bytes(data, byteorder="big") / 10
    return "{}°C".format(value)


# Vom Regler berechnete Speichersolltemperatur
def get_speicher_soll(connection):
    data = get_variable(connection, 116)
    value = int.from_bytes(data, byteorder="big") / 10
    return "{}°C".format(value)


# wird zum Vergleich Speicher Soll/Ist Temperatur benutzt
def get_speicher_oben_ist(connection):
    data = get_variable(connection, 7)
    value = int.from_bytes(data, byteorder="big") / 10
    return "{}°C".format(value)


# gesammt Energieertrag der Solaranlage seit IBN
def get_energieertrag_solar(connection):
    data = get_variable(connection, 36)
    value = int.from_bytes(data, byteorder="big") / 1_000_000
    return "{:.1f} MWh".format(value)


# Tagesertrag der Solaranlage
def get_tagesertrag_solar(connection):
    data = get_variable(connection, 37)
    value = int.from_bytes(data, byteorder="big") / 10
    return "{} KWh".format(value)


# Versuch Abfrage der Bitvariablen
def get_arbeits_bits(connection):
    answer = call_command(connection, [0x20])
    # hier habe gerade Keine wie ich die Info vom Regler sinnvoll auswerte
    return int.from_bytes(answer[1:3], byteorder="big")


def main():
    connection = serial.Serial("/dev/ttyUSB0", 19200, timeout=2)
    connection.rts = False
    while True:
        now = datetime.datetime.now()
        print(now.strftime("%d.%m.%Y"))
        print(now.strftime("%H:%M:%S"))
        print("prosolar PS600")
        print("Kennung:", get_reglerkennung(connection))
        print("Versionsnummer:", get_versionsnummer(connection))
        print()
        print("Istwerte")
        for fuehler, name in TEMPERATURFUEHLER.items():
            print(
                "{}: {}".format(
                    name, get_temperaturfuehler(connection, fuehler)
                )
            )
        print()
        print("Erträge")
        print("Energieertrag Solar:", get_energieertrag_solar(connection))
        print("Tagesertrag Solar:", get_tagesertrag_solar(connection))
        print()
        print("Abfrage Bitvariante:", get_arbeits_bits(connection))
        print()
        print("Sollwerte")
        print("Vorlauf Soll HK1:", get_vorlauf_hk1_soll(connection))
        print("Vorlauf Soll HK2:", get_vorlauf_hk2_soll(connection))
        print("Speicher Soll:", get_speicher_soll(connection))

        # Überprüfung ob Speicheristtemperatur über Speichersolltemperatur liegt
        # sollte das nicht zutreffen, soll hier (wenn ich wies wie) eine
        # Benachrichtigung zu auf Smartphone erfolgen
        if get_speicher_oben_ist(connection) < get_speicher_soll(connection):
            print("Temp. Warnung")
        else:
            print("Temp. Ok")

        # Versuch Daten aus anderem Modul zu verwenden
        sollwerte.sollwert_test()
        print()
        print()
        time.sleep(10)  # zu Testzwecken auf 10 Sekunden


if __name__ == "__main__":
    main()
Um die Bitvariablen auszuwerten müsstest Du Dir mal die bitweisen Operatoren wie ``&``, ``|``, und ``~`` anschauen. Und/oder vielleicht auch `enum.IntFlag`.
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
Alter forscht
User
Beiträge: 8
Registriert: Sonntag 8. Dezember 2019, 18:45

@blackjack: Vielen Dank für deine tolle schnelle Unterstützung.
Ich habe den modifizierten Code getestet, er läuft fehlerfrei :D
Damit habe ich wieder ein solide Grundlage, auf der ich weiter arbeiten kann und wieder ein paar neue :?: im Kopf :wink:
Antworten