Seite 1 von 1

Verbesserungsvorschläge zum Code?

Verfasst: Donnerstag 17. März 2016, 10:52
von chpo7234
Moin moin,

ich habe ein kleines Programm für die Überwachung eines Netzwerk-Hosts (primär erst mal Printer) geschrieben und würde mir gerne (falls möglich) Verbesserungsvorschläge einholen.

Bild

Das Programm überwacht den Status (Online/Offline) und SNMP-OID's. Bei Status-Änderungen erfolgt eine E-Mail-Benachrichtigung. Die Benachrichtigung bei ausgedruckter Seite ist erst mal test-weise.

Code auf pastebin: http://pastebin.com/FjxuwQUg

Code: Alles auswählen

import datetime
import email.utils
import smtplib
import socket
import subprocess
import sys
import time

from pysnmp.entity.rfc3413.oneliner import cmdgen
from email.mime.text import MIMEText

def sendMail(host, event, old_action, new_action):
    msg = MIMEText(host + " changed his " + event + " from " + old_action + " to " + new_action)
    msg["To"] = email.utils.formataddr(("your@mail.de", "your@mail.de.com"))
    msg["From"] = email.utils.formataddr(("Observer@Printer.de", "Observer@Printer.de"))
    msg["Subject"] = "I've found an event on " + host
    
    server = smtplib.SMTP("localhost")
    server.set_debuglevel(True)
    
    try:
        server.sendmail("Observer@Printer.de", ["your@mail.de.com"], msg.as_string())
    finally:
        server.quit()

class Printer(object):
    
    def __init__(self, name):
        self.updates = 0
        self.update_time = 0
        self.events = ""
        self.num_events = 0
        self.points = ""
        self.ip = socket.gethostbyname(name)
        self.snmp_name = "not available"
        self.dns_name = socket.gethostbyaddr(self.ip)[0].split(".domain")[0]
        self.uptime = "not available"
        self.printed_sites = "not available"
        self.snmp_info1 = "SNMP-Info 1"
        self.snmp_info2 = "SNMP-Info 2"
        self.snmp_info3 = "SNMP-Info 3"
        self.snmp_info4 = "SNMP-Info 4"
        
        self.snmp_info1_status = "not available"
        self.snmp_info2_status = "not available"
        self.snmp_info3_status = "not available"
        self.snmp_info4_status = "not available"
        self.update()

    def update(self):
        try:
            subprocess.check_output("ping -c 1 " + self.ip, shell=True)
            self.status = "Online"
        except Exception:
            self.status = "Offline"
        
        self.updates = self.updates + 1
        self.snmp_name = self.get_snmp_info('1.3.6.1.2.1.1.1.0')
        
        if "No SNMP" in str(self.snmp_name):
            return
        
        seconds = int(self.get_snmp_info('1.3.6.1.2.1.1.3.0'))/100
        self.uptime = str(datetime.timedelta(seconds = int(seconds)))
        self.printed_sites = str(self.get_snmp_info('1.3.6.1.2.1.43.10.2.1.4.1.1'))

        #e.g. schwarzer toner, belichtungseinheit, wartungskit, rollenkit
        self.snmp_info1 = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.6.1.1') 
        self.snmp_info2 = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.6.1.2')
        self.snmp_info3 = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.6.1.3')
        self.snmp_info4 = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.6.1.4')
        
        #e.g. 3000 6000 4000 5000
        status1 = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.9.1.1')  
        status2 = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.9.1.2')
        status3 = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.9.1.3')
        status4 = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.9.1.4')
        
        #e.g. 10000 12000 10000 10000
        status1_max = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.8.1.1')
        status2_max = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.8.1.2')
        status3_max = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.8.1.3')
        status4_max = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.8.1.4')
        
        #e.g. 30% 50%, 40%, 50%
        self.snmp_info1_status = float(status1)/float(status1_max)*100
        self.snmp_info2_status = float(status2)/float(status2_max)*100
        self.snmp_info3_status = float(status3)/float(status3_max)*100
        self.snmp_info4_status = float(status4)/float(status4_max)*100
    
    def output(self): 
        subprocess.call("clear")
        print ("==============================================")
        print ("|| Observing " + self.ip + self.points)
        print ("|| Update-Time.....: " + str(self.update_time) + " seconds")
        print ("|| Updates.........: " + str(self.updates))
        print ("==============================================")
        print ("|| Status..........: " + str(self.status))
        print ("|| DNS-Name........: " + str(self.dns_name))
        print ("|| SNMP-Name.......: " + str(self.snmp_name)[:25])
        print ("|| Uptime..........: " + self.uptime)
        print ("|| Printed sites...: " + self.printed_sites)
        print ("|| " + self.snmp_info1 + " -> " + self.snmp_info1_status + " %")
        print ("|| " + self.snmp_info2 + " -> " + self.snmp_info2_status + " %")
        print ("|| " + self.snmp_info3 + " -> " + self.snmp_info3_status + " %")
        print ("|| " + self.snmp_info4 + " -> " + self.snmp_info4_status + " %")
        print ("==============================================")
        print ("|| Events..........: " + str(self.num_events))
        print ("==============================================" + self.events)
        
    def get_snmp_info(self, oid):
        command_generator = cmdgen.CommandGenerator()
        error_indication, error_status, error_index, var_binding = command_generator.getCmd(
            cmdgen.CommunityData('public'),
            cmdgen.UdpTransportTarget((self.ip,161)),
            oid
        )
        
        if error_indication:
            return error_indication
        else:
            if error_status:
                return ('%s at %s' % (              #print instead of return
                    error_status.prettyPring(),
                    error_index and var_binding[int(error_index)-1] or '?'
                    )
                )
            else:
                return var_binding[0][1]
                
    def notify(self, sleep):
        self.update_time = sleep
        
        while(True):
            temp_status = self.status
            temp_printed_sites = self.printed_sites
            temp_snmp_info1_status = self.snmp_info1_status
            temp_snmp_info2_status = self.snmp_info2_status
            temp_snmp_info3_status = self.snmp_info3_status
            temp_snmp_info4_status = self.snmp_info4_status
            
            self.update()
            
            self.check_event(temp_status, self.status)
            self.check_event(temp_printed_sites, self.printed_sites)
            self.check_event(temp_snmp_info1_status, self.snmp_info1_status)
            self.check_event(temp_snmp_info2_status, self.snmp_info2_status)
            self.check_event(temp_snmp_info3_status, self.snmp_info3_status)
            self.check_event(temp_snmp_info4_status, self.snmp_info4_status)

            if len(self.points)>8:
                self.points=""
            else:
                self.points = self.points + "."

            self.output()
            
            time.sleep(sleep)
            
        def check_event(self, host_object, old_status, new_status):
            if old_status != new_status:
                self.events = self.events + "\n|| " + time.asctime(time.localtime(time.time())) + ": " + host_object + " " + old_status + " -> " + new_status
                sendMail(self.dns_name, host_object, old_status, new_status)
                self.num_events = self.num_events + 1
        
        
