cron(d) in Python

Code-Stücke können hier veröffentlicht werden.
Antworten
Milan
User
Beiträge: 1078
Registriert: Mittwoch 16. Oktober 2002, 20:52

Hallo Leute!

Nach langer Zeit will ich mich auch mal wieder hier blicken lassen und ein kleines Script von mir zur Begutachtung vorstellen. Ich habe einen kleinen cron geschrieben, der platformunabhängig läuft und sowohl Python callables als auch andere Programme ausführen kann. Unter *nix kann er das auch in einem definierbaren Userkontext, sodass con als root laufen kann, gestartete Programme aber nur mit eingeschränkten Rechten laufen werden.
Ich möchte den auf einem System einsetzen, auf dem man nur sehr beschränkt Dinge nachinstallieren kann und das KEINEN crond hat. Würdet ihr mir das empfehlen oder habe ich etwas wichtiges übersehen?

Code: Alles auswählen

#!/usr/bin/python2
#coding: utf-8
#file   : pycron.py
#original code from https://github.com/idning/pcl/blob/master/pcl/crontab.py

#cron, we do the task in the main thread, so do not add task run more than 1 minutes
#you can start thread in your task

import argparse
from datetime import datetime
import logging
import os
import shlex
from StringIO import StringIO
import subprocess
import sys
import time
import thread

try:
	import pwd
	import grp
except ImportError:
	pass

DETACHED_PROCESS = 0x00000008

class Cron(object):
	def __init__(self):
		self.events = []

	def add_Event(self, e):
		if isinstance(e,Event):
			self.events.append(e)

	def run(self):
		last_run = 0
		while True:
			#wait to a new minute start
			t = time.time()
			next_minute = t - t%60 + 60
			while t < next_minute:
				sleeptime = 60 - t%60
				logging.debug('current time: %s, we will sleep %.2f seconds' %(t, sleeptime))
				time.sleep(sleeptime)
				t = time.time()

			if last_run and next_minute - last_run != 60:
				logging.warn('Cron Ignored: last_run: %s, this_time:%s' % (last_run, next_minute) )
			last_run = next_minute

			current = datetime(*datetime.now().timetuple()[:5])
			for e in self.events:
				e.check(current)
			time.sleep(0.001)

class Event(object):
	def __init__(self,desc, use_thread=False):
		"""
		desc: min hour day month dow
			day: 1 - num days
			month: 1 - 12
			dow: mon = 1, sun = 7
		"""
		self.desc = desc 
		self.use_thread = use_thread
	#support: 
	# * 
	# 59
	# 10,20,30

	def _match(self, value, expr):
		#print 'match', value, expr
		if expr == '*':
			return True
		values = expr.split(',')
		for v in values:
			if int(v) == value:
				return True
		return False

	def matchtime(self, t):
		mins, hour, day, month, dow = self.desc.split()
		return self._match(t.minute	   , mins)  and\
			   self._match(t.hour		 , hour)  and\
			   self._match(t.day		  , day)   and\
			   self._match(t.month		, month) and\
			   self._match(t.isoweekday() , dow)

	def check(self,t):
		if self.matchtime(t):
			if self.use_thread:
				thread.start_new_thread(self.run,tuple(),{})
			else:
				try:
					self.run()
				except Exception, e:
					logging.exception(e)

	def run(self):
		pass

class PyEvent(Event):
	def __init__(self,desc, func, args=(), kwargs={}, use_thread=False):
		Event.__init__(self,desc,use_thread=use_thread)
		self.func = func
		self.args = args
		self.kwargs = kwargs

	def run(self):
		self.func(*self.args, **self.kwargs)

class ProcessEvent(Event):
	def __init__(self,desc,cmd,use_thread=False):
		Event.__init__(self,desc,use_thread=use_thread)
		if isinstance(cmd,basestring):
			self.cmd=tuple(shlex.split(cmd))
		elif hasattr(cmd,"__iter__"):
			self.cmd=tuple(cmd)
		else:
			raise TypeError, "cmd must be a string or an iterable command list like shlex.split returns"

	def run(self):
		if os.name=="posix":
			Null=file("/dev/null","w+b")
		elif os.name=="nt":
			Null=file("NUL","w+b")
		else:
			Null=None
		subprocess.Popen(self.cmd,stdout=Null,stderr=Null,cwd=os.path.abspath(os.sep))
		if hasattr(Null,"close"):
			Null.close()

if os.name == "posix":
	class PosixProcessEvent(ProcessEvent):
		def __init__(self,desc,cmd,username=None,groupname=None,use_thread=False):
			ProcessEvent.__init__(self,desc,cmd,use_thread=False)
			uid=os.getuid()
			gid=os.getgid()
			self.username=username
			self.groupname=groupname
			if username is not None:
				try:
					p=pwd.getpwnam(username)
					self.uid=p.pw_uid
					self.gid=p.pw_gid
				except KeyError:
					raise KeyError, "no user %s found" % username
				if uid != 0 and self.uid != uid:
					raise OSError, "must be superuser to change user/uid to %s/%i" % (self.username,self.uid)
			else:
				self.uid=uid
			if groupname is not None:
				try:
					self.gid=grp.getgrnam(groupname).gr_gid
				except KeyError:
					raise KeyError, "no group %s found" % groupname
				if uid != 0 and self.gid != gid:
					raise OSError, "must be superuser to change groupname/gid to %s/%i" % (self.groupname,self.gid)
			else:
				self.gid=gid

		def run(self):
			def ChangeChildUIDandGID():
				os.setgid(self.gid)
				os.setuid(self.uid)
			Null=file("/dev/null","w+b")
			subprocess.Popen(self.cmd,preexec_fn=ChangeChildUIDandGID,stdout=Null,stderr=Null,cwd=os.path.abspath(os.sep))
			Null.close()

class NullFile(StringIO):
	def write(self,text):
		self.truncate(0)

if __name__ == "__main__":
#if running as main program, not as module

	def detach():
		if os.name == "posix":
			try:
				pid=os.fork()
				if pid > 0:
					#exit parent process
					sys.exit(0)
			except OSError,e:
				print >> sys.stderr, "fork failed: %d (%s)" % (e.errno, e.strerror)
				sys.exit(1)
			# Configure the child processes environment
			os.chdir("/")
			os.setsid()
			os.umask(0)
			sys.stdin.close()
			f=NullFile()
			sys.stdout=sys.stderr=f
			main()
		elif os.name == "nt":
			pid = subprocess.Popen([sys.executable, sys.argv[0]],creationflags=DETACHED_PROCESS,close_fds=True).pid
			sys.exit(0)
		else:
			sys.exit("no daemon mode available for this platform")

	def main2():
		def long_task():
			time.sleep(61)
			print 'long task @ %s' % time.time()
		#we will got warnning
		cron = Cron()
		cron.add_Event(PyEvent('* * * * *'   , long_task)) # every minute
		cron.run()

	def main():
		def minute_task():
			print 'minute_task @ %s' % time.time()
		def day_task():
			print 'day_task @ %s' % time.time()

		cron = Cron()
		cron.add_Event(PyEvent('* * * * *'   , minute_task)) # every minute
		cron.add_Event(PyEvent('* * * * *'   , minute_task, use_thread=True)) # every minute
		cron.add_Event(PyEvent('33 * * * *'  , day_task)) # every hour
		cron.add_Event(PyEvent('34 18 * * *' , day_task)) # every day
		cron.run()

	SCRIPTNAME=sys.argv[0]
	parser = argparse.ArgumentParser(description='Cron program written in python. Support for all cron features. If run by root, it can call commands in any specific user context.')
	group = parser.add_mutually_exclusive_group()
	group.add_argument("-b","--background",dest="daemon",action='store_true',help="run in background, behave like a daemon")
	group.add_argument("-f","--foreground",dest="daemon",action='store_false',help="run in foreground, do not run in daemon mode (DEFAULT)")
	p=parser.parse_args()
	if p.daemon:
		detach()
	else:
		main()
BlackJack

@Milan: Als erstes fallen mir die Abweichungen zu PEP8 auf, also Einrücktiefe, Namensschreibweisen und einige Zeilen wo echt viel ohne irgendwelche hilfreichen Lerrzeichen aneinandergequetscht wurde. Auf der anderen Seite dann aber auch so etwas wie `Cron.matchtime()` wo plötzlich nahezu willkürlich viel Leerraum mitten in Argumentlisten steht.

