Parsing serial Strings mit PySerial

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
peacepipe
User
Beiträge: 5
Registriert: Mittwoch 1. November 2017, 16:05

Hallo python-Community,

ich habe eine NB-IoT Modul von U-Blox und versuche im seriellen Output nach bestimmten Strings zu parsen, um anschließend weitere serielle Befehle durchzuführen oder zur Analyse. Leider komme ich aktuell nicht weiter und weiß nicht ob es technisch überhaupt geht. :K
Wünschenswert wäre es, wenn die PySerial-Bibliothek es schafft den seriellen Output zu verwerten und direkt zu parsen ODER der serielle Output wird zwischengespeichert und danach untersucht ODER man schiebt den seriellen Output in eine extra Datei und untersucht diese später (das wäre die wohl unschönste Variante).
Vielleicht könntet ihr mir da ein klein wenig auf die Sprünge helfen.

Eingehend sind es diverse AT-Kommandos. Der Output ist dann bspw. "OK".

Beispiel eingehend: AT+NPING=<remote_address>
Beispiel output: +NPINGERR:1

Hier mein aktueller Stand:

Code: Alles auswählen

#!/usr/bin/python

#Libraries
import serial
import io
import time
import sys
import os

#configure serial Interface
ser = serial.Serial()
ser.baudrate = 9600
ser.parity = 'N'
ser.bytesize = 8
ser.stopbits = 1
ser.port = '/dev/ttyUSB0'
#timeout = 1
ser.open()

print 'Enter your commands below.\r\nInsert "exit" to leave the application.'

input=1
while 1 :
    # get keyboard input
    input = raw_input(">> ")
    if input == 'exit':
		ser.close()
		exit()
    else:
        # send the character to the device
        # (note that I happend a \r\n carriage return and line feed to the characters - this is requested by my device)
        ser.write(input + '\r\n')
        out = ''
        # let's wait one second before reading output (let's give device time to answer)
        time.sleep(1)
        while ser.inWaiting() > 0:
            out += ser.read(1)
            ser.write("AT+nping=127.0.0.1")
            if out == "NPINGERR":
            	print "Host not available"
            else:
            	print "Host available"
            
        if out != '':
			print ">>" + out

Sirius3
User
Beiträge: 17738
Registriert: Sonntag 21. Oktober 2012, 17:20

@peacepipe: da stimmt etwas mit Deiner Einrückung nicht. Einrückungen sind in Python sehr wichtig. Eingerückt wird immer mit 4 Leerzeichen pro Ebene, nicht mal 1, 2 oder 3.
Was soll das `input=1`? Dieser Wert wird nie benutzt. In Python gibt es Wahrheitswerte, es sollte also `while True:` heißen. In der inneren while-Schleife sollte wohl nur das `out += ...` stehen. Beim `write` danacht fehlt wohl der Zeilenumbruch. Weiter kann man wohl ohne die passende Hardware nicht helfen.
__deets__
User
Beiträge: 14527
Registriert: Mittwoch 14. Oktober 2015, 14:29

Technisch gehen tut es. Und zwar ohne zwischengespeicherte Dateien. Allerdings kannst du das dem serial-Modul nicht beibiegen, warum auch. Das stellt die reine Kommunikations-Ebene dar. Wenn du da Daten lesen und empfangen hast, hat es seinen Zweck erfuellt.

Alles was darueber dann stattfindet muss man halt programmieren. Da kommts jetzt auf dein Niveau eben dieser Taetigkeit an. Ich persoenlich wuerde mit Zustandsmaschinen und - so du unter Linux/macOS unterwegs bist - asynchroner IO arbeiten. Damit bekommst du mit, wann Daten an der seriellen Schnittstelle anliegen.
peacepipe
User
Beiträge: 5
Registriert: Mittwoch 1. November 2017, 16:05

Zunächst danke für Die Antwort,

also im Grunde geht es mir generell, um die Fähigkeit den seriellen Output, egal von welchem Gerät es auch sein mag, zu interpretieren. Es ist also kein hardwarespezifisches Problem. Ich würde gerne wissen, wie ich die Problemstellung angehen könnte.

@deets: Hast du vielleicht eine Beispiel-Implementierung dafür?


EDIT: Wäre das, das Mittel der Wahl?
http://pyserial-asyncio.readthedocs.io/ ... intro.html
__deets__
User
Beiträge: 14527
Registriert: Mittwoch 14. Oktober 2015, 14:29

Das untenstehende Beispiel ist eine Implementierung fuer einen Laserscanner, der ueber eine serielle Schnittstelle angesprochen wird.

