Seite 1 von 1

Socketerror Testen / Portscanner

Verfasst: Samstag 14. Mai 2005, 14:38
von globox
Also ich bin momentan dabei ein Portscanner zu scripten, das Problem ist dass wenn der Socket nicht auf den Port connecten kann das ganze Script einbricht und mir die timeout Nachricht zeigt.

Code: Alles auswählen

"""
GloBoX - Portscanner
"""

import sys
import time
from socket import *

usage = '''
GloBoX - Portscanner
----------------
usage: %s [host] [startport] [stopport] [-i]
-i\tShows Infos about open Ports
''' % sys.argv[0]

gbx = '''
GloBoX - Portscanner
---------------
'''

def pscan(host, startport, stopport, info):
    stopport_ = int(stopport)
    startport_ = int(startport)
    
    for currport in range(startport_, stopport_+1):
        print '[-] Beginning scan on %s' % host
        if info == 'off':
            try:
                sock = socket(AF_INET, SOCK_STREAM)
                sock.connect((host, currport))
            except IOError:
                print '[-] Port: %s; Closed!' % currport
            else:
                print '[-] Port: %s; Open!' % currport
        sock.shutdown(SHUT_RDWR)
        time.sleep(1)

def start():
    args = sys.argv[1:]
    if len(args) == 3:
        print gbx
        pscan(sys.argv[1], sys.argv[2], sys.argv[3], 'off')
    elif len(args) == 4 and args[4] == '-i':
        print gbx
        pscan(sys.argv[1], sys.argv[2], sys.argv[3], 'on')
    else:
        print usage

if __name__ == '__main__':
    start()
EDIT (jens): Betreff ergänzt

Verfasst: Samstag 14. Mai 2005, 16:05
von jens
Also irgendwas stimmt mit deiner Zeile "if info == 'off':" nicht ;)

Hier mal meine Variante:

Code: Alles auswählen

"""
GloBoX - Portscanner
"""

import sys
import time
from socket import *

usage = '''
GloBoX - Portscanner
----------------
usage: %s [host] [startport] [stopport] [-i]
-i\tShows Infos about open Ports
''' % sys.argv[0]

gbx = '''
GloBoX - Portscanner
---------------
'''

def pscan(host, startport, stopport, info):
    stopport_ = int(stopport)
    startport_ = int(startport)

    print '[-] Beginning scan on %s' % host

    for currport in range(startport_, stopport_+1):
        print "Test Port: %s ..." % currport,
        try:
            sock = socket(AF_INET, SOCK_STREAM)
            sock.connect((host, currport))
            sock.shutdown(SHUT_RDWR)
        except Exception, e:
            print 'Closed [%s]' % e
        else:
            print 'Open!'

        #~ time.sleep(0.2)

def start():
    args = sys.argv[1:]
    if len(args) == 3:
        print gbx
        pscan(sys.argv[1], sys.argv[2], sys.argv[3], 'off')
    elif len(args) == 4 and args[4] == '-i':
        print gbx
        pscan(sys.argv[1], sys.argv[2], sys.argv[3], 'on')
    else:
        print usage

if __name__ == '__main__':
    #~ start()
    pscan( "localhost", 138,140, "on")

Verfasst: Samstag 14. Mai 2005, 17:18
von globox
Aaah supper, dankeschön

Der Fehler war im except

globox

edit:

Achja habs jetzt so und es funktioniert :)

Code: Alles auswählen

"""
GloBoX - Simple Portscanner
"""

import sys
import time
from socket import *

usage = '''
GloBoX - Simple Portscanner
----------------
usage: port [host] [startport] [stopport]
''' % sys.argv[0]

gbx = '''
GloBoX - Simple Portscanner
---------------
'''   

def pscan(host, startport, stopport):
    stopport_ = int(stopport)
    startport_ = int(startport)

    print '[-] Beginning scan on %s' % host

    for currport in range(startport_, stopport_+1):
        try:
            sock = socket(AF_INET, SOCK_STREAM)
            sock.settimeout(2)
            sock.connect((host, currport))
            sock.shutdown(SHUT_RDWR)
        except Exception, e:
            print '[-] Port: %s; Closed! (%s)' % (currport, e) 
        else:
            print '[-] Port: %s; Open!' % currport

    print '[-] Scanned from %s to %s on %s' % (startport, stopport, host)
                
def start():
    args = sys.argv[1:]
    if len(args) == 3:
        print gbx
        pscan(sys.argv[1], sys.argv[2], sys.argv[3])
    else:
        print usage

