Python 2 Skript für Python 3 kompatibel machen

Python auf Einplatinencomputer wie Raspberry Pi, Banana Pi / Python für Micro-Controller
Antworten
robse
User
Beiträge: 4
Registriert: Dienstag 9. August 2016, 16:21

Hallo alle zusammen,
ich möchte das folgende Python Skript mit Python3 benutzen. Mit Python2 funktioniert dies einwandfrei nur bei der Version 3 kommt beim Ausführen folgende Fehlermeldung:

error: uncaptured python exception, closing channel <__main__.Client connected 192.168.0.31:58773 at 0xb69a7450> (<class 'TypeError'>:Can't convert 'bytes' object to str implicitly [netio_server.py|readwrite|108] [/usr/lib/python3.4/asyncore.py|handle_read_event|442] [netio_server.py|handle_read|46])

Code: Alles auswählen

# -*- encoding: utf-8 -*-
import asyncore
import socket
import select
import RPi.GPIO as GPIO
import subprocess

#Initial GPIO-setup
GPIO.setwarnings(False)
GPIO.cleanup()

# to use Raspberry Pi board pin numbers
GPIO.setmode(GPIO.BCM)

# For LED1 we use pin 4 according BCM pin count
# (see https://projects.drogon.net/raspberry-p ... pins/&#41;
LED1 = 4
GPIO.setup(LED1, GPIO.OUT)

# For Switch input we use pin 30 according BCM pin count
Radio = 30

# set up GPIO input with pull-up control
#   (pull_up_down be PUD_OFF, PUD_UP or PUD_DOWN, default PUD_OFF)
GPIO.setup(30, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)   #GPIO 30 als Input setzen

Port = 1882

class Client(asyncore.dispatcher_with_send):
   def __init__(self, socket=None, pollster=None):
       asyncore.dispatcher_with_send.__init__(self, socket)
       self.data = ''
       if pollster:
           self.pollster = pollster
           pollster.register(self, select.EPOLLIN)

   def handle_close(self):
       if self.pollster:
           self.pollster.unregister(self)

   def handle_read(self):
       receivedData = self.recv(8192)
       if not receivedData:
           self.close()
           return
       receivedData = self.data + receivedData
       while '\n' in receivedData:
           line, receivedData = receivedData.split('\n',1)
           self.handle_command(line)
       self.data = receivedData

   def handle_command(self, line):
       if line == 'LED1 on':
           self.send('on\n')
           subprocess.Popen("sudo python3 radio_on.py", shell=True)
       elif line == 'LED1 off':
           self.send('off\n')
           subprocess.Popen("sudo python3 radio_off.py", shell=True)
       elif line == 'get name':
           mpd_name = subprocess.Popen(['mpc', 'current', '-f', '[%name%]'], stdout=subprocess.PIPE).communicate()[0].decode('utf-8').strip()
           self.send(str(mpd_name) + "\n")
       elif line == 'get title':
           mpd_title = subprocess.Popen(['mpc', 'current', '-f', '[%title%]'], stdout=subprocess.PIPE).communicate()[0].decode('utf-8').strip()
           self.send(str(mpd_title) + "\n")
       elif line == 'get status':
           print ('Input Status:', GPIO.input(Radio))
           if not GPIO.input(Radio):
               self.send('AN on\n')
               print ('Read GPIO 0 result On')
           else:
               self.send('off AUS\n')
               print ('Read GPIO 0 result Off')
       elif line.startswith('set Volume to'):
           value = str(line[14:17])
           print('Eingestellte Lautstärke: ' + value + '%')
           subprocess.Popen('mpc volume ' + value, shell=True, stdout=subprocess.PIPE).communicate()
           self.send('Ok\n')
       elif line == 'get volume':
           mpd_volume = subprocess.Popen(['mpc', 'volume'], stdout=subprocess.PIPE).communicate()[0].decode('utf-8').strip()
           print (mpd_volume)
           self.send(str(mpd_volume[7:10]) + "\n")
       # ende if
       
       else:
           self.send('unbekannter Befehl\n')
           print ('unbekannter Befehl:', line)

class Server(asyncore.dispatcher):
   def __init__(self, listen_to, pollster):
       asyncore.dispatcher.__init__(self)
       self.pollster = pollster
       self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
       self.bind(listen_to)
       self.listen(5)

   def handle_accept(self):
       newSocket, address = self.accept()
       print ("Connected from", address)
       Client(newSocket,self.pollster)

def readwrite(obj, flags):
   try:

       if flags & select.EPOLLIN:
           obj.handle_read_event()
       if flags & select.EPOLLOUT:
           obj.handle_write_event()
       if flags & select.EPOLLPRI:
           obj.handle_expt_event()
       if flags & (select.EPOLLHUP | select.EPOLLERR | select.POLLNVAL):
           obj.handle_close()
   except socket.error as e:
       if e.args[0] not in asyncore._DISCONNECTED:
           obj.handle_error()
       else:
           obj.handle_close()
   except asyncore._reraised_exceptions:
       raise
   except:
       obj.handle_error()


class EPoll(object):
   def __init__(self):
       self.epoll = select.epoll()
       self.fdmap = {}
   def register(self, obj, flags):
       fd = obj.fileno()
       self.epoll.register(fd, flags)
       self.fdmap[fd] = obj
   def unregister(self, obj):
       fd = obj.fileno()
       del self.fdmap[fd]
       self.epoll.unregister(fd)
   def poll(self):
       evt = self.epoll.poll()
       for fd, flags in evt:
           yield self.fdmap[fd], flags


if __name__ == "__main__":
   pollster = EPoll()
   pollster.register(Server(("",Port),pollster), select.EPOLLIN)
   while True:
       evt = pollster.poll()
       for obj, flags in evt:
           readwrite(obj, flags)
Ich hoffe Ihr könnt mir helfen diese ab zu ändern.
Wäre das modul "gevent" besser als asyncore?

Viele Grüße :)
Zuletzt geändert von Anonymous am Dienstag 9. August 2016, 16:54, insgesamt 1-mal geändert.
Grund: Quelltext in Python-Codebox-Tags gesetzt.
BlackJack