Das `isinstance()` in `add_Event()` ist „unpythonisch” weil es „duck typing” verhindert.

Schau Dir mal das `sched`-Modul aus der Standardbibliothek an.

Der Ausdruck ``datetime(*datetime.now().timetuple()[:5])`` könnte weniger magisch und damit verständlicher geschrieben werden: ``datetime.now().replace(second=0, microsecond=0)``. Da sieht man beim lesen einfacher was da gemacht wird. Allerdings werden soweit ich das sehe weder Sekunden noch Microsekunden irgendwo abgefragt oder benutzt, also bräuchte man die auch gar nicht auf Null setzen.

Das die Zeitbeschreibung jedes mal zum Testen aufs neue geparst werden muss ist IMHO unnötig. Die Zeichenkette ist etwas was ausserhalb eines Programms in einer Konfigurationsdatei stehen kann, aber nichts was als ”Datenstruktur” innerhalb eines Programms genutzt werden sollte.

`Event._match()` ist keine Methode, sollte also entweder mit `staticmethod` dekoriert oder als Funktion herausgezogen werden. Die Funktion ist auch reichlich ”geschwätzig” geschrieben, das ist eigentlich ein simpler Einzeiler:

Code: Alles auswählen

def match(self, value, expr):
    return expr == '*' or value in (int(x) for x in expr.split(','))
`thread` sollte nun schon seit einigen Jahren nicht mehr benutzt werden. Dafür gibt es das `threading`-Modul.

Das eine Methode die `check()` heisst, mehr macht als eine Prüfung durchzuführen ist sehr überraschend.

Konvention für abstrakte Methoden ist das sie eine `NotImplemented`-Ausnahme auslösen.

Hier würde ich mal nicht `Popen` *ohne* eine Shell verwenden, denn der originale crond benutzt eine Shell um die Sachen auszuführen. Wenn Du das nicht machst, dann gehen viele Dinge einfach nicht, also zum Beispiel eingebaute Befehler der Shell, das Auflösen von Shell-Variablen, Umleitungen in Dateien, und so weiter. Das sind alles Sachen die man in `crontab`-Dateien sehr häufig findet.

Das prüfen auf ein `__iter__`-Attribut ist nicht ausreichend um alle iterierbaren Objekte zu identifizieren. An der Stelle würde ich gar nicht testen sondern einfach `tuple()` anwenden und auf den `TypeError` reagieren falls das nicht klappt.

Ich bin schon eine Weile bei Python dabei, aber eine Ausnahme mit ``raise Exception, 'message'`` habe ich glaube ich noch nie ausgelöst. Diese Syntaxvariante ist sowas von veraltet und es gibt sie in Python 3 deshalb auch nicht mehr.

`file` ist als Datentyp gedacht, nicht zum öffnen von Dateien. Dafür gibt es `open()`.

Den Namen der ”null”-Datei braucht man nicht selber ermitteln, den gibt es in der Standardbibliothek als `os.devnull`.

Du machst nichts mit dem Rückgabewert von `Popen`, also insbesondere fragst Du nicht den Rückgabecode des Prozesses ab und erzeugst damit lustige Zombieprozesse!

Warum wird die Klasse `PosixProcessEvent` nur definiert wenn `os.name` gleich 'posix' ist? Wäre es wirklich so schlimm wenn die Klasse auf nicht-posix-Systemen trotzdem existiert? Und wo wird die verwendet?

`SCRIPTNAME` wird definiert aber nicht benutzt.

Die Beschreibung 'Support for all cron features.' ist wohl etwas übertrieben. ;-)
mutetella
User
Beiträge: 1695
Registriert: Donnerstag 5. März 2009, 17:10
Kontaktdaten:

BlackJack hat geschrieben:Das `isinstance()` in `add_Event()` ist „unpythonisch” weil es „duck typing” verhindert.
Versteh' ich jetzt nicht. Würdest Du eher via ``hasattr(e, 'use_thread')`` prüfen, oder was meinst Du?

mutetella
Entspanne dich und wisse, dass es Zeit für alles gibt. (YogiTea Teebeutel Weisheit ;-) )
BlackJack

@mutetella: Ich würde da gar nichts prüfen. Warum denn auch? Wenn sich das nicht so verhält wie es soll wird das schon auffallen. Und ein `hasattr()` könnte fehlschlagen wenn es das nicht als Attribut gibt aber ein echter Zugriff wegen einer `__getattr__()` oder `__getattribute__()`-Methode dann doch funktionieren würde.
mutetella
User
Beiträge: 1695
Registriert: Donnerstag 5. März 2009, 17:10
Kontaktdaten:

BlackJack hat geschrieben:Warum denn auch?
Das hat mir als Info gefehlt... ;-) Dann versteh' ich in der Folge auch, weshalb das "duck typing" verhindert.

mutetella
Entspanne dich und wisse, dass es Zeit für alles gibt. (YogiTea Teebeutel Weisheit ;-) )
Milan
User
Beiträge: 1078
Registriert: Mittwoch 16. Oktober 2002, 20:52

@BlackJack: Tja, da sieht man mal, was alles an mir nach der Schule vorbei gegangen ist, während ich studiert habe und in einer nicht IT-Branche arbeite :D . Kurze Erklärung: Die Struktur (laufen im Vordergrund oder Hintergrund) und die Event-Implementierung stammt von mir, das Gerüst des cron habe ich gemäß Quellenangabe im Header übernommen. Das erklärt auch die unterschiedlichen Stilrichtungen im Code. Wenn ich nach Feierabend mal viel Zeit und Laune habe, werde ich mich mal mit sched befassen und das Gerüst darauf umbauen. Scheint nach 1. Blick ganz gut geeignet zu sein und wohl im wesentlichen kompatibel zu meiner Event Klassifizierung (Python callables und Kindprozesse).

thread habe ich auch übernommen. Wo mir das Konzept von fork() zum Aufteilen von Aufgaben noch sehr einleuchtet, sind mir threads allerdings immer ein wenig fremd geblieben und ich habe niemals richtig gelernt damit umzugehen. Eher würde ich threading Support herausstreichen, als mich da einzuarbeiten, dazu fehlt mir als Vollberufler mit Spaß am Hobbyprogrammieren für eigene Dinge die Zeit. Leider. Traurig.
BlackJack hat geschrieben:Das eine Methode die `check()` heisst, mehr macht als eine Prüfung durchzuführen ist sehr überraschend.

Konvention für abstrakte Methoden ist das sie eine `NotImplemented`-Ausnahme auslösen.

Das prüfen auf ein `__iter__`-Attribut ist nicht ausreichend um alle iterierbaren Objekte zu identifizieren. An der Stelle würde ich gar nicht testen sondern einfach `tuple()` anwenden und auf den `TypeError` reagieren falls das nicht klappt.

Ich bin schon eine Weile bei Python dabei, aber eine Ausnahme mit ``raise Exception, 'message'`` habe ich glaube ich noch nie ausgelöst. Diese Syntaxvariante ist sowas von veraltet und es gibt sie in Python 3 deshalb auch nicht mehr.

Ich bin schon eine Weile bei Python dabei, aber eine Ausnahme mit ``raise Exception, 'message'`` habe ich glaube ich noch nie ausgelöst. Diese Syntaxvariante ist sowas von veraltet und es gibt sie in Python 3 deshalb auch nicht mehr.

`SCRIPTNAME` wird definiert aber nicht benutzt.

