Datenaustausch mit Heizungssteuerung

Python auf Einplatinencomputer wie Raspberry Pi, Banana Pi / Python für Micro-Controller
Antworten
Alter forscht
User
Beiträge: 8
Registriert: Sonntag 8. Dezember 2019, 18:45

Hallo an die erfahrenen Programmierer unter euch,

ich habe eine ProSolar PS 600 Heizungsteuerung mit einer RS232 Schnittstelle, mittels einer vom Hersteller mitgelieferten Software konnte man früher sämtliche Werte der Steuerung am PC sehen und auch ändern. Dies dürfte soweit ich mich erinnere unter Windows 95 oder XP gewesen sein.

Ich habe jetzt eine Raspberry PI4 installiert und über USB-RS232-Adapter mit der Steuerung verbunden und versuche nun verzweifelt eine Verbindung aufzubauen.

Mein Script sieht z.Z so auf:

Code: Alles auswählen

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import serial
import time

ser = serial.Serial("/dev/ttyUSB0",
                    19200,
                    xonxoff=False,
                    #rtscts=True,
                    #dsrdtr=True,
                    parity=serial.PARITY_NONE,
                    stopbits=serial.STOPBITS_ONE,
                    bytesize=serial.EIGHTBITS,
                    timeout=2)

if (ser.isOpen() == True):
  ser.close()
ser.open()
ser.setRTS(0)

nachricht = [ 0x02, 0x01, 0x01, 0xFB ]

print('Nachricht korrent:', sum(nachricht) % 0xFF == 0)
daten = bytes(nachricht)

for i in range(100):
    print('Sende:', daten)

    ser.write(daten)

    time.sleep(0.05)
    empfangen = ser.read(ser.inWaiting())
    print('Empfange:', empfangen)
    
    time.sleep(1)
Daten werden wohl gesendet, aber keine empfangen

Ich habe keine Ahnung wo der Fehler liegt. : :roll:

Hier noch ein Link zu Schnittstellendokumentation von ProSolar
https://1drv.ms/b/s!Aj1wKT5IcL0qkH34MoW ... -?e=2EA4CA

Ich hoffe hier gibt es kluge Köpfe, die sich mit so etwas auskennen und mir helfen können.

Ich bin für jede konstruktive Hilfe dankbar.
__deets__
User
Beiträge: 14539
Registriert: Mittwoch 14. Oktober 2015, 14:29

Deine Prüfsumme ist falsch. Die muss 0xFC sein, damit die Summe 0 ergibt. Dein Test ob die korrekt ist, ist auch falsch. 255 % 255 ist 0. Denn eine Zahl ist immer durch sich selbst teilbar ohne Rest. Du musst entweder mit 0x100 (256) prüfen, oder stattdessen mit 0xff verunden, um zu sehen ob Bits im unteren Byte gesetzt sind.

Und bitte in Zukunft selbst Code Tags verwenden. Sonst gehen die wichtigen Einrückungen verloren.
__deets__
User
Beiträge: 14539
Registriert: Mittwoch 14. Oktober 2015, 14:29

Und noch ein Nachtrag: ich würde eher mit timeout lesen, statt mit inWaiting. Denn wenn noch keine Daten da sind, liest das (denke ich mal) nichts. Das ist zwar nicht die Ursache, aber wenn die Prüfsumme stimmt, dann wäre das ggf. ein Problem.
Sirius3
User
Beiträge: 17749
Registriert: Sonntag 21. Oktober 2012, 17:20

Nachdem in Serial-Objekt erstellt wurde, ist die Schnittstelle frisch geöffnet. Welchen Sinn hat es da, sie zu schließen und gleich wieder zu öffnen? setRTS ist veraltet. Statt dessen benutzt man das Property rts.
Beim Lesen solltest Du das Protokoll einhalten und nicht einfach drauflos lesen.

Code: Alles auswählen

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
import serial