@robse: Du musst halt sauber zwischen `str` und `bytes` trennen und an den entsprechenden Stellen umwandeln mit der jeweiligen `encode()`- oder `decode()`-Methode. Je nach Richtung in die gewandelt werden muss.
BlackJack

@robse: Vielleicht noch ein paar Anmerkungen zum Quelltext: Die Einrückung entspricht nicht der Konvention von vier Leerzeichen pro Ebene. Und um binäre Operationen und nach Kommas erhöht ein Leerzeichen die Lesbarkeit. Macht man in Prosatext ja auch so.

Zwischen Funktionsname und der öffnenen Klammer für einen Aufruf gehört hingegen kein Leerzeichen.

Die Import-Zeile für `GPIO` ist umständlich weil man GPIO so zweimal da stehen hat.

Auf Modulebene gehört in der Regel nur Code der Konstanten, Funktionen, und Klassen definiert. Keiner der Effekte hat, und schon gar keiner der irgendwelche Hardware konfiguriert oder gar voraussetzt. Module sollten ohne Effekte importierbar sein. Das braucht man für automatisierte Tests und einige Werkzeuge, beispielsweise für statische Analyse oder Dokumentation, erwarten das so. Ein paar Zeilen vom Programmanfang und alles unter dem ``if __name__ …`` sollten also in einer Funktion verschwinden. Die heisst üblicherweise `main()`.

Konstantennamen werden per Konvention komplett gross geschreiben. `Port` und `Radio` sind schreibweisen für Klassennamen.

`LED1` wird in dem Programm gar nicht wirklich verwendet‽ `Radio` wird definiert, im Code wird dann an einer Stelle aber trotzdem der literale Wert noch einmal verwendet, statt die Konstante zu verwenden. Zudem stimmen die Ausgaben nicht mit dem Code überein. Beim ein- und ausschalten des Radios gibt das Programm aus es würde die LED ein-/ausgeschaltet und bei der Abfrage des Status behauptet es Pin 0 wird geprüft, dabei ist es Pin 30.

