Code: Alles auswählen
from __future__ import print_function
import sys
import nanomsg
import json
import threading
import time, random
from functools import partial
REGISTERED_SWITCHES = {}
def random_switching(socket):
while True:
time.sleep(1.0)
if REGISTERED_SWITCHES:
name = random.choice(REGISTERED_SWITCHES.keys())
guid = REGISTERED_SWITCHES[name]
state = random.random() > .5
message = json.dumps(
{
"switches" : [ { "name" : name, "state" : state }],
"guid": guid,
}
)
print(message)
socket.send(message)
def register(message):
guid = message["guid"]
name = message["name"]
REGISTERED_SWITCHES[name] = guid
def main():
socket = nanomsg.Socket(nanomsg.BUS)
socket.bind("ipc:///tmp/test")
t = threading.Thread(target=partial(random_switching, socket))
t.daemon = True
t.start()
while True:
message = json.loads(socket.recv())
print(message)
register(message)
if __name__ == '__main__':
main()
Und hier der neue Client, den man auch mehrfach starten kann mit unterschiedlichen Konfigurationen:
Code: Alles auswählen
from __future__ import print_function
import sys
import json
import argparse
import time
import uuid
import nanomsg
def error(message, code=1):
sys.stderr.write(message)
sys.stderr.write("\n")
sys.exit(code)
class Switch(object):
def __init__(self, config, on_off_function):
self._name = config["name"]
self._inverted = config["invert"]
self._on = False
self._on_off_function = on_off_function
self.set(False)
def set(self, state):
self._on = state
real_state = state ^ self._inverted
self._on_off_function(real_state)
class SwitchClientBase(object):
def __init__(self):
self._switches = {}
self._socket = None
self._guid = str(uuid.uuid4())
def add_specific_arguments(self, parser):
"""
Overload this to add specific arguments for your switch implementation
"""
pass
def run(self):
parser = argparse.ArgumentParser()
parser.add_argument("--config", help="configuration file")
parser.add_argument("--print-basic-config", action="store_true")
parser.add_argument("--test-mode", choices=["cyclic", "on", "off"])
self.add_specific_arguments(parser)
opts = parser.parse_args()
if opts.print_basic_config:
print("A basic configuration. All keys with `_description` are just explanations and can be left out")
print(json.dumps(self.basic_config(), indent=4))
sys.exit(0)
if not opts.config:
error("You need to give a config-file. Get an example using the `--print-basic-config` argument")
with open(opts.config) as inf:
config = json.load(inf)
for switch in config["switches"]:
self._switches[switch["name"]] = self._setup_switch(switch)
if opts.test_mode is not None:
getattr(self, "test_%s" % opts.test_mode)(opts)
else:
self._socket = nanomsg.Socket(nanomsg.BUS)
self._socket.connect(config["uri"])
self._register_switches()
while True:
message = json.loads(self._socket.recv())
if message["guid"] == self._guid:
self._dispatch(message)
def _dispatch(self, message):
if "switches" in message:
for switch in message["switches"]:
self._switches[switch["name"]].set(switch["state"])
def _register_switches(self):
for name in self._switches:
self._socket.send(
json.dumps(
{ "guid" : self._guid, "name" : name}
)
)
def test_cyclic(self, opts):
state = True
while True:
time.sleep(1.0)
for switch in self._switches.values():
switch.set(state)
state = not state
def basic_config(self):
return {
"uri" : "server URI such as tcp://localhost:12345",
"switches" : [
{
"name": "unique switch name, e.g garage",
"invert" : False,
"invert_description" : "If True, the switch state is inverted, meaning a ON switch will deliver a LOW-level",
},
{
"name": "unique switch name, e.g terrarium",
"invert" : True,
"invert_description" : "If True, the switch state is inverted, meaning a ON switch will deliver a LOW-level",
},
],
}
def _setup_switch(self, config):
on_off_function = self.on_off_function(config)
return Switch(config, on_off_function)
def on_off_function(self, config):
raise NotImplementedError
class GPIOClient(SwitchClientBase):
def basic_config(self):
config = super(GPIOClient, self).basic_config()
# augment with our specific info
for pin, switch in enumerate(config["switches"]):
switch["gpio_pin"] = pin
switch["gpio_pin_description"] = "Number of the GPIO-pin to toggle for that switch, in Broadcom-Numbering"
return config
def on_off_function(self, config):
# in real, we would set up the GPIO configured here
pin = config["gpio_pin"]
name = config["name"]
def on_off_function(state):
print("GPIO", name, pin, state)
return on_off_function
def main():
client = GPIOClient()
client.run()
if __name__ == '__main__':
main()