if __name__ == '__main__':
    start()

Verfasst: Montag 8. August 2005, 12:54
von jens
Wäre nur die Frage, ob man nicht per Thread mehrere Port "gleichzeitig" Testen könnte um die Sache zu beschleunigen!

Verfasst: Montag 8. August 2005, 13:49
von jens
Hab hier noch eingebaut, das Informationen zum aktuellen Port angezeigt werden:

Code: Alles auswählen

#!/usr/bin/python
# -*- coding: UTF-8 -*-

# http://www.python-forum.de/viewtopic.php?t=3262

import sys, socket, urllib, sgmllib

portinfo_url = "http://www.grc.com/port_%s.htm"

class PortInfo(sgmllib.SGMLParser):
    """
    Holt sich informationen über den Port bei grc.com
    Fischt sich die Info's aus der HTML-Seite. Wenn die
    Seite umgebaut wurde, wird's wahrscheinlich nicht mehr
    richtig funktionieren!
    """
    def __init__(self, verbose = 0):
        sgmllib.SGMLParser.__init__(self, verbose)
        self.next_is_Purpose = False
        self.portinfo = "[Not found!]"

    def handle_data( self, data ):
        data = data.strip()

        if self.next_is_Purpose:
            self.portinfo = data
            self.next_is_Purpose = False
        elif data == "Purpose:":
            self.next_is_Purpose = True

    def get_info( self ):
        return self.portinfo


class portscanner:
    def __init__( self, host, start_port=1, end_port=65535 ):
        self.host = host
        self.start_port = start_port
        self.end_port = end_port
        self.portscan()

    def portscan( self ):
        print "--> Beginning scan on %s Port %s - %s" % (
            self.host, self.start_port, self.end_port
        )
        print "-"*80

        for currport in range( self.start_port, self.end_port+1 ):
            self.scan( self.host, currport )

    def scan( self, host, port ):
        print "Port:%s (%-40s)..." % (
            port, self.portinfo( port )
        ),
        try:
            sock = socket.socket( socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(1)
            sock.connect( (host, port) )
            sock.shutdown( socket.SHUT_RDWR )
        except Exception, e:
            print "Closed [%s]" % e
        else:
            print "Open!"
            sock.close()

    def portinfo( self, port ):
        url = urllib.urlopen( portinfo_url % port )
        data = url.read()
        url.close()

        parser = PortInfo()
        parser.feed(data)
        return parser.get_info()

if __name__ == '__main__':
    #~ portscanner( "localhost", 138,140 )
    portscanner( "82.105.57.129", 70,90 )
Schön wäre natürlich das man die Portinformationen in eine Datei fest speichert, oder einen anderen Dienst im Internet findet, wo man einfacher an die Daten kommt...
Die Beispiel IP-Adresse hatte ich aus einer aktuellen eBay-Pishing-SPAM-Mail 8)

Verfasst: Montag 8. August 2005, 14:58
von Leonidas
jens hat geschrieben:Schön wäre natürlich das man die Portinformationen in eine Datei fest speichert, oder einen anderen Dienst im Internet findet, wo man einfacher an die Daten kommt...
Was spricht gegen IANA? Die Datei ist auch teilweise in /etc/services zu finden.

Verfasst: Montag 8. August 2005, 15:51
von Joghurt
Und was spricht gegen Nmap?

SCNR :wink:

Verfasst: Montag 8. August 2005, 19:44
von jens
Hab mal weiter gebastelt... Weil Spass macht! :lol:

Es werden nun die iana.org Seite verwendet. Diese wird in einer Datei zwischen gespeichert...

Code: Alles auswählen

#!/usr/bin/python
# -*- coding: UTF-8 -*-

# http://www.python-forum.de/viewtopic.php?t=3262

import sys, socket, urllib, time
import zipfile, zlib

zlib.Z_DEFAULT_COMPRESSION = 9


