smartHome Zeitschaltuhr 2.0

Stellt hier eure Projekte vor.
Internetseiten, Skripte, und alles andere bzgl. Python.
__deets__
User
Beiträge: 14522
Registriert: Mittwoch 14. Oktober 2015, 14:29

So, jetzt ein Server:

Code: Alles auswählen

from __future__ import print_function

import sys
import nanomsg
import json
import threading
import time, random
from functools import partial

REGISTERED_SWITCHES = {}


def random_switching(socket):
    while True:
        time.sleep(1.0)
        if REGISTERED_SWITCHES:
            name = random.choice(REGISTERED_SWITCHES.keys())
            guid = REGISTERED_SWITCHES[name]
            state = random.random() > .5
            message = json.dumps(
                {
                    "switches" : [ { "name" : name, "state" : state }],
                    "guid": guid,
                }
                )
            print(message)
            socket.send(message)


def register(message):
    guid = message["guid"]
    name = message["name"]
    REGISTERED_SWITCHES[name] = guid


def main():
    socket = nanomsg.Socket(nanomsg.BUS)
    socket.bind("ipc:///tmp/test")

    t = threading.Thread(target=partial(random_switching, socket))
    t.daemon = True
    t.start()


    while True:
        message = json.loads(socket.recv())
        print(message)
        register(message)



if __name__ == '__main__':
    main()
Ich benutze nanomsg, weil ich das gut kenne - das sollte aber auch mit Websockets klappen, das nanomsg.BUS-Protokoll hat naemlich dieselbe Eigenschaft die du beschreibst: es sind immer broadcast-Nachrichten an alle angeschlossenen Clients. Darum mache ich die durch die GUID eindeutig - obwohl das strenggenommen gar nicht notwendig waere, denn die namen muessen das ja auch sein.

Und hier der neue Client, den man auch mehrfach starten kann mit unterschiedlichen Konfigurationen:

Code: Alles auswählen

from __future__ import print_function

import sys
import json
import argparse
import time
import uuid
import nanomsg


def error(message, code=1):
    sys.stderr.write(message)
    sys.stderr.write("\n")
    sys.exit(code)


class Switch(object):

    def __init__(self, config, on_off_function):
        self._name = config["name"]
        self._inverted = config["invert"]
        self._on = False
        self._on_off_function = on_off_function
        self.set(False)


    def set(self, state):
        self._on = state
        real_state = state ^ self._inverted
        self._on_off_function(real_state)



class SwitchClientBase(object):

    def __init__(self):
        self._switches = {}
        self._socket = None
        self._guid = str(uuid.uuid4())


    def add_specific_arguments(self, parser):
        """
        Overload this to add specific arguments for your switch implementation
        """
        pass


    def run(self):
        parser = argparse.ArgumentParser()
        parser.add_argument("--config", help="configuration file")
        parser.add_argument("--print-basic-config", action="store_true")
        parser.add_argument("--test-mode", choices=["cyclic", "on", "off"])
        self.add_specific_arguments(parser)

        opts = parser.parse_args()
        if opts.print_basic_config:
            print("A basic configuration. All keys with `_description` are just explanations and can be left out")
            print(json.dumps(self.basic_config(), indent=4))
            sys.exit(0)

        if not opts.config:
            error("You need to give a config-file. Get an example using the `--print-basic-config` argument")

        with open(opts.config) as inf:
            config = json.load(inf)


        for switch in config["switches"]:
            self._switches[switch["name"]] = self._setup_switch(switch)


        if opts.test_mode is not None:
            getattr(self, "test_%s" % opts.test_mode)(opts)
        else:
            self._socket = nanomsg.Socket(nanomsg.BUS)
            self._socket.connect(config["uri"])
            self._register_switches()
            while True:
                message = json.loads(self._socket.recv())
                if message["guid"] == self._guid:
                    self._dispatch(message)


    def _dispatch(self, message):
        if "switches" in message:
            for switch in message["switches"]:
                self._switches[switch["name"]].set(switch["state"])


    def _register_switches(self):
        for name in self._switches:
            self._socket.send(
                json.dumps(
                    { "guid" : self._guid, "name" : name}
                )
            )


    def test_cyclic(self, opts):
        state = True
        while True:
            time.sleep(1.0)
            for switch in self._switches.values():
                switch.set(state)
            state = not state


    def basic_config(self):
        return {
            "uri" : "server URI such as tcp://localhost:12345",
            "switches" : [
                {
                    "name": "unique switch name, e.g garage",
                    "invert" : False,
                    "invert_description" : "If True, the switch state is inverted, meaning a ON switch will deliver a LOW-level",
                },
                {
                    "name": "unique switch name, e.g terrarium",
                    "invert" : True,
                    "invert_description" : "If True, the switch state is inverted, meaning a ON switch will deliver a LOW-level",
                },
            ],
        }


    def _setup_switch(self, config):
        on_off_function = self.on_off_function(config)
        return Switch(config, on_off_function)



    def on_off_function(self, config):
        raise NotImplementedError



class GPIOClient(SwitchClientBase):


    def basic_config(self):
        config = super(GPIOClient, self).basic_config()
        # augment with our specific info
        for pin, switch in enumerate(config["switches"]):
            switch["gpio_pin"] = pin
            switch["gpio_pin_description"] = "Number of the GPIO-pin to toggle for that switch, in Broadcom-Numbering"
        return config


    def on_off_function(self, config):
        # in real, we would set up the GPIO configured here
        pin = config["gpio_pin"]
        name = config["name"]
        def on_off_function(state):
            print("GPIO", name, pin, state)

        return on_off_function



def main():
    client = GPIOClient()
    client.run()


if __name__ == '__main__':
    main()
der_Mausbiber
User
Beiträge: 72
Registriert: Donnerstag 2. Oktober 2014, 09:51

Vielen Dank für die ausführliche Antwort.
Ich muss den Stoff jetzt erstmal in Ruhe durchgehen ... :)
Antworten