def main():
    printer = sys.argv[1]
    
    try:
        socket.gethostbyname(printer)
    except socket.gaierror:
        print(printer + " is not available")
        return
    
    pfle033 = Printer(printer)
    pfle033.notify(3)

if __name__ == main():
    main()
Ist es möglich, mehreren Variablen innerhalb eines Ausdrucks den gleichen Wert zuzuweisen? Der folgende Code (innerhalb der __init__) wirkt für mich so redundant:

Code: Alles auswählen

        self.snmp_name = "not available"
        self.uptime = "not available"
        self.printed_sites = "not available"
        self.snmp_info1_status = "not available"
        self.snmp_info2_status = "not available"
        self.snmp_info3_status = "not available"
        self.snmp_info4_status = "not available"
Auch die Zeilen innerhalb der update()-Methode wirken für mich überladend. Gibt es hier vielleicht auch eine Möglichkeit das code-sparender zu schreiben?

Code: Alles auswählen

        self.snmp_info1 = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.6.1.1') 
        self.snmp_info2 = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.6.1.2')
        self.snmp_info3 = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.6.1.3')
        self.snmp_info4 = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.6.1.4')
        
        status1 = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.9.1.1')  
        status2 = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.9.1.2')
        status3 = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.9.1.3')
        status4 = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.9.1.4')
        
        status1_max = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.8.1.1')
        status2_max = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.8.1.2')
        status3_max = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.8.1.3')
        status4_max = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.8.1.4')
Für die Zukunft habe ich mir folgendes überlegt:
- Event-Logging außerhalb der Konsole (-> .log-Datei)
- Erweiterte Benachrichtigungseinstellungen (z.B. nur Benachrichtigen wenn Wert unterhalb von angegebener Grenze)
- Parallele Überwachung von mehreren Hosts
- Web-Frontend

Wie ist euer Eindruck? Ist die Klassen- und Methodenbildung so sinnvoll? Fällt euch sonst irgendetwas negativ auf?

MfG

Re: Verbesserungsvorschläge zum Code?

Verfasst: Mittwoch 30. März 2016, 11:01
von Sirius3
@chpo7234: die Namensschreibweise ist nicht durchgehend konsistent, es müßte 'send_mail` heißen. Statt Strings mit + zusammenzustückeln format verwenden, Statt Variablen durchzunummerieren, Listen verwenden. Möglich wäre eine Snmp-Info-Klasse, die info-Text und Status zusammenfaßt. check_output mit Liste und ohne shell=True aufrufen. Bei get_snmp_info sollte statt einen Error-String zurückzugeben eine Exception geworfen werden. Statt time.asctime ein datetime-Objekt benutzen. Statt eines event-String lieber eine Liste, dann brauchst Du auch nicht händisch die Anzahl zählen. Die extra Prüfung in main ist überflüssig, da Printer.__init__ mit der selben Exception abbrechen würde.