Einige Kommentare sind überflüssig weil sie nur noch einmal das offensichtliche sagen was da schon im Quelltext steht. Kommentare sollten dem Code gegenüber einen Mehrwert bieten. Faustregel: Nicht kommentieren *was* passiert, das steht schon im Code, sondern *warum* — sofern das nicht offensichtlich ist.

Schlimmer als überflüssige Kommentare sind falsche Kommentare, weil man dann nicht weiss wer Recht hat, der Kommentar oder der Code. Zum Beispiel wenn im Kommentar steht der Pin würde mit einem Pull-Up-Widerstand konfiguriert, der Code dann aber ganz eindeutig das Gegenteil macht.

In `__init__()` und `handle_close()` von `Client` ist ein Fehler: Wenn das Attribut `pollster` nicht gesetzt wird, und das kann in der `__init__()` passieren, dann führt der Versuch das Attribut in `handle_close()` abzufragen zu einer Ausnahme. Man muss das Attribut also in jedem Fall in der `__init__()` einführen. Was sowieso sauberer ist.

Das `shell`-Argument von `Popen` sollte man nicht auf `True` setzen solange man das nicht tatsächlich braucht. Und selbst dann sollte man gut überlegen ob man das *tatsächlich* braucht, oder ob man das nicht anders lösen kann. Bei den Aufrufen im Programm gibt es jedenfalls keinen Grund.

Die beiden ersten Aufrufe von `Popen` haben zudem das Problem das sie Zombie-Prozesse verursachen können, weil der Rückgabecode des externen Prozesses nicht abgefragt wird. Das `subprocess`-Modul hat da auch Funktionen die einem Arbeit abnehmen.

Der Code dürfte entgehen Deiner Behauptung so nicht problemlos unter Python 2 laufen. Der dekodiert die Ausgabe von einigen ``mpc``-Aufrufen zu `unicode` und wendet danach dann einfach `str()` darauf an ohne eine Kodierung anzugeben. An der Stelle wird es krachen wenn in einem Titel etwas ausserhalb von ASCII vorkommt, beispielsweise ein Umlaut.

Die Antworten des Servers auf 'get status' sind komisch. An und Aus auf deutsch und englisch und mal deutsch zuerst und mal englisch, eins komplett gross geschrieben, das andere komplett klein; was soll das? Warum bei 'set Volume to' das Volume gross geschrieben bei 'get volume' dann aber klein? Das ist eine verwirrende API. Die `print()`-Ausgaben sind auch mal auf Deutsch und mal auf Englisch formuliert.

Beim setzen und abfragen der Lautstärke ist der `str()`-Aufruf in Python 2 überflüssig. Und in Python 3 würde ich dafür eher `decode()` verwenden.

Die ``if``\s im ``try``-Block von `readwrite()` schliessen sich gegenseitig aus, also sollte man dort von ``elif`` gebrauch machen.

Ich lande dann ungetestet bei so etwas als Zwischenergebnis:

Code: Alles auswählen

#!/usr/bin/env python
# -*- encoding: utf-8 -*-
from __future__ import absolute_import, division, print_function
import asyncore
import select
import socket
import subprocess

from RPi import GPIO

# For LED1 we use pin 4 according BCM pin count
# (see https://projects.drogon.net/raspberry-p ... pins/&#41;
LED1_PIN = 4

# For Switch input we use pin 30 according BCM pin count
RADIO_PIN = 30

PORT = 1882