Dort verwende ich aus technischen Gruende threading fuer das lesen statt async io, aber das sollte sich nicht allzu schwer umbauen lassen.

Natuerlich ist das spezifisch fuer das SCIP2.0 protocol das dieser Scanner spricht. Deine AT-Kommandos haben eine andere Struktur (nur eine statt zwei newlines). Das beantwortet dann IMHO auch die Frage wie generalisierbar das ist: geht so. Braucht man die Erfahrung mit mehreren Devices, um zu sehen, was man wo wie extrahieren kann.

Edit: die readloop_started Logik ist natuerlich etwas knoedelig, besser waere eine Condition.

Code: Alles auswählen

import os
import sys
import logging
import threading
import time
from functools import wraps

import serial
from math import cos, sin, pi

logger = logging.getLogger(__name__)


def world_coordinates(m, x=0, y=0):
    res = []
    for i in xrange(len(m[0])):
        w, d = m[0][i], m[1][i]
        if d > 0:
            res.append((cos(w) * d + x, sin(w) * d + y))
    return res


def validate_checksum(data):
    checksum = data[-1]
    data = data[:-1]
    a = (sum(ord(c) for c in data) & 0x3f) + 0x30
    return chr(a) == checksum


def four_c_enc(v):
    return c_enc(v, 4)


def c_enc(v, count):
    chars = []
    for _ in xrange(count):
        a = v & 0x3f
        chars.append(chr(a + 0x30))
        v = v >> 6
    return "".join(reversed(chars))


def four_c_dec(data):
    assert len(data) == 4
    return c_dec(data)


def c_dec(data):
    v = 0
    for c in data:
        v <<= 6
        v += ord(c) - 0x30
    return v