class iana_port_info:
    """
    Stell Port-Informationen bereit.
    Diese werden von iana.org gelesen und in einer Cache-Datei
    gespeichert. Beim nächsten Aufruf werden die Daten direkt
    aus der Cache-Datei genutzt.
    """

    url         = "http://www.iana.org/assignments/port-numbers"
    cachefile   = "portscanner_data.zip"

    def __init__( self, verbose = True ):
        self.verbose = verbose
        self.side_data = self._get_side_data()
        self.port_data = {}

        # Daten in Dict schreiben
        self.prepare_data()

    def prepare_data( self ):
        """
        Die Rohdaten von iana.org parsen
        """
        for line in self.side_data.splitlines():
            line = line.split()
            try:
                if not "/tcp" in line[1]:
                    continue

                self._insert_port_info( line )
            except IndexError:
                pass

    def _insert_port_info( self, data ):
        """
        Daten aufteilen und in Dict speichern
        """
        try:
            port_number = int( data[1].rsplit("/")[0] )
        except ValueError:
            return

        keyword = data[0]
        description = " ".join( data[2:] )

        self.port_data[ port_number ] = "%s, %s" % (
            keyword, description
        )

    def _get_side_data( self ):
        """
        Port Daten lesen. Entweder aus Datei oder direkt von
        iana.org, wenn die Datei nicht existiert.
        """
        try:
            return self._read_side_data()
        except IOError:
            # Aus Datei konnte nicht gelesen werden, also
            # holen wir uns die Daten frisch von iana.org
            data = self._iana_data_from_url()
            # Speicher das ganze ab, für nächstes mal
            self._save_side_data( data )
            return data

    def _iana_data_from_url( self ):
        """
        Port Info-Seite von iana.org holen
        """
        if self.verbose: print "connect to '%s'..." % self.url,
        c = urllib.urlopen( self.url )
        if self.verbose: print "OK"
        if self.verbose: print "download portinfo...",
        data = c.read()
        if self.verbose: print "OK"
        c.close()
        return data

    def _save_side_data( self, data ):
        """ Port-Seite in Cache Datei schreiben """
        print "Write Port-Data in ZIP-File...",
        ziparchiv = zipfile.ZipFile( self.cachefile, "w", zipfile.ZIP_DEFLATED)
        ziparchiv.writestr( "port-numbers.txt", data)
        ziparchiv.close()
        print "OK"

    def _read_side_data( self ):
        """ Port-Seite aus Cache Datei lesen """
        ziparchiv = zipfile.ZipFile( self.cachefile, "r" )
        data = ziparchiv.read( "port-numbers.txt")
        ziparchiv.close()
        return data

    def known_ports( self ):
        """ Liefert Liste aller bekannten Ports zurück """
        return sorted( self.port_data.keys() )

    def iteritems( self ):
        """ Hilfreich zum Auflisten der Port-Informationen """
        return sorted( self.port_data.iteritems() )

    def __getitem__( self, port ):
        """ Liefert Informationen zum angegebenen port """
        try:
            return self.port_data[port]
        except KeyError:
            return "unknow port"

    def __len__( self ):
        return len(self.port_data)

#~ t = iana_port_info()
#~ for port, info in t.iteritems():
    #~ print port, "-", info
#~ sys.exit()

class portscanner:

    # Timeout in Sek. nachdem ein Port als zu erkannt werden soll
    timeout = 1

    # Auf wievielfachen Wert soll der timeout gesenkt werden, wenn
    # ein Port offen gescannt wurde
    buffer_multiply = 2

    def __init__( self ):
        self.iana_port_info = iana_port_info()
        print len(self.iana_port_info),"known ports"

    def scan_range( self, host, start_port=1, end_port=65535 ):
        """
        Scannt eine Port-Bereich durch.
        """
        print "\n--> Beginning scan on %s Port %s - %s" % (
            host, start_port, end_port
        )
        print "-"*80

        for port in xrange( start_port, end_port+1 ):
            self.scan_port( host, port )

    def scan_known_ports( self, host, well_know=True ):
        """
        Scannt nur Ports die in der iana.org Liste auftauchen.
        Ist well_know == True werden nur Ports gescannt die als
        Keyword in der iana.org Liste kein '#'-Zeichen haben
        """
        print "\n--> scan know ports on %s" % host

        for port in self.iana_port_info.known_ports():
            if well_know and self.iana_port_info[port].startswith("#"):
                continue
            self.scan_port( host, port )

    def scan_port( self, host, port ):
        """
        Scannt einen port
        """
        print "Port:%5s (%-40s)..." % (
            port, self.iana_port_info[port]
        ),
        start_time = time.time()
        try:
            sock = socket.socket( socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout( self.timeout )
            sock.connect( (host, port) )
            sock.shutdown( socket.SHUT_RDWR )
            scan_time = time.time() - start_time
        except Exception, e:
            print "Closed [%s]" % e
        else:
            print "Open! (%.0fms, %.0fms)" % (
                scan_time*100, self.timeout*100
            )
            sock.close()

            if scan_time < self.timeout:
                # Server reagierte schneller, als der aktuelle Timeout
                self.timeout = scan_time * self.buffer_multiply


if __name__ == '__main__':
    scanner = portscanner()

    host = "82.105.57.129"

    #~ scanner.scan_range( host, 1,65535 )

    scanner.scan_known_ports( host )
EDIT: Weil die Portliste so groß ist, hab ich es nun in eine ZIP-Datei gepackt ;)

