@Milan: Ich habe das Problem ehrlich gesagt nicht verstanden. Warum sollte da etwas übersprungen werden und warum müssen die Zeiten chronologisch sortiert sein? Ich gehe von *einem* `scheduler` aus der in *einem* Thread läuft und wo auch nur neue Einträge aus *diesem* Thread gemacht werden. Die Aufgaben selbst können dann in anderen Threads laufen, aber die haben dann ja mit dem `scheduler` auch nichts mehr am Hut.
Edit: Und wenn man einen Thread pro Eintrag macht kann man sich den `scheduler` auch gleich sparen. Das skaliert aber nicht so wirklich gut.
cron(d) in Python
@BlackJack: Du hast Recht. Wer lesen kann ist klar im Vorteil, gut dass ich die Zeile aus der Doku noch mal hier geschrieben habe. Es ist doch ein riesiger Unterschied zwischen "inability to insert a new task before the one currently pending in a running scheduler" (Doku) und "inability to insert a new task in a running scheduler" (was ich noch im Kopf hatte). Mea culpa.
Werde ich demnächst mal testen. Danke.
Werde ich demnächst mal testen. Danke.
Danke nochmals, es läuft.
Code: Alles auswählen
class Cron(object):
def __init__(self):
self.events = []
self.scheduler = sched.scheduler(time.time, time.sleep)
def add_Event(self, event):
self.events.append(event)
def refeed_scheduler(self, event, generator):
runtime_seconds = time.mktime(generator.next().timetuple())
self.scheduler.enterabs(runtime_seconds, event.priority, self.refeed_scheduler, (event, generator))
event.run()
def run(self):
now = datetime.datetime.now()
for event in self.events:
generator = event.custom_runpoint_generator(now)
runtime_seconds = time.mktime(generator.next().timetuple())
self.scheduler.enterabs(runtime_seconds, event.priority, self.refeed_scheduler, (event, generator))
self.scheduler.run()
Hallo liebe Leute,
ich habe mein Programm mal wieder etwas überarbeitet. Wirklich viel geändert hat sich dabei nicht. Die Klasse Cron kann nun ausgeführt und auch wieder angehalten werden, sowie zur Laufzeit neue Events hinzugefügt oder entfernt werden. Es ist nun sowohl unter Python 2 als auch 3 ohne Anpassung lauffähig und anstelle eines iterativen Weges zum Bestimmen des Laufzeitpunkt eines Events wird nun Backtracking genutzt. Eigentlich wollte ich den Code damit lesbarer machen, jedoch finde ich das nicht wirklich gelungen. Zu Übungszwecken habe ich es trotzdem gemacht, vielleicht ist es ja doch nicht so schlimm?
@an die Moderation: kann man den 1. Beitrag hier editieren und den Code dort hineinfügen? Ich möchte den Code gerne teilen, ohne dass man x Beiträge durchforsten muss, um zur endgültigen Version zu gelangen...
Viele Grüße,
Milan
ich habe mein Programm mal wieder etwas überarbeitet. Wirklich viel geändert hat sich dabei nicht. Die Klasse Cron kann nun ausgeführt und auch wieder angehalten werden, sowie zur Laufzeit neue Events hinzugefügt oder entfernt werden. Es ist nun sowohl unter Python 2 als auch 3 ohne Anpassung lauffähig und anstelle eines iterativen Weges zum Bestimmen des Laufzeitpunkt eines Events wird nun Backtracking genutzt. Eigentlich wollte ich den Code damit lesbarer machen, jedoch finde ich das nicht wirklich gelungen. Zu Übungszwecken habe ich es trotzdem gemacht, vielleicht ist es ja doch nicht so schlimm?
@an die Moderation: kann man den 1. Beitrag hier editieren und den Code dort hineinfügen? Ich möchte den Code gerne teilen, ohne dass man x Beiträge durchforsten muss, um zur endgültigen Version zu gelangen...
Viele Grüße,
Milan
Code: Alles auswählen
#!/usr/bin/python
import argparse
import datetime
import logging
import os
import sched
import shlex
import stat
import subprocess
import sys
import time
import threading
try:
import pwd
import grp
except ImportError:
pass
DETACHED_PROCESS = 0x00000008
try:
basestring
except NameError:
basestring = str
def _function_wrapper(func):
def exception_safe_func(*args, **kw):
try:
func(*args,**kw)
except NotImplementedError as e:
raise
except Exception as e:
logging.exception(e)
return exception_safe_func
class Cron(object):
def __init__(self,*events):
self.events = set(*events)
self.event_ids = {}
self.scheduler = sched.scheduler(time.time, time.sleep)
self.running = False
def refeed_scheduler(self, event, generator=None, runpoint=None, run_event=True):
if generator is None:
if runpoint is None:
runpoint = datetime.datetime.now()
generator = event.custom_runpoint_generator(runpoint)
runtime_seconds = time.mktime(next(generator).timetuple())
event_id = self.scheduler.enterabs(runtime_seconds, event.priority, self.refeed_scheduler, (event, generator))
self.event_ids[event] = event_id
if run_event:
try:
event.run()
except:
pass
def add_Event(self, event):
self.events.add(event)
if self.running:
self.refeed_scheduler(event=event, run_event=True)
def remove_Event(self, event):
self.events.discard(event)
if self.running:
try:
self.scheduler.cancel(self.event_ids[event])
except ValueError:
pass
def run(self):
if self.running:
return
now = datetime.datetime.now()
for event in self.events:
self.refeed_scheduler(event=event, runpoint=now, run_event=False)
self.running = True
self.scheduler.run()
def stop(self):
for event_id in self.scheduler.queue:
self.scheduler.cancel(event_id)
self.event_ids = {}
self.running = False
class EventGeneratorRestart(Exception):
pass
class BaseEvent(object):
def __init__(self, desc, priority=5, use_thread=True):
"""
desc: min hour day month dow
day: 1 - num days
month: 1 - 12
dow: mon = 1, sun = 7
example * or 59 or 10,20,30
"""
self.args = ()
self.kw = {}
self.priority = priority
self.use_thread = use_thread
self.time_units = ("month", "day", "dow", "hour", "minute")
self.runpoints = dict( (("month", range(1, 13)), ("day", range(1, 32)), ("dow", range(1, 8)), ("hour", range(24)), ("minute", range(60))) )
desc_dict = dict(zip(("minute", "hour", "day", "month", "dow"), desc.split()))
for k in desc_dict:
if desc_dict[k] == "*":
continue
points = desc_dict[k].split(",")
points = sorted(map(int, points))
if min(points) < min(self.runpoints[k]) or max(points) > max(self.runpoints[k]):
raise ValueError("%s of Event must be a value between %i and %i" % (k, min(self.runpoints[k]), max(self.runpoints[k])))
self.runpoints[k] = points
tolerance = datetime.timedelta(seconds=59)
def __iter__(self):
return self.custom_runpoint_generator()
def _match_runpoint(self, now, truncated_now, candidate, time_units, check_actuality=False):
if len(time_units) == 0:
if not check_actuality:
if candidate >= now:
yield candidate
elif candidate > (datetime.datetime.now() - self.tolerance):
yield candidate
else:
raise EventGeneratorRestart("we are late, a runpoint has been skipped. Restart generator.")
raise StopIteration
time_unit = time_units[0]
time_units = time_units[1:]
runpoints = self.runpoints[time_unit]
if time_unit == "dow":
if candidate.isoweekday() not in runpoints:
raise StopIteration
for result in self._match_runpoint(now, truncated_now, candidate, time_units):
yield result
else:
for runpoint in runpoints:
try:
candidate = candidate.replace(**{time_unit:runpoint})
except ValueError:
#day may be out of range, for example 2015-02-31
continue
if candidate < truncated_now:
continue
for result in self._match_runpoint(now, truncated_now.replace(**{time_unit:getattr(now, time_unit)}), candidate, time_units):
yield result
def custom_runpoint_generator(self, custom_initialisation=None, check_actuality=False):
now = datetime.datetime.now()
if custom_initialisation is not None:
if check_actuality and custom_initialisation > now:
now = custom_initialisation
if not check_actuality:
now = custom_initialisation
while True:
try:
truncated_now = datetime.datetime(now.year, 1, 1, 0, 0)
for result in self._match_runpoint(now, truncated_now, truncated_now, self.time_units, check_actuality=check_actuality):
yield result
now = datetime.datetime(now.year+1, 1, 1, 0, 0)
except EventGeneratorRestart:
#in case a runpoint has been skipped
now = datetime.datetime.now()
@_function_wrapper
def func(*args, **kw):
raise NotImplementedError("BaseEvent cannot run actions")
def run(self):
if self.use_thread:
runthread = threading.Thread(target=self.func, args=self.args, kwargs=self.kw)
runthread.start()
else:
self.func(*self.args, **self.kw)
class PyEvent(BaseEvent):
def __init__(self, desc, func, args=(), kwargs={}, priority=5, use_thread=True):
BaseEvent.__init__(self, desc, priority=priority, use_thread=use_thread)
self.func = _function_wrapper(func)
self.args = args
self.kwargs = kwargs
class ProcessEvent(BaseEvent):
def __init__(self, desc, cmd, priority=5, use_thread=True):
BaseEvent.__init__(self, desc, priority=priority, use_thread=use_thread)
if isinstance(cmd, basestring):
self.cmd = tuple(shlex.split(cmd))
else:
try:
self.cmd = tuple(cmd)
except TypeError:
raise TypeError("cmd must be a string or an iterable command list like shlex.split returns")
@_function_wrapper
def func(self):
with open(os.devnull, "w+b") as DevNull:
returncode = subprocess.call(self.cmd, stdout=DevNull, stderr=DevNull, cwd=os.path.abspath(os.sep))
return returncode
if os.name == "posix":
class PosixProcessEvent(ProcessEvent):
def __init__(self, desc, cmd, username=None, groupname=None, priority=5, use_thread=True):
ProcessEvent.__init__(self, desc, cmd, priority=priority, use_thread=use_thread)
uid = os.getuid()
gid = os.getgid()
if username is not None:
try:
p = pwd.getpwnam(username)
self.uid = p.pw_uid
except KeyError:
raise KeyError("no user %s found" % username)
if (uid != 0) and (self.uid != uid):
raise OSError("must be superuser to change user/uid to %s/%i" % (username, self.uid))
else:
self.uid = uid
self.username = pwd.getpwuid(self.uid).pw_name
if groupname is not None:
try:
self.gid = grp.getgrnam(groupname).gr_gid
except KeyError:
raise KeyError("no group %s found" % groupname)
if (uid != 0) and (self.gid != gid):
raise OSError("must be superuser to change groupname/gid to %s/%i" % (groupname, self.gid))
else:
self.gid = pwd.getpwnam(self.username).pw_gid
#if not run by root: run simple ProcessEvent mode because preexec_fn would raise an exception
#if run by root: allow change of user context
if os.getuid() == 0:
@_function_wrapper
def func(self):
def ChangeChildUIDandGID():
os.umask(stat.S_IWGRP | stat.S_IWOTH)
os.setgid(self.gid)
os.initgroups(self.username, self.gid)
os.setuid(self.uid)
with open(os.devnull, "w+b") as DevNull:
returncode=subprocess.call(self.cmd, preexec_fn=ChangeChildUIDandGID, stdout=DevNull, stderr=DevNull, cwd=os.path.abspath(os.sep))
return returncode
if __name__ == "__main__":
#if running as main program, not as module
def detach():
if os.name == "posix":
try:
pid = os.fork()
if pid > 0:
#exit parent process
sys.exit(0)
except OSError as e:
sys.stderr.write("fork failed: %d (%s)\n" % (e.errno, e.strerror))
sys.stderr.flush()
sys.exit(1)
# Configure the child processes environment
os.chdir(os.path.abspath(os.sep))
os.setsid()
os.umask(stat.S_IWGRP | stat.S_IWOTH)
sys.stdin.close()
with open(os.devnull, "w+b") as DevNull:
sys.stdout = sys.stderr = DevNull
main()
elif os.name == "nt":
pid = subprocess.Popen([sys.executable, sys.argv[0]], creationflags=DETACHED_PROCESS, close_fds=True).pid
sys.exit(0)
else:
sys.exit("no daemon mode available for this platform")
def main():
def minute_task():
print ("minute_task @ %s" % datetime.datetime.now().strftime("%a, %d.%m.%Y %H:%M:%S.%f"))
def day_task():
print ("day_task @ %s" % datetime.datetime.now().strftime("%a, %d.%m.%Y %H:%M:%S.%f"))
cron = Cron()
#Radiomitschnitt von Hoerbuechern
cron.add_Event(PosixProcessEvent("55 21 * * 7","/usr/local/bin/radiorec record 890RTL 260",username="samba-rsync",groupname="users"))
cron.add_Event(PosixProcessEvent("55 1 * * 2,3,4,5","/usr/local/bin/radiorec record 890RTL 90",username="samba-rsync",groupname="users"))
cron.add_Event(PosixProcessEvent("55 23 * * *","/usr/local/bin/radiorec record RICKFUTURE 375",username="samba-rsync",groupname="users"))
cron.add_Event(PosixProcessEvent("55 5 * * 4","/usr/local/bin/radiorec record RICKFUTURE 190",username="samba-rsync",groupname="users"))
cron.run()
parser = argparse.ArgumentParser(description='Cron program written in python. Support for basic cron features. If run by root, it can call commands in any specific user context.')
group = parser.add_mutually_exclusive_group()
group.add_argument("-b","--background",dest="daemon",action='store_true',default=False,help="run in background, behave like a daemon")
group.add_argument("-f","--foreground",dest="daemon",action='store_false',default=False,help="run in foreground, do not run in daemon mode (DEFAULT)")
p=parser.parse_args()
if p.daemon:
detach()
else:
main()
Schon mal über ein Repository nachgedacht?Milan hat geschrieben:kann man den 1. Beitrag hier editieren und den Code dort hineinfügen? Ich möchte den Code gerne teilen, ohne dass man x Beiträge durchforsten muss, um zur endgültigen Version zu gelangen...
mutetella
Entspanne dich und wisse, dass es Zeit für alles gibt. (YogiTea Teebeutel Weisheit )