def assertion_safe(func):
    @wraps(func)
    def _d(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except AssertionError:
            pass
    return _d


class InvalidMeasurementException(Exception):
    pass



class Scanner(object):
    """
    Implementation for the URG 04LX UG01
    """

    RESET, ON, OFF, ERROR = "reset", "on", "off", "error"


    FIRST_STEP = 44
    LAST_STEP = 725
    CENTER_STEP = 385
    PITCH_ANGLE = 0.36 # resolution of rays
    CALIBRATION_FACTOR = 0.001 # scanner delivers data in mm, we need m

    SPEED = 4000000 # serial USB speed

    def __init__(self, port, rotation=0.0, position=(0, 0)):
        self.position = position
        self.state = None
        self.answers = []
        self.known_commands = {}
        self.read_chunk = ""
        self.error_code = None
        self.measurement = None
        self.echo = False
        self.reader = None
        self.then = time.time()
        self.conn = serial.Serial(port, self.SPEED)
        self.listeners = []
        self.idle_listeners = []

        self.setup_measurement_matrix(
            rotation,
            self.FIRST_STEP,
            self.CENTER_STEP,
            self.LAST_STEP,
            self.PITCH_ANGLE,
            )


        for name in dir(self):
            if name.startswith("cmd_"):
                command = name[4:].upper()
                self.known_commands[command] = getattr(self, name)

        self.start_reading()
        self.reset()


    def add_listener(self, listener):
        self.listeners.append(listener)


    def add_idle_listener(self, listener):
        self.idle_listeners.append(listener)


    def setup_measurement_matrix(
            self,
            rotation,
            first_step,
            center_step,
            last_step,
            pitch_angle,
            ):
        """
        Create the polar coordinate matrix for each measurement
        """
        m = self.measurement = [
            [0 for _ in xrange(last_step - first_step + 1)]
            for _ in xrange(2)]


        rot_rad = rotation / 180.0 * pi

        for step in xrange(first_step, last_step + 1):
            m[0][step - first_step] = (
                (step - center_step) * pitch_angle) \
                / 180.0 * pi + rot_rad


    def start_reading(self):
        readloop_started = []
        def readloop():
            readloop_started.append(True)
            while True:
                self.read()

        self.reader = threading.Thread(target=readloop)
        self.reader.setDaemon(True)
        self.reader.start()
        while not readloop_started:
            pass
        logger.info("readloop started")


    def read(self):
        blocksize = self.conn.inWaiting()
        if not blocksize:
            for listener in self.idle_listeners:
                listener()
            time.sleep(.01)
            return
        block = self.conn.read(blocksize)
        #logger.debug("read block with %i bytes", len(block))
        self.read_chunk += block

        if "\n\n" not in self.read_chunk:
            return

        parts = self.read_chunk.split("\n\n")
        # the last few bytes are from a following command,
        # or "", so  reassign to chunk
        self.read_chunk = parts[-1]
        for command_candidate in parts[:-1]:
            self.process_command(command_candidate)


    def process_command(self, command):
        prefix = command[:2]
        if prefix not in self.known_commands:
            self.report_malformed_command(command)
        else:
            #logger.debug("processing command from scanner: %s", prefix)
            self.known_commands[prefix](command)


    @assertion_safe
    def process_measurment_lines(self, lines):
        numbers = []

        for line in lines:
            if not validate_checksum(line):
                raise InvalidMeasurementException(line)

            numbers.append(line[:-1]) # cut of checksum

        encoded_data = "".join(numbers)

        assert (len(encoded_data) % 3) == 0

        measurements = []

        for offset in xrange(0, len(encoded_data), 3):
            measurements.append(c_dec(encoded_data[offset:offset + 3]))

        assert len(measurements) == self.LAST_STEP - self.FIRST_STEP + 1, \
          (len(measurements), self.LAST_STEP - self.FIRST_STEP + 1)


        # update only the distances
        mview = self.measurement[1]
        for i, m in enumerate(measurements):
            if m > 19:
                mview[i] = m * self.CALIBRATION_FACTOR
            else:
                mview[i] = -1.0

        world_coords = world_coordinates(self.measurement, *self.position)
        elapsed = time.time() - self.then
        self.then += elapsed
        #logger.debug("measurement_updated, %.2f", 1.0 / elapsed)
        for listener in self.listeners:
            listener(world_coords)


    def send(self, message):
        data = "%s\n" % message
        #logger.debug("sending message %r", message)
        self.conn.write(data)


    # Here come the command-implementations

    def cmd_rs(self, _command):
        self.state = self.RESET


    def cmd_bm(self, command):
        _prefix, rest = command.split("\n", 1)
        if not validate_checksum(rest):
            self.report_malformed_command(command)
            return

        status = rest[:2]

        if status == "00" or status == "02":
            self.state = self.ON
        else:
            self.state = self.ERROR


    def cmd_qt(self, command):
        _prefix, rest = command.split("\n", 1)
        if rest != "00P":
            self.report_malformed_command(command)
            return

        self.state = self.OFF


    def cmd_ss(self, command):
        _prefix, rest = command.split("\n", 1)
        if rest != "00P":
            self.state = self.ERROR
            return


    def cmd_gd(self, command):
        _start_step = four_c_dec(command[2:6])
        _end_step = four_c_dec(command[6:10])
        _cluster_count = command[10:12]

        rest = command.split("\n", 1)[1]
        error = rest[:3]

        if not validate_checksum(error):
            self.report_malformed_command(command)

        if error[:2] != "00":
            self.state = self.ERROR
            self.error_code = error[:2]
            return

        # we have a proper result
        lines = rest.split("\n")[2:] # cut off error & timestamp
        self.process_measurment_lines(lines)


    def cmd_md(self, command):
        lines = command.split("\n")
        error = lines[1]
        # this happens the first time after a MD command is received
        if error == "00P":
            self.state = self.ON
            return

        if error != "99b":
            self.state = self.ERROR
            return

        # skip over header, error, timestamp
        try:
            self.process_measurment_lines(lines[3:])
        except InvalidMeasurementException:
            self.report_malformed_command("MD failed") #command)


    def wait_state(self, state, timeout):
        then = time.time() + timeout
        while state != self.state and time.time() < then:
            time.sleep(.1)

        if state != self.state:
            logger.warn(
                "Waited for state %s but am in %s, timed out",
                state,
                self.state,
                )


    @classmethod
    def report_malformed_command(cls, command):
        logger.error("Malformed command: %r", command[:20])


    def reset(self):
        self.send("RS")


    def hispeed(self):
        self.send("SS750000")


    def laser_on(self):
        self.send("BM")


    def laser_off(self):
        self.send("QT")


    def stop(self):
        self.reset()


    def measure(self):
        count = 0
        while self.state != self.RESET:
            count +=1
            time.sleep(.1)
            logger.warn("Not yet in RESET state, waiting")
            if (count % 100) == 0:
                logger.warn("Re-issuing reset-command")
                self.reset()
        msg = "MD%04i%04i%02i000" % (self.FIRST_STEP, self.LAST_STEP, 0)
        self.send(msg)


    @classmethod
    def locate_scanner_device(cls):
        basedir = "/dev/serial/by-id"
        candidates = [
            candidate
            for candidate in os.listdir(basedir)
            if "URG" in candidate
            ]
        assert len(candidates) == 1
        return os.path.join(basedir, candidates[0])


def main():
    logging.basicConfig(
        stream=sys.stderr,
        level=logging.DEBUG,
        )
    device_file = Scanner.locate_scanner_device()
    logger.info("device: %s", device_file)
    scanner = Scanner(device_file, None, {})

    while scanner.state != Scanner.RESET:
        pass
    logger.info("scanner is reset, aquiring measurement")
    scanner.measure()

    while scanner.state != Scanner.ON:
        if scanner.state == Scanner.ERROR:
            raise Exception("Error")
        #print "waiting for ON"

    count = 0

    then = time.time()
    while True or count < 2 and scanner.state == Scanner.ON:
        time.sleep(.1)
        continue
        updated, _, _ = scanner.current_measurement
        if updated:
            elapsed = time.time() - then
            then += elapsed
            logger.info("new measurement, rate: %.2f", 1.0 / elapsed)
            count += 1

    print "turning scanner off"

    if scanner.state == Scanner.ERROR:
        print "error: ", scanner.error_code

    scanner.laser_off()

    while scanner.state == Scanner.ON:
        pass

    print "resetting"

    scanner.reset()


    while scanner.state != Scanner.RESET:
        pass


if __name__ == "__main__":
    main()
__deets__
User
Beiträge: 14527
Registriert: Mittwoch 14. Oktober 2015, 14:29

Ja, diese pyserial-asyncio sieht ganz gut aus.
peacepipe
User
Beiträge: 5
Registriert: Mittwoch 1. November 2017, 16:05

Ich habe mir mal den Code von Pyserial-Asyncio angesehen. Konkret weiß ich dennoch nicht, wie ich nach dem seriellen Input (serial.write = "AT-Kommando") direkt im Anschluss den Serial Output hole und weiter mit dem Python Script bearbeite. Die Website gibt nicht viel Informationen dazu.

Aufgefallen ist mir, dass das Asyncio-Modul nur Bytes statt Strings als Input verwerten kann. Ich müsste also zusätzlich für jeden Input+Output, diesen noch ins richtige Format bringen. Ich finde das ziemlich umständlich mit dieser Lösung, da ich vorab wissen will, welche Strings ich verwerten kann. Würde ich das mit der .decode('ascii')-Funktion lösen könnte?
Feststellen konnte ich bisher, dass ein bestimmter Byte-Code ausgespuckt wurde, ich diesen bereits mit einer IF-Abfrage prüfen und ein Event auslösen konnte.

Code: Alles auswählen

#!/usr/bin/python

import asyncio
import serial_asyncio
import sys
import codecs

class Output(asyncio.Protocol):
	def connection_made(self, transport):
		self.transport = transport
		print('port opened', transport)
		transport.serial.rts = True  # You can manipulate Serial object via transport
		transport.write(b'Hello World!\n')  # Write serial data via transport
		
	def data_received(self, data):
		print('data received', repr(data))
       # print(repr(data.decode('utf-8')))
	#	data.decode('utf-8')
		if b'\n' in data:
			print('newline')
	#		self.transport.close()
		if b'REBOOT_CAUSE_UNK' in data:
			print('parsed!')
			self.transport.write(b'AT+NUESTATS')
       
           
	def connection_lost(self, exc):
		print('port closed')
		self.transport.loop.stop()
	
	def pause_writing(self):
		print('pause writing')
		print(self.transport.get_write_buffer_size())
	
	def resume_writing(self):
		print(self.transport.get_write_buffer_size())
		print('resume writing')

loop = asyncio.get_event_loop()
coro = serial_asyncio.create_serial_connection(loop, Output, '/dev/ttyUSB0', baudrate=9600)
loop.run_until_complete(coro)
loop.run_forever()
loop.close()
Es wäre super hilfreich wenn jemand den Code aus http://pyserial-asyncio.readthedocs.io/ ... intro.html kommentieren könnte. Ich habe wohl ein Verständnisproblem.
Gibt es sonst noch andere Lösungswege? Ein Kollege meinte, ich sollte mit Threads arbeiten... ABER wie? :K

@deets: Danke für dein Programmcode aber der ist echt ein wenig zu viel für mich als Anfänger.

VG
PeacePipe
__deets__
User
Beiträge: 14527
Registriert: Mittwoch 14. Oktober 2015, 14:29

Da eine serielle Schnittstelle nur Bytes liefert verstehe ich dein Problem nicht. Ob du mit ascii dekodieren kannst, hängt von deinen Daten ab. Und mir fehlt bei deinem Post bezüglich asyncio die Frage. Was ist dir unklar?

Mein Code enthält viel Anwendungsspezifisches. Das kannst du ignorieren. Um den Kern und seine Komplexität wirst du dich aber nicht drücken können. So oder so ähnlich muss man halt arbeiten. Und er nutzt übrigens Threads.
Benutzeravatar
snafu
User
Beiträge: 6738
Registriert: Donnerstag 21. Februar 2008, 17:31
Wohnort: Gelsenkirchen

peacepipe hat geschrieben:Aufgefallen ist mir, dass das Asyncio-Modul nur Bytes statt Strings als Input verwerten kann. Ich müsste also zusätzlich für jeden Input+Output, diesen noch ins richtige Format bringen. Ich finde das ziemlich umständlich mit dieser Lösung, da ich vorab wissen will, welche Strings ich verwerten kann. Würde ich das mit der .decode('ascii')-Funktion lösen könnte?
Ein magisches Raten der Kodierung ist prinzipiell unmöglich. Wenn man das in jedem Fall sicher bestimmen könnte, dann kann man diesen Zwischenschritt ja gleich weglassen. Es hat schon seinen Sinn, warum das eingeführt wurde. Du kannst oft ASCII oder UTF-8 annehmen, aber das stimmt wie gesagt nicht immer.

Falls du innerhalb von Bytes suchen willst:

Code: Alles auswählen

b'suchwort' in dein_input
peacepipe
User
Beiträge: 5
Registriert: Mittwoch 1. November 2017, 16:05

Danke für eure Antworten.

Wenn ich das Gerät direkt über Putty oder minicom verbinde, kann ich ja auch normale ascii-Zeichen an das Gerät senden und die kommen auch zurück. Natürlich weiß ich nun nicht wie diese Tools intern funktionieren. Ist es denn so, dass die Zeichen intern in Bytes umwandeln? Dann würde das natürlich Sinn ergeben.


@Snafu: Anmerkung zu deinem Code-Schnipsel:
b'suchwort' in dein_input
also wenn ich zum Beispiel nach Buxdehude suchen würde im Bytecode und wenn es gefunden wird soll "Yippie" ausgeben werden, dann würde es so aussehen?

Code: Alles auswählen

if b'Buxdehude' in Outputdata:
   print('Yippie', b'Buxdehude')
Ich muss aber immer bestimmen, dass dieser Output auch generiert wird. Wie finde ich denn heraus, welche Kodierung verwendet wird? Im Datenblatt des Herstellers? Ich muss einfach gestehen, dass serielle Schnittstellen nicht alltäglich von mir verwendet werden und ich da ein wenig Schwierigkeiten habe :(

VG
peacepipe
Benutzeravatar
snafu
User
Beiträge: 6738
Registriert: Donnerstag 21. Februar 2008, 17:31
Wohnort: Gelsenkirchen

Hat die Ausgabe denn irgendwelche "komisch" dargestellten Zeichen? Falls nicht, dann wird ASCII wohl klar gehen. Dann kannst du es einfach wie normalen Text behandeln. Ansonsten musst du dir eben Infos zur Kodierung des Geräts besorgen. Google könnte hier auch hilfreich sein.

Und schau doch auch mal ob es eine passende Bibliothek gibt, um die Ausgabe in eine brauchbare Struktur zu bringen. Denn das wäre ja normalerweise der nächste Schritt nachdem die Kodierung geklärt ist. Unter Umständen muss man sich dafür auf andere Sprachen einlassen, falls es für Python nichts gibt.
__deets__
User
Beiträge: 14527
Registriert: Mittwoch 14. Oktober 2015, 14:29

Mit seriellen Schnittstellen hat das erstmal nichts zu tun. Alle Daten die übertragen werden sind notwendigerweise Bytes. Ob das per serieller Schnittstelle oder Festplatte oder Netzwerk geht ist unerheblich. Was diese Bytes bedeuten muss man wissen. Das heutzutage ASCII für Texte ohne Sonderzeichen der minimalstandard ist, erweckt den Eindruck es wäre anders. Ist es aber nicht.

Wenn du mit Byte-strings (b“etwas“) auskommst für dein Problem, musst du nicht dekodieren.

Und minicom übernimmt übrigens die Terminaleinstellungen (LC_*) für das Encoding. Je nachdem wie die eingestellt sind, wandelt sich ein Tastendruck auf ö in ein oder mehrere Bytes um, die dem encoding entsprechen.
Antworten