EDIT2: Weil die Scanzeiten so lang sind, hab ich am Timeout etwas rumgeschraubt... Sobald ein Port als offen erkannt wurde, wird der Timeout runtergesetzt.
Bei dem SPAM-TEST-Server ist der Port 22 und 25 offen. Danach wird es spürbar schneller ;)

Verfasst: Dienstag 9. August 2005, 09:52
von jens
So, nun ist aber gut mit Spielerei:

Code: Alles auswählen

#!/usr/bin/python
# -*- coding: UTF-8 -*-

"""
Scan know Ports on a Host

usage:

    portscanne.py [host]

Info: http://www.python-forum.de/viewtopic.php?t=3262
"""

import sys, socket, urllib, time
import zipfile, zlib

zlib.Z_DEFAULT_COMPRESSION = 9


class iana_port_info:
    """
    Stell Port-Informationen bereit.
    Diese werden von iana.org gelesen und in einer Cache-Datei
    gespeichert. Beim nächsten Aufruf werden die Daten direkt
    aus der Cache-Datei genutzt.
    """

    url         = "http://www.iana.org/assignments/port-numbers"
    cachefile   = "portscanner_data.zip"

    def __init__( self, verbose = True ):
        self.verbose = verbose
        self.side_data = self._get_side_data()
        self.port_data = {}

        # Daten in Dict schreiben
        self.prepare_data()

    def prepare_data( self ):
        """
        Die Rohdaten von iana.org parsen
        """
        for line in self.side_data.splitlines():
            line = line.split()
            try:
                if not "/tcp" in line[1]:
                    continue

                self._insert_port_info( line )
            except IndexError:
                pass

    def _insert_port_info( self, data ):
        """
        Daten aufteilen und in Dict speichern
        """
        try:
            port_number = int( data[1].rsplit("/")[0] )
        except ValueError:
            return

        keyword = data[0]
        description = " ".join( data[2:] )

        self.port_data[ port_number ] = "%s, %s" % (
            keyword, description
        )

    def _get_side_data( self ):
        """
        Port Daten lesen. Entweder aus Datei oder direkt von
        iana.org, wenn die Datei nicht existiert.
        """
        try:
            return self._read_side_data()
        except IOError:
            # Aus Datei konnte nicht gelesen werden, also
            # holen wir uns die Daten frisch von iana.org
            data = self._iana_data_from_url()
            # Speicher das ganze ab, für nächstes mal
            self._save_side_data( data )
            return data

    def _iana_data_from_url( self ):
        """
        Port Info-Seite von iana.org holen
        """
        if self.verbose: print "connect to '%s'..." % self.url,
        c = urllib.urlopen( self.url )
        if self.verbose: print "OK"
        if self.verbose: print "download portinfo...",
        data = c.read()
        if self.verbose: print "OK"
        c.close()
        return data

    def _save_side_data( self, data ):
        """ Port-Seite in Cache Datei schreiben """
        print "Write Port-Data in ZIP-File...",
        ziparchiv = zipfile.ZipFile( self.cachefile, "w", zipfile.ZIP_DEFLATED)
        ziparchiv.writestr( "port-numbers.txt", data)
        ziparchiv.close()
        print "OK"

    def _read_side_data( self ):
        """ Port-Seite aus Cache Datei lesen """
        ziparchiv = zipfile.ZipFile( self.cachefile, "r" )
        data = ziparchiv.read( "port-numbers.txt")
        ziparchiv.close()
        return data

    def known_ports( self ):
        """ Liefert Liste aller bekannten Ports zurück """
        return sorted( self.port_data.keys() )

    def iteritems( self ):
        """ Hilfreich zum Auflisten der Port-Informationen """
        return sorted( self.port_data.iteritems() )

    def __getitem__( self, port ):
        """ Liefert Informationen zum angegebenen port """
        try:
            return self.port_data[port]
        except KeyError:
            return "unknow port"

    def __len__( self ):
        return len(self.port_data)

#~ t = iana_port_info()
#~ for port, info in t.iteritems():
    #~ print port, "-", info
#~ sys.exit()

