ich habe mir ein Skript geschrieben mit dem ich mithilfe von den Edimax SP-210W Zwischensteckern bei meinem Trockner und Waschmaschine den aktuellen Verbrauch abfragen kann um zu erkennen, wann die Waschmaschine/Trockner fertig ist. Das Skript schickt mir dann per PushBullet eine Nachricht aufs Handy.
Folgendes Problem besteht:
Wenn ich das Skript per cronjob mit @reboot ausführe wird es zwar gestartet, stürzt aber aus für mich unerdenklichen Gründen irgendwann ab.
Wie kann ich das Skript robuster gestalten, ohne dass es abstürzt?
Code: Alles auswählen
# -*- encoding: utf-8 -*-
from ediplug.smartplug import SmartPlug
import subprocess
from pushbullet import Pushbullet
import time
import _thread
SMARTPLUGWM = "192.168.0.XX"
SMARTPLUGTR = "192.168.0.XX"
W = SmartPlug(SMARTPLUGWM, ('admin', '1234')) # Waschmaschine
T = SmartPlug(SMARTPLUGTR, ('admin', '1234')) # Trockner
PB = Pushbullet("o.adslkjsafnklfsfDASDASFSDF")
THRESHOLDWM = 6.20 # Schwellwert der Waschmaschine in Watt
THRESHOLDTRON = 150.00 # Schwellwert des Trockners in Watt
THRESHOLDTROFF = 5.00 # Schwellwert des Trockners in Watt
WAITWM = 60 # Zeit in Sekunden bis zur nächsten Abfrage der aktuellen Leistung der Waschmaschine
WAITTR = 60 # Zeit in Sekunden bis zur nächsten Abfrage der aktuellen Leistung des Trockners
print("SmartPlugs werden überprüft")
def TrocknerPing():
while True:
if subprocess.check_call(['ping', '-c', '1', SMARTPLUGTR], stderr=subprocess.STDOUT) == 0:
print("SmartPlug TR erreichbar")
break
else:
print("SmartPlug TR nicht erreichbar")
time.sleep(10)
def WaschmaschinePing():
while True:
if subprocess.check_call(['ping', '-c', '1', SMARTPLUGWM], stderr=subprocess.STDOUT) == 0:
print("SmartPlug WM erreichbar")
break
else:
print("SmartPlug WM nicht erreichbar")
time.sleep(10)
def Trockner():
tron = 0
troff = 0
print("Aktueller Verbrauch Trockner:", T.power, "Watt")
while True:
TrocknerPing()
if float(T.power) <= THRESHOLDTRON and tron == 0:
print("Trockner aus")
tron = 0
time.sleep(30)
else:
tron = 1
print("Trockner läuft")
PB.push_note('Trockner läuft', 'Der Trockner läuft')
time.sleep(10)
while True:
if troff >= 3:
print("Trockner ist fertig!")
PB.push_note('Trockner fertig!', 'Der Trockner ist fertig!')
troff = 0
time.sleep(180)
break
else:
TrocknerPing()
if float(T.power) <= THRESHOLDTROFF:
print("Trockner pausiert!")
troff += 1
tron = 0
time.sleep(WAITTR)
else:
print("Trockner läuft immernoch")
troff = 0
time.sleep(180)
def Waschmaschine():
wmon = 0
wmoff = 0
print("Aktueller Verbrauch Waschmaschine:", W.power, "Watt")
while True:
WaschmaschinePing()
if float(W.power) <= THRESHOLDWM and wmon == 0:
print("Waschmaschine aus")
wmon = 0
time.sleep(30)
else:
wmon = 1
print("Waschmaschine läuft")
PB.push_note('Waschmaschine läuft', 'Die Waschmaschine läuft')
time.sleep(10)
while True:
if wmoff >= 3:
print("Waschprogramm ist fertig!")
PB.push_note('Waschmaschine fertig!', 'Die Waschmaschine ist fertig!')
wmoff = 0
break
else:
WaschmaschinePing()
if float(W.power) <= THRESHOLDWM:
print("Waschprogramm pausiert!")
wmoff += 1
wmon = 0
time.sleep(WAITWM)
else:
print("Waschmaschine läuft immernoch")
wmoff = 0
time.sleep(180)
# Hauptprogramm
_thread.start_new_thread(Trockner,())
Waschmaschine()
Folgendes Sript wird als Modul importiert um die SmartPlugs abzufragen:
Code: Alles auswählen
import requests as req
import optparse as par
import logging as log
from xml.dom.minidom import getDOMImplementation
from xml.dom.minidom import parseString
__author__ = 'Stefan Wendler, sw@kaltpost.de'
class SmartPlug(object):
"""
Simple class to access a "EDIMAX Smart Plug Switch SP1101W/SP2101W"
Usage example when used as library:
p = SmartPlug("172.16.100.75", ('admin', '1234'))
# get device info
print(p.info)
# change state of plug
p.state = "OFF"
p.state = "ON"
# query and print current state of plug
print(p.state)
# get power consumption (only SP2101W)
print(p.power)
# get current consumption (only SP2101W)
print(p.current)
# read and print complete week schedule from plug
print(p.schedule.__str__())
# write schedule for on day to plug (Saturday, 11:15 - 11:45)
p.schedule = {'state': u'ON', 'sched': [[[11, 15], [11, 45]]], 'day': 6}
# write schedule for the whole week
p.schedule = [
{'state': u'ON', 'sched': [[[0, 3], [0, 4]]], 'day': 0},
{'state': u'ON', 'sched': [[[0, 10], [0, 20]], [[10, 16], [11, 55]],
[[15, 19], [15, 32]], [[21, 0], [23, 8]], [[23, 17], [23, 59]]], 'day': 1},
{'state': u'OFF', 'sched': [[[19, 59], [21, 1]]], 'day': 2},
{'state': u'OFF', 'sched': [[[20, 59], [21, 12]]], 'day': 3},
{'state': u'OFF', 'sched': [], 'day': 4},
{'state': u'OFF', 'sched': [[[0, 0], [0, 30]], [[11, 14], [14, 31]]], 'day': 5},
{'state': u'ON', 'sched': [[[1, 42], [2, 41]]], 'day': 6}]
Usage example when used as command line utility:
Get device info:
python smartplug.py -H 172.16.100.75 -l admin -p 1234 -i
turn plug on:
python smartplug.py -H 172.16.100.75 -l admin -p 1234 -s ON
turn plug off:
python smartplug.py -H 172.16.100.75 -l admin -p 1234 -s OFF
get plug state:
python smartplug.py -H 172.16.100.75 -l admin -p 1234 -g
get power consumption (only SP2101W)
python smartplug.py -H 172.16.100.75 -l admin -p 1234 -w
get current consumption (only SP2101W)
python smartplug.py -H 172.16.100.75 -l admin -p 1234 -a
get schedule of the whole week:
python smartplug.py -H 172.16.100.75 -l admin -p 1234 -G
get schedule of the whole week as python array:
python smartplug.py -H 172.16.100.75 -l admin -p 1234 -P
set schedule for one day:
python smartplug.py -H 172.16.100.75 -l admin -p 1234 -S
"{'state': u'ON', 'sched': [[[11, 0], [11, 45]]], 'day': 6}"
set schedule for the whole week:
python smartplug.py -H 172.16.100.75 -l admin -p 1234 -S "[
{'state': u'ON', 'sched': [[[1, 0], [1, 1]]], 'day': 0},
{'state': u'ON', 'sched': [[[2, 0], [2, 2]]], 'day': 1},
{'state': u'ON', 'sched': [[[3, 0], [3, 3]]], 'day': 2},
{'state': u'ON', 'sched': [[[4, 0], [4, 4]]], 'day': 3},
{'state': u'ON', 'sched': [[[5, 0], [5, 5]]], 'day': 4},
{'state': u'ON', 'sched': [[[6, 0], [6, 6]]], 'day': 5},
{'state': u'ON', 'sched': [[[7, 0], [7, 7]]], 'day': 6},
]"
"""
def __init__(self, host, auth):
"""
Create a new SmartPlug instance identified by the given URL.
:rtype: object
:param host: The IP/hostname of the SmartPlug. E.g. '172.16.100.75'
:param auth: User and password to authenticate with the plug. E.g. ('admin', '1234')
"""
object.__init__(self)
self.url = "http://%s:10000/smartplug.cgi" % host
self.auth = auth
self.domi = getDOMImplementation()
self.log = log.getLogger("SmartPlug")
def _xml_cmd_setget_state(self, cmdId, cmdStr):
"""
Create XML representation of a state command.
:type self: object
:type cmdId: str
:type cmdStr: str
:rtype: str
:param cmdId: Use 'get' to request plug state, use 'setup' change plug state.
:param cmdStr: Empty string for 'get', 'ON' or 'OFF' for 'setup'
:return: XML representation of command
"""
assert (cmdId == "setup" and cmdStr in ["ON", "OFF"]) or (cmdId == "get" and cmdStr == "")
doc = self.domi.createDocument(None, "SMARTPLUG", None)
doc.documentElement.setAttribute("id", "edimax")
cmd = doc.createElement("CMD")
cmd.setAttribute("id", cmdId)
state = doc.createElement("Device.System.Power.State")
cmd.appendChild(state)
state.appendChild(doc.createTextNode(cmdStr))
doc.documentElement.appendChild(cmd)
xml = doc.toxml()
self.log.debug("Request: %s" % xml)
return xml
def _xml_cmd_get_pc(self, what):
"""
Get power or current consumption (only SP2101W).
:type self: object
:type what: str
:rtype: str
:param what: What to retrieve: "NowPower" or "NowCurrent
:return: XML representation of command
"""
assert what in ["NowPower", "NowCurrent"]
doc = self.domi.createDocument(None, "SMARTPLUG", None)
doc.documentElement.setAttribute("id", "edimax")
cmd = doc.createElement("CMD")
cmd.setAttribute("id", "get")
pwr = doc.createElement("NOW_POWER")
cmd.appendChild(pwr)
state = doc.createElement("Device.System.Power.%s" % what)
pwr.appendChild(state)
doc.documentElement.appendChild(cmd)
xml = doc.toxml()
self.log.debug("Request: %s" % xml)
return xml
def _xml_cmd_get_info(self):
"""
Create XML representation of a command to query some information
:type self: object
:rtype: str
:return: XML representation of command
"""
doc = self.domi.createDocument(None, "SMARTPLUG", None)
doc.documentElement.setAttribute("id", "edimax")
cmd = doc.createElement("CMD")
cmd.setAttribute("id", "get")
si = doc.createElement("SYSTEM_INFO")
cmd.appendChild(si)
doc.documentElement.appendChild(cmd)
xml = doc.toxml()
self.log.debug("Request: %s" % xml)
return xml
def _xml_cmd_get_sched(self):
"""
Create XML representation of a command to query schedule of whole week from plug.
:type self: object
:rtype: str
:return: XML representation of command
"""
doc = self.domi.createDocument(None, "SMARTPLUG", None)
doc.documentElement.setAttribute("id", "edimax")
cmd = doc.createElement("CMD")
cmd.setAttribute("id", "get")
sched = doc.createElement("SCHEDULE")
cmd.appendChild(sched)
doc.createElement("Device.System.Power.State")
doc.documentElement.appendChild(cmd)
xml = doc.toxml()
self.log.debug("Request: %s" % xml)
return xml
def _xml_cmd_set_sched(self, sched_days):
"""
Create XML representation of a command to set scheduling for one day or whole week.
:type self: object
:type sched_days: list
:rtype: str
:param sched_day: Single day or whole week
:return: XML representation of command
"""
doc = self.domi.createDocument(None, "SMARTPLUG", None)
doc.documentElement.setAttribute("id", "edimax")
cmd = doc.createElement("CMD")
cmd.setAttribute("id", "setup")
sched = doc.createElement("SCHEDULE")
cmd.appendChild(sched)
if isinstance(sched_days, list):
# more then one day
for one_sched_day in sched_days:
dev_sched = doc.createElement("Device.System.Power.Schedule.%d" % one_sched_day["day"])
dev_sched.appendChild(doc.createTextNode(self._render_schedule(one_sched_day["sched"])))
dev_sched.attributes["value"] = one_sched_day["state"]
sched.appendChild(dev_sched)
else:
# one day
dev_sched = doc.createElement("Device.System.Power.Schedule.%d" % sched_days["day"])
dev_sched.appendChild(doc.createTextNode(self._render_schedule(sched_days["sched"])))
dev_sched.attributes["value"] = sched_days["state"]
sched.appendChild(dev_sched)
doc.documentElement.appendChild(cmd)
xml = doc.toxml()
self.log.debug("Request: %s" % xml)
return xml
def _post_xml(self, xml):
"""
Post XML command as multipart file to SmartPlug, parse XML response.
:type self: object
:type xml: str
:rtype: str
:param xml: XML representation of command (as generated by _xml_cmd)
:return: 'OK' on success, 'FAILED' otherwise
"""
files = {'file': xml}
res = req.post(self.url, auth=self.auth, files=files)
self.log.debug("Status code: %d" % res.status_code)
self.log.debug("Response: %s" % res.text)
if res.status_code == req.codes.ok:
dom = parseString(res.text)
try:
val = dom.getElementsByTagName("CMD")[0].firstChild.nodeValue
if val is None:
val = dom.getElementsByTagName("CMD")[0].getElementsByTagName("Device.System.Power.State")[0].\
firstChild.nodeValue
return val
except Exception as e:
print(e.__str__())
return None
def _post_xml_dom(self, xml):
"""
Post XML command as multipart file to SmartPlug, return response as raw dom.
:type self: object
:type xml: str
:rtype: object
:param xml: XML representation of command (as generated by _xml_cmd)
:return: dom representation of XML response
"""
files = {'file': xml}
res = req.post(self.url, auth=self.auth, files=files)
self.log.debug("Status code: %d" % res.status_code)
self.log.debug("Response: %s" % res.text)
if res.status_code == req.codes.ok:
return parseString(res.text)
return None
@property
def info(self):
"""
Get device info (vendor, model, version, mac and system name (if available)).
:type self: object
:rtype: dictonary
:return: dictonary with the following keys: vendor, model, version, mac, name
"""
dom = self._post_xml_dom(self._xml_cmd_get_info())
vendor = dom.getElementsByTagName("Run.Cus")[0].firstChild.nodeValue
model = dom.getElementsByTagName("Run.Model")[0].firstChild.nodeValue
version = dom.getElementsByTagName("Run.FW.Version")[0].firstChild.nodeValue
mac = dom.getElementsByTagName("Run.LAN.Client.MAC.Address")[0].firstChild.nodeValue
inf = {"vendor":vendor, "model":model, "version":version, "mac":mac}
# not all plugs/fw versions seem to return the system name ...
try:
inf["name"] = dom.getElementsByTagName("Device.System.Name")[0].firstChild.nodeValue
except IndexError:
pass
return inf
@property
def state(self):
"""
Get the current state of the SmartPlug.
:type self: object
:rtype: str
:return: 'ON' or 'OFF'
"""
res = self._post_xml(self._xml_cmd_setget_state("get", ""))
if res != "ON" and res != "OFF":
raise Exception("Failed to communicate with SmartPlug")
return res
@state.setter
def state(self, value):
"""
Set the state of the SmartPlug
:type self: object
:type value: str
:param value: 'ON', 'on', 'OFF' or 'off'
"""
if value == "ON" or value == "on":
res = self._post_xml(self._xml_cmd_setget_state("setup", "ON"))
else:
res = self._post_xml(self._xml_cmd_setget_state("setup", "OFF"))
if res != "OK":
raise Exception("Failed to communicate with SmartPlug")
@property
def power(self):
"""
Get the power consumption of the SmartPlug (only SP2101W).
:type self: object
:rtype: tuple (str, float)
:return: power consumption in W
"""
dom = self._post_xml_dom(self._xml_cmd_get_pc("NowPower"))
try:
power = dom.getElementsByTagName("Device.System.Power.NowPower")[0].firstChild.nodeValue
except:
raise Exception("Failed to communicate with SmartPlug")
return power
@property
def current(self):
"""
Get the current consumption of the SmartPlug (only SP2101W).
:type self: object
:rtype: tuple (str, float)
:return: current consumption in A
"""
dom = self._post_xml_dom(self._xml_cmd_get_pc("NowCurrent"))
try:
current = dom.getElementsByTagName("Device.System.Power.NowCurrent")[0].firstChild.nodeValue
except:
raise Exception("Failed to communicate with SmartPlug")
return current
def _parse_schedule(self, sched):
"""
Parse the plugs internal scheduling format string to python array
:type self: object
:type sched: str
:rtype: list
:param sched: scheduling string (of one day) as returned by plug
:return: Python array with scheduling: [[[start_hh:start_mm],[end_hh:end_mm]], ... ]
"""
sched_unpacked = [0] * 60 * 24
hours = []
idx_sched = 0
# first, unpack the packed schedule from the plug
for packed in sched:
int_packed = int(packed, 16)
sched_unpacked[idx_sched+0] = (int_packed >> 3) & 1
sched_unpacked[idx_sched+1] = (int_packed >> 2) & 1
sched_unpacked[idx_sched+2] = (int_packed >> 1) & 1
sched_unpacked[idx_sched+3] = (int_packed >> 0) & 1
idx_sched += 4
idx_hours = 0
hour = 0
min = 0
found_range = False
# second build time array from unpacked schedule
for m in sched_unpacked:
if m == 1 and not found_range:
found_range = True
hours.append([[hour, min], [23, 59]])
elif m == 0 and found_range:
found_range = False
hours[idx_hours][1][0] = hour
hours[idx_hours][1][1] = min
idx_hours += 1
min += 1
if min > 59:
min = 0
hour += 1
return hours
def _render_schedule(self, hours):
"""
Render Python scheduling array back to plugs internal format
:type self: object
:type hours: list
:rtype: str
:param hours: Python array with scheduling hours: [[[start_hh:start_mm],[end_hh:end_mm]], ... ]
:return: scheduling string (of one day) as needed by plug
"""
sched = [0] * 60 * 24
sched_str = ''
# first, set every minute we found a schedule to 1 in the sched array
for times in hours:
idx_start = times[0][0] * 60 + times[0][1]
idx_end = times[1][0] * 60 + times[1][1]
if idx_end < idx_start:
idx_end = 60 * 24
for i in range(idx_start, idx_end):
sched[i] = 1
# second, pack the minute array from above into the plug format and make a string out of it
for i in range(0, 60 * 24, 4):
packed = (sched[i] << 3) + (sched[i+1] << 2) + (sched[i+2] << 1) + (sched[i+3] << 0)
sched_str += "%X" % packed
return sched_str
@property
def schedule(self):
"""
Get scheduling for all days of week from plug as python list.
Note: it looks like the plug only is able to return a whole week.
:type self: object
:rtype: list
:return: List with scheduling for each day of week:
[
{'state': u'ON|OFF', 'sched': [[[hh, mm], [hh, mm]], ...], 'day': 0..6},
...
]
"""
sched = []
dom = self._post_xml_dom(self._xml_cmd_get_sched())
if dom is None:
return sched
try:
dom_sched = dom.getElementsByTagName("CMD")[0].getElementsByTagName("SCHEDULE")[0]
for i in range(0, 7):
sched.append(
{"day": i,
"state": dom_sched.getElementsByTagName("Device.System.Power.Schedule.%d" % i)[0].attributes[
"value"].
firstChild.nodeValue,
"sched": self._parse_schedule(
dom_sched.getElementsByTagName("Device.System.Power.Schedule.%d" % i)[0].
firstChild.nodeValue)})
except Exception as e:
print(e.__str__())
return sched
@schedule.setter
def schedule(self, sched):
"""
Set scheduling for ony day of week or for whole week on the plug.
Note: it seams not to be possible to schedule anything else then one day or a whole week.
:type self: object
:type sched: list
:rtype: str
:param sched: Array with scheduling hours for ons day:
{'day': 0..6, 'state': 'ON' | 'OFF', [[start_hh:start_mm],[end_hh:end_mm]], ... ]}
Or whole week:
[{'day': 0..6, 'state': 'ON' | 'OFF', [[start_hh:start_mm],[end_hh:end_mm]], ... ]}, ...]
:return: 'OK' (or exception on error)
"""
res = self._post_xml(self._xml_cmd_set_sched(sched))
if res != "OK":
raise Exception("Failed to communicate with SmartPlug")
return res
if __name__ == "__main__":
usage = "%prog [options]"
parser = par.OptionParser(usage)
parser.add_option("-v", "--verbose", action="store_true", help="Print debug information")
parser.add_option("-H", "--host", default="172.16.100.75", help="Base URL of the SmartPlug")
parser.add_option("-l", "--login", default="admin", help="Login user to authenticate with SmartPlug")
parser.add_option("-p", "--password", default="1234", help="Password to authenticate with SmartPlug")
parser.add_option("-i", "--info", action="store_true", help="Get plug information")
parser.add_option("-g", "--get", action="store_true", help="Get state of plug")
parser.add_option("-s", "--set", help="Set state of plug: ON or OFF")
parser.add_option("-w", "--power", action="store_true", help="Get plug power consumption (only SP2101W)")
parser.add_option("-a", "--current", action="store_true", help="Get plug current consumption (only SP2101W)")
parser.add_option("-G", "--getsched", action="store_true", help="Get schedule from Plug")
parser.add_option("-P", "--getschedpy", action="store_true", help="Get schedule from Plug as Python list")
parser.add_option("-S", "--setsched", help="Set schedule of Plug")
(options, args) = parser.parse_args()
# this turns on debugging
level = log.ERROR
if options.verbose:
level = log.DEBUG
log.basicConfig(level=level, format='%(asctime)s - %(levelname) 8s [%(module) 15s] - %(message)s')
p = SmartPlug(options.host, (options.login, options.password))
if options.info:
print("Plug info:")
for i in sorted(p.info.items()):
print("- %s: %s" % i)
if options.get:
print(p.state)
elif options.set:
p.state = options.set
if options.power:
print("%s W" % p.power)
if options.current:
print("%s A" % p.current)
elif options.getsched:
days = {0: "Sunday", 1: "Monday", 2: "Tuesday", 3: "Wednesday",
4: "Thursday", 5: "Friday", 6: "Saturday"}
for day in p.schedule:
if len(day["sched"]) > 0:
print("Schedules for: %s (%s)" % (days[day["day"]], day["state"]))
for sched in day["sched"]:
print(" * %02d:%02d - %02d:%02d" % (sched[0][0], sched[0][1], sched[1][0], sched[1][1]))
elif options.getschedpy:
print(p.schedule.__str__())
elif options.setsched:
try:
sched = eval(options.setsched)
p.schedule = sched
except Exception as e:
print("Wrong input format: %s" % e.__str__())
exit(-1)