ser = serial.Serial("/dev/ttyUSB0",
                    19200,
                    xonxoff=False,
                    #rtscts=True,
                    #dsrdtr=True,
                    parity=serial.PARITY_NONE,
                    stopbits=serial.STOPBITS_ONE,
                    bytesize=serial.EIGHTBITS,
                    timeout=2)
ser.rts = 0

nachricht = [ 0x02, 0x01, 0x01, 0xFC ]
print('Nachricht korrent:', sum(nachricht) & 0xFF == 0)

daten = bytes(nachricht)
for i in range(100):
    print('Sende:', daten)
    ser.write(daten)
    empfangen = ser.read(2)
    if empfangen:
        data = ser.read(empfangen[1])
        checksum = ser.read(1)
        print('Empfange:', empfangen, data, checksum)
    else:
        print('Timeout')
    time.sleep(1)
Alter forscht
User
Beiträge: 8
Registriert: Sonntag 8. Dezember 2019, 18:45

Danke für die schnellen Antworten,

bitte habt Nachsicht mit mir, da ich gerade sehr viel Neuland betrete
(Raspberry, Linux (Raspbian), Python, Script Datenübertragung).

Da habe ich mir die Latte ganz schön hochgehängt.

Nach dem die Installation des PI einschließlich grafischer Oberfläche und Remotezugriff sowie ein erstes kleines Projekt zur Steuerung eines Relais via GPIO widererwartend gut gelaufen ist, wurde meine Euphorie mit dem Projekt Datenübertragung erst mal gedämpft.

Ich werde versuchen eure Hinweise um zusetzen, mal schauen ob das klappt.
__deets__
User
Beiträge: 14539
Registriert: Mittwoch 14. Oktober 2015, 14:29

Alles gut. Der Code von dir war ok, und keine Katastrophe für den Anfang. Verbesserung geht immer, und so ein Thema wie dieses protokoll braucht man ein paar Anläufe für. Probier die Vorschläge einfach mal aus.
Alter forscht
User
Beiträge: 8
Registriert: Sonntag 8. Dezember 2019, 18:45

Hallo deets,
das mit der Prüfsumme war ein Volltreffer, der Regler antwortet herzlichen Dank dafür, damit habe ich eine riesen Hürde genommen.

Wie sieht die Syntax für das Lesen mit timeout aus?

Ich habe meinen Code mal verändert und vom Regler die Softwareversion ausgelesen, was grundsätzlich funktioniert hat aber sicher noch verbesserungsbedürftig ist.
Mit den Code Tags tue ich mir auch noch schwer, ich hoffe man kann trotzdem was mit anfangen.

Code: Alles auswählen

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import serial
import time

ser = serial.Serial("/dev/ttyUSB0",
                    19200,
                    xonxoff=False,
                    parity=serial.PARITY_NONE,
                    stopbits=serial.STOPBITS_ONE,
                    bytesize=serial.EIGHTBITS,
                    timeout=2)

if (ser.isOpen() == True):
  ser.close()
ser.open()
ser.setRTS(0)

nachricht = [ 0x02, 0x01, 0x02, 0xFB]


print('Abweichung Prüfsumme:', sum(nachricht) % 0x100)
daten = bytes(nachricht)

for i in range(2):
    print('Sende:', daten)

    ser.write(daten)

    time.sleep(0.05)
    empfangen = ser.read(ser.inWaiting())
    print('Empfange:', empfangen.hex())
    
    time.sleep(1)
__deets__
User
Beiträge: 14539
Registriert: Mittwoch 14. Oktober 2015, 14:29

Schön das es klappt. Du kannst oben bei Sirius3 sehen, wie es geht: read kehrt ohne Daten zurück, wenn der timeout zuschlug. Und natürlich keine erwartete Anzahl von Bytes angeben.
Sirius3
User
Beiträge: 17749
Registriert: Sonntag 21. Oktober 2012, 17:20

@Alter forscht: wenn es prinzipell funktioniert, solltest Du auch gleich anfangen, alles in saubere Funktionen zu packen:

Code: Alles auswählen

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import serial

