ich will das mein client mehrere downloads oder verbindungen aufbaut!
statt immer das nur eins davon geht!
Code: Alles auswählen
"""
SAM based RawServer replacement
Written by duck in 2004 and released into the public domain.
"""
from bisect import insort
import select, socket
import string
from cStringIO import StringIO
from traceback import print_exc, print_stack
from errno import EINTR, EWOULDBLOCK
from time import time, sleep
from random import randrange
import sys
import Logger
import SamBuffer
class SingleSocket:
def __init__(self, raw_server, sock, handler, logger):
self.raw_server = raw_server
# sock is (id, dest)
self.id = sock[0]
self.dest = sock[1]
self.handler = handler
self.logger = logger
self.buffer = StringIO()
self.last_hit = time()
self.connected = False
def connect(self):
self.connected = True
def close(self):
"""Initiate socket closing."""
self.raw_server.logger.debug("Sending close to %d", self.id)
# uncomment to dump the stack to the logs
#data = StringIO()
#print_stack(file = data)
#self.raw_server.logger.debug(data.getvalue())
self.raw_server._sendClose(self.id)
def get_ip(self):
return self.dest
def shutdown(self, val):
self.raw_server.logger.debug("Sending shutdown to %d, level %d", self.id, val)
self.raw_server._sendClose(self.id)
def is_flushed(self):
return self.buffer.tell() == 0
def write(self, s):
self.raw_server.logger.debug("Writing to buffer of %d, length %d", self.id, len(s))
self.buffer.write(s)
def try_write(self):
if self.connected:
self.buffer.seek(0)
while True:
nmax = 32768
block = self.buffer.read(nmax)
if len(block)>0:
self.raw_server.logger.debug("Sending data to %d, length %d", self.id, len(block))
self.raw_server._sendData(self.id, block)
else:
self.buffer.reset()
self.buffer.truncate()
return
def default_error_handler(x):
print x
class SamServer:
def __init__(self, doneflag, bind, timeout_check_interval, timeout,
sam_bridge, tunnel_depth, tunnel_number, length_variance,
noisy = True, errorfunc = default_error_handler,
maxconnects = 55):
# default sam config
self.samhost = sam_bridge.split(':')[0]
self.samport = int(sam_bridge.split(':')[1])
self.version = '1.0'
self.tunnel_depth = tunnel_depth
self.tunnel_number = tunnel_number
self.length_variance = length_variance
self.connected = False
self.handshaked = False
self.bound = False
if not bind:
self.sessionname = 'I2P-BT-%d' % randrange(2 ** 30)
else:
self.sessionname = bind
self.mydest = None
self.samBuffer = SamBuffer.SamBuffer()
self.outBuffer = []
self.timeout_check_interval = timeout_check_interval
self.timeout = timeout
self.single_sockets = {}
self.dead_from_write = []
self.doneflag = doneflag
self.noisy = noisy
self.errorfunc = errorfunc
self.maxconnects = maxconnects
self.funcs = []
self.unscheduled_tasks = []
self.maxId = 0
self._connect()
self.add_task(self.scan_for_timeouts, timeout_check_interval)
def _connect(self):
"""Connect to SAM."""
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.setblocking(0)
try:
self.sock.connect_ex((self.samhost, self.samport))
except socket.error:
raise
except Exception, e:
raise socket.error(str(e))
self.handshaked = False
def _disconnect(self):
"""Close the SAM connection."""
sleep(0.01)
self.sock.close()
self.connected = False
def _recv(self):
"""Read data from SAM and add it to the buffer."""
data = self.sock.recv(100000)
self.logger.debug("Receiving data from SAM, length %d", len(data))
if data == '':
self._disconnect()
else:
self.samBuffer.add(data)
def _send(self, message):
"""Add a message to the out buffer."""
self.outBuffer.append(message)
def _sendHandshake(self):
self._send("HELLO VERSION MIN=%s MAX=%s\n" % (self.version, self.version))
self.wantCommand = True
def _sendClose(self, id):
self._send("STREAM CLOSE ID=%d\n" % id)
def _sendConnect(self, id, dest):
self._send("STREAM CONNECT ID=%d DESTINATION=%s\n" % (id, dest))
def _sendData(self, id, msg):
self._send("STREAM SEND ID=%d SIZE=%d\n%s" % (id, len(msg), msg))
def _sendBuffer(self):
"""Now really send the buffer"""
try:
while self.outBuffer != []:
self.logger.debug("Sending data to SAM, length %d", len(self.outBuffer[0]))
amount = self.sock.send(self.outBuffer[0])
if amount != len(self.outBuffer[0]):
if amount != 0:
self.outBuffer[0] = self.outBuffer[0][amount:]
break
del self.outBuffer[0]
except socket.error, e:
code, msg = e
if code != EWOULDBLOCK:
self.logger.error("SAM connection error %d: %s", code, msg)
return
def _on_message(self, msg, kwargs, data):
"""Call the proper _on_* handler"""
#print msg, kwargs
name = '_on_' + msg.upper().replace(' ', '_')
getattr(self, name)(kwargs, data)
def _on_HELLO_REPLY(self, kwargs, data):
self.handshaked = True
tunnel_parameters = ' inbound.nickname=%s outbound.nickname=%s' % (self.sessionname, self.sessionname)
tunnel_parameters += ' inbound.length=%s' % self.tunnel_depth
tunnel_parameters += ' outbound.length=%s' % self.tunnel_depth
if self.tunnel_number:
tunnel_parameters += ' inbound.quantity=%s' % self.tunnel_number
tunnel_parameters += ' outbound.quantity=%s' % self.tunnel_number
tunnel_parameters += ' inbound.lengthVariance=%s' % self.length_variance
tunnel_parameters += ' outbound.lengthVariance=%s' % self.length_variance
self._send("SESSION CREATE STYLE=STREAM DESTINATION=%s DIRECTION=BOTH%s\n"
% (self.sessionname, tunnel_parameters))
def _on_SESSION_STATUS(self, kwargs, data):
if not self.bound:
self.bound = True
# Look up your destination
self._send("NAMING LOOKUP NAME=ME\n")
def _on_NAMING_REPLY(self, kwargs, data):
if not self.mydest and kwargs['RESULT'] == 'OK' and kwargs['NAME'] == 'ME':
self.mydest = kwargs['VALUE'] + '.i2p'
else:
print "Huh, what did we ask?"
def _on_STREAM_CONNECTED(self, kwargs, data):
id = int(kwargs['ID'])
dest= kwargs['DESTINATION']
self.logger.debug("Inbound connection at %d from %s", id, dest)
s = SingleSocket(self, (id, dest), self.handler, self.logger)
s.connect()
self.single_sockets[id] = s
self.handler.external_connection_made(s)
def _on_STREAM_RECEIVED(self, kwargs, data):
id = int(kwargs['ID'])
if self.single_sockets.has_key(id):
self.logger.debug("Received data for %d, length %d", id, len(data))
s = self.single_sockets.get(id)
s.last_hit = time()
s.handler.data_came_in(s, data)
else:
self.logger.debug("Connection already went away (stream received) for %d", id)
def _on_STREAM_CLOSED(self, kwargs, data):
id = int(kwargs['ID'])
if self.single_sockets.has_key(id):
self.logger.debug("Connection with %d closed", id)
s = self.single_sockets[id]
self._close_socket(s)
else:
self.logger.debug("Connection already went away (stream closed) for %d", id)
def _on_STREAM_STATUS(self, kwargs, data):
id = int(kwargs['ID'])
if self.single_sockets.has_key(id):
s = self.single_sockets.get(id)
if kwargs['RESULT'] == 'OK':
self.logger.debug("Outbound connection to %d established", id)
s.connect()
else:
self.logger.debug("Outbound connection to %d failed %s", id, kwargs['RESULT'])
self._close_socket(s)
else:
self.logger.debug("Connection already went away (stream status) for %d", id)
def add_task(self, func, delay):
self.unscheduled_tasks.append((func, delay))
def scan_for_timeouts(self):
self.add_task(self.scan_for_timeouts, self.timeout_check_interval)
t = time() - self.timeout
tokill = []
for s in self.single_sockets.values():
if s.last_hit < t:
tokill.append(s)
for k in tokill:
self.logger.debug("Connection with %d closing due to timeout", k.id)
k.close()
def bind(self, port = '', reuse = False):
"""Specify the sessionname return the destination. blocks evily"""
self.logger = Logger.Logger(self.sessionname)
self.logger.info("Binding to '%s'", self.sessionname)
while self.mydest == None:
self.loopOnce()
self.logger.info("Bound on %s", self.mydest)
return self.mydest
def start_connection(self, dns, handler = None):
"""Connect to a remote location. Returns a SingleSocket"""
if handler == None:
handler = self.handler
self.maxId += 1
# ignore the port
dest = dns[0]
# Remove Azureus style .i2p
if len(dest) >= 256 and dest[-4:] == '.i2p':
dest = dest[:-4]
self.logger.debug("Connecting on %d to %s", self.maxId, dest)
self._sendConnect(self.maxId, dest)
s = SingleSocket(self, (self.maxId, dest), self.handler, self.logger)
self.single_sockets[self.maxId] = s
s.connect()
return s
def loopOnce(self, timeout=1.0):
"""Do a single poll loop"""
sock = self.sock
# check if there is data
if self.outBuffer != [] or not self.connected:
r = [sock]
w = [sock]
e = [sock]
else:
r = [sock]
w = []
e = [sock]
# see http://mail.python.org/pipermail/python-dev/2000-October/009671.html
while True:
try:
(R, W, E) = select.select(r, w, e, timeout)
except select.error, v:
if v[0] != EINTR:
raise
else:
break
if sock in R:
# receive incoming data
self._recv()
if sock in W:
# initiate handshake upon connection
if not self.connected:
self.connected = True
self._sendHandshake()
# sending buffer to SAM
self._sendBuffer()
if sock in E:
self.logger.error("SAM connection error")
# handle messages
for m, a, d in self.samBuffer.getMessages():
self._on_message(m, a, d)
# look if some socket has something to write
for s in self.single_sockets.values():
if s.connected and not s.is_flushed():
s.try_write()
if s.is_flushed():
s.handler.connection_flushed(s)
def pop_unscheduled(self):
try:
while True:
(func, delay) = self.unscheduled_tasks.pop()
insort(self.funcs, (time() + delay, func))
except IndexError:
pass
def listen_forever(self, handler):
self.handler = handler
try:
while not self.doneflag.isSet():
try:
self.pop_unscheduled()
if len(self.funcs) == 0:
period = 2 ** 30
else:
period = self.funcs[0][0] - time()
if period < 0:
period = 0
# network stuff
self.loopOnce(period)
# handle events
while len(self.funcs) > 0 and self.funcs[0][0] <= time():
garbage, func = self.funcs[0]
del self.funcs[0]
func()
#self._close_dead()
#self.handle_events(events)
#if self.doneflag.isSet():
# return
#self._close_dead()
except KeyboardInterrupt:
print_exc()
return
finally:
# for ss in self.single_sockets.values():
# ss.close()
self._disconnect()
def _close_dead(self):
while len(self.dead_from_write) > 0:
old = self.dead_from_write
self.dead_from_write = []
for s in old:
self._close_socket(s)
def _close_socket(self, s):
if s:
id = s.id
del self.single_sockets[id]
s.connected = False
# uncomment to dump the stack to the logs
#data = StringIO()
#print_stack(file = data)
#self.logger.debug(data.getvalue())
s.handler.connection_lost(s)
else:
self.logger.debug("Socket already went away (close_socket)")
class DummyHandler:
def __init__(self):
pass
def main():
from threading import Event
fa = Event()
es = SamServer(fa, None, 100, 100)
handler = DummyHandler()
dest = es.bind(2134)
print dest
es.listen_forever(handler)
if __name__ == '__main__':
main()