cron(d) in Python

Code-Stücke können hier veröffentlicht werden.
Antworten
BlackJack

@Milan: Ich habe das Problem ehrlich gesagt nicht verstanden. Warum sollte da etwas übersprungen werden und warum müssen die Zeiten chronologisch sortiert sein? Ich gehe von *einem* `scheduler` aus der in *einem* Thread läuft und wo auch nur neue Einträge aus *diesem* Thread gemacht werden. Die Aufgaben selbst können dann in anderen Threads laufen, aber die haben dann ja mit dem `scheduler` auch nichts mehr am Hut.

Edit: Und wenn man einen Thread pro Eintrag macht kann man sich den `scheduler` auch gleich sparen. Das skaliert aber nicht so wirklich gut.
Milan
User
Beiträge: 1078
Registriert: Mittwoch 16. Oktober 2002, 20:52

@BlackJack: Du hast Recht. Wer lesen kann ist klar im Vorteil, gut dass ich die Zeile aus der Doku noch mal hier geschrieben habe. Es ist doch ein riesiger Unterschied zwischen "inability to insert a new task before the one currently pending in a running scheduler" (Doku) und "inability to insert a new task in a running scheduler" (was ich noch im Kopf hatte). Mea culpa.

Werde ich demnächst mal testen. Danke.
Milan
User
Beiträge: 1078
Registriert: Mittwoch 16. Oktober 2002, 20:52

Danke nochmals, es läuft.

Code: Alles auswählen

class Cron(object):
	def __init__(self):
		self.events = []
		self.scheduler = sched.scheduler(time.time, time.sleep)

	def add_Event(self, event):
		self.events.append(event)

	def refeed_scheduler(self, event, generator):
		runtime_seconds = time.mktime(generator.next().timetuple())
		self.scheduler.enterabs(runtime_seconds, event.priority, self.refeed_scheduler, (event, generator))
		event.run()

	def run(self):
		now = datetime.datetime.now()
		for event in self.events:
			generator = event.custom_runpoint_generator(now)
			runtime_seconds = time.mktime(generator.next().timetuple())
			self.scheduler.enterabs(runtime_seconds, event.priority, self.refeed_scheduler, (event, generator))
		self.scheduler.run()
Milan
User
Beiträge: 1078
Registriert: Mittwoch 16. Oktober 2002, 20:52

Hallo liebe Leute,

ich habe mein Programm mal wieder etwas überarbeitet. Wirklich viel geändert hat sich dabei nicht. Die Klasse Cron kann nun ausgeführt und auch wieder angehalten werden, sowie zur Laufzeit neue Events hinzugefügt oder entfernt werden. Es ist nun sowohl unter Python 2 als auch 3 ohne Anpassung lauffähig und anstelle eines iterativen Weges zum Bestimmen des Laufzeitpunkt eines Events wird nun Backtracking genutzt. Eigentlich wollte ich den Code damit lesbarer machen, jedoch finde ich das nicht wirklich gelungen. Zu Übungszwecken habe ich es trotzdem gemacht, vielleicht ist es ja doch nicht so schlimm?

@an die Moderation: kann man den 1. Beitrag hier editieren und den Code dort hineinfügen? Ich möchte den Code gerne teilen, ohne dass man x Beiträge durchforsten muss, um zur endgültigen Version zu gelangen...

Viele Grüße,
Milan

Code: Alles auswählen

#!/usr/bin/python

import argparse
import datetime
import logging
import os
import sched
import shlex
import stat
import subprocess
import sys
import time
import threading
try:
    import pwd
    import grp
except ImportError:
    pass
DETACHED_PROCESS = 0x00000008
try:
    basestring
except NameError:
    basestring = str

def _function_wrapper(func):
    def exception_safe_func(*args, **kw):
        try:
            func(*args,**kw)
        except NotImplementedError as e:
            raise
        except Exception as e:
            logging.exception(e)
    return exception_safe_func


class Cron(object):
    def __init__(self,*events):
        self.events = set(*events)
        self.event_ids = {}
        self.scheduler = sched.scheduler(time.time, time.sleep)
        self.running = False

    def refeed_scheduler(self, event, generator=None, runpoint=None, run_event=True):
        if generator is None:
            if runpoint is None:
                runpoint = datetime.datetime.now()
            generator = event.custom_runpoint_generator(runpoint)
        runtime_seconds = time.mktime(next(generator).timetuple())
        event_id = self.scheduler.enterabs(runtime_seconds, event.priority, self.refeed_scheduler, (event, generator))
        self.event_ids[event] = event_id
        if run_event:
            try:
                event.run()
            except:
                pass

    def add_Event(self, event):
        self.events.add(event)
        if self.running:
            self.refeed_scheduler(event=event, run_event=True)

    def remove_Event(self, event):
        self.events.discard(event)
        if self.running:
            try:
                self.scheduler.cancel(self.event_ids[event])
            except ValueError:
                pass

    def run(self):
        if self.running:
            return
        now = datetime.datetime.now()
        for event in self.events:
            self.refeed_scheduler(event=event, runpoint=now, run_event=False)
        self.running = True
        self.scheduler.run()

    def stop(self):
        for event_id in self.scheduler.queue:
            self.scheduler.cancel(event_id)
        self.event_ids = {}
        self.running = False