def send_message(serial, message):
    assert len(message) < 256
    checksum =  -(2 + len(message) + sum(message)) & 0xff
    data = bytes([2, len(message)] + message + [checksum])
    serial.write(data)

def read_message(serial):
    data = serial.read(2)
    if not data:
        # timeout
        return None
    if data[0] != 2:
        raise IOError("message-header wrong")
    message = serial.read(data[1])
    checksum = serial.read(1)
    if -sum(data + message) & 0xff != checksum:
        raise IOError("checksum wrong")
    return message

def call_command(serial, message):
    send_message(serial, message)
    return read_message(serial)

def main():
    ser = serial.Serial("/dev/ttyUSB0",
                    19200,
                    xonxoff=False,
                    #rtscts=True,
                    #dsrdtr=True,
                    parity=serial.PARITY_NONE,
                    stopbits=serial.STOPBITS_ONE,
                    bytesize=serial.EIGHTBITS,
                    timeout=2)
    ser.rts = 0
    print(call_command(ser, [0x01]))
    print(call_command(ser, [0x02]))

if __name__ = '__main__':
    main()
Alter forscht
User
Beiträge: 8
Registriert: Sonntag 8. Dezember 2019, 18:45

Hallo Sirius3,
danke auch für deine Unterstützung.

Ich habe den Code mal so übernommen, auch wenn ich gerade mangels Python Kenntnissen das Gefühl habe, ich tappe völlig im Dunkeln :?

Ich bekomme dann eine Fehlermeldung

Code: Alles auswählen

>>> %Run PS600_USB-A11.py
Traceback (most recent call last):
  File "/home/pi/Desktop/Meine Sripte/PS600/PS600_USB-A11.py", line 42
    if __name__ = '__main__':
                ^
SyntaxError: invalid syntax
Benutzeravatar
__blackjack__
User
Beiträge: 13102
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@Alter forscht: Das muss ein Vergleich sein, also ``==`` statt ``=``.
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
Alter forscht
User
Beiträge: 8
Registriert: Sonntag 8. Dezember 2019, 18:45

habe ich geändert, ist noch schlimmer geworden :roll:

Code: Alles auswählen

Python 3.7.3 (/usr/bin/python3)
>>> %Run PS600_USB-A11.py
Traceback (most recent call last):
  File "/home/pi/Desktop/Meine Sripte/PS600/PS600_USB-A11.py", line 43, in <module>
    main()
  File "/home/pi/Desktop/Meine Sripte/PS600/PS600_USB-A11.py", line 39, in main
    print(call_command(ser, [0x01]))
  File "/home/pi/Desktop/Meine Sripte/PS600/PS600_USB-A11.py", line 26, in call_command
    return read_message(serial)
  File "/home/pi/Desktop/Meine Sripte/PS600/PS600_USB-A11.py", line 21, in read_message
    raise IOError("checksum wrong")
OSError: checksum wrong
Sirius3
User
Beiträge: 17749
Registriert: Sonntag 21. Oktober 2012, 17:20

Ja, logisch, die Checksumme muß natürlich in eine Zahl umgewandelt werden.

Code: Alles auswählen

if -sum(data + message) & 0xff != ord(checksum):
Alter forscht
User
Beiträge: 8
Registriert: Sonntag 8. Dezember 2019, 18:45

Das sagst du so einfach, ich stehe gerade voll auf dem Schlauch :?:
Sirius3
User
Beiträge: 17749
Registriert: Sonntag 21. Oktober 2012, 17:20

Wenn Du Dir die fragliche Zeile anschaust, und schaust, was da verglichen wird, wirst Du gemerkt haben, dass versucht wird, eine Zahl mit einem ByteString zu vergleichen. Das geht nie gut. Daher muß man den ByteString in eine Zahl umwandeln.
Alter forscht
User
Beiträge: 8
Registriert: Sonntag 8. Dezember 2019, 18:45

Hallo hier bin ich wieder, vielen Dank noch mal für eure bisherige Hilfe.

