server.py
Code: Alles auswählen
#!/usr/bin/env python
#coding: utf-8
# Einfacher Stoppable XMLRPC-Server
import time
import threading
import socket; socket.setdefaulttimeout(3) # Timeout auf 3 sec. setzen
from SimpleXMLRPCServer import SimpleXMLRPCServer
from SimpleXMLRPCServer import SimpleXMLRPCRequestHandler
class StoppableXMLRPCServer(SimpleXMLRPCServer, threading.Thread):
def __init__(self, *args, **kwargs):
SimpleXMLRPCServer.__init__(self, *args, **kwargs)
threading.Thread.__init__(self)
def run(self):
# *serve_forever* muss in einem eigenen Thread laufen, damit man es
# unterbrechen kann!
self.serve_forever()
print "Thread beendet"
def stop(self):
self.shutdown()
class XmlrpcHandler(object):
def __init__(self, server):
self._server = server
def get_time(self):
time.sleep(0.2) # nur zum Testen der Threads!
return time.asctime()
def close(self, password):
if password == "12345":
self._stoptimer = threading.Timer(1, self._server.stop)
self._stoptimer.start()
return "Befehl zum Stoppen erteilt"
return "falsches Passwort - Server wird nicht gestoppt"
def main():
server = StoppableXMLRPCServer(("localhost", 50505), logRequests = True)
handler = XmlrpcHandler(server)
server.register_instance(handler)
print "Der XMLRPC-Server horcht auf http://localhost:50505."
print "Er kann mit STRG+C oder STRG+PAUSE beendet werden."
print
server.start()
try:
while server.is_alive():
time.sleep(0.1)
except KeyboardInterrupt:
server.stop()
if __name__ == "__main__":
main()
Dieser Client testet den obigen Server:
client.py
Code: Alles auswählen
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Einfacher XMLRPC-Client zum Testen des "Stoppable XMLRPC-Servers"
import socket; socket.setdefaulttimeout(3) # Timeout auf 3 sec. setzen
import xmlrpclib
def main():
server = xmlrpclib.ServerProxy("http://localhost:50505")
# Testen
for i in xrange(3):
print server.get_time()
# Server stoppen
print server.close("12345")
if __name__ == "__main__":
main()
Gerold
Suchworte: beenden schließen stoppbar stoppen XML-RPC quit close