Die Beschreibung 'Support for all cron features.' ist wohl etwas übertrieben. ;-)
Ok, Formfehler. Leicht zu fixen, wird gemacht.
BlackJack hat geschrieben:Hier würde ich mal nicht `Popen` *ohne* eine Shell verwenden, denn der originale crond benutzt eine Shell um die Sachen auszuführen. Wenn Du das nicht machst, dann gehen viele Dinge einfach nicht, also zum Beispiel eingebaute Befehler der Shell, das Auflösen von Shell-Variablen, Umleitungen in Dateien, und so weiter. Das sind alles Sachen die man in `crontab`-Dateien sehr häufig findet.
Hm, stimmt. Zumal die Configdateien sich ja in der Regel nicht dynamisch ändern dürften und somit kein Sicherheitsrisiko darstellen.
BlackJack hat geschrieben:`file` ist als Datentyp gedacht, nicht zum öffnen von Dateien. Dafür gibt es `open()`.
Das finde ich ehrlich gesagt inkonsequent. Ein open gibt es in so vielen Modulen, ein file(...) finde ich sehr viel aussagekräftiger. Als damals mit 2.2 oder irgendwann file eingeführt wurde habe ich das so für mich übernommen... Und hätte es theoretisch mit Python 3 zum Standard gemacht. Schade. Naja, Philosophie.
BlackJack hat geschrieben:Den Namen der ”null”-Datei braucht man nicht selber ermitteln, den gibt es in der Standardbibliothek als `os.devnull`.
Wieder was gelernt, hab ich übersehen. Ist auch viel handlicher...
BlackJack hat geschrieben:Du machst nichts mit dem Rückgabewert von `Popen`, also insbesondere fragst Du nicht den Rückgabecode des Prozesses ab und erzeugst damit lustige Zombieprozesse!
Der Wert interessiert mich ja auch nicht. Was muss ich tun, um die Zombies zu vermeiden? Warum entstehen Zombies? Warum terminieren die nicht einfach und und lassen den Rückgabewert von Pythons GC ins Nirvana schicken? Halt, lass mich raten... ich muss den Rückgabewert "abholen", wo er wie ein Baby wartet und ihn dann wegwerfen? Hilf mir bitte, wie?
BlackJack hat geschrieben:Warum wird die Klasse `PosixProcessEvent` nur definiert wenn `os.name` gleich 'posix' ist? Wäre es wirklich so schlimm wenn die Klasse auf nicht-posix-Systemen trotzdem existiert? Und wo wird die verwendet?
Im Modul os (als warpper für die einzelnen separaten Module), socket und früher auch time war so einiges plattformspezifisch. Die Klasse ist dafür da, um cron als root auszuführen, jedoch unpriviligierte Kindprozesse zu ermöglichen, indem setuid und setgid genutzt wird. Das geht nur mit posix! Unter Windows geht das nicht. Gäbe es da die Klasse, würde es unter Umständen erst zur Laufzeit knallen und mitunter der ganze crond abstürzen (nicht getestet). Vor allem aber will ich, dass jemand bereits beim Testen einer Eventdefinition einen Fehler bekommt und nicht erst nach versteckten Laufzeitfehlern suchen muss. Es knallt also schon bei PosixProcessEvent(...) oder "from pycron import PosixProcessEvent" . Es geht nicht, also gibt es das auch nicht. Availability: Unix.

Danke dir für die Ratschläge. :!: :D 8)
BlackJack

@Milan: Zombieprozesse entstehen wenn man den Rückgabecode nicht abfragt. Das Betriebssystem kann die Strukturen eines abgelaufenen Prozessen beziehungsweise mindestens die Prozess-ID nicht wieder freigeben, solange der Rückgabecode nicht abgefragt wurde (oder der Elternprozess komplett beendet ist) weil der Code ja irgendwann doch noch mal abgefragt werden könnte. Solange taucht dieser tote Prozess in der Prozessliste auf.

Ich bin mir nicht sicher, aber vermute stark die Speicherverwaltung von Python kümmert sich nicht darum. Da der Prozess asynchron läuft, kann es ja möglich sein das der noch gar nicht zuende ist wenn das `Popen`-Objekt gelöscht werden kann, also kann man an der Stelle auch den Rückgabecode nicht abfragen, denn dieser Aufruf blockiert. Andererseits gibt es keine Garantien wann oder ob überhaupt ein Objekt von der Speicherverwaltung freigegeben wird. Also sollte man sich besser explizit darum kümmern das keine Zombies entstehen.

Ad `file`: Das war mal ein alias für `open` weil das ableiten einer Klasse von `open` sehr komisch aussieht, ums mal vorsichtig zu sagen. Dann wurde aus `open()` eine eigenständige Funktion gemacht und in Python 3 gibt es nicht mehr *eine* `file`-Klasse sondern die `open()`-Funktion liefert je nach Argumenten einen Wert einer anderen Klasse aus dem `io`-Modul zurück, das heisst man kann das gar nicht auf `file` beschränken und `open()` weglassen.
Milan
User
Beiträge: 1078
Registriert: Mittwoch 16. Oktober 2002, 20:52

Nochmals danke. Wenn ich das richtig verstehe, dann habe ich wenig Möglichkeiten: entweder ich frage den Status ab und riskiere damit einen block (P_WAIT) oder ich bringe einen Kindprozess auf anderen Wegen zustande, z.B. über fork oder CreateProcess unter Windows und xyz unter System abc. Dann bleibt mir also doch nichts übrig, als mir noch einmal Threads anzuschauen und jeden Subprozess in einem Thread zu starten, der auch explizit den Rückgabewert abfragt. Zumindest bleibe ich dann mit Standardmodulen plattformunabhängig, was ich als sehr wichtig erachte.

Zu `file`: So gesehen macht das sehr viel Sinn um bessere Portabilität zu Python 3 zu erreichen. Leider ist es hier auch so: Ich hatte nie die Zeit mich einzuarbeiten.
Milan
User
Beiträge: 1078
Registriert: Mittwoch 16. Oktober 2002, 20:52

Hallo Leute,

nach ein wenig Freizeit habe ich mich noch einmal an den Code gesetzt und ihn zumindest auf blackjacks Empfehlungen hin bearbeitet. Ich denke, dass ich früher oder später die Klasse Cron mit einer Implementierung mittels dem Modul sched ersetzen werde. Vorläufig soll der Code allerdings vor allem funktionieren, daher habe ich ihn nur begrenzt bearbeitet. Die Event Klassen habe ich erhalten, jedoch den Abgleich der Zeit verbessert. Außerdem könnte man bestimmt noch ein wenig mehr mit Threads heraus holen. Ich habe nur eine Variante der Funktion "start_new_thread" aus dem Modul thread implementiert. Macht es Sinn, Meine Klasse BaseEvent direkt von threading.Thread abzuleiten?

PEP 8 werde ich mich als nächstes widmen, versprochen. Dafür hat die Zeit leider nicht gereicht, wird aber demnächst noch gemacht.

Viele Grüße,
Milan

Code: Alles auswählen

#!/usr/bin/python2
#coding: utf-8
#file   : pycron.py
#original code from https://github.com/idning/pcl/blob/master/pcl/crontab.py

#cron, we do the task in the main thread, so do not add task run more than 1 minutes
#you can start thread in your task

import argparse
import datetime
import logging
import os
import shlex
from StringIO import StringIO
import stat
import subprocess
import sys
import time
import threading

try:
    import pwd
    import grp
except ImportError:
    pass

DETACHED_PROCESS = 0x00000008

class Cron(object):
    def __init__(self):
        self.events = []

    def add_Event(self, e):
        self.events.append(e)

    def run(self):
        last_run = 0
        while True:
            #wait to a new minute start
            t = time.time()
            next_minute = t - t%60 + 60
            while t < next_minute:
                sleeptime = 60 - t%60
                logging.debug('current time: %s, we will sleep %.2f seconds' %(t, sleeptime))
                time.sleep(sleeptime)
                t = time.time()

            if last_run and next_minute - last_run != 60:
                logging.warn('Cron Ignored: last_run: %s, this_time:%s' % (last_run, next_minute) )
            last_run = next_minute

            current = datetime.datetime.now()
            for e in self.events:
                e.check_and_run(current)
            time.sleep(0.001)

class BaseEvent(object):
    def __init__(self,desc, use_thread=True):
        """
        desc: min hour day month dow
            day: 1 - num days
            month: 1 - 12
            dow: mon = 1, sun = 7
            example * or 59 or 10,20,30
        """
        self.use_thread = use_thread
        self.runpoints={"minute":range(60), "hour":range(60), "day":range(1,32), "month":range(1,13),"dow":range(1,8)}
        desc_dict=dict(zip(("minute","hour","day","month","dow"),desc.split()))
        for k in desc_dict:
            if desc_dict[k] == "*":
                continue
            points=desc_dict[k].split(",")
            points=sorted(map(int,points))
            if min(points)<min(self.runpoints[k]) or max(points)>max(self.runpoints[k]):
                raise ValueError("%s of Event must be a value between %i and %i" % (k,min(self.runpoints[k]),max(self.runpoints[k])))
            self.runpoints[k]=points

    def matchtime(self, t):
        timedict={"minute":t.minute,"hour":t.hour,"day":t.day,"month":t.month,"dow":t.isoweekday()}
        for k in timedict:
            if not timedict[k] in self.runpoints[k]:
                return False
        return True

    def check_and_run(self,t):
        if self.matchtime(t):
            if self.use_thread:
                runthread = threading.Thread(target = self.run)
                runthread.start()
            else:
                try:
                    self.run()
                except Exception, e:
                    logging.exception(e)

    def run(self):
        raise NotImplementedError("BaseEvent cannot run actions")

