Leider wird keine Verbindung aufgebaut, ich glaube ich muss auch json als Bib importieren oder?
Muss ich dann Clientseitig ebenfalls mit json arbeiten, also ich weiß jetzt natürlich nicht, ob das mit JS kompatibel ist? Vielleicht könntest du noch mal drüber schauen, Danke!
Sirius3 hat geschrieben: ↑Sonntag 26. Juni 2022, 22:43
Typischerweise schickt man über Websockets JSON-kodierte Wörterbücher. Da kann man auf strukturierte Weise alle möglichen Daten kodieren, z.B: {"relais1": "off", "temp1": "on"}
Entsprechend einfach ist es, den Code um weitere Schalter zu erweitern.Code: Alles auswählen
class Echo(WebSocket): def __init__(self, client, server, sock, address): super().__init__(server, sock, address) self.modbus = client self.functions = { "relais1": partial(self.simple_switch, COILBASE + 16), "relais2": partial(self.simple_switch, COILBASE + 1), "relais3": partial(self.simple_switch, COILBASE + 2), "temperatur1": partial(self.read_temperature, 0), "temperatur2": partial(self.read_temperature, 3), "heizen1": partial(self.heating, 525), "heizen2": partial(self.heating, 526), "heatread": partial(self.simple_read, 524), "systemp": self.systemp } def simple_switch(self, address, value): """ value can be one of "on", "off" or "get" """ if value in ["on", "off"]: rq = self.modbus.write_coil(address, value == "on", unit=UNIT) time.sleep(0.1) elif value != 'get': raise ValueError(value) # Relaisregister lesen rp = self.modbus.read_coils(address, unit=UNIT) return "on" if rp.bits[0] else "off" def simple_read(self, address, value): if value != 'get': raise ValueError(value) # Relaisregister lesen rp = self.modbus.read_coils(address, unit=UNIT) return "on" if rp.bits[0] else "off" def read_temperature(self, address, value): if value != "get": raise ValueError(value) response = self.modbus.read_holding_registers(0x00, 4, unit=1) return response.registers[address] def heating(self, address, value): # Heizen an | Modul an dem der Heizdraht geschaltet wird if value in ["on", "off"]: rq = self.modbus.write_coil(524, value == "on", unit=UNIT) time.sleep(0.01) rq = self.modbus.write_coil(address, value == "on", unit=UNIT) time.sleep(0.1) elif value != 'get': raise ValueError(value) rp = self.modbus.read_coils(address, unit=UNIT) return "on" if rp.bits[0] else "off" def systemp(self, value): result = subprocess.run(["vcgencmd", "measure_temp"], stdout=subprocess.PIPE) temperature = result.stdout.partition("temp=")[-1].partition("'C\n")[0] return temperature def handleMessage(self): new_state = {} commands = json.loads(self.data) for key, value in commands.items(): function = self.functions[key] new_state[key] = function(value) self.sendMessage(json.dumps(new_stat))