Einige Fragezeichen in meinem Kopf sind weg, dafür gibt es wieder andere :D

Da mein Code immer länger wird denke ich es wäre vielleicht sinnvoll in auf mehrere Module auf zu teile z.B. Hauptcode, Abfrage allgemeine Daten, Abfrage ist Temperaturen, Abfrage Solltemperaturen, Abfrage Betriebszustände, Vergleich Soll-Ist-Werte mit Benachrichtigung z.B. auf Smartphone.

Beim Abfragen von Betriebszuständen habe ich im Moment noch keine Ahnung wie ich die Werte die ich vom Regler über die Abfrage der Bitvariablen erhalte brauchbar auswerten kann.
https://1drv.ms/b/s!Aj1wKT5IcL0qkH34MoW ... -?e=1OkBm8 ich hoffe der Link zur Regler-Dokumentation funktioniert so.
Unter Punkt 5.12 ist die Abfrage der Bitvariationen beschrieben. Lieder verstehe ich als Unwissender nicht, was mir das sagen soll :cry:

Bezüglich Vergleich Soll-Ist-Werte möchte ich eine Störmeldung auf Smartphone, wenn z.B. die Puffertemperatur nicht den Sollwert erreicht.

Für Hilfe, Tipps, Codevorschläge und Erklärungen (die meinen bescheidenen Programmierhorizont) erweitern bin ich dankbar.


Hier noch der aktuelle Stand meines Codes

Code: Alles auswählen

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import serial
import sollwerte
import time
import datetime

TEMPERATURFUEHLER = {
    1: "Kollektorfühlertemperatur",
    2: "Kollekorvorlauftemperatur",
    3: "Speicher Oben Temperatur",
    4: "Speicher Mitte Temperatur",
    5: "Speicher Unten Temperatur",
    7: "Kollekorrücklauf Temperatur",
    10: "Vorlauftemperatur Heizkreis 1",
    11: "Vorlauftemperatur Heizkreis 2",
    12: "Zirkulation Rücklauftemperatur",
    15: "Holzpelletkessel Temperatur",
    16: "Außentemperatur"
}

def send_message(serial, message):
    assert len(message) < 256
    checksum = -(2 + len(message) + sum(message)) & 0xff
    data = bytes([2, len(message)] + message + [checksum])
    serial.write(data)

def read_message(serial):
    data = serial.read(2)
    if data is None:
        # timeout
        return None
    if data[0] != 2:
        raise IOError("message-header wrong")
    message = serial.read(data[1])
    checksum = serial.read(1)
    if -sum(data + message) & 0xff != checksum[0]:
        raise IOError("checksum wrong")
    return message

def call_command(serial, message):
    send_message(serial, message)
    return read_message(serial)

def reglerkennung(serial):
    ans = call_command(serial, [ 0x01 ])
    if ans[0] != 0x01:
        raise IOError("Wrong answer id")

    return int.from_bytes(ans[1:3], byteorder="big")

def versionsnummer(serial):
    ans = call_command(serial, [ 0x02 ])
    if ans[0] != 0x02:
        raise IOError("Wrong answer id")
    if len(ans) != 4:
        raise IOError("Wrong answer length")

    major = int.from_bytes(ans[1:3], byteorder="big")
    minor = int(ans[3])
    return "V{}.{:02}.{}".format(int(major / 100), major % 100, minor)

def temperaturfuehler(serial, fuehler):
    assert fuehler < 17
    ans = call_command(serial, [ 0x10, 0x00, fuehler + 4 ])
    if ans[0] != 0x10:
        raise IOError("Wrong answer id")
    if len(ans) != 7:
        raise IOError("Wrong answer length")
    if int.from_bytes(ans[1:3], byteorder="big") != fuehler + 4:
        raise IOError("Wrong var id")

    value = int.from_bytes(ans[3:8], signed=True, byteorder="big")

    return "{}°C".format(value / 10)