class Client(asyncore.dispatcher_with_send):

    def __init__(self, sock=None, pollster=None):
        asyncore.dispatcher_with_send.__init__(self, sock)
        self.data = ''
        self.pollster = pollster
        if self.pollster:
            pollster.register(self, select.EPOLLIN)

    def handle_close(self):
        if self.pollster:
            self.pollster.unregister(self)

    def handle_read(self):
        received_data = self.recv(8192)
        if not received_data:
            self.close()
            return
        received_data = self.data + received_data
        while '\n' in received_data:
            line, received_data = received_data.split('\n', 1)
            self.handle_command(line)
        self.data = received_data

    def handle_command(self, line):
        if line == 'LED1 on':
            subprocess.check_call(['sudo', 'python3', 'radio_on.py'])
            self.send('on\n')
        elif line == 'LED1 off':
            subprocess.check_call(['sudo', 'python3', 'radio_off.py'])
            self.send('off\n')
        elif line == 'get name':
            mpd_name = subprocess.check_output(
                ['mpc', 'current', '-f', '[%name%]']
            ).strip()
            self.send(mpd_name + '\n')
        elif line == 'get title':
            mpd_title = subprocess.check_output(
                ['mpc', 'current', '-f', '[%title%]']
            ).strip()
            self.send(mpd_title + '\n')
        elif line == 'get status':
            status = not GPIO.input(RADIO_PIN)
            print('Input Status:', status)
            if status:
                print('Read GPIO {0} result On'.format(RADIO_PIN))
                self.send('AN on\n')
            else:
                print('Read GPIO {0} result Off'.format(RADIO_PIN))
                self.send('off AUS\n')
        elif line.startswith('set volume to'):
            value = line[14:17]  # FIXME Magic numbers.
            print('Eingestellte Lautstaerke: {0}%'.format(value))
            subprocess.check_call(['mpc', 'volume', value])
            self.send('ok\n')
        elif line == 'get volume':
            mpd_volume = subprocess.check_output(['mpc', 'volume']).strip()
            print(mpd_volume)
            self.send(mpd_volume[7:10] + '\n')  # FIXME Magic numbers.
        else:
            self.send('unbekannter Befehl\n')
            print('unbekannter Befehl:', line)


class Server(asyncore.dispatcher):

    def __init__(self, listen_to, pollster):
        asyncore.dispatcher.__init__(self)
        self.pollster = pollster
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.bind(listen_to)
        self.listen(5)

    def handle_accept(self):
        sock, address = self.accept()
        print('Connected from', address)
        Client(sock, self.pollster)


def communicate(obj, flags):
    try:
        if flags & select.EPOLLIN:
            obj.handle_read_event()
        elif flags & select.EPOLLOUT:
            obj.handle_write_event()
        elif flags & select.EPOLLPRI:
            obj.handle_expt_event()
        elif flags & (select.EPOLLHUP | select.EPOLLERR | select.POLLNVAL):
            obj.handle_close()
    except socket.error as error:
        if error.args[0] not in asyncore._DISCONNECTED:
            obj.handle_error()
        else:
            obj.handle_close()
    except asyncore._reraised_exceptions:
        raise
    except:
        obj.handle_error()


class EPoll(object):

    def __init__(self):
        self.epoll = select.epoll()
        self.fdmap = {}

    def register(self, obj, flags):
        fd = obj.fileno()
        self.epoll.register(fd, flags)
        self.fdmap[fd] = obj

    def unregister(self, obj):
        fd = obj.fileno()
        del self.fdmap[fd]
        self.epoll.unregister(fd)

    def poll(self):
        for fd, flags in self.epoll.poll():
            yield self.fdmap[fd], flags


def main():
    GPIO.setwarnings(False)  # TODO Why? Fix then instead of ignoring them!
    try:
        GPIO.setmode(GPIO.BCM)
        GPIO.setup(LED1_PIN, GPIO.OUT)
        GPIO.setup(RADIO_PIN, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)

        pollster = EPoll()
        pollster.register(Server(('', PORT), pollster), select.EPOLLIN)
        while True:
            for obj, flags in pollster.poll():
                communicate(obj, flags)
    finally:
        GPIO.cleanup()


if __name__ == '__main__':
    main()
Die `print()`-Anweisungen würde ich durch `logging` ersetzen.

Eventuell nimmt einem die `asynchat.async_chat`-Klasse ein bisschen Arbeit ab.
robse
User
Beiträge: 4
Registriert: Dienstag 9. August 2016, 16:21