class portscanner:

    # Timeout in Sek. nachdem ein Port als zu erkannt werden soll
    timeout = 1

    # Auf wievielfachen Wert soll der timeout gesenkt werden, wenn
    # ein Port offen gescannt wurde
    buffer_multiply = 1.75

    # Mindest Timeout-Wert
    min_timeout = 0.01

    def __init__( self, host ):
        self.host = host

        self.iana_port_info = iana_port_info()
        print "\n%s known ports" % len(self.iana_port_info)

        self.print_IP_info()

        self.set_timeout()

        self.total_starttime = time.time()

    def scan_range( self, start_port=1, end_port=65536 ):
        """
        Scannt eine Port-Bereich durch.
        """
        print "Beginning scan, Port %s - %s" % (
            start_port, end_port
        )
        print "-"*80

        self.total_ports = end_port+1 - start_port

        self.index = 0
        for port in xrange( start_port, end_port+1 ):
            self.index += 1
            self.scan_port( self.host, port )

    def print_IP_info( self ):
        server_name = socket.getfqdn( self.host )
        print "%s - %s" % ( self.host, server_name )


    def scan_known_ports( self, well_know=True ):
        """
        Scannt nur Ports die in der iana.org Liste auftauchen.
        Ist well_know == True werden nur Ports gescannt die als
        Keyword in der iana.org Liste kein '#'-Zeichen haben
        """
        print "scan know ports"
        print "-"*80

        self.total_ports = len(self.iana_port_info)

        self.index = 0
        for port in self.iana_port_info.known_ports():
            self.index += 1
            if well_know and self.iana_port_info[port].startswith("#"):
                continue
            self.scan_port( self.host, port )


    def set_timeout( self ):
        sys.stdout.write( "Set timeout" )
        often_open_ports = (80,21,22,25,53,110,143)
        total_time = 0
        for port in often_open_ports:
            sys.stdout.write(".")
            start_time = time.time()
            if self._scan( self.host, port ) == True:
                scan_time = time.time() - start_time
                self.new_timeout( scan_time )

        print "timeout: %.1fms" % (self.timeout*100)

    def scan_port( self, host, port ):
        """
        Scannt einen port
        """
        info_line = "%5s %-52s" % ( port, self.iana_port_info[port][:52] )

        percent = "%.1f%%" % (float(self.index) / self.total_ports * 100)

        elapsed = (time.time()-self.total_starttime)        # Vergangene Zeit
        estimated = elapsed / self.index * self.total_ports # Geschätzte Zeit

        time_info = "%.1f/%.1fmin" % (elapsed/60, estimated/60)

        sys.stdout.write(
            "%s %s %s" % (info_line, percent, time_info)
        )
        start_time = time.time()
        port_status = self._scan( host, port )
        sys.stdout.write("\r")
        scan_time = time.time() - start_time

        if port_status != True:
            # Port ist geschlossen
            return

        print "%s Open! (%.1fms, %.1fms)" % (
            info_line, scan_time*100, self.timeout*100
        )
        self.new_timeout( scan_time )

    def new_timeout( self, scan_time ):
        new_timeout = scan_time * self.buffer_multiply

        if new_timeout < self.min_timeout:
            self.timeout = self.min_timeout
        else:
            self.timeout = new_timeout

    def _scan( self, host, port ):
        try:
            sock = socket.socket( socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout( self.timeout )
            sock.connect( (host, port) )
            sock.shutdown( socket.SHUT_RDWR )
        except Exception, e:
            return False
        else:
            sock.close()
            return True

if __name__ == '__main__':
    try:
        host = sys.argv[1]
    except IndexError:
        print __doc__
        sys.exit()

    #~ host = "82.105.57.129"
    scanner = portscanner( host )

    #~ scanner.scan_range( 1, 65535 )

    scanner.scan_known_ports()
Nun ist die Ausgabe auf der Consoler sinnvoller, man hat nun eine Fortschrittanzeige mit Prozent und Zeitangabe. Außerdem sieht man nun besser, welche Ports offen sind, denn nur diese Zeilen bleiben am Ende stehen.

EDIT: Ok, noch eine Änderung... Nun kann man die Adresse als Parameter angeben.
Außerdem wird nun der Timeout am Anfang gesetzt anhand von oft offenen Ports. Dann startet der Scann viel schneller...

Verfasst: Mittwoch 10. August 2005, 17:57
von jens
Hab gerade einen "multithreaded portscanner" gefunden: http://aspn.activestate.com/ASPN/Cookbo ... ipe/286240