#Vom Regler berechnete Vorlaufsolltemperatur Heizkreis 1 (Heizkörper)
def vorlauf_HK1_soll(serial):
    ans = call_command(serial, [ 0x10, 0x00, 75 ])
    if ans[0] != 0x10:
        raise IOError("Wrong answer id")
    if len(ans) != 7:
        raise IOError("Wrong answer length")
    if int.from_bytes(ans[1:3], byteorder="big") != 75:
        raise IOError("Wrong var id")

    value = int.from_bytes(ans[3:8], byteorder="big")

    return "{}°C".format(value / 10)

#Vom Regler berechnete Vorlaufsolltemperatur Heizkreis 2 (Fußbodenheizung)
def vorlauf_HK2_soll(serial):
    ans = call_command(serial, [ 0x10, 0x00, 92 ])
    if ans[0] != 0x10:
        raise IOError("Wrong answer id")
    if len(ans) != 7:
        raise IOError("Wrong answer length")
    if int.from_bytes(ans[1:3], byteorder="big") != 92:
        raise IOError("Wrong var id")

    value = int.from_bytes(ans[3:8], byteorder="big")

    return "{}°C".format(value / 10)

#Vom Regler berechnete Speichersolltemperatur
def speicher_soll(serial):
    ans = call_command(serial, [ 0x10, 0x00, 116 ])
    if ans[0] != 0x10:
        raise IOError("Wrong answer id")
    if len(ans) != 7:
        raise IOError("Wrong answer length")
    if int.from_bytes(ans[1:3], byteorder="big") != 116:
        raise IOError("Wrong var id")

    value = int.from_bytes(ans[3:8], byteorder="big")

    return "{}°C".format(value / 10)


#wird zum Vergleich Speicher Soll/Ist Temperatur benutzt
def speicher_oben_ist(serial):
    ans = call_command(serial, [ 0x10, 0x00, 7 ])
    if ans[0] != 0x10:
        raise IOError("Wrong answer id")
    if len(ans) != 7:
        raise IOError("Wrong answer length")
    if int.from_bytes(ans[1:3], byteorder="big") != 7:
        raise IOError("Wrong var id")

    value = int.from_bytes(ans[3:8], byteorder="big")

    return "{}°C".format(value / 10)

#gesammt Energieertrag der Solaranlage seit IBN
def energieertrag_solar(serial):
    ans = call_command(serial, [ 0x10, 0x00, 36 ])
    if ans[0] != 0x10:
        raise IOError("Wrong answer id")
    if len(ans) != 7:
        raise IOError("Wrong answer length")
    if int.from_bytes(ans[1:3], byteorder="big") != 36:
        raise IOError("Wrong var id")

    value = int.from_bytes(ans[3:8], byteorder="big")

    return "{} MWh".format(round(value / 1000000, 1))

#Tagesertrag der Solaranlage
def tagesertrag_solar(serial):
    ans = call_command(serial, [ 0x10, 0x00, 37 ])
    if ans[0] != 0x10:
        raise IOError("Wrong answer id")
    if len(ans) != 7:
        raise IOError("Wrong answer length")
    if int.from_bytes(ans[1:3], byteorder="big") != 37:
        raise IOError("Wrong var id")

    value = int.from_bytes(ans[3:8], byteorder="big")

    return "{} KWh".format(value / 10)

#Versuch Abfrage der Bitvariablen
def abfrage_bitv(serial):
    ans = call_command(serial, [ 0x20 ])
    if ans[0] != 0x20:
        raise IOError("Wrong answer id")

    return int.from_bytes(ans[1:3], byteorder="big")#hier habe gerade Keine wie ich die Info vom Regel sinnvoll auswerte



def main():
    ser = serial.Serial("/dev/ttyUSB0",
                    19200,
                    xonxoff=False,
                    parity=serial.PARITY_NONE,
                    stopbits=serial.STOPBITS_ONE,
                    bytesize=serial.EIGHTBITS,
                    timeout=2)
    ser.rts = 0