Hallo BlackJack,
die Anmerkungen mit den Magic numbers habe ich nun anders gelöst. Funktionieren tut das Skript unter Python2 korrekt.
Nur unter Python3 bekomme ich es auch nach langem ausprobieren nicht zum laufen. Und was ich genau für asynchat abändern muss, kapier ich auch noch nicht so ganz.
Mit programmieren in Python steh ich noch ganz am Anfang.

Die Leerzeichen werden beim Einfügen des Codes hier falsch angezeigt. Diese habe ich in diesem und dem letzten Skript eingehalten. :)
Ich hoffe du hast Zeit und Lust mir zu einem sauberen Code zu verhelfen. :D

Code: Alles auswählen

#!/usr/bin/env python
# -*- encoding: utf-8 -*-
from __future__ import absolute_import, division, print_function
import asyncore
import select
import socket
import subprocess
import re
from RPi import GPIO

RADIO_PIN = 30

PORT = 1882


class Client(asyncore.dispatcher_with_send):

    def __init__(self, sock=None, pollster=None):
        asyncore.dispatcher_with_send.__init__(self, sock)
        self.data = ''
        self.pollster = pollster
        if self.pollster:
            pollster.register(self, select.EPOLLIN)

    def handle_close(self):
        if self.pollster:
            self.pollster.unregister(self)

    def handle_read(self):
        received_data = self.recv(8192)
        if not received_data:
            self.close()
            return
        received_data = self.data + received_data
        while '\n' in received_data:
            line, received_data = received_data.split('\n', 1)
            self.handle_command(line)
        self.data = received_data

    def handle_command(self, line):
        if line == 'mpd1 reboot':
            subprocess.check_call(['sudo', 'reboot'])
            self.send('reboot\n')
        if line == 'radio on':
            subprocess.check_call(['sudo', 'python3', 'radio_on_mpd.py'])
            self.send('on\n')
        elif line == 'radio off':
            subprocess.check_call(['sudo', 'python3', 'radio_off.py'])
            self.send('off\n')
        elif line == 'get name':
            mpd_name = subprocess.check_output(
                ['mpc', 'current', '-f', '[%name%]']
            ).strip()
            self.send(mpd_name + '\n')
        elif line == 'get title':
            mpd_title = subprocess.check_output(
                ['mpc', 'current', '-f', '[%title%]']
            ).strip()
            self.send(mpd_title + '\n')
        elif line == 'get status':
            status = not GPIO.input(RADIO_PIN)
            if status:
                print('Read GPIO {0} result On'.format(RADIO_PIN))
                self.send('on\n')
            else:
                print('Read GPIO {0} result Off'.format(RADIO_PIN))
                self.send('off\n')
        elif line.startswith('set volume to'):
            value = re.search("[0-9]+", line).group()
            print('Eingestellte Lautstärke: {0}%'.format(value))
            subprocess.check_call(['mpc', 'volume', value])
            self.send('ok\n')
        elif line == 'get volume':
            mpd_volume = subprocess.check_output(['mpc', 'volume']).strip()
            print(mpd_volume)
            self.send(re.search("[0-9]+", mpd_volume).group() + '\n')
        #Radio Sender
        elif line == 'jamfm':
            subprocess.check_call(['mpc', 'play', '1'])
            self.send('JAM FM\n')
        elif line == 'bigfm':
            subprocess.check_call(['mpc', 'play', '2']) 
            self.send('BIG FM\n')
        elif line == 'dasding':
            subprocess.check_call(['mpc', 'play', '3'])
            self.send('DASDING\n')
        elif line == 'energy':
            subprocess.check_call(['mpc', 'play', '4'])
            self.send('Energy\n')
        elif line == 'antenne1':
            subprocess.check_call(['mpc', 'play', '5'])
            self.send('Antenne 1\n')
        else:
            self.send('unbekannter Befehl\n')
            print('unbekannter Befehl:', line)


class Server(asyncore.dispatcher):

    def __init__(self, listen_to, pollster):
        asyncore.dispatcher.__init__(self)
        self.pollster = pollster
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.bind(listen_to)
        self.listen(5)

    def handle_accept(self):
        sock, address = self.accept()
        print('Connected from', address)
        Client(sock, self.pollster)


