mehr als ein download zum gleichen zeitpunkt

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
matrixnet
User
Beiträge: 35
Registriert: Donnerstag 21. April 2005, 16:45

hi

ich will das mein client mehrere downloads oder verbindungen aufbaut!
statt immer das nur eins davon geht!

Code: Alles auswählen

"""
SAM based RawServer replacement

Written by duck in 2004 and released into the public domain.
"""

from bisect import insort
import select, socket
import string
from cStringIO import StringIO
from traceback import print_exc, print_stack
from errno import EINTR, EWOULDBLOCK
from time import time, sleep
from random import randrange
import sys

import Logger
import SamBuffer

class SingleSocket:
    def __init__(self, raw_server, sock, handler, logger):
        self.raw_server = raw_server
        # sock is (id, dest)
        self.id = sock[0]
        self.dest = sock[1]
        self.handler = handler
        self.logger = logger
        self.buffer = StringIO()
        self.last_hit = time()
        self.connected = False

    def connect(self):
        self.connected = True

    def close(self):
        """Initiate socket closing."""
        self.raw_server.logger.debug("Sending close to %d", self.id)

        # uncomment to dump the stack to the logs
        #data = StringIO()
        #print_stack(file = data)
        #self.raw_server.logger.debug(data.getvalue())

        self.raw_server._sendClose(self.id)

    def get_ip(self):
        return self.dest

    def shutdown(self, val):
        self.raw_server.logger.debug("Sending shutdown to %d, level %d", self.id, val)
        self.raw_server._sendClose(self.id)

    def is_flushed(self):
        return self.buffer.tell() == 0

    def write(self, s):
        self.raw_server.logger.debug("Writing to buffer of %d, length %d", self.id, len(s))
        self.buffer.write(s)

    def try_write(self):
        if self.connected:
            self.buffer.seek(0)
            while True:
                nmax = 32768
                block = self.buffer.read(nmax)
                if len(block)>0:
                    self.raw_server.logger.debug("Sending data to %d, length %d", self.id, len(block))
                    self.raw_server._sendData(self.id, block)
                else:
                    self.buffer.reset()
                    self.buffer.truncate()
                    return

def default_error_handler(x):
    print x