#    for i in range(3):
    while True:
        now = datetime.datetime.now()
        print(now.strftime('%d.%m.%Y'))
        print(now.strftime('%H:%M:%S'))
        print("prosolar PS600"), print("Kennung:", reglerkennung(ser))
        print("Versionsnummer:", versionsnummer(ser))
        print("")
        print("Istwerte")
        for fuehler, name in TEMPERATURFUEHLER.items():
            print("{}: {}".format(name, temperaturfuehler(ser, fuehler)))
        print("")
        print("Erträge")
        print("Energieertrag Solar:", energieertrag_solar(ser))
        print("Tagesertrag Solar:" , tagesertrag_solar(ser))
        print("")
        print("Abfrage Bitvariante:" , abfrage_bitv(ser))
        print("")
        print("Sollwerte")
        print("Vorlauf Soll HK1:", vorlauf_HK1_soll(ser)) ; print("Vorlauf Soll HK2:", vorlauf_HK2_soll(ser))
        print("Speicher Soll:", speicher_soll(ser))

#Überprüfung ob Speicheristtemperatur über Speichersolltemperatur liegt
#sollte das nicht zutreffen, soll hier (wenn ich wies wie) eine Benachrichtigung zu auf Smartphone erfolgen
        if  (speicher_oben_ist(ser)) < (speicher_soll(ser)):
            print("Temp. Warnung")
        else:
            print("Temp.ok")

#Versuch Daten aus anderem Modul zu verwenden
        sollwerte.sollwert_test()
        print("")
        print("") 
        time.sleep(10)#zu Testzwecken auf 10 Sekunden

if __name__ == '__main__':
        main()
Benutzeravatar
__blackjack__
User
Beiträge: 13102
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@Alter forscht: So 200 Zeilen finde ich ja noch nicht besonders lang. Zumindest an dieser Zeilenzahl würde ich nicht festmachen wollen das man das sogar auf mehrere Module aufteilen sollte. Zumal es so aussieht als wenn da einiges an Code wiederholt wurde wo sich die einzelnen Funktionen nur durch Werte unterscheiden, aber sonst die gleiche Struktur haben, man also wahrscheinlich noch ein paar Zeilen einsparen kann.

Man sollte nicht versuchen mehr als eine Anweisung in eine Zeile zu quetschen. Das Semikolon wird in normalen Python-Programmen nicht verwendet. Das kommt fast nur bei Code-Golf und bei Einzeilen die per ``-c``-Option an Python übergeben werden, verwendet.

Und ein Komma sollte man dafür erst recht nicht verwenden, denn damit erstellt man ein Tupel. Das betrifft die Zeile mit ``print("prosolar PS600")``. In der Zeile wird ein Tupel mit den Rückgabewerten der beiden `print()`-Aufrufe erstellt, also ``(None, None)`` mit dem dann natürlich nichts weiter gemacht wird, weil das ja auch keinen Sinn ergibt.

In der `main()`-Funktion wird das `Serial`-Objekt an den Namen `ser` gebunden weil `serial` bereits durch das Modul belegt ist. Und in allen anderen Funktionen verdeckt das `serial`-Argument das gleichnamige Modul. Das ist unschön. Entweder man sorgt dafür das auf Modulebene kein `serial` als Name existiert, oder man benennt das `Serial`-Objekt anders, beispielsweise `connection`.

Um eine Leerzeile mit `print()` auszugeben braucht man keine leere Zeichenkette übergeben.

Kommentare sollten entsprechend dem Code eingerückt werden. Sonst entspricht die Formatierung nicht mehr der Struktur des Codes.

Der Name `ans` ist nicht gut wenn man `answer` meint. Keine kryptischen Abkürzugen verwenden.

Funktionen werden üblicherweise nach Tätigkeiten benannt, um sie von eher passiven Werten unterscheiden zu können. Hinter einem Namen wie `versionsnummer` erwartet der Leser eine Versionsnummer, keine funktion die eine Versionsnummer ermittelt.

