Verbesserungsvorschläge zum Code?

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
chpo7234
User
Beiträge: 35
Registriert: Dienstag 29. September 2015, 10:19

Moin moin,

ich habe ein kleines Programm für die Überwachung eines Netzwerk-Hosts (primär erst mal Printer) geschrieben und würde mir gerne (falls möglich) Verbesserungsvorschläge einholen.

Bild

Das Programm überwacht den Status (Online/Offline) und SNMP-OID's. Bei Status-Änderungen erfolgt eine E-Mail-Benachrichtigung. Die Benachrichtigung bei ausgedruckter Seite ist erst mal test-weise.

Code auf pastebin: http://pastebin.com/FjxuwQUg

Code: Alles auswählen

import datetime
import email.utils
import smtplib
import socket
import subprocess
import sys
import time

from pysnmp.entity.rfc3413.oneliner import cmdgen
from email.mime.text import MIMEText

def sendMail(host, event, old_action, new_action):
    msg = MIMEText(host + " changed his " + event + " from " + old_action + " to " + new_action)
    msg["To"] = email.utils.formataddr(("your@mail.de", "your@mail.de.com"))
    msg["From"] = email.utils.formataddr(("Observer@Printer.de", "Observer@Printer.de"))
    msg["Subject"] = "I've found an event on " + host
    
    server = smtplib.SMTP("localhost")
    server.set_debuglevel(True)
    
    try:
        server.sendmail("Observer@Printer.de", ["your@mail.de.com"], msg.as_string())
    finally:
        server.quit()

class Printer(object):
    
    def __init__(self, name):
        self.updates = 0
        self.update_time = 0
        self.events = ""
        self.num_events = 0
        self.points = ""
        self.ip = socket.gethostbyname(name)
        self.snmp_name = "not available"
        self.dns_name = socket.gethostbyaddr(self.ip)[0].split(".domain")[0]
        self.uptime = "not available"
        self.printed_sites = "not available"
        self.snmp_info1 = "SNMP-Info 1"
        self.snmp_info2 = "SNMP-Info 2"
        self.snmp_info3 = "SNMP-Info 3"
        self.snmp_info4 = "SNMP-Info 4"
        
        self.snmp_info1_status = "not available"
        self.snmp_info2_status = "not available"
        self.snmp_info3_status = "not available"
        self.snmp_info4_status = "not available"
        self.update()

    def update(self):
        try:
            subprocess.check_output("ping -c 1 " + self.ip, shell=True)
            self.status = "Online"
        except Exception:
            self.status = "Offline"
        
        self.updates = self.updates + 1
        self.snmp_name = self.get_snmp_info('1.3.6.1.2.1.1.1.0')
        
        if "No SNMP" in str(self.snmp_name):
            return
        
        seconds = int(self.get_snmp_info('1.3.6.1.2.1.1.3.0'))/100
        self.uptime = str(datetime.timedelta(seconds = int(seconds)))
        self.printed_sites = str(self.get_snmp_info('1.3.6.1.2.1.43.10.2.1.4.1.1'))

        #e.g. schwarzer toner, belichtungseinheit, wartungskit, rollenkit
        self.snmp_info1 = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.6.1.1') 
        self.snmp_info2 = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.6.1.2')
        self.snmp_info3 = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.6.1.3')
        self.snmp_info4 = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.6.1.4')
        
        #e.g. 3000 6000 4000 5000
        status1 = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.9.1.1')  
        status2 = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.9.1.2')
        status3 = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.9.1.3')
        status4 = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.9.1.4')
        
        #e.g. 10000 12000 10000 10000
        status1_max = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.8.1.1')
        status2_max = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.8.1.2')
        status3_max = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.8.1.3')
        status4_max = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.8.1.4')
        
        #e.g. 30% 50%, 40%, 50%
        self.snmp_info1_status = float(status1)/float(status1_max)*100
        self.snmp_info2_status = float(status2)/float(status2_max)*100
        self.snmp_info3_status = float(status3)/float(status3_max)*100
        self.snmp_info4_status = float(status4)/float(status4_max)*100
    
    def output(self): 
        subprocess.call("clear")
        print ("==============================================")
        print ("|| Observing " + self.ip + self.points)
        print ("|| Update-Time.....: " + str(self.update_time) + " seconds")
        print ("|| Updates.........: " + str(self.updates))
        print ("==============================================")
        print ("|| Status..........: " + str(self.status))
        print ("|| DNS-Name........: " + str(self.dns_name))
        print ("|| SNMP-Name.......: " + str(self.snmp_name)[:25])
        print ("|| Uptime..........: " + self.uptime)
        print ("|| Printed sites...: " + self.printed_sites)
        print ("|| " + self.snmp_info1 + " -> " + self.snmp_info1_status + " %")
        print ("|| " + self.snmp_info2 + " -> " + self.snmp_info2_status + " %")
        print ("|| " + self.snmp_info3 + " -> " + self.snmp_info3_status + " %")
        print ("|| " + self.snmp_info4 + " -> " + self.snmp_info4_status + " %")
        print ("==============================================")
        print ("|| Events..........: " + str(self.num_events))
        print ("==============================================" + self.events)
        
    def get_snmp_info(self, oid):
        command_generator = cmdgen.CommandGenerator()
        error_indication, error_status, error_index, var_binding = command_generator.getCmd(
            cmdgen.CommunityData('public'),
            cmdgen.UdpTransportTarget((self.ip,161)),
            oid
        )
        
        if error_indication:
            return error_indication
        else:
            if error_status:
                return ('%s at %s' % (              #print instead of return
                    error_status.prettyPring(),
                    error_index and var_binding[int(error_index)-1] or '?'
                    )
                )
            else:
                return var_binding[0][1]
                
    def notify(self, sleep):
        self.update_time = sleep
        
        while(True):
            temp_status = self.status
            temp_printed_sites = self.printed_sites
            temp_snmp_info1_status = self.snmp_info1_status
            temp_snmp_info2_status = self.snmp_info2_status
            temp_snmp_info3_status = self.snmp_info3_status
            temp_snmp_info4_status = self.snmp_info4_status
            
            self.update()
            
            self.check_event(temp_status, self.status)
            self.check_event(temp_printed_sites, self.printed_sites)
            self.check_event(temp_snmp_info1_status, self.snmp_info1_status)
            self.check_event(temp_snmp_info2_status, self.snmp_info2_status)
            self.check_event(temp_snmp_info3_status, self.snmp_info3_status)
            self.check_event(temp_snmp_info4_status, self.snmp_info4_status)

            if len(self.points)>8:
                self.points=""
            else:
                self.points = self.points + "."

            self.output()
            
            time.sleep(sleep)
            
        def check_event(self, host_object, old_status, new_status):
            if old_status != new_status:
                self.events = self.events + "\n|| " + time.asctime(time.localtime(time.time())) + ": " + host_object + " " + old_status + " -> " + new_status
                sendMail(self.dns_name, host_object, old_status, new_status)
                self.num_events = self.num_events + 1
        
        