class SamServer:
    def __init__(self, doneflag, bind, timeout_check_interval, timeout,
                 sam_bridge, tunnel_depth, tunnel_number, length_variance,
                 noisy = True, errorfunc = default_error_handler,
                 maxconnects = 55):
        # default sam config
        self.samhost = sam_bridge.split(':')[0]
        self.samport = int(sam_bridge.split(':')[1])
        self.version = '1.0'
        self.tunnel_depth = tunnel_depth
        self.tunnel_number = tunnel_number
        self.length_variance = length_variance

        self.connected = False
        self.handshaked = False
        self.bound = False

        if not bind:
            self.sessionname = 'I2P-BT-%d' % randrange(2 ** 30)
        else:
            self.sessionname = bind
        self.mydest = None

        self.samBuffer = SamBuffer.SamBuffer()
        self.outBuffer = []

        self.timeout_check_interval = timeout_check_interval
        self.timeout = timeout
        self.single_sockets = {}
        self.dead_from_write = []
        self.doneflag = doneflag
        self.noisy = noisy
        self.errorfunc = errorfunc
        self.maxconnects = maxconnects
        self.funcs = []
        self.unscheduled_tasks = []

        self.maxId = 0
        self._connect()
        self.add_task(self.scan_for_timeouts, timeout_check_interval)

    def _connect(self):
        """Connect to SAM."""
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.setblocking(0)
        try:
            self.sock.connect_ex((self.samhost, self.samport))
        except socket.error:
            raise
        except Exception, e:
            raise socket.error(str(e))
        self.handshaked = False

    def _disconnect(self):
        """Close the SAM connection."""
        sleep(0.01)
        self.sock.close()
        self.connected = False

    def _recv(self):
        """Read data from SAM and add it to the buffer."""
        data = self.sock.recv(100000)
        self.logger.debug("Receiving data from SAM, length %d", len(data))
        if data == '':
            self._disconnect()
        else:
            self.samBuffer.add(data)
                

    def _send(self, message):
        """Add a message to the out buffer."""
        self.outBuffer.append(message)

    def _sendHandshake(self):
        self._send("HELLO VERSION MIN=%s MAX=%s\n" % (self.version, self.version))
        self.wantCommand = True

    def _sendClose(self, id):
        self._send("STREAM CLOSE ID=%d\n" % id)

    def _sendConnect(self, id, dest):
        self._send("STREAM CONNECT ID=%d DESTINATION=%s\n" % (id, dest))

    def _sendData(self, id, msg):
        self._send("STREAM SEND ID=%d SIZE=%d\n%s" % (id, len(msg), msg))

    def _sendBuffer(self):
        """Now really send the buffer"""
        try:
            while self.outBuffer != []:
                self.logger.debug("Sending data to SAM, length %d", len(self.outBuffer[0]))
                amount = self.sock.send(self.outBuffer[0])
                if amount != len(self.outBuffer[0]):
                    if amount != 0:
                        self.outBuffer[0] = self.outBuffer[0][amount:]
                    break
                del self.outBuffer[0]
        except socket.error, e: 
            code, msg = e
            if code != EWOULDBLOCK:
                self.logger.error("SAM connection error %d: %s", code, msg)
                return

    def _on_message(self, msg, kwargs, data):
        """Call the proper _on_* handler"""
        #print msg, kwargs
        name = '_on_' + msg.upper().replace(' ', '_')
        getattr(self, name)(kwargs, data)

    def _on_HELLO_REPLY(self, kwargs, data):
        self.handshaked = True
        
        tunnel_parameters = ' inbound.nickname=%s outbound.nickname=%s' % (self.sessionname, self.sessionname)
        tunnel_parameters += ' inbound.length=%s' % self.tunnel_depth
        tunnel_parameters += ' outbound.length=%s' % self.tunnel_depth
        if self.tunnel_number:
            tunnel_parameters += ' inbound.quantity=%s' % self.tunnel_number
            tunnel_parameters += ' outbound.quantity=%s' % self.tunnel_number
        tunnel_parameters += ' inbound.lengthVariance=%s' % self.length_variance
        tunnel_parameters += ' outbound.lengthVariance=%s' % self.length_variance
            
        self._send("SESSION CREATE STYLE=STREAM DESTINATION=%s DIRECTION=BOTH%s\n"
            % (self.sessionname, tunnel_parameters))

    def _on_SESSION_STATUS(self, kwargs, data):
        if not self.bound:
            self.bound = True
            # Look up your destination
            self._send("NAMING LOOKUP NAME=ME\n")

    def _on_NAMING_REPLY(self, kwargs, data):
        if not self.mydest and kwargs['RESULT'] == 'OK' and kwargs['NAME'] == 'ME':
            self.mydest = kwargs['VALUE'] + '.i2p'
        else:
            print "Huh, what did we ask?"

    def _on_STREAM_CONNECTED(self, kwargs, data):
        id = int(kwargs['ID'])
        dest= kwargs['DESTINATION']
        self.logger.debug("Inbound connection at %d from %s", id, dest)
        s = SingleSocket(self, (id, dest), self.handler, self.logger)
        s.connect()
        self.single_sockets[id] = s
        self.handler.external_connection_made(s)

    def _on_STREAM_RECEIVED(self, kwargs, data):
        id = int(kwargs['ID'])
        if self.single_sockets.has_key(id):
            self.logger.debug("Received data for %d, length %d", id, len(data))
            s = self.single_sockets.get(id)
            s.last_hit = time()
            s.handler.data_came_in(s, data)
        else:
            self.logger.debug("Connection already went away (stream received) for %d", id)

    def _on_STREAM_CLOSED(self, kwargs, data):
        id = int(kwargs['ID'])
        if self.single_sockets.has_key(id):
            self.logger.debug("Connection with %d closed", id)
            s = self.single_sockets[id]
            self._close_socket(s)
        else:
            self.logger.debug("Connection already went away (stream closed) for %d", id)

    def _on_STREAM_STATUS(self, kwargs, data):
        id = int(kwargs['ID'])
        if self.single_sockets.has_key(id):
            s = self.single_sockets.get(id)
            if kwargs['RESULT'] == 'OK':
                self.logger.debug("Outbound connection to %d established", id)
                s.connect()
            else:
                self.logger.debug("Outbound connection to %d failed %s", id, kwargs['RESULT'])
                self._close_socket(s)
        else:
            self.logger.debug("Connection already went away (stream status) for %d", id)

    def add_task(self, func, delay):
        self.unscheduled_tasks.append((func, delay))

    def scan_for_timeouts(self):
        self.add_task(self.scan_for_timeouts, self.timeout_check_interval)
        t = time() - self.timeout
        tokill = []
        for s in self.single_sockets.values():
            if s.last_hit < t:
                tokill.append(s)
        for k in tokill:
            self.logger.debug("Connection with %d closing due to timeout", k.id)
            k.close()

    def bind(self, port = '', reuse = False):
        """Specify the sessionname return the destination. blocks evily"""

        self.logger = Logger.Logger(self.sessionname)
        self.logger.info("Binding to '%s'", self.sessionname)

        while self.mydest == None:
            self.loopOnce()

        self.logger.info("Bound on %s", self.mydest)
        return self.mydest

    def start_connection(self, dns, handler = None):
        """Connect to a remote location. Returns a SingleSocket"""
        if handler == None:
            handler = self.handler
        self.maxId += 1
        # ignore the port
        dest = dns[0]
        # Remove Azureus style .i2p
        if len(dest) >= 256 and dest[-4:] == '.i2p':
            dest = dest[:-4]
        self.logger.debug("Connecting on %d to %s", self.maxId, dest)
        self._sendConnect(self.maxId, dest)
        s = SingleSocket(self, (self.maxId, dest), self.handler, self.logger)
        self.single_sockets[self.maxId] = s
        s.connect()
        return s

    def loopOnce(self, timeout=1.0):
        """Do a single poll loop"""
        sock = self.sock

        # check if there is data
        if self.outBuffer != [] or not self.connected:
            r = [sock]
            w = [sock]
            e = [sock]
        else:
            r = [sock]
            w = []
            e = [sock]

        # see http://mail.python.org/pipermail/python-dev/2000-October/009671.html
        while True:
            try:
                (R, W, E) = select.select(r, w, e, timeout)
            except select.error, v:
                if v[0] != EINTR:
                    raise
            else:
                break

        if sock in R:
            # receive incoming data
            self._recv()
        if sock in W:
            # initiate handshake upon connection
            if not self.connected:
                self.connected = True
                self._sendHandshake()
            # sending buffer to SAM
            self._sendBuffer()
        if sock in E:
            self.logger.error("SAM connection error")

        # handle messages
        for m, a, d in self.samBuffer.getMessages():
            self._on_message(m, a, d)

        # look if some socket has something to write
        for s in self.single_sockets.values():
            if s.connected and not s.is_flushed():
                s.try_write()
                if s.is_flushed():
                    s.handler.connection_flushed(s)

    def pop_unscheduled(self):
        try:
            while True:
                (func, delay) = self.unscheduled_tasks.pop()
                insort(self.funcs, (time() + delay, func))
        except IndexError:
            pass

    def listen_forever(self, handler):
        self.handler = handler
        try:
            while not self.doneflag.isSet():
                try:
                    self.pop_unscheduled()
                    if len(self.funcs) == 0:
                        period = 2 ** 30
                    else:
                        period = self.funcs[0][0] - time()
                    if period < 0:
                        period = 0
        
                    # network stuff
                    self.loopOnce(period)

                    # handle events
                    while len(self.funcs) > 0 and self.funcs[0][0] <= time():
                        garbage, func = self.funcs[0]
                        del self.funcs[0]
                        func()
                    #self._close_dead()
                    #self.handle_events(events)
                    #if self.doneflag.isSet():
                    #    return
                    #self._close_dead()
                except KeyboardInterrupt:
                    print_exc()
                    return

        finally:
#            for ss in self.single_sockets.values():
#                ss.close()
            self._disconnect()

    def _close_dead(self):
        while len(self.dead_from_write) > 0:
            old = self.dead_from_write
            self.dead_from_write = []
            for s in old:
                self._close_socket(s)

    def _close_socket(self, s):
        if s:
            id = s.id
            del self.single_sockets[id]
            s.connected = False

            # uncomment to dump the stack to the logs
            #data = StringIO()
            #print_stack(file = data)
            #self.logger.debug(data.getvalue())

            s.handler.connection_lost(s)
        else:
            self.logger.debug("Socket already went away (close_socket)")

class DummyHandler:
    def __init__(self):
        pass


def main():
    from threading import Event
    fa = Event()
    es = SamServer(fa, None, 100, 100)
    handler = DummyHandler()
    dest = es.bind(2134)
    print dest
    es.listen_forever(handler)

if __name__ == '__main__':
    main()
Clython
User
Beiträge: 151
Registriert: Samstag 21. August 2004, 13:58
Wohnort: Schweiz, BE-2500

Das solltest du mit threads hinkriegen...

Hmm, on second thought: redundanter Kommentar...
Clython
User
Beiträge: 151
Registriert: Samstag 21. August 2004, 13:58
Wohnort: Schweiz, BE-2500

Soweit ich das sehe, muss du dafür jedem Download einen eigenen Thread zuteilen und diesen beenden, sobald der Download beendet ist.

Schau die mal die Dokumentation von Thread an. Ich brauche das leider nie...
matrixnet
User
Beiträge: 35
Registriert: Donnerstag 21. April 2005, 16:45

danke werde mal schauen was ich machen kann!
BlackJack

Das `threading` Modul ist etwas mehr "high-level".
Gast

problem gelöst!

danke für die infos
Antworten