def communicate(obj, flags):
    try:
        if flags & select.EPOLLIN:
            obj.handle_read_event()
        elif flags & select.EPOLLOUT:
            obj.handle_write_event()
        elif flags & select.EPOLLPRI:
            obj.handle_expt_event()
        elif flags & (select.EPOLLHUP | select.EPOLLERR | select.POLLNVAL):
            obj.handle_close()
    except socket.error as error:
        if error.args[0] not in asyncore._DISCONNECTED:
            obj.handle_error()
        else:
            obj.handle_close()
    except asyncore._reraised_exceptions:
        raise
    except:
        obj.handle_error()


class EPoll(object):

    def __init__(self):
        self.epoll = select.epoll()
        self.fdmap = {}

    def register(self, obj, flags):
        fd = obj.fileno()
        self.epoll.register(fd, flags)
        self.fdmap[fd] = obj

    def unregister(self, obj):
        fd = obj.fileno()
        del self.fdmap[fd]
        self.epoll.unregister(fd)

    def poll(self):
        for fd, flags in self.epoll.poll():
            yield self.fdmap[fd], flags


def main():
    print('Skript wird ausgeführt')
    GPIO.setwarnings(False)  # TODO Why? Fix then instead of ignoring them!
    try:
        GPIO.setmode(GPIO.BCM)
        GPIO.setup(RADIO_PIN, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)

        pollster = EPoll()
        pollster.register(Server(('', PORT), pollster), select.EPOLLIN)
        while True:
            for obj, flags in pollster.poll():
                communicate(obj, flags)
    finally:
        GPIO.cleanup()


if __name__ == '__main__':
    main()
Zuletzt geändert von Anonymous am Freitag 12. August 2016, 14:49, insgesamt 1-mal geändert.
Grund: Quelltext in Python-Codebox-Tags gesetzt.
robse
User
Beiträge: 4
Registriert: Dienstag 9. August 2016, 16:21

robse hat geschrieben:Hallo BlackJack,
die Anmerkungen mit den Magic numbers habe ich nun anders gelöst. Funktionieren tut das Skript unter Python2 korrekt.
Nur unter Python3 bekomme ich es auch nach langem ausprobieren nicht zum laufen. Und was ich genau für asynchat abändern muss, kapier ich auch noch nicht so ganz.
Mit programmieren in Python steh ich noch ganz am Anfang.

Die Leerzeichen werden beim Einfügen des Codes hier falsch angezeigt. Diese habe ich in diesem und dem letzten Skript eingehalten. :)
Ich hoffe du hast Zeit und Lust mir zu einem sauberen Code zu verhelfen. :D

Code: Alles auswählen

#!/usr/bin/env python
# -*- encoding: utf-8 -*-
from __future__ import absolute_import, division, print_function
import asyncore
import select
import socket
import subprocess
import re
from RPi import GPIO

RADIO_PIN = 30

PORT = 1882