class PyEvent(BaseEvent):
    def __init__(self,desc, func, args=(), kwargs={}, use_thread=True):
        BaseEvent.__init__(self,desc,use_thread=use_thread)
        self.func = func
        self.args = args
        self.kwargs = kwargs

    def run(self):
        self.func(*self.args, **self.kwargs)

class ProcessEvent(BaseEvent):
    def __init__(self,desc,cmd,use_thread=True):
        BaseEvent.__init__(self,desc,use_thread=use_thread)
        if isinstance(cmd,basestring):
            self.cmd=tuple(shlex.split(cmd))
        else:
            try:
                self.cmd=tuple(cmd)
            except TypeError:
                raise TypeError("cmd must be a string or an iterable command list like shlex.split returns")

    def run(self):
        Null=open(os.devnull,"w+b")
        p=subprocess.Popen(self.cmd,stdout=Null,stderr=Null,cwd=os.path.abspath(os.sep),shell=True)
        p.wait()
        Null.close()

if os.name == "posix":
    class PosixProcessEvent(ProcessEvent):
        def __init__(self,desc,cmd,username=None,groupname=None,use_thread=True):
            ProcessEvent.__init__(self,desc,cmd,use_thread=False)
            uid=os.getuid()
            gid=os.getgid()
            self.username=username
            self.groupname=groupname
            if username is not None:
                try:
                    p=pwd.getpwnam(username)
                    self.uid=p.pw_uid
                    self.gid=p.pw_gid
                except KeyError:
                    raise KeyError("no user %s found" % username)
                if uid != 0 and self.uid != uid:
                    raise OSError("must be superuser to change user/uid to %s/%i" % (self.username,self.uid))
            else:
                self.uid=uid
            if groupname is not None:
                try:
                    self.gid=grp.getgrnam(groupname).gr_gid
                except KeyError:
                    raise KeyError("no group %s found" % groupname)
                if uid != 0 and self.gid != gid:
                    raise OSError("must be superuser to change groupname/gid to %s/%i" % (self.groupname,self.gid))
            else:
                self.gid=gid

        def run(self):
            def ChangeChildUIDandGID():
                os.umask(stat.S_IWGRP | stat.S_IWOTH)
                os.setgid(self.gid)
                os.setuid(self.uid)
            DevNull=open(os.devnull,"w+b")
            p=subprocess.Popen(self.cmd,preexec_fn=ChangeChildUIDandGID,stdout=DevNull,stderr=DevNull,cwd=os.path.abspath(os.sep),shell=True)
            p.wait()
            DevNull.close()

class NullFile(StringIO):
    def write(self,text):
        self.truncate(0)

if __name__ == "__main__":
#if running as main program, not as module

    def detach():
        if os.name == "posix":
            try:
                pid=os.fork()
                if pid > 0:
                    #exit parent process
                    sys.exit(0)
            except OSError,e:
                print >> sys.stderr, "fork failed: %d (%s)" % (e.errno, e.strerror)
                sys.exit(1)
            # Configure the child processes environment
            os.chdir("/")
            os.setsid()
            os.umask(0)
            sys.stdin.close()
            f=NullFile()
            sys.stdout=sys.stderr=f
            main()
        elif os.name == "nt":
            pid = subprocess.Popen([sys.executable, sys.argv[0]],creationflags=DETACHED_PROCESS,close_fds=True).pid
            sys.exit(0)
        else:
            sys.exit("no daemon mode available for this platform")

    def main():
        cron = Cron()
        cron.add_Event(PosixProcessEvent("55 21 * * 7","/usr/local/bin/radiorec record 890RTL 230",username="samba-rsync",groupname="users"))
        cron.add_Event(PosixProcessEvent("55 1 * * 2,3,4,5","/usr/local/bin/radiorec record 890RTL 65",username="samba-rsync",groupname="users"))
        cron.run()

    parser = argparse.ArgumentParser(description='Cron program written in python. Support for basic cron features. If run by root, it can call commands in any specific user context.')
    group = parser.add_mutually_exclusive_group()
    group.add_argument("-b","--background",dest="daemon",action='store_true',help="run in background, behave like a daemon")
    group.add_argument("-f","--foreground",dest="daemon",action='store_false',help="run in foreground, do not run in daemon mode (DEFAULT)")
    p=parser.parse_args()
    if p.daemon:
        detach()
    else:
        main()
Milan
User
Beiträge: 1078
Registriert: Mittwoch 16. Oktober 2002, 20:52

Ich bin etwas verwirrt, ich suche die Funktion, meinen eigenen Beitrag nach zu bearbeiten. Ich kann sie leider nicht finden. Ich habe beim Testen einen Bug in Zeile 129 gefunden:

Code: Alles auswählen

            ProcessEvent.__init__(self,desc,cmd,use_thread=False)
Es muss heißen:

Code: Alles auswählen

            ProcessEvent.__init__(self,desc,cmd,use_thread=use_thread)
Ansonsten läuft der Code stabil. Anpassugen an PEP 8 sind noch immer auf meine todo, wegen Privatleben & Beruf aber immer noch nicht umgesetzt. Eventuell arbeite ich mich dann auch nach Python 3 vor (jeder braucht ein Hobby ;-) )
Milan
User
Beiträge: 1078
Registriert: Mittwoch 16. Oktober 2002, 20:52

Hallo Leute :D ,

ich bin nicht wenig Stolz, nach langer Zeit einige Fortschritte vorweisen zu können. BlackJack lag mir die ganze Zeit über in den Ohren... Es wurmte mich, zwar funktionierenden Code schreiben zu können, welcher jedoch von keiner guten Qualität ist. Also habe ich mich versucht und bedeutende Teile umgeschrieben und ich habe begonnen, mich mit PEP 8 anzufreunden. Einige Teile, wie die begrenzte Zeilenlänge wollte ich nicht nachträglich umsetzen, werde aber demnächst darauf achten. Ich fühle mich da einfach noch ein wenig unsicher... Ich habe versucht, selbsterklärende Variablennamen zu verwenden und so die Lesbarkeit und Verständlichkeit zu verbessern.

Funktional hat sich einiges gegenüber der letzten Version geändert:
Ich habe das Design der Klassen BaseEvent und Cron angepasst, sodass es jetzt eine Nutzung des Moduls sched möglich ist. BaseEvent und alle abgeleiteten Events können nun als Generator genutzt werden und iterieren über ihre nächsten Ausführungstermine. Dabei lässt sich leicht das Verhalten des Generators ändern, sodass man einen eigenen Startpunkt definieren kann, der sowohl in der Vergangenheit als auch in der Zunft liegen darf. Default ist der Moment der Initialisierung des Generators. Außerdem kann der Generator sich an die Echtzeit anpassen und prüfen, ob zwischen der Generierung/Abfrage des letzten Ausführungszeitpunktes der Verarbeitung des nächsten zuviel Zeit vergangen ist und ein/mehrere Ausführungspunkte übersprungen wurden. In diesem Fall gibt es einen Reset und es wird einfach der nächste Zeitpunkt berechnet.

Cron nutzte jetzt das Modul sched. Es werden immer für 24 Stunden alle Ausführungszeitpunkte aller Events im Voraus berechnet und dann der Reihe nach abgearbeitet. Somit muss nicht mehr jede Minute einzeln geprüft werden, es sollte somit Rechenzeit gespart werden. Die Vorausberechnung erfolgt in einem eigenem Thread, sodass nach Ablauf von 24h keine Pause entsteht.