Erwartete Länge und ID der Antwort wird in (fast) jeder Funktion geprüft, den Code kann man also in `call_command()` verschieben. Aus dem bisherigen Code wird der Zusammenhang zwischen der Antwort-ID die ja eigentlich ein Fehlercode ist, und der gesendeten Nachricht nur indirekt erkennbar – bei allen Befehlen die Werte abfragen ist das erste Byte der gesendeten und der empfangenen Nachricht gleich, wenn alles richtig lief. Bei den Befehlen die Werte setzen ist das erste Byte der Wert 0.

Man kann aus den Funktionen eine `get_variable()`-Funktion herausziehen, womit die ganzen Funktionen die Variablen abfragen kürzer werden. Da bleibt am Ende dann immer noch ein Muster übrig das sich nur durch Werte unterscheidet. Da würde ich wahrscheinlich aufgrund der Tabelle in der Dokumentation versuchen so viel wie möglich als Daten zu kodieren, so das man am Ende vielleicht sogar mit einer einzigen Funktion auskommt.

Zwischenstand (natürlich ungetestet):

Code: Alles auswählen

#!/usr/bin/env python3
import datetime
import time

import serial
import sollwerte

TEMPERATURFUEHLER = {
    1: "Kollektorfühlertemperatur",
    2: "Kollekorvorlauftemperatur",
    3: "Speicher Oben Temperatur",
    4: "Speicher Mitte Temperatur",
    5: "Speicher Unten Temperatur",
    7: "Kollekorrücklauf Temperatur",
    10: "Vorlauftemperatur Heizkreis 1",
    11: "Vorlauftemperatur Heizkreis 2",
    12: "Zirkulation Rücklauftemperatur",
    15: "Holzpelletkessel Temperatur",
    16: "Außentemperatur",
}


def send_message(connection, message):
    checksum = -(2 + len(message) + sum(message)) & 0xFF
    data = bytes([2, len(message), *message, checksum])
    connection.write(data)


def read_message(connection):
    data = connection.read(2)
    if data is None:
        # timeout
        return None
    if data[0] != 2:
        raise IOError("message-header wrong")
    message = connection.read(data[1])
    checksum = connection.read(1)
    if -sum(data + message) & 0xFF != checksum[0]:
        raise IOError("checksum wrong")
    return message


def call_command(connection, message, does_echo_first_byte=True):
    send_message(connection, message)
    answer = read_message(connection)
    #
    # TODO Die Dokumentation enthält mehr Informationen zu den Fehlercodes die
    # man hier besser melden könnte.  Vielleicht auch als eigenen Ausnahmetyp.
    #
    if answer[0] != (message[0] if does_echo_first_byte else 0x00):
        raise IOError("Error code 0x{:02X}".format(answer[0]))
    return answer


def get_reglerkennung(connection):
    answer = call_command(connection, [0x01])
    return int.from_bytes(answer[1:3], byteorder="big")