class Client(asyncore.dispatcher_with_send):

    def __init__(self, sock=None, pollster=None):
        asyncore.dispatcher_with_send.__init__(self, sock)
        self.data = ''
        self.pollster = pollster
        if self.pollster:
            pollster.register(self, select.EPOLLIN)

    def handle_close(self):
        if self.pollster:
            self.pollster.unregister(self)

    def handle_read(self):
        received_data = self.recv(8192)
        if not received_data:
            self.close()
            return
        received_data = self.data + received_data
        while '\n' in received_data:
            line, received_data = received_data.split('\n', 1)
            self.handle_command(line)
        self.data = received_data

    def handle_command(self, line):
        if line == 'mpd1 reboot':
            subprocess.check_call(['sudo', 'reboot'])
            self.send('reboot\n')
        if line == 'radio on':
            subprocess.check_call(['sudo', 'python3', 'radio_on_mpd.py'])
            self.send('on\n')
        elif line == 'radio off':
            subprocess.check_call(['sudo', 'python3', 'radio_off.py'])
            self.send('off\n')
        elif line == 'get name':
            mpd_name = subprocess.check_output(
                ['mpc', 'current', '-f', '[%name%]']
            ).strip()
            self.send(mpd_name + '\n')
        elif line == 'get title':
            mpd_title = subprocess.check_output(
                ['mpc', 'current', '-f', '[%title%]']
            ).strip()
            self.send(mpd_title + '\n')
        elif line == 'get status':
            status = not GPIO.input(RADIO_PIN)
            if status:
                print('Read GPIO {0} result On'.format(RADIO_PIN))
                self.send('on\n')
            else:
                print('Read GPIO {0} result Off'.format(RADIO_PIN))
                self.send('off\n')
        elif line.startswith('set volume to'):
            value = re.search("[0-9]+", line).group()
            print('Eingestellte Lautstärke: {0}%'.format(value))
            subprocess.check_call(['mpc', 'volume', value])
            self.send('ok\n')
        elif line == 'get volume':
            mpd_volume = subprocess.check_output(['mpc', 'volume']).strip()
            print(mpd_volume)
            self.send(re.search("[0-9]+", mpd_volume).group() + '\n')
        #Radio Sender
        elif line == 'jamfm':
            subprocess.check_call(['mpc', 'play', '1'])
            self.send('JAM FM\n')
        elif line == 'bigfm':
            subprocess.check_call(['mpc', 'play', '2']) 
            self.send('BIG FM\n')
        elif line == 'dasding':
            subprocess.check_call(['mpc', 'play', '3'])
            self.send('DASDING\n')
        elif line == 'energy':
            subprocess.check_call(['mpc', 'play', '4'])
            self.send('Energy\n')
        elif line == 'antenne1':
            subprocess.check_call(['mpc', 'play', '5'])
            self.send('Antenne 1\n')
        else:
            self.send('unbekannter Befehl\n')
            print('unbekannter Befehl:', line)


class Server(asyncore.dispatcher):

    def __init__(self, listen_to, pollster):
        asyncore.dispatcher.__init__(self)
        self.pollster = pollster
        self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
        self.bind(listen_to)
        self.listen(5)

    def handle_accept(self):
        sock, address = self.accept()
        print('Connected from', address)
        Client(sock, self.pollster)


def communicate(obj, flags):
    try:
        if flags & select.EPOLLIN:
            obj.handle_read_event()
        elif flags & select.EPOLLOUT:
            obj.handle_write_event()
        elif flags & select.EPOLLPRI:
            obj.handle_expt_event()
        elif flags & (select.EPOLLHUP | select.EPOLLERR | select.POLLNVAL):
            obj.handle_close()
    except socket.error as error:
        if error.args[0] not in asyncore._DISCONNECTED:
            obj.handle_error()
        else:
            obj.handle_close()
    except asyncore._reraised_exceptions:
        raise
    except:
        obj.handle_error()


class EPoll(object):

    def __init__(self):
        self.epoll = select.epoll()
        self.fdmap = {}

    def register(self, obj, flags):
        fd = obj.fileno()
        self.epoll.register(fd, flags)
        self.fdmap[fd] = obj

    def unregister(self, obj):
        fd = obj.fileno()
        del self.fdmap[fd]
        self.epoll.unregister(fd)

    def poll(self):
        for fd, flags in self.epoll.poll():
            yield self.fdmap[fd], flags


def main():
    print('Skript wird ausgeführt')
    GPIO.setwarnings(False)  # TODO Why? Fix then instead of ignoring them!
    try:
        GPIO.setmode(GPIO.BCM)
        GPIO.setup(RADIO_PIN, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)

        pollster = EPoll()
        pollster.register(Server(('', PORT), pollster), select.EPOLLIN)
        while True:
            for obj, flags in pollster.poll():
                communicate(obj, flags)
    finally:
        GPIO.cleanup()


if __name__ == '__main__':
    main()
Oder ist es besser die Magic numbers durch

Zeile 69

Code: Alles auswählen

value = float(line)
Zeile 76

Code: Alles auswählen

self.send(float(mpd_volume) + '\n')
zu ersetzen. Funktionieren tut es.
Antworten