Ich bitte erneut um Kritik! :D :D Bitte keine Zurückhaltung... Es hat zwar lange gedauert, war aber sehr konstruktiv. Außerdem setze ich mein Skript lokal zum Aufnehmen von Hörspielen aus Internetradiostreams ein. Es ist also erprobt und hat sich bisher gut bewährt.

Code: Alles auswählen

#!/usr/bin/python2
#coding: utf-8
#file   : pycron.py
#original code from https://github.com/idning/pcl/blob/master/pcl/crontab.py

#cron, we do the task in the main thread, so do not add task run more than 1 minutes
#you can start thread in your task

import argparse
import datetime
import logging
import os
import sched
import shlex
import stat
from StringIO import StringIO
import subprocess
import sys
import time
import threading
try:
    import pwd
    import grp
except ImportError:
    pass
DETACHED_PROCESS = 0x00000008


class NullFile(StringIO):
    def write(self, text):
        self.truncate(0)


class Cron(object):
    def __init__(self):
        self.events = []
        self.generators_dict = {}
        self.generators_runpoints_buffer_dict = {}
        self.runpoints_dict = {}
        self.last_calculation_enddatetime = datetime.datetime.now()

    def add_Event(self, event):
        self.events.append(event)
        self.runpoints_dict[event] = []
        self.generators_runpoints_buffer_dict[event] = []

    def calculate_runpoints(self, enddatetime):
        startdatetime = datetime.datetime.now()
        if self.last_calculation_enddatetime >= enddatetime:
            return
        for event in self.events:
            if self.generators_runpoints_buffer_dict[event]:
                for (i, runpoint) in enumerate(self.generators_runpoints_buffer_dict[event]):
                    if runpoint > enddatetime:
                        self.runpoints_dict[event].extend(self.generators_runpoints_buffer_dict[event][:i])
                        del self.generators_runpoints_buffer_dict[event][:i]
                        break
            try:
                generator = self.generators_dict[event]
            except KeyError:
                generator = event.custom_runpoint_generator(startdatetime)
                self.generators_dict[event] = generator
            runpoint = generator.next()
            while runpoint < enddatetime:
                self.runpoints_dict[event] .append(runpoint)
                runpoint = generator.next()
            self.generators_runpoints_buffer_dict[event].append(runpoint)
        self.last_calculation_enddatetime = enddatetime

    def run(self):
        precalculation_timedelta = datetime.timedelta(days=1)
        scheduler = sched.scheduler(time.time, time.sleep)
        enddatetime = datetime.datetime.now() + precalculation_timedelta
        self.calculate_runpoints(enddatetime)
        while True:
            for (event, runpoints) in self.runpoints_dict.iteritems():
                for runpoint in runpoints:
                    scheduler.enterabs(time.mktime(runpoint.timetuple()), event.priority, event.run, ())
                del self.runpoints_dict[event][:]
            enddatetime += precalculation_timedelta
            precalculation_thread = threading.Thread(target=self.calculate_runpoints, kwargs={"enddatetime": enddatetime})
            precalculation_thread.start()
            scheduler.run()
            precalculation_thread.join()


class EventGeneratorRestart(Exception):
    pass


class BaseEvent(object):
    def __init__(self, desc, priority=5, use_thread=True):
        """
        desc: min hour day month dow
            day: 1 - num days
            month: 1 - 12
            dow: mon = 1, sun = 7
            example * or 59 or 10,20,30
        """
        self.args = ()
        self.kw = {}
        self.priority = priority
        self.use_thread = use_thread
        self.runpoints={"minute": range(60), "hour": range(24), "day": range(1, 32), "month": range(1, 13), "dow": range(1, 8)}
        desc_dict = dict(zip(("minute", "hour", "day", "month", "dow"), desc.split()))
        for k in desc_dict:
            if desc_dict[k] == "*":
                continue
            points = desc_dict[k].split(",")
            points = sorted(map(int, points))
            if min(points) < min(self.runpoints[k]) or max(points) > max(self.runpoints[k]):
                raise ValueError("%s of Event must be a value between %i and %i" % (k, min(self.runpoints[k]), max(self.runpoints[k])))
            self.runpoints[k] = points

    def __iter__(self, custom_initialisation=None, check_actuality=False):
        now = datetime.datetime.now()
        if custom_initialisation is None:
            custom_initialisation = now
        elif isinstance(custom_initialisation, datetime.datetime):
            pass
        elif isinstance(custom_initialisation, datetime.date):
            custom_initialisation = datetime.datetime.combine(custom_initialisation, datetime.time())
        elif isinstance(custom_initialisation, (int, long, float)):
            custom_initialisation = datetime.datetime(*time.localtime(custom_initialisation)[:6])
        else:
            raise TypeError("custom_initialisation must be an " +
                "instance of datetime.datetime or datetime.date " +
                "or an integer representing the time in seconds" +
                "since the epoch")
        if check_actuality:
            if custom_initialisation > now:
                now = custom_initialisation
        else:
            now = custom_initialisation
        tolerance = datetime.timedelta(seconds=59)
        while True:
            try:
                for month in self.runpoints["month"]:
                    if month < now.month:
                        continue

                    for day in self.runpoints["day"]:
                        if (month == now.month) and (day < now.day):
                            continue

                        try:
                            if datetime.date(now.year, month, day).isoweekday() not in self.runpoints["dow"]:
                                continue
                        except ValueError:
                            #day may be out of range, for example 2015-02-31
                            continue

                        for hour in self.runpoints["hour"]:
                            if (month == now.month) and (day == now.day) and (hour < now.hour):
                                continue

                            for minute in self.runpoints["minute"]:
                                if (month == now.month) and (day == now.day) and (hour == now.hour) and (minute < now.minute):
                                    continue
                                runpoint_candidate = datetime.datetime(now.year, month, day, hour, minute)

                                if not check_actuality:
                                    if runpoint_candidate >= now:
                                        yield runpoint_candidate
                                elif runpoint_candidate > (datetime.datetime.now()-tolerance):
                                    yield runpoint_candidate
                                else:
                                    raise EventGeneratorRestart("we are late, a runpoint has been skipped. Restart generator.")
                now = datetime.datetime(now.year+1, 1, 1, 0, 0)
            except EventGeneratorRestart:
                #in case a runpoint has been skipped
                now = datetime.datetime.now()

    def custom_runpoint_generator(self, custom_initialisation=None, check_actuality=False):
        return self.__iter__(custom_initialisation, check_actuality)

    def func(*args, **kw):
        raise NotImplementedError("BaseEvent cannot run actions")

    def run(self):
        if self.use_thread:
            runthread = threading.Thread(target=self.func, args=self.args, kwargs=self.kw)
            runthread.start()
        else:
            try:
                self.func(*self.args, **self.kw)
            except NotImplementedError, e:
                raise
            except Exception, e:
                logging.exception(e)


class PyEvent(BaseEvent):
    def __init__(self, desc, func, args=(), kwargs={}, priority=5, use_thread=True):
        BaseEvent.__init__(self, desc, priority=priority, use_thread=use_thread)
        self.func = func
        self.args = args
        self.kwargs = kwargs


class ProcessEvent(BaseEvent):
    def __init__(self, desc, cmd, priority=5, use_thread=True):
        BaseEvent.__init__(self, desc, priority=priority, use_thread=use_thread)
        if isinstance(cmd, basestring):
            self.cmd = tuple(shlex.split(cmd))
        else:
            try:
                self.cmd = tuple(cmd)
            except TypeError:
                raise TypeError("cmd must be a string or an iterable command list like shlex.split returns")

    def func(self):
        with open(os.devnull, "w+b") as DevNull:
            returncode = subprocess.call(self.cmd, stdout=DevNull, stderr=DevNull, cwd=os.path.abspath(os.sep))


