Threads und nicht blockierende Sockets

Sockets, TCP/IP, (XML-)RPC und ähnliche Themen gehören in dieses Forum
Antworten
antaeus
User
Beiträge: 48
Registriert: Dienstag 19. September 2006, 10:10

Donnerstag 21. September 2006, 19:36

Hallo,
ich habe mich vorhin an Threads und nicht blockierende Sockets gewagt und komme nicht mehr weiter.

Ziel ist, dass ich einen Echo-Server habe, der mit mehreren Clienten gleichzeitig klarkommt....

Ich habe eine Klasse für den Server. Die macht nicht viel, außer einen Serversocket aufzumachen und auf Connections zu warten. Bei einer neuen Verbindung wir ein "Handler-Thread" erzeugt und der soll sich um die Kundschaft kümmern.

Der Handler wird auch erzeugt und funktioniert soweit auch wie erwartet. Nur funktioniert es nicht, dass der Server sich um mehrere Clients gleichzeitig kümmert. Die neuen Verbindungen werden so lange auf Halde gelegt, bis die momentan aktuelle Verbindung beendet ist.

Also habe ich ein setblocking(0) für den erzeugten Client-Socket in den Code geschrieben, aber wenn ich versuche zum Server eine Verbindung aufzubauen bekomme ich folgenden Fehler:

Traceback (most recent call last):
File "/home/x/workspace/tES/src/main.py", line 3, in ?
echo = echoServer.echoServer()
File "/home/x/workspace/tES/src/echoServer.py", line 33, in __init__
curHandler = handler.handler(curClientSocket, curClientAddr)
File "/home/x/workspace/tES/src/handler.py", line 11, in __init__
self.handle()
File "/home/x/workspace/tES/src/handler.py", line 18, in handle
except self.socket.error:
AttributeError: '_socketobject' object has no attribute 'error'

Wie muss ich denn den Code umbauen, damit ich den multithreaded Echo-Server hinbekomme?

Vielen Dank für Tipps!

Code: Alles auswählen

### server.py ###
import socket as sk
import handler

class echoServer(object):
    
    IP = "localhost"
    PORT = 8080
    BOUND = False

    def __init__(self):
        self.socket = sk.socket(sk.AF_INET, sk.SOCK_STREAM)
        
        # first bind to socket
        
        while not self.BOUND:
            try:
                self.socket.bind((self.IP, self.PORT))
                self.BOUND = True
                self.tell("Bound to port "+str(self.PORT))
            except sk.error:
                self.PORT = self.PORT + 1
                self.tell("Can't bind, trying next port " + str(self.PORT))
                
        # now we're bound, run server loop
                
        if self.BOUND:
            while True:
                self.tell("Listening")
                self.socket.listen(5)           
                (curClientSocket, curClientAddr) = self.socket.accept()
                
                # spawn new Handler thread
                curHandler = handler.handler(curClientSocket, curClientAddr)
                curHandler.start()
        else:
            self.tell("Error, exiting!")
            
    def tell(self, text):
        print "Server:",text   



### handler.py ###
from threading import Thread

class handler(Thread):
    BUFLEN = 1024
    
    def __init__(self, socket, addr):
        Thread.__init__(self)
        self.socket = socket
        self.addr = addr
        self.tell("Socket for Client " + str(self.addr[0]) + ":" + str(self.addr[1])+ " created")
        self.handle()
        
    def handle(self):
        while True:
            #self.socket.setblocking(0) <------------- PROBLEM!
            data = self.socket.recv(self.BUFLEN)          
            
            if not data:
                self.tell("Connection to host closed")
                break
            elif data == "QUIT\r\n":
                self.tell("Closing connection")
                break
            else:
                self.tell(data)
                self.socket.send(">"+data)
        
        self.socket.close()

    def tell(self, text):
        print "Handler:",text                


Benutzeravatar
Rebecca
User
Beiträge: 1662
Registriert: Freitag 3. Februar 2006, 12:28
Wohnort: DN, Heimat: HB
Kontaktdaten:

Freitag 22. September 2006, 07:48

Die Fehlermeldung, die du gepostet hast, passt nicht zum Code.

Die Fehlermeldung besagt, dass du bei einem except-Statement in der Methode handle einen Fehler machst. In dem geposteten Code gibt's aber kein except-Statement in handle(). Und zwar beschwert sich Python, dass du auf ein nicht vorhandenes Attribut self.socket.error zugreifst...

Die Fehlermeldungen sollte man durchaus genau lesen.
rayo
User
Beiträge: 773
Registriert: Mittwoch 5. November 2003, 18:06
Wohnort: Schweiz
Kontaktdaten:

Freitag 22. September 2006, 09:28

Hi

Zusätzlich macht du einen Fehler beim Thread erstellen.

Das stimmt hier nocht ... jedoch dann im __init__ vom handler machst du einen Fehler.

Code: Alles auswählen

curHandler = handler.handler(curClientSocket, curClientAddr)
curHandler.start()
Hier darf self.handle() nicht stehen, sonst wird der Mainthread blockiert, weil in der ___init__ noch kein neuer Thread läuft. Du musst die self.handle() Methode in die run() Methode packen vom Thread (die wird ausgeführt wenn du curHandler.start() machst)

Code: Alles auswählen

def __init__(self, socket, addr):
        Thread.__init__(self)
        self.socket = socket
        self.addr = addr
        self.tell("Socket for Client " + str(self.addr[0]) + ":" + str(self.addr[1])+ " created")
        self.handle()

Etwa so sollte der Thread aussehen

Code: Alles auswählen

class handler(Thread):
    BUFLEN = 1024
    
    def __init__(self, socket, addr):
        Thread.__init__(self)
        self.socket = socket
        self.addr = addr
        self.tell("Socket for Client " + str(self.addr[0]) + ":" + str(self.addr[1])+ " created")
        
    def handle(self):
        while True:
            #self.socket.setblocking(0) <------------- PROBLEM!
            data = self.socket.recv(self.BUFLEN)          
            
            if not data:
                self.tell("Connection to host closed")
                break
            elif data == "QUIT\r\n":
                self.tell("Closing connection")
                break
            else:
                self.tell(data)
                self.socket.send(">"+data)
        
        self.socket.close()

    def run(self):
        self.tell('new thread running')
        self.handle()

    def tell(self, text):
        print "Handler:",text        
Gruss
antaeus
User
Beiträge: 48
Registriert: Dienstag 19. September 2006, 10:10

Freitag 29. September 2006, 09:40

... ich hatte leider in den letzten Tagen keine Zeit, daher erst jetzt "Danke" für eure Antworten!

@ Rebecca: Du hast recht. Da habe ich tatsächlich irgendeinen einen falschen Fehler :D gepostet. Sorry!

@ rayo: Danke, dein Tipp hat gewirkt...

Eine Frage habe ich aber noch bzgl. des (nicht)blockierenden Sockets. Wenn ich versuche den Socket als nicht blockierend zu setzen, raucht mein Programm ab, sobald eine Verbindung aufgebaut wird. Das ist der Output des Programms
Server: Bound to port 8080
Server: Listening
Handler: Socket for Client 127.0.0.1:38470 created
Server: Listening
Exception in thread Thread-1:
Traceback (most recent call last):
File "/usr/lib/python2.4/threading.py", line 442, in __bootstrap
self.run()
File "/home/DrScheme/workspace/tES/src/handler.py", line 14, in run
self.handle()
File "/home/DrScheme/workspace/tES/src/handler.py", line 19, in handle
data = self.socket.recv(self.BUFLEN)
error: (11, 'Resource temporarily unavailable')

Handler: New thread running
Der Code ist noch immer der selbe wie im ersten Posting (abgesehen von den Änderungen, die rayo vorgeschlagen hat). An was liegt denn das?
rayo
User
Beiträge: 773
Registriert: Mittwoch 5. November 2003, 18:06
Wohnort: Schweiz
Kontaktdaten:

Freitag 29. September 2006, 10:29

Hi

Wenn du nicht blockierende Sockets machts, musst du mit dem Modul select arbeiten. Da der Socket z.B. bei einem .connect nicht blockiert und der connect ist noch nicht fertig wird ein recv mit einem Fehler quittiert.

select schaut eine Liste von sockets an und gibt die Sockets, welche bereit zum lesen/schreiben sind zurück.

Gruss
antaeus
User
Beiträge: 48
Registriert: Dienstag 19. September 2006, 10:10

Freitag 29. September 2006, 12:06

Ah, okay! Ich dachte bisher, dass select mehr oder weniger "optional" sei. So kam es jedenfalls in dem Tutorial rüber, das ich gelesen habe. Ich werde es versuchen.

Danke
antaeus
User
Beiträge: 48
Registriert: Dienstag 19. September 2006, 10:10

Samstag 7. Oktober 2006, 12:35

Ich möchte zwei Instanzen der folgenden Klasse gleichzeitig ausführen lassen:

Code: Alles auswählen

import socket as sk
from threading import Thread

class Server(Thread):
    
    def __init__(self, name, ownAddr):
        
        # name is the name of the server
        self.name = name     
        # initialize Threading
        Thread.__init__(self)      
        # size of buffer
        self.bufsize = 1024
        # at the beginning socket is not bound
        self.bound = False
        # ownAddr is the IP and Port number where the server socket is bound to
        self.ownAddr = ownAddr
            
    def run(self):
        self.makeSocket()
        
    def makeSocket(self):
        try:
            self.socket = sk.socket(sk.AF_INET, sk.SOCK_DGRAM)
            self.tell("Socket created")
            self.start()
        except sk.error:
            self.tell("ERROR creating socket")       
            
    def start(self):
        
        self.tell("Now starting...")
        
        running = False
        try: 
            self.socket.bind(self.ownAddr)
            self.tell("Socket bound...")
            running = True
        except sk.error:
            self.tell("ERROR binding to" + self.addrToString(self.ownAddr))   
        
        while running:
            try:
                (data, addr) = self.socket.recvfrom(self.bufsize)
                self.tell("New connection" + self.addrToString(addr))
                self.tell("Got data: " + data)
                if not data:
                    self.tell("Connection lost")
                    running = False
            except sk.error:
                self.tell("ERROR reading data from socket")
                running = False

        self.socket.close()
        self.tell("Socket closed")
           
    def addrToString(self, addr): 
        return " " + addr[0] + ":" + str(addr[1])
            
    def tell(self, text):
        print self.name + ": " + text
        
servera = Server("serverA", ("", 8888))
servera.run()

serverb = Server("serverB", ("", 8889))
serverb.run()
Problem: Es wird nur Server A ausgeführt.
Ich verstehe nicht so recht, an was das liegt. Ich erstelle für jeden Server einen eigenen Thread. Darin kann der Socket so lange blockieren, wie er lustig ist, das dürfte den anderen Thread doch nicht behelligen?! Oder liege ich da falsch?

Oder ist in dem Code ein anderer Fehler, den ich nicht sehe?

Grüße!
A.
rayo
User
Beiträge: 773
Registriert: Mittwoch 5. November 2003, 18:06
Wohnort: Schweiz
Kontaktdaten:

Samstag 7. Oktober 2006, 13:03

Hi

Benenne deine start-Methode bitte anders (start ist reserviert für den Thread). Ich würde listen vorschlagen.

Dann beim Thread erstellen führst du nicht die run-Methode aus sondern servera.start() (nicht deine start sondern das ist Thread.start, darum umbenennen)

Der Thread führt die run-Methode dann selbst aus.

Gruss
antaeus
User
Beiträge: 48
Registriert: Dienstag 19. September 2006, 10:10

Sonntag 8. Oktober 2006, 18:22

Kopf --> Tisch! *aua*

Immer diese verdammten Kleinigkeiten, die man vom prinzip her weiß, aber trorzdem falsch macht!!
Antworten