class EventGeneratorRestart(Exception):
    pass


class BaseEvent(object):
    def __init__(self, desc, priority=5, use_thread=True):
        """
        desc: min hour day month dow
            day: 1 - num days
            month: 1 - 12
            dow: mon = 1, sun = 7
            example * or 59 or 10,20,30
        """
        self.args = ()
        self.kw = {}
        self.priority = priority
        self.use_thread = use_thread
        self.time_units = ("month", "day", "dow", "hour", "minute")
        self.runpoints = dict( (("month", range(1, 13)), ("day", range(1, 32)), ("dow", range(1, 8)), ("hour", range(24)), ("minute", range(60))) )
        desc_dict = dict(zip(("minute", "hour", "day", "month", "dow"), desc.split()))
        for k in desc_dict:
            if desc_dict[k] == "*":
                continue
            points = desc_dict[k].split(",")
            points = sorted(map(int, points))
            if min(points) < min(self.runpoints[k]) or max(points) > max(self.runpoints[k]):
                raise ValueError("%s of Event must be a value between %i and %i" % (k, min(self.runpoints[k]), max(self.runpoints[k])))
            self.runpoints[k] = points

    tolerance = datetime.timedelta(seconds=59)

    def __iter__(self):
        return self.custom_runpoint_generator()

    def _match_runpoint(self, now, truncated_now, candidate, time_units, check_actuality=False):
        if  len(time_units) == 0:
            if not check_actuality:
                if candidate >= now:
                    yield candidate
            elif candidate > (datetime.datetime.now() - self.tolerance):
                yield candidate
            else:
                raise EventGeneratorRestart("we are late, a runpoint has been skipped. Restart generator.")
            raise StopIteration
        time_unit = time_units[0]
        time_units = time_units[1:]
        runpoints = self.runpoints[time_unit]
        if time_unit == "dow":
            if candidate.isoweekday() not in runpoints:
                raise StopIteration
            for result in self._match_runpoint(now, truncated_now, candidate, time_units):
                yield result
        else:
            for runpoint in runpoints:
                try:
                    candidate = candidate.replace(**{time_unit:runpoint})
                except ValueError:
                    #day may be out of range, for example 2015-02-31
                    continue
                if candidate < truncated_now:
                    continue
                for result in self._match_runpoint(now, truncated_now.replace(**{time_unit:getattr(now, time_unit)}), candidate, time_units):
                    yield result

    def custom_runpoint_generator(self, custom_initialisation=None, check_actuality=False):
        now = datetime.datetime.now()
        if custom_initialisation is not None:
            if check_actuality and custom_initialisation > now:
                now = custom_initialisation
            if not check_actuality:
                now = custom_initialisation
        while True:
            try:
                truncated_now = datetime.datetime(now.year, 1, 1, 0, 0)
                for result in self._match_runpoint(now, truncated_now, truncated_now, self.time_units, check_actuality=check_actuality):
                    yield result
                now = datetime.datetime(now.year+1, 1, 1, 0, 0)
            except EventGeneratorRestart:
                #in case a runpoint has been skipped
                now = datetime.datetime.now()

    @_function_wrapper
    def func(*args, **kw):
        raise NotImplementedError("BaseEvent cannot run actions")

    def run(self):
        if self.use_thread:
            runthread = threading.Thread(target=self.func, args=self.args, kwargs=self.kw)
            runthread.start()
        else:
            self.func(*self.args, **self.kw)


class PyEvent(BaseEvent):
    def __init__(self, desc, func, args=(), kwargs={}, priority=5, use_thread=True):
        BaseEvent.__init__(self, desc, priority=priority, use_thread=use_thread)
        self.func = _function_wrapper(func)
        self.args = args
        self.kwargs = kwargs


class ProcessEvent(BaseEvent):
    def __init__(self, desc, cmd, priority=5, use_thread=True):
        BaseEvent.__init__(self, desc, priority=priority, use_thread=use_thread)
        if isinstance(cmd, basestring):
            self.cmd = tuple(shlex.split(cmd))
        else:
            try:
                self.cmd = tuple(cmd)
            except TypeError:
                raise TypeError("cmd must be a string or an iterable command list like shlex.split returns")

    @_function_wrapper
    def func(self):
        with open(os.devnull, "w+b") as DevNull:
            returncode = subprocess.call(self.cmd, stdout=DevNull, stderr=DevNull, cwd=os.path.abspath(os.sep))
        return returncode