class PosixProcessEvent(ProcessEvent):
    def __init__(self, desc, cmd, username=None, groupname=None, priority=5, use_thread=True):
        ProcessEvent.__init__(self, desc, cmd, priority=priority, use_thread=use_thread)
        uid = os.getuid()
        gid = os.getgid()
        if username is not None:
            try:
                p = pwd.getpwnam(username)
                self.uid = p.pw_uid
                self.gid = p.pw_gid
            except KeyError:
                raise KeyError("no user %s found" % username)
            if (uid != 0) and (self.uid != uid):
                raise OSError("must be superuser to change user/uid to %s/%i" % (username, self.uid))
        else:
            self.uid = uid
        self.username = pwd.getpwuid(self.uid).pw_name
        if groupname is not None:
            try:
                self.gid = grp.getgrnam(groupname).gr_gid
            except KeyError:
                raise KeyError("no group %s found" % groupname)
            if (uid != 0) and (self.gid != gid):
                raise OSError("must be superuser to change groupname/gid to %s/%i" % (groupname, self.gid))
        elif username is not None:
            self.gid = pwd.getpwnam(username).pw_gid
        else:
            self.gid = gid

    def func(self):
        def ChangeChildUIDandGID():
            os.umask(stat.S_IWGRP | stat.S_IWOTH)
            os.setgid(self.gid)
            os.initgroups(self.username, self.gid)
            os.setuid(self.uid)
        with open(os.devnull, "w+b") as DevNull:
            returncode=subprocess.call(self.cmd, preexec_fn=ChangeChildUIDandGID, stdout=DevNull, stderr=DevNull, cwd=os.path.abspath(os.sep))
        return returncode


if os.name != "posix":
    del PosixProcessEvent


if __name__ == "__main__":
#if running as main program, not as module
    def detach():
        if os.name == "posix":
            try:
                pid = os.fork()
                if pid > 0:
                    #exit parent process
                    sys.exit(0)
            except OSError,e:
                print >> sys.stderr, "fork failed: %d (%s)" % (e.errno, e.strerror)
                sys.exit(1)
            # Configure the child processes environment
            os.chdir("/")
            os.setsid()
            os.umask(stat.S_IWGRP | stat.S_IWOTH)
            sys.stdin.close()
            f = NullFile()
            sys.stdout = sys.stderr = f
            main()
        elif os.name == "nt":
            pid = subprocess.Popen([sys.executable, sys.argv[0]], creationflags=DETACHED_PROCESS, close_fds=True).pid
            sys.exit(0)
        else:
            sys.exit("no daemon mode available for this platform")

    def main():
        def minute_task():
            print 'minute_task @ %s' % time.time()
            print
        def delay_task():
            print 'delay_task_start @ %s' % time.time()
            time.sleep(5)
            print 'delay_task_end @ %s' % time.time()
            print
        def day_task():
            print 'day_task @ %s' % time.time()
            print

        cron = Cron()
        cron.add_Event(PyEvent('* * * * *'   , minute_task, priority=3, use_thread=False)) # every minute
        cron.add_Event(PyEvent('* * * * *'   , minute_task, priority=5)) # every minute
        cron.add_Event(PyEvent('* * * * *'   , delay_task, priority=4)) # every minute
        cron.add_Event(PyEvent('5 * * * *'  , day_task, priority=4, use_thread=False)) # every hour
        cron.add_Event(PyEvent('34 18 * * *' , day_task)) # every day
        cron.run()

    main()
Sirius3
User
Beiträge: 17738
Registriert: Sonntag 21. Oktober 2012, 17:20

@Milan: das mit dem calculate_runpoints ist mir alles viel zu kompliziert. Warum müssen die Zeitpunkte für 24h im Voraus berechnet werden, oder das ganze in einem Thread ablaufen? Du brauchst doch nur zu wissen, wann welches Event das nächste mal laufen soll, und wenn es gelaufen ist, kannst Du wiederum den nächsten Zeitpunkt berechnen. Damit schrumpft Dir Deine Cron-Klasse auf nicht viel mehr als 5 Zeilen zusammen.

In BaseEvent runpoints vorzubelegen, um sie danach gegebenenfalls zu ersetzen, ist unschön. __iter__ hat keine zusätzlichen Parameter, wenn Du also eine spezielle iter-Funktion hast, würde ich einen Namen ohne Unterstrichen nehmen und in __iter__ diese Methode aufrufen. Die Funktion sollte übrigens nur einen Datentyp erwarten. Wenn jemand einen Timestamp hat, kann er ihn ja vor dem Aufruf in ein datetime-Objekt umwandlen. Alternative datetime-Implementierungen, wie arrow, schließt Du mit Deinem Vorgehen unnötigerweise aus. Die fünffach verschachtelte for-Schleifen und die continues machen den Code unlesbar.

Warum erzeugst Du eine Klasse (PosixProcessEvent), wenn Du sie danach wieder löschst? Sinnvoll wäre doch umgekehrt.

Und was sollen die Definitionen innerhalb eines if? Und die Testfunktion würde ich nicht main, sondern main_test nennen.
Milan
User
Beiträge: 1078
Registriert: Mittwoch 16. Oktober 2002, 20:52

Sirius3 hat geschrieben:@Milan: das mit dem calculate_runpoints ist mir alles viel zu kompliziert. Warum müssen die Zeitpunkte für 24h im Voraus berechnet werden, oder das ganze in einem Thread ablaufen? Du brauchst doch nur zu wissen, wann welches Event das nächste mal laufen soll, und wenn es gelaufen ist, kannst Du wiederum den nächsten Zeitpunkt berechnen. Damit schrumpft Dir Deine Cron-Klasse auf nicht viel mehr als 5 Zeilen zusammen.

In BaseEvent runpoints vorzubelegen, um sie danach gegebenenfalls zu ersetzen, ist unschön. __iter__ hat keine zusätzlichen Parameter, wenn Du also eine spezielle iter-Funktion hast, würde ich einen Namen ohne Unterstrichen nehmen und in __iter__ diese Methode aufrufen. Die Funktion sollte übrigens nur einen Datentyp erwarten. Wenn jemand einen Timestamp hat, kann er ihn ja vor dem Aufruf in ein datetime-Objekt umwandlen. Alternative datetime-Implementierungen, wie arrow, schließt Du mit Deinem Vorgehen unnötigerweise aus. Die fünffach verschachtelte for-Schleifen und die continues machen den Code unlesbar.

Warum erzeugst Du eine Klasse (PosixProcessEvent), wenn Du sie danach wieder löschst? Sinnvoll wäre doch umgekehrt.

Und was sollen die Definitionen innerhalb eines if? Und die Testfunktion würde ich nicht main, sondern main_test nennen.
@Sirius3: Du hast ja Recht, das mit dem im voraus berechnen ist komplex... Es ist ein Workaround für ein Problem, für dass mir keine Lösung eingefallen ist: Die Events können alle zu verschiedenen Zeiten auftreten. Von jeder Minute bis zu einmal im Jahr, falls dann auch noch der Wochentag passend ist, kann alles vorkommen. Um cron mit einem korrektem Ablauf zu betreiben müssen die Events also entweder chronologisch sortiert aus einem Generator kommen oder man muss den übernächsten runpoint aller Events mit berechnen, um kein Event durch warten auszulassen. Da man den übernächsten runpoint dann aber schon generiert hat, muss man ihn zwischenspeichern und wenn man einmal damit anfängt, kann man gleich weiter machen. 24 Stunden sind hier rein willkürlich gewählt. Oder man startet pro Event einen eigenen Scheduler in jeweils einem eigenem Thread.
Ich habe mich für das im voraus Berechnen entschieden. Lieber würde ich eine Klasse EventChain definieren, die beliebig viele Events aufnehmen kann. Aus den Informationen zu den einzelnen runpoints sollte dann ein Generator sofort chronologisch geordnete Ausgaben machen können. Wie du schon bemerkt hast, ist mein Generator für ein Event allein schon recht unschön geschrieben... Ich habe keine einfache Lösung gefunden, die mich zufrieden stellen konnte. Die Datenstrukturen wurden mir jedes mal zu komplex um durch alle runpoints durch zu iterieren und gleichzeitig nicht die Zuordnung zum jeweiligem Event aus den Augen zu verlieren.

Deine restlichen Kritikpunkte habe ich eben lokal gefixt, hatte ich aus den Augen verloren... Was meintest du mit Definitionen innerhalb eines if? Die Funktion main / main_test?

Viele Grüße, Milan
BlackJack

@Milan: Wieso müssen die Events alle in der richtigen Reihenfolge aus einem Generator kommen? Du ermittelst einfach für jeden Eintrag die zeitlich am nächsten liegenden Zeitpunkt an dem er ausgeführt werden soll und fütterst das an *einen* `scheduler`. Egal in welcher Reihenfolge. Für das aufrufen in der richtigen Reihenfolge ist doch genau dieses Objekt zuständig. Und wenn der `scheduler` dann die Aktion aufruft ist das erste was diese Rückruffunktion macht, den nächsten Zeitpunkt zu berechnen und an den `scheduler` zu geben. So gibt es für jeden Eintrag immer genau *einen*, nämlich den nächsten Zeitpunkt im `scheduler`.
Milan
User
Beiträge: 1078
Registriert: Mittwoch 16. Oktober 2002, 20:52

