Code: Alles auswählen
UNIT2KEYWORD = {
'Minuten': 'minutes',
'Stunden': 'hours',
'Tage': 'days',
'Wochen': 'weeks'
}
WEEK2WEEK = [
('mon', 'weekly_monday'),
('tue', 'weekly_tuesday'),
('wed', 'weekly_wednesday'),
('thu', 'weekly_thursday'),
('fri', 'weekly_friday'),
('sat', 'weekly_saturday'),
('sun', 'weekly_sunday'),
]
def _add(self, dataset):
title = dataset['title']
date_start_on = dataset['date_start_on']
date_start_off = dataset['date_start_off']
date_stop_on = dataset['date_stop_on']
date_stop_off = dataset['date_stop_off']
method = self._methods[dataset['device']]
self._logging_daemon.info('Timerswitch ... Add New Timer: Name = %s Device = %s Start = %s Stop = %s' % (
title, method, date_start_on, date_start_off))
week = ','.join(
abr for abr, full in self.WEEK2WEEK
if dataset[full]
)
duration = dataset['duration']
if duration == 'einmalig':
type = 'date'
args_on = dict(run_date=date_start_on)
args_off = dict(run_date=date_start_off)
elif duration == 'intervall':
type = 'intervall'
interval_argument = {self.UNIT2KEYWORD[dataset['interval_unit']]: dataset['interval_number']}
args_on = dict(interval_argument,
start_date=date_start_on, end_date=date_stop_on
)
args_off = dict(interval_argument,
start_date=date_start_off, end_date=date_stop_off
)
elif duration == 'wochentag':
type = 'cron'
args_on = dict(
day_of_week=week, hour=date_start_on.hour,
minute=date_start_on.minute,
start_date=date_start_on,
end_date=date_stop_on
)
args_off = dict(
day_of_week=week, hour=date_start_off.hour,
minute=date_start_off.minute,
start_date=date_start_off,
end_date=date_stop_off
)
scheduler_id = dataset['scheduler_id']
self._scheduler.add_job(method, type,
args=[title, scheduler_id, True, None],
id='%son' % scheduler_id, **args_on)
self._scheduler.add_job(method, type,
args=[title, scheduler_id, False, date_stop_off],
id='%soff' % scheduler_id, **args_off)