def get_versionsnummer(connection):
    answer = call_command(connection, [0x02])
    if len(answer) != 4:
        raise IOError("Wrong answer length")

    major = int.from_bytes(answer[1:3], byteorder="big")
    minor = int(answer[3])
    return "V{}.{:02}.{}".format(major // 100, major % 100, minor)


def get_variable(connection, variable_id):
    answer = call_command(connection, [0x10, 0x00, variable_id])
    if len(answer) != 7:
        raise IOError("Wrong answer length")
    if int.from_bytes(answer[1:3], byteorder="big") != variable_id:
        raise IOError("Wrong var id")
    return answer[3:8]


def get_temperaturfuehler(connection, fuehler):
    assert fuehler < 17
    data = get_variable(connection, fuehler + 4)
    value = int.from_bytes(data, signed=True, byteorder="big") / 10
    return "{}°C".format(value)


# Vom Regler berechnete Vorlaufsolltemperatur Heizkreis 1 (Heizkörper)
def get_vorlauf_hk1_soll(connection):
    data = get_variable(connection, 75)
    value = int.from_bytes(data, byteorder="big") / 10
    return "{}°C".format(value)


# Vom Regler berechnete Vorlaufsolltemperatur Heizkreis 2 (Fußbodenheizung)
def get_vorlauf_hk2_soll(connection):
    data = get_variable(connection, 92)
    value = int.from_bytes(data, byteorder="big") / 10
    return "{}°C".format(value)


# Vom Regler berechnete Speichersolltemperatur
def get_speicher_soll(connection):
    data = get_variable(connection, 116)
    value = int.from_bytes(data, byteorder="big") / 10
    return "{}°C".format(value)


# wird zum Vergleich Speicher Soll/Ist Temperatur benutzt
def get_speicher_oben_ist(connection):
    data = get_variable(connection, 7)
    value = int.from_bytes(data, byteorder="big") / 10
    return "{}°C".format(value)


# gesammt Energieertrag der Solaranlage seit IBN
def get_energieertrag_solar(connection):
    data = get_variable(connection, 36)
    value = int.from_bytes(data, byteorder="big") / 1_000_000
    return "{:.1f} MWh".format(value)


# Tagesertrag der Solaranlage
def get_tagesertrag_solar(connection):
    data = get_variable(connection, 37)
    value = int.from_bytes(data, byteorder="big") / 10
    return "{} KWh".format(value)


# Versuch Abfrage der Bitvariablen
def get_arbeits_bits(connection):
    answer = call_command(connection, [0x20])
    # hier habe gerade Keine wie ich die Info vom Regler sinnvoll auswerte
    return int.from_bytes(answer[1:3], byteorder="big")


def main():
    connection = serial.Serial("/dev/ttyUSB0", 19200, timeout=2)
    connection.rts = False
    while True:
        now = datetime.datetime.now()
        print(now.strftime("%d.%m.%Y"))
        print(now.strftime("%H:%M:%S"))
        print("prosolar PS600")
        print("Kennung:", get_reglerkennung(connection))
        print("Versionsnummer:", get_versionsnummer(connection))
        print()
        print("Istwerte")
        for fuehler, name in TEMPERATURFUEHLER.items():
            print(
                "{}: {}".format(
                    name, get_temperaturfuehler(connection, fuehler)
                )
            )
        print()
        print("Erträge")
        print("Energieertrag Solar:", get_energieertrag_solar(connection))
        print("Tagesertrag Solar:", get_tagesertrag_solar(connection))
        print()
        print("Abfrage Bitvariante:", get_arbeits_bits(connection))
        print()
        print("Sollwerte")
        print("Vorlauf Soll HK1:", get_vorlauf_hk1_soll(connection))
        print("Vorlauf Soll HK2:", get_vorlauf_hk2_soll(connection))
        print("Speicher Soll:", get_speicher_soll(connection))

        # Überprüfung ob Speicheristtemperatur über Speichersolltemperatur liegt
        # sollte das nicht zutreffen, soll hier (wenn ich wies wie) eine
        # Benachrichtigung zu auf Smartphone erfolgen
        if get_speicher_oben_ist(connection) < get_speicher_soll(connection):
            print("Temp. Warnung")
        else:
            print("Temp. Ok")

        # Versuch Daten aus anderem Modul zu verwenden
        sollwerte.sollwert_test()
        print()
        print()
        time.sleep(10)  # zu Testzwecken auf 10 Sekunden


if __name__ == "__main__":
    main()
Um die Bitvariablen auszuwerten müsstest Du Dir mal die bitweisen Operatoren wie ``&``, ``|``, und ``~`` anschauen. Und/oder vielleicht auch `enum.IntFlag`.
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
Alter forscht
User
Beiträge: 8
Registriert: Sonntag 8. Dezember 2019, 18:45

@blackjack: Vielen Dank für deine tolle schnelle Unterstützung.
Ich habe den modifizierten Code getestet, er läuft fehlerfrei :D
Damit habe ich wieder ein solide Grundlage, auf der ich weiter arbeiten kann und wieder ein paar neue :?: im Kopf :wink:
Antworten