@BlackJack: Ein paar Eigenheiten des Moduls sched machen das leider zunichte:
In multi-threaded environments, the scheduler class has limitations with respect to thread-safety, inability to insert a new task before the one currently pending in a running scheduler, and holding up the main thread until the event queue is empty.
Konkreter Anwendungsfall:

Code: Alles auswählen

#Radiomitschnitt von Hoerbuechern
cron.add_Event(PosixProcessEvent("55 21 * * 7","/usr/local/bin/radiorec record 890RTL 260",username="samba-rsync",groupname="users"))
cron.add_Event(PosixProcessEvent("55 1 * * 2,3,4,5","/usr/local/bin/radiorec record 890RTL 90",username="samba-rsync",groupname="users"))
Nehmen wir mal an ich starte cron nun am Montag Nachmittag. Wenn nun alle Events auf die nächste Ausführung berechnet werden, so wird für das erste Event der nächste Sonntag um 21:55 erhalten. Für das zweite Event wird der frühe Dienstag morgen um 01:55 erhalten. Wenn nur 1 eine Zeit eingetragen wird, so landet möglicherweise die 1. Zeit zuerst im scheduler, so werden 4 Ausführungen des 2. Events übersprungen. Wenn jeweils nur 1 Event eingetragen wird, so müssen die generierten Zeiten chronologisch sortiert sein. Das erreiche ich nur durch Vorberechnung für willkürlich definierte Intervalle oder durch schlaue Generierung unter Einbeziehung aller Events. Für letzteres habe ich keine Lösung gefunden.

Die zweite Variante ist mir bei verfassen der Antwort für Sirius eingefallen: Jedes Event wird in einem eigenem Thread mit eigenem scheduler ausgeführt, sodass diese sich nicht gegenseitig blockieren.
BlackJack

@Milan: Ich habe das Problem ehrlich gesagt nicht verstanden. Warum sollte da etwas übersprungen werden und warum müssen die Zeiten chronologisch sortiert sein? Ich gehe von *einem* `scheduler` aus der in *einem* Thread läuft und wo auch nur neue Einträge aus *diesem* Thread gemacht werden. Die Aufgaben selbst können dann in anderen Threads laufen, aber die haben dann ja mit dem `scheduler` auch nichts mehr am Hut.

Edit: Und wenn man einen Thread pro Eintrag macht kann man sich den `scheduler` auch gleich sparen. Das skaliert aber nicht so wirklich gut.
Milan
User
Beiträge: 1078
Registriert: Mittwoch 16. Oktober 2002, 20:52

@BlackJack: Du hast Recht. Wer lesen kann ist klar im Vorteil, gut dass ich die Zeile aus der Doku noch mal hier geschrieben habe. Es ist doch ein riesiger Unterschied zwischen "inability to insert a new task before the one currently pending in a running scheduler" (Doku) und "inability to insert a new task in a running scheduler" (was ich noch im Kopf hatte). Mea culpa.

Werde ich demnächst mal testen. Danke.
Milan
User
Beiträge: 1078
Registriert: Mittwoch 16. Oktober 2002, 20:52

Danke nochmals, es läuft.

Code: Alles auswählen

class Cron(object):
	def __init__(self):
		self.events = []
		self.scheduler = sched.scheduler(time.time, time.sleep)

	def add_Event(self, event):
		self.events.append(event)

	def refeed_scheduler(self, event, generator):
		runtime_seconds = time.mktime(generator.next().timetuple())
		self.scheduler.enterabs(runtime_seconds, event.priority, self.refeed_scheduler, (event, generator))
		event.run()

	def run(self):
		now = datetime.datetime.now()
		for event in self.events:
			generator = event.custom_runpoint_generator(now)
			runtime_seconds = time.mktime(generator.next().timetuple())
			self.scheduler.enterabs(runtime_seconds, event.priority, self.refeed_scheduler, (event, generator))
		self.scheduler.run()
Milan
User
Beiträge: 1078
Registriert: Mittwoch 16. Oktober 2002, 20:52

Hallo liebe Leute,

ich habe mein Programm mal wieder etwas überarbeitet. Wirklich viel geändert hat sich dabei nicht. Die Klasse Cron kann nun ausgeführt und auch wieder angehalten werden, sowie zur Laufzeit neue Events hinzugefügt oder entfernt werden. Es ist nun sowohl unter Python 2 als auch 3 ohne Anpassung lauffähig und anstelle eines iterativen Weges zum Bestimmen des Laufzeitpunkt eines Events wird nun Backtracking genutzt. Eigentlich wollte ich den Code damit lesbarer machen, jedoch finde ich das nicht wirklich gelungen. Zu Übungszwecken habe ich es trotzdem gemacht, vielleicht ist es ja doch nicht so schlimm?

@an die Moderation: kann man den 1. Beitrag hier editieren und den Code dort hineinfügen? Ich möchte den Code gerne teilen, ohne dass man x Beiträge durchforsten muss, um zur endgültigen Version zu gelangen...

Viele Grüße,
Milan

Code: Alles auswählen

#!/usr/bin/python

import argparse
import datetime
import logging
import os
import sched
import shlex
import stat
import subprocess
import sys
import time
import threading
try:
    import pwd
    import grp
except ImportError:
    pass
DETACHED_PROCESS = 0x00000008
try:
    basestring
except NameError:
    basestring = str

def _function_wrapper(func):
    def exception_safe_func(*args, **kw):
        try:
            func(*args,**kw)
        except NotImplementedError as e:
            raise
        except Exception as e:
            logging.exception(e)
    return exception_safe_func


class Cron(object):
    def __init__(self,*events):
        self.events = set(*events)
        self.event_ids = {}
        self.scheduler = sched.scheduler(time.time, time.sleep)
        self.running = False

    def refeed_scheduler(self, event, generator=None, runpoint=None, run_event=True):
        if generator is None:
            if runpoint is None:
                runpoint = datetime.datetime.now()
            generator = event.custom_runpoint_generator(runpoint)
        runtime_seconds = time.mktime(next(generator).timetuple())
        event_id = self.scheduler.enterabs(runtime_seconds, event.priority, self.refeed_scheduler, (event, generator))
        self.event_ids[event] = event_id
        if run_event:
            try:
                event.run()
            except:
                pass

    def add_Event(self, event):
        self.events.add(event)
        if self.running:
            self.refeed_scheduler(event=event, run_event=True)

    def remove_Event(self, event):
        self.events.discard(event)
        if self.running:
            try:
                self.scheduler.cancel(self.event_ids[event])
            except ValueError:
                pass

    def run(self):
        if self.running:
            return
        now = datetime.datetime.now()
        for event in self.events:
            self.refeed_scheduler(event=event, runpoint=now, run_event=False)
        self.running = True
        self.scheduler.run()

    def stop(self):
        for event_id in self.scheduler.queue:
            self.scheduler.cancel(event_id)
        self.event_ids = {}
        self.running = False


class EventGeneratorRestart(Exception):
    pass