Code: Alles auswählen

import datetime
import email.utils
import smtplib
import socket
import subprocess
import sys
import time

from pysnmp.entity.rfc3413.oneliner import cmdgen
from email.mime.text import MIMEText

def sendMail(host, event, old_action, new_action):
    msg = MIMEText("{0} changed his {1} from {2} to {3}".format(host, event, old_action, new_action))
    msg["To"] = email.utils.formataddr(("your@mail.de", "your@mail.de.com"))
    msg["From"] = email.utils.formataddr(("Observer@Printer.de", "Observer@Printer.de"))
    msg["Subject"] = "I've found an event on {0}".format(host)
    
    server = smtplib.SMTP("localhost")
    server.set_debuglevel(True)
    
    try:
        server.sendmail("Observer@Printer.de", ["your@mail.de.com"], msg.as_string())
    finally:
        server.quit()

class SnmpInfo(object):
    def __init__(self, num):
        self.info = "SNMP-Info {0}".format(num)
        self.status = "not available"
        
    def update(self, printer):
        #e.g. schwarzer toner, belichtungseinheit, wartungskit, rollenkit
        self.info = printer.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.6.1.%d' % num) 
        status = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.9.1.%d' % num)  
        status_max = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.8.1.%d' % num)
        self.status = float(status)/float(status_max)

    def __str__(self):
        return "{0} -> {1:%}".format(self.info, self.status)
        
class Printer(object):
    
    def __init__(self, name):
        self.updates = 0
        self.update_time = 0
        self.events = []
        self.points = 0
        self.ip = socket.gethostbyname(name)
        self.snmp_name = "not available"
        self.dns_name = socket.gethostbyaddr(self.ip)[0].split(".domain")[0]
        self.uptime = "not available"
        self.printed_sites = "not available"
        self.snmp_infos = [SnmpInfo(n) for n in range(1,5)]
        self.update()

    def update(self):
        try:
            subprocess.check_output(["ping", "-c", "1", self.ip])
            self.status = "Online"
        except Exception:
            self.status = "Offline"
        
        self.updates += 1
        self.snmp_name = self.get_snmp_info('1.3.6.1.2.1.1.1.0')
        
        if "No SNMP" in self.snmp_name:
            return
        
        seconds = int(self.get_snmp_info('1.3.6.1.2.1.1.3.0'))/100
        self.uptime = datetime.timedelta(seconds=int(seconds))
        self.printed_sites = self.get_snmp_info('1.3.6.1.2.1.43.10.2.1.4.1.1')
        for info in self.snmp_infos:
            info.update(self)
    
    def output(self): 
        subprocess.call(["clear"])
        print("==============================================")
        print("|| Observing {0}{1}".format(self.ip, "."*(self.points%8)))
        print("|| Update-Time.....: {0} seconds".format(self.update_time))
        print("|| Updates.........: {0}".format(self.updates))
        print("==============================================")
        print("|| Status..........: {0}".format(self.status))
        print("|| DNS-Name........: {0}".format(self.dns_name))
        print("|| SNMP-Name.......: {0}".format(self.snmp_name))
        print("|| Uptime..........: {0}".format(self.uptime))
        print("|| Printed sites...: {0}".format(self.printed_sites))
        for info in self.snmp_infos:
            print("|| {0}".format(info)
        print("==============================================")
        print("|| Events..........: {0}".format(len(self.events)))
        print("==============================================")
        print('\n'.join(self.events))
        
    def get_snmp_info(self, oid):
        command_generator = cmdgen.CommandGenerator()
        error_indication, error_status, error_index, var_binding = command_generator.getCmd(
            cmdgen.CommunityData('public'),
            cmdgen.UdpTransportTarget((self.ip,161)),
            oid
        )
        
        if error_indication:
            return error_indication
        else:
            if error_status:
                return ('%s at %s' % (              #print instead of return
                    error_status.prettyPring(),
                    error_index and var_binding[int(error_index)-1] or '?'
                    )
                )
            else:
                return var_binding[0][1]
                
    def notify(self, sleep):
        self.update_time = sleep
        
        while True:
            old_status = self.status
            old_printed_sites = self.printed_sites
            self.old_info = [info.status for info in self.snmp_infos]
            self.update()
            self.check_event(old_status, self.status)
            self.check_event(old_printed_sites, self.printed_sites)
            for old_status, info in self.snmp_infos:
                self.check_event(old_status, info.status)
            self.points += 1
            self.output()
            time.sleep(sleep)
            
        def check_event(self, host_object, old_status, new_status):
            if old_status != new_status:
                self.events.append("|| {0}: {1} {2} -> {3}".format(datetime.datetime.now(), host_object, old_status, new_status))
                sendMail(self.dns_name, host_object, old_status, new_status)

def main():
    printer = sys.argv[1]
    try:
        pfle033 = Printer(printer)
        pfle033.notify(3)
    except socket.gaierror:
        print(printer + " is not available")

if __name__ == main():
    main()