if os.name == "posix":
    class PosixProcessEvent(ProcessEvent):
        def __init__(self, desc, cmd, username=None, groupname=None, priority=5, use_thread=True):
            ProcessEvent.__init__(self, desc, cmd, priority=priority, use_thread=use_thread)
            uid = os.getuid()
            gid = os.getgid()
            if username is not None:
                try:
                    p = pwd.getpwnam(username)
                    self.uid = p.pw_uid
                except KeyError:
                    raise KeyError("no user %s found" % username)
                if (uid != 0) and (self.uid != uid):
                    raise OSError("must be superuser to change user/uid to %s/%i" % (username, self.uid))
            else:
                self.uid = uid
            self.username = pwd.getpwuid(self.uid).pw_name
            if groupname is not None:
                try:
                    self.gid = grp.getgrnam(groupname).gr_gid
                except KeyError:
                    raise KeyError("no group %s found" % groupname)
                if (uid != 0) and (self.gid != gid):
                    raise OSError("must be superuser to change groupname/gid to %s/%i" % (groupname, self.gid))
            else:
                self.gid = pwd.getpwnam(self.username).pw_gid

        #if not run by root: run simple ProcessEvent mode because preexec_fn would raise an exception
        #if run by root: allow change of user context
        if os.getuid() == 0:
            @_function_wrapper
            def func(self):
                def ChangeChildUIDandGID():
                    os.umask(stat.S_IWGRP | stat.S_IWOTH)
                    os.setgid(self.gid)
                    os.initgroups(self.username, self.gid)
                    os.setuid(self.uid)
                with open(os.devnull, "w+b") as DevNull:
                    returncode=subprocess.call(self.cmd, preexec_fn=ChangeChildUIDandGID, stdout=DevNull, stderr=DevNull, cwd=os.path.abspath(os.sep))
                return returncode


if __name__ == "__main__":
#if running as main program, not as module
    def detach():
        if os.name == "posix":
            try:
                pid = os.fork()
                if pid > 0:
                    #exit parent process
                    sys.exit(0)
            except OSError as e:
                sys.stderr.write("fork failed: %d (%s)\n" % (e.errno, e.strerror))
                sys.stderr.flush()
                sys.exit(1)
            # Configure the child processes environment
            os.chdir(os.path.abspath(os.sep))
            os.setsid()
            os.umask(stat.S_IWGRP | stat.S_IWOTH)
            sys.stdin.close()
            with open(os.devnull, "w+b") as DevNull:
                sys.stdout = sys.stderr = DevNull
                main()
        elif os.name == "nt":
            pid = subprocess.Popen([sys.executable, sys.argv[0]], creationflags=DETACHED_PROCESS, close_fds=True).pid
            sys.exit(0)
        else:
            sys.exit("no daemon mode available for this platform")

    def main():
        def minute_task():
            print ("minute_task @ %s" % datetime.datetime.now().strftime("%a, %d.%m.%Y %H:%M:%S.%f"))
        def day_task():
            print ("day_task @ %s" % datetime.datetime.now().strftime("%a, %d.%m.%Y %H:%M:%S.%f"))

        cron = Cron()

#Radiomitschnitt von Hoerbuechern
        cron.add_Event(PosixProcessEvent("55 21 * * 7","/usr/local/bin/radiorec record 890RTL 260",username="samba-rsync",groupname="users"))
        cron.add_Event(PosixProcessEvent("55 1 * * 2,3,4,5","/usr/local/bin/radiorec record 890RTL 90",username="samba-rsync",groupname="users"))

        cron.add_Event(PosixProcessEvent("55 23 * * *","/usr/local/bin/radiorec record RICKFUTURE 375",username="samba-rsync",groupname="users"))
        cron.add_Event(PosixProcessEvent("55 5 * * 4","/usr/local/bin/radiorec record RICKFUTURE 190",username="samba-rsync",groupname="users"))

        cron.run()

    parser = argparse.ArgumentParser(description='Cron program written in python. Support for basic cron features. If run by root, it can call commands in any specific user context.')
    group = parser.add_mutually_exclusive_group()
    group.add_argument("-b","--background",dest="daemon",action='store_true',default=False,help="run in background, behave like a daemon")
    group.add_argument("-f","--foreground",dest="daemon",action='store_false',default=False,help="run in foreground, do not run in daemon mode (DEFAULT)")
    p=parser.parse_args()
    if p.daemon:
        detach()
    else:
        main()
mutetella
User
Beiträge: 1695
Registriert: Donnerstag 5. März 2009, 17:10
Kontaktdaten:

Milan hat geschrieben:kann man den 1. Beitrag hier editieren und den Code dort hineinfügen? Ich möchte den Code gerne teilen, ohne dass man x Beiträge durchforsten muss, um zur endgültigen Version zu gelangen...
Schon mal über ein Repository nachgedacht?

mutetella
Entspanne dich und wisse, dass es Zeit für alles gibt. (YogiTea Teebeutel Weisheit ;-) )
Antworten