class BaseEvent(object):
    def __init__(self, desc, priority=5, use_thread=True):
        """
        desc: min hour day month dow
            day: 1 - num days
            month: 1 - 12
            dow: mon = 1, sun = 7
            example * or 59 or 10,20,30
        """
        self.args = ()
        self.kw = {}
        self.priority = priority
        self.use_thread = use_thread
        self.time_units = ("month", "day", "dow", "hour", "minute")
        self.runpoints = dict( (("month", range(1, 13)), ("day", range(1, 32)), ("dow", range(1, 8)), ("hour", range(24)), ("minute", range(60))) )
        desc_dict = dict(zip(("minute", "hour", "day", "month", "dow"), desc.split()))
        for k in desc_dict:
            if desc_dict[k] == "*":
                continue
            points = desc_dict[k].split(",")
            points = sorted(map(int, points))
            if min(points) < min(self.runpoints[k]) or max(points) > max(self.runpoints[k]):
                raise ValueError("%s of Event must be a value between %i and %i" % (k, min(self.runpoints[k]), max(self.runpoints[k])))
            self.runpoints[k] = points

    tolerance = datetime.timedelta(seconds=59)

    def __iter__(self):
        return self.custom_runpoint_generator()

    def _match_runpoint(self, now, truncated_now, candidate, time_units, check_actuality=False):
        if  len(time_units) == 0:
            if not check_actuality:
                if candidate >= now:
                    yield candidate
            elif candidate > (datetime.datetime.now() - self.tolerance):
                yield candidate
            else:
                raise EventGeneratorRestart("we are late, a runpoint has been skipped. Restart generator.")
            raise StopIteration
        time_unit = time_units[0]
        time_units = time_units[1:]
        runpoints = self.runpoints[time_unit]
        if time_unit == "dow":
            if candidate.isoweekday() not in runpoints:
                raise StopIteration
            for result in self._match_runpoint(now, truncated_now, candidate, time_units):
                yield result
        else:
            for runpoint in runpoints:
                try:
                    candidate = candidate.replace(**{time_unit:runpoint})
                except ValueError:
                    #day may be out of range, for example 2015-02-31
                    continue
                if candidate < truncated_now:
                    continue
                for result in self._match_runpoint(now, truncated_now.replace(**{time_unit:getattr(now, time_unit)}), candidate, time_units):
                    yield result

    def custom_runpoint_generator(self, custom_initialisation=None, check_actuality=False):
        now = datetime.datetime.now()
        if custom_initialisation is not None:
            if check_actuality and custom_initialisation > now:
                now = custom_initialisation
            if not check_actuality:
                now = custom_initialisation
        while True:
            try:
                truncated_now = datetime.datetime(now.year, 1, 1, 0, 0)
                for result in self._match_runpoint(now, truncated_now, truncated_now, self.time_units, check_actuality=check_actuality):
                    yield result
                now = datetime.datetime(now.year+1, 1, 1, 0, 0)
            except EventGeneratorRestart:
                #in case a runpoint has been skipped
                now = datetime.datetime.now()

    @_function_wrapper
    def func(*args, **kw):
        raise NotImplementedError("BaseEvent cannot run actions")

    def run(self):
        if self.use_thread:
            runthread = threading.Thread(target=self.func, args=self.args, kwargs=self.kw)
            runthread.start()
        else:
            self.func(*self.args, **self.kw)


class PyEvent(BaseEvent):
    def __init__(self, desc, func, args=(), kwargs={}, priority=5, use_thread=True):
        BaseEvent.__init__(self, desc, priority=priority, use_thread=use_thread)
        self.func = _function_wrapper(func)
        self.args = args
        self.kwargs = kwargs


class ProcessEvent(BaseEvent):
    def __init__(self, desc, cmd, priority=5, use_thread=True):
        BaseEvent.__init__(self, desc, priority=priority, use_thread=use_thread)
        if isinstance(cmd, basestring):
            self.cmd = tuple(shlex.split(cmd))
        else:
            try:
                self.cmd = tuple(cmd)
            except TypeError:
                raise TypeError("cmd must be a string or an iterable command list like shlex.split returns")

    @_function_wrapper
    def func(self):
        with open(os.devnull, "w+b") as DevNull:
            returncode = subprocess.call(self.cmd, stdout=DevNull, stderr=DevNull, cwd=os.path.abspath(os.sep))
        return returncode


if os.name == "posix":
    class PosixProcessEvent(ProcessEvent):
        def __init__(self, desc, cmd, username=None, groupname=None, priority=5, use_thread=True):
            ProcessEvent.__init__(self, desc, cmd, priority=priority, use_thread=use_thread)
            uid = os.getuid()
            gid = os.getgid()
            if username is not None:
                try:
                    p = pwd.getpwnam(username)
                    self.uid = p.pw_uid
                except KeyError:
                    raise KeyError("no user %s found" % username)
                if (uid != 0) and (self.uid != uid):
                    raise OSError("must be superuser to change user/uid to %s/%i" % (username, self.uid))
            else:
                self.uid = uid
            self.username = pwd.getpwuid(self.uid).pw_name
            if groupname is not None:
                try:
                    self.gid = grp.getgrnam(groupname).gr_gid
                except KeyError:
                    raise KeyError("no group %s found" % groupname)
                if (uid != 0) and (self.gid != gid):
                    raise OSError("must be superuser to change groupname/gid to %s/%i" % (groupname, self.gid))
            else:
                self.gid = pwd.getpwnam(self.username).pw_gid

        #if not run by root: run simple ProcessEvent mode because preexec_fn would raise an exception
        #if run by root: allow change of user context
        if os.getuid() == 0:
            @_function_wrapper
            def func(self):
                def ChangeChildUIDandGID():
                    os.umask(stat.S_IWGRP | stat.S_IWOTH)
                    os.setgid(self.gid)
                    os.initgroups(self.username, self.gid)
                    os.setuid(self.uid)
                with open(os.devnull, "w+b") as DevNull:
                    returncode=subprocess.call(self.cmd, preexec_fn=ChangeChildUIDandGID, stdout=DevNull, stderr=DevNull, cwd=os.path.abspath(os.sep))
                return returncode


if __name__ == "__main__":
#if running as main program, not as module
    def detach():
        if os.name == "posix":
            try:
                pid = os.fork()
                if pid > 0:
                    #exit parent process
                    sys.exit(0)
            except OSError as e:
                sys.stderr.write("fork failed: %d (%s)\n" % (e.errno, e.strerror))
                sys.stderr.flush()
                sys.exit(1)
            # Configure the child processes environment
            os.chdir(os.path.abspath(os.sep))
            os.setsid()
            os.umask(stat.S_IWGRP | stat.S_IWOTH)
            sys.stdin.close()
            with open(os.devnull, "w+b") as DevNull:
                sys.stdout = sys.stderr = DevNull
                main()
        elif os.name == "nt":
            pid = subprocess.Popen([sys.executable, sys.argv[0]], creationflags=DETACHED_PROCESS, close_fds=True).pid
            sys.exit(0)
        else:
            sys.exit("no daemon mode available for this platform")

    def main():
        def minute_task():
            print ("minute_task @ %s" % datetime.datetime.now().strftime("%a, %d.%m.%Y %H:%M:%S.%f"))
        def day_task():
            print ("day_task @ %s" % datetime.datetime.now().strftime("%a, %d.%m.%Y %H:%M:%S.%f"))

        cron = Cron()

#Radiomitschnitt von Hoerbuechern
        cron.add_Event(PosixProcessEvent("55 21 * * 7","/usr/local/bin/radiorec record 890RTL 260",username="samba-rsync",groupname="users"))
        cron.add_Event(PosixProcessEvent("55 1 * * 2,3,4,5","/usr/local/bin/radiorec record 890RTL 90",username="samba-rsync",groupname="users"))

        cron.add_Event(PosixProcessEvent("55 23 * * *","/usr/local/bin/radiorec record RICKFUTURE 375",username="samba-rsync",groupname="users"))
        cron.add_Event(PosixProcessEvent("55 5 * * 4","/usr/local/bin/radiorec record RICKFUTURE 190",username="samba-rsync",groupname="users"))

        cron.run()

    parser = argparse.ArgumentParser(description='Cron program written in python. Support for basic cron features. If run by root, it can call commands in any specific user context.')
    group = parser.add_mutually_exclusive_group()
    group.add_argument("-b","--background",dest="daemon",action='store_true',default=False,help="run in background, behave like a daemon")
    group.add_argument("-f","--foreground",dest="daemon",action='store_false',default=False,help="run in foreground, do not run in daemon mode (DEFAULT)")
    p=parser.parse_args()
    if p.daemon:
        detach()
    else:
        main()
mutetella
User
Beiträge: 1695
Registriert: Donnerstag 5. März 2009, 17:10
Kontaktdaten:

Milan hat geschrieben:kann man den 1. Beitrag hier editieren und den Code dort hineinfügen? Ich möchte den Code gerne teilen, ohne dass man x Beiträge durchforsten muss, um zur endgültigen Version zu gelangen...
Schon mal über ein Repository nachgedacht?

mutetella
Entspanne dich und wisse, dass es Zeit für alles gibt. (YogiTea Teebeutel Weisheit ;-) )
Antworten