Python 2 Skript für Python 3 kompatibel machen
Verfasst: Dienstag 9. August 2016, 16:27
Hallo alle zusammen,
ich möchte das folgende Python Skript mit Python3 benutzen. Mit Python2 funktioniert dies einwandfrei nur bei der Version 3 kommt beim Ausführen folgende Fehlermeldung:
error: uncaptured python exception, closing channel <__main__.Client connected 192.168.0.31:58773 at 0xb69a7450> (<class 'TypeError'>:Can't convert 'bytes' object to str implicitly [netio_server.py|readwrite|108] [/usr/lib/python3.4/asyncore.py|handle_read_event|442] [netio_server.py|handle_read|46])
Ich hoffe Ihr könnt mir helfen diese ab zu ändern.
Wäre das modul "gevent" besser als asyncore?
Viele Grüße
ich möchte das folgende Python Skript mit Python3 benutzen. Mit Python2 funktioniert dies einwandfrei nur bei der Version 3 kommt beim Ausführen folgende Fehlermeldung:
error: uncaptured python exception, closing channel <__main__.Client connected 192.168.0.31:58773 at 0xb69a7450> (<class 'TypeError'>:Can't convert 'bytes' object to str implicitly [netio_server.py|readwrite|108] [/usr/lib/python3.4/asyncore.py|handle_read_event|442] [netio_server.py|handle_read|46])
Code: Alles auswählen
# -*- encoding: utf-8 -*-
import asyncore
import socket
import select
import RPi.GPIO as GPIO
import subprocess
#Initial GPIO-setup
GPIO.setwarnings(False)
GPIO.cleanup()
# to use Raspberry Pi board pin numbers
GPIO.setmode(GPIO.BCM)
# For LED1 we use pin 4 according BCM pin count
# (see https://projects.drogon.net/raspberry-p ... pins/)
LED1 = 4
GPIO.setup(LED1, GPIO.OUT)
# For Switch input we use pin 30 according BCM pin count
Radio = 30
# set up GPIO input with pull-up control
# (pull_up_down be PUD_OFF, PUD_UP or PUD_DOWN, default PUD_OFF)
GPIO.setup(30, GPIO.IN, pull_up_down=GPIO.PUD_DOWN) #GPIO 30 als Input setzen
Port = 1882
class Client(asyncore.dispatcher_with_send):
def __init__(self, socket=None, pollster=None):
asyncore.dispatcher_with_send.__init__(self, socket)
self.data = ''
if pollster:
self.pollster = pollster
pollster.register(self, select.EPOLLIN)
def handle_close(self):
if self.pollster:
self.pollster.unregister(self)
def handle_read(self):
receivedData = self.recv(8192)
if not receivedData:
self.close()
return
receivedData = self.data + receivedData
while '\n' in receivedData:
line, receivedData = receivedData.split('\n',1)
self.handle_command(line)
self.data = receivedData
def handle_command(self, line):
if line == 'LED1 on':
self.send('on\n')
subprocess.Popen("sudo python3 radio_on.py", shell=True)
elif line == 'LED1 off':
self.send('off\n')
subprocess.Popen("sudo python3 radio_off.py", shell=True)
elif line == 'get name':
mpd_name = subprocess.Popen(['mpc', 'current', '-f', '[%name%]'], stdout=subprocess.PIPE).communicate()[0].decode('utf-8').strip()
self.send(str(mpd_name) + "\n")
elif line == 'get title':
mpd_title = subprocess.Popen(['mpc', 'current', '-f', '[%title%]'], stdout=subprocess.PIPE).communicate()[0].decode('utf-8').strip()
self.send(str(mpd_title) + "\n")
elif line == 'get status':
print ('Input Status:', GPIO.input(Radio))
if not GPIO.input(Radio):
self.send('AN on\n')
print ('Read GPIO 0 result On')
else:
self.send('off AUS\n')
print ('Read GPIO 0 result Off')
elif line.startswith('set Volume to'):
value = str(line[14:17])
print('Eingestellte Lautstärke: ' + value + '%')
subprocess.Popen('mpc volume ' + value, shell=True, stdout=subprocess.PIPE).communicate()
self.send('Ok\n')
elif line == 'get volume':
mpd_volume = subprocess.Popen(['mpc', 'volume'], stdout=subprocess.PIPE).communicate()[0].decode('utf-8').strip()
print (mpd_volume)
self.send(str(mpd_volume[7:10]) + "\n")
# ende if
else:
self.send('unbekannter Befehl\n')
print ('unbekannter Befehl:', line)
class Server(asyncore.dispatcher):
def __init__(self, listen_to, pollster):
asyncore.dispatcher.__init__(self)
self.pollster = pollster
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.bind(listen_to)
self.listen(5)
def handle_accept(self):
newSocket, address = self.accept()
print ("Connected from", address)
Client(newSocket,self.pollster)
def readwrite(obj, flags):
try:
if flags & select.EPOLLIN:
obj.handle_read_event()
if flags & select.EPOLLOUT:
obj.handle_write_event()
if flags & select.EPOLLPRI:
obj.handle_expt_event()
if flags & (select.EPOLLHUP | select.EPOLLERR | select.POLLNVAL):
obj.handle_close()
except socket.error as e:
if e.args[0] not in asyncore._DISCONNECTED:
obj.handle_error()
else:
obj.handle_close()
except asyncore._reraised_exceptions:
raise
except:
obj.handle_error()
class EPoll(object):
def __init__(self):
self.epoll = select.epoll()
self.fdmap = {}
def register(self, obj, flags):
fd = obj.fileno()
self.epoll.register(fd, flags)
self.fdmap[fd] = obj
def unregister(self, obj):
fd = obj.fileno()
del self.fdmap[fd]
self.epoll.unregister(fd)
def poll(self):
evt = self.epoll.poll()
for fd, flags in evt:
yield self.fdmap[fd], flags
if __name__ == "__main__":
pollster = EPoll()
pollster.register(Server(("",Port),pollster), select.EPOLLIN)
while True:
evt = pollster.poll()
for obj, flags in evt:
readwrite(obj, flags)
Wäre das modul "gevent" besser als asyncore?
Viele Grüße