def main():
    printer = sys.argv[1]
    
    try:
        socket.gethostbyname(printer)
    except socket.gaierror:
        print(printer + " is not available")
        return
    
    pfle033 = Printer(printer)
    pfle033.notify(3)

if __name__ == main():
    main()
Ist es möglich, mehreren Variablen innerhalb eines Ausdrucks den gleichen Wert zuzuweisen? Der folgende Code (innerhalb der __init__) wirkt für mich so redundant:

Code: Alles auswählen

        self.snmp_name = "not available"
        self.uptime = "not available"
        self.printed_sites = "not available"
        self.snmp_info1_status = "not available"
        self.snmp_info2_status = "not available"
        self.snmp_info3_status = "not available"
        self.snmp_info4_status = "not available"
Auch die Zeilen innerhalb der update()-Methode wirken für mich überladend. Gibt es hier vielleicht auch eine Möglichkeit das code-sparender zu schreiben?

Code: Alles auswählen

        self.snmp_info1 = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.6.1.1') 
        self.snmp_info2 = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.6.1.2')
        self.snmp_info3 = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.6.1.3')
        self.snmp_info4 = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.6.1.4')
        
        status1 = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.9.1.1')  
        status2 = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.9.1.2')
        status3 = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.9.1.3')
        status4 = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.9.1.4')
        
        status1_max = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.8.1.1')
        status2_max = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.8.1.2')
        status3_max = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.8.1.3')
        status4_max = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.8.1.4')
Für die Zukunft habe ich mir folgendes überlegt:
- Event-Logging außerhalb der Konsole (-> .log-Datei)
- Erweiterte Benachrichtigungseinstellungen (z.B. nur Benachrichtigen wenn Wert unterhalb von angegebener Grenze)
- Parallele Überwachung von mehreren Hosts
- Web-Frontend

Wie ist euer Eindruck? Ist die Klassen- und Methodenbildung so sinnvoll? Fällt euch sonst irgendetwas negativ auf?

MfG
Sirius3
User
Beiträge: 17747
Registriert: Sonntag 21. Oktober 2012, 17:20

@chpo7234: die Namensschreibweise ist nicht durchgehend konsistent, es müßte 'send_mail` heißen. Statt Strings mit + zusammenzustückeln format verwenden, Statt Variablen durchzunummerieren, Listen verwenden. Möglich wäre eine Snmp-Info-Klasse, die info-Text und Status zusammenfaßt. check_output mit Liste und ohne shell=True aufrufen. Bei get_snmp_info sollte statt einen Error-String zurückzugeben eine Exception geworfen werden. Statt time.asctime ein datetime-Objekt benutzen. Statt eines event-String lieber eine Liste, dann brauchst Du auch nicht händisch die Anzahl zählen. Die extra Prüfung in main ist überflüssig, da Printer.__init__ mit der selben Exception abbrechen würde.

Code: Alles auswählen

import datetime
import email.utils
import smtplib
import socket
import subprocess
import sys
import time

from pysnmp.entity.rfc3413.oneliner import cmdgen
from email.mime.text import MIMEText

def sendMail(host, event, old_action, new_action):
    msg = MIMEText("{0} changed his {1} from {2} to {3}".format(host, event, old_action, new_action))
    msg["To"] = email.utils.formataddr(("your@mail.de", "your@mail.de.com"))
    msg["From"] = email.utils.formataddr(("Observer@Printer.de", "Observer@Printer.de"))
    msg["Subject"] = "I've found an event on {0}".format(host)
    
    server = smtplib.SMTP("localhost")
    server.set_debuglevel(True)
    
    try:
        server.sendmail("Observer@Printer.de", ["your@mail.de.com"], msg.as_string())
    finally:
        server.quit()

class SnmpInfo(object):
    def __init__(self, num):
        self.info = "SNMP-Info {0}".format(num)
        self.status = "not available"
        
    def update(self, printer):
        #e.g. schwarzer toner, belichtungseinheit, wartungskit, rollenkit
        self.info = printer.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.6.1.%d' % num) 
        status = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.9.1.%d' % num)  
        status_max = self.get_snmp_info('.1.3.6.1.2.1.43.11.1.1.8.1.%d' % num)
        self.status = float(status)/float(status_max)

    def __str__(self):
        return "{0} -> {1:%}".format(self.info, self.status)
        
class Printer(object):
    
    def __init__(self, name):
        self.updates = 0
        self.update_time = 0
        self.events = []
        self.points = 0
        self.ip = socket.gethostbyname(name)
        self.snmp_name = "not available"
        self.dns_name = socket.gethostbyaddr(self.ip)[0].split(".domain")[0]
        self.uptime = "not available"
        self.printed_sites = "not available"
        self.snmp_infos = [SnmpInfo(n) for n in range(1,5)]
        self.update()

    def update(self):
        try:
            subprocess.check_output(["ping", "-c", "1", self.ip])
            self.status = "Online"
        except Exception:
            self.status = "Offline"
        
        self.updates += 1
        self.snmp_name = self.get_snmp_info('1.3.6.1.2.1.1.1.0')
        
        if "No SNMP" in self.snmp_name:
            return
        
        seconds = int(self.get_snmp_info('1.3.6.1.2.1.1.3.0'))/100
        self.uptime = datetime.timedelta(seconds=int(seconds))
        self.printed_sites = self.get_snmp_info('1.3.6.1.2.1.43.10.2.1.4.1.1')
        for info in self.snmp_infos:
            info.update(self)
    
    def output(self): 
        subprocess.call(["clear"])
        print("==============================================")
        print("|| Observing {0}{1}".format(self.ip, "."*(self.points%8)))
        print("|| Update-Time.....: {0} seconds".format(self.update_time))
        print("|| Updates.........: {0}".format(self.updates))
        print("==============================================")
        print("|| Status..........: {0}".format(self.status))
        print("|| DNS-Name........: {0}".format(self.dns_name))
        print("|| SNMP-Name.......: {0}".format(self.snmp_name))
        print("|| Uptime..........: {0}".format(self.uptime))
        print("|| Printed sites...: {0}".format(self.printed_sites))
        for info in self.snmp_infos:
            print("|| {0}".format(info)
        print("==============================================")
        print("|| Events..........: {0}".format(len(self.events)))
        print("==============================================")
        print('\n'.join(self.events))
        
    def get_snmp_info(self, oid):
        command_generator = cmdgen.CommandGenerator()
        error_indication, error_status, error_index, var_binding = command_generator.getCmd(
            cmdgen.CommunityData('public'),
            cmdgen.UdpTransportTarget((self.ip,161)),
            oid
        )
        
        if error_indication:
            return error_indication
        else:
            if error_status:
                return ('%s at %s' % (              #print instead of return
                    error_status.prettyPring(),
                    error_index and var_binding[int(error_index)-1] or '?'
                    )
                )
            else:
                return var_binding[0][1]
                
    def notify(self, sleep):
        self.update_time = sleep
        
        while True:
            old_status = self.status
            old_printed_sites = self.printed_sites
            self.old_info = [info.status for info in self.snmp_infos]
            self.update()
            self.check_event(old_status, self.status)
            self.check_event(old_printed_sites, self.printed_sites)
            for old_status, info in self.snmp_infos:
                self.check_event(old_status, info.status)
            self.points += 1
            self.output()
            time.sleep(sleep)
            
        def check_event(self, host_object, old_status, new_status):
            if old_status != new_status:
                self.events.append("|| {0}: {1} {2} -> {3}".format(datetime.datetime.now(), host_object, old_status, new_status))
                sendMail(self.dns_name, host_object, old_status, new_status)

def main():
    printer = sys.argv[1]
    try:
        pfle033 = Printer(printer)
        pfle033.notify(3)
    except socket.gaierror:
        print(printer + " is not available")

if __name__ == main():
    main()
Antworten