Das untenstehende Beispiel ist eine Implementierung fuer einen Laserscanner, der ueber eine serielle Schnittstelle angesprochen wird.
Dort verwende ich aus technischen Gruende threading fuer das lesen statt async io, aber das sollte sich nicht allzu schwer umbauen lassen.
Natuerlich ist das spezifisch fuer das SCIP2.0 protocol das dieser Scanner spricht. Deine AT-Kommandos haben eine andere Struktur (nur eine statt zwei newlines). Das beantwortet dann IMHO auch die Frage wie generalisierbar das ist: geht so. Braucht man die Erfahrung mit mehreren Devices, um zu sehen, was man wo wie extrahieren kann.
Edit: die readloop_started Logik ist natuerlich etwas knoedelig, besser waere eine Condition.
Code: Alles auswählen
import os
import sys
import logging
import threading
import time
from functools import wraps
import serial
from math import cos, sin, pi
logger = logging.getLogger(__name__)
def world_coordinates(m, x=0, y=0):
res = []
for i in xrange(len(m[0])):
w, d = m[0][i], m[1][i]
if d > 0:
res.append((cos(w) * d + x, sin(w) * d + y))
return res
def validate_checksum(data):
checksum = data[-1]
data = data[:-1]
a = (sum(ord(c) for c in data) & 0x3f) + 0x30
return chr(a) == checksum
def four_c_enc(v):
return c_enc(v, 4)
def c_enc(v, count):
chars = []
for _ in xrange(count):
a = v & 0x3f
chars.append(chr(a + 0x30))
v = v >> 6
return "".join(reversed(chars))
def four_c_dec(data):
assert len(data) == 4
return c_dec(data)
def c_dec(data):
v = 0
for c in data:
v <<= 6
v += ord(c) - 0x30
return v
def assertion_safe(func):
@wraps(func)
def _d(*args, **kwargs):
try:
return func(*args, **kwargs)
except AssertionError:
pass
return _d
class InvalidMeasurementException(Exception):
pass
class Scanner(object):
"""
Implementation for the URG 04LX UG01
"""
RESET, ON, OFF, ERROR = "reset", "on", "off", "error"
FIRST_STEP = 44
LAST_STEP = 725
CENTER_STEP = 385
PITCH_ANGLE = 0.36 # resolution of rays
CALIBRATION_FACTOR = 0.001 # scanner delivers data in mm, we need m
SPEED = 4000000 # serial USB speed
def __init__(self, port, rotation=0.0, position=(0, 0)):
self.position = position
self.state = None
self.answers = []
self.known_commands = {}
self.read_chunk = ""
self.error_code = None
self.measurement = None
self.echo = False
self.reader = None
self.then = time.time()
self.conn = serial.Serial(port, self.SPEED)
self.listeners = []
self.idle_listeners = []
self.setup_measurement_matrix(
rotation,
self.FIRST_STEP,
self.CENTER_STEP,
self.LAST_STEP,
self.PITCH_ANGLE,
)
for name in dir(self):
if name.startswith("cmd_"):
command = name[4:].upper()
self.known_commands[command] = getattr(self, name)
self.start_reading()
self.reset()
def add_listener(self, listener):
self.listeners.append(listener)
def add_idle_listener(self, listener):
self.idle_listeners.append(listener)
def setup_measurement_matrix(
self,
rotation,
first_step,
center_step,
last_step,
pitch_angle,
):
"""
Create the polar coordinate matrix for each measurement
"""
m = self.measurement = [
[0 for _ in xrange(last_step - first_step + 1)]
for _ in xrange(2)]
rot_rad = rotation / 180.0 * pi
for step in xrange(first_step, last_step + 1):
m[0][step - first_step] = (
(step - center_step) * pitch_angle) \
/ 180.0 * pi + rot_rad
def start_reading(self):
readloop_started = []
def readloop():
readloop_started.append(True)
while True:
self.read()
self.reader = threading.Thread(target=readloop)
self.reader.setDaemon(True)
self.reader.start()
while not readloop_started:
pass
logger.info("readloop started")
def read(self):
blocksize = self.conn.inWaiting()
if not blocksize:
for listener in self.idle_listeners:
listener()
time.sleep(.01)
return
block = self.conn.read(blocksize)
#logger.debug("read block with %i bytes", len(block))
self.read_chunk += block
if "\n\n" not in self.read_chunk:
return
parts = self.read_chunk.split("\n\n")
# the last few bytes are from a following command,
# or "", so reassign to chunk
self.read_chunk = parts[-1]
for command_candidate in parts[:-1]:
self.process_command(command_candidate)
def process_command(self, command):
prefix = command[:2]
if prefix not in self.known_commands:
self.report_malformed_command(command)
else:
#logger.debug("processing command from scanner: %s", prefix)
self.known_commands[prefix](command)
@assertion_safe
def process_measurment_lines(self, lines):
numbers = []
for line in lines:
if not validate_checksum(line):
raise InvalidMeasurementException(line)
numbers.append(line[:-1]) # cut of checksum
encoded_data = "".join(numbers)
assert (len(encoded_data) % 3) == 0
measurements = []
for offset in xrange(0, len(encoded_data), 3):
measurements.append(c_dec(encoded_data[offset:offset + 3]))
assert len(measurements) == self.LAST_STEP - self.FIRST_STEP + 1, \
(len(measurements), self.LAST_STEP - self.FIRST_STEP + 1)
# update only the distances
mview = self.measurement[1]
for i, m in enumerate(measurements):
if m > 19:
mview[i] = m * self.CALIBRATION_FACTOR
else:
mview[i] = -1.0
world_coords = world_coordinates(self.measurement, *self.position)
elapsed = time.time() - self.then
self.then += elapsed
#logger.debug("measurement_updated, %.2f", 1.0 / elapsed)
for listener in self.listeners:
listener(world_coords)
def send(self, message):
data = "%s\n" % message
#logger.debug("sending message %r", message)
self.conn.write(data)
# Here come the command-implementations
def cmd_rs(self, _command):
self.state = self.RESET
def cmd_bm(self, command):
_prefix, rest = command.split("\n", 1)
if not validate_checksum(rest):
self.report_malformed_command(command)
return
status = rest[:2]
if status == "00" or status == "02":
self.state = self.ON
else:
self.state = self.ERROR
def cmd_qt(self, command):
_prefix, rest = command.split("\n", 1)
if rest != "00P":
self.report_malformed_command(command)
return
self.state = self.OFF
def cmd_ss(self, command):
_prefix, rest = command.split("\n", 1)
if rest != "00P":
self.state = self.ERROR
return
def cmd_gd(self, command):
_start_step = four_c_dec(command[2:6])
_end_step = four_c_dec(command[6:10])
_cluster_count = command[10:12]
rest = command.split("\n", 1)[1]
error = rest[:3]
if not validate_checksum(error):
self.report_malformed_command(command)
if error[:2] != "00":
self.state = self.ERROR
self.error_code = error[:2]
return
# we have a proper result
lines = rest.split("\n")[2:] # cut off error & timestamp
self.process_measurment_lines(lines)
def cmd_md(self, command):
lines = command.split("\n")
error = lines[1]
# this happens the first time after a MD command is received
if error == "00P":
self.state = self.ON
return
if error != "99b":
self.state = self.ERROR
return
# skip over header, error, timestamp
try:
self.process_measurment_lines(lines[3:])
except InvalidMeasurementException:
self.report_malformed_command("MD failed") #command)
def wait_state(self, state, timeout):
then = time.time() + timeout
while state != self.state and time.time() < then:
time.sleep(.1)
if state != self.state:
logger.warn(
"Waited for state %s but am in %s, timed out",
state,
self.state,
)
@classmethod
def report_malformed_command(cls, command):
logger.error("Malformed command: %r", command[:20])
def reset(self):
self.send("RS")
def hispeed(self):
self.send("SS750000")
def laser_on(self):
self.send("BM")
def laser_off(self):
self.send("QT")
def stop(self):
self.reset()
def measure(self):
count = 0
while self.state != self.RESET:
count +=1
time.sleep(.1)
logger.warn("Not yet in RESET state, waiting")
if (count % 100) == 0:
logger.warn("Re-issuing reset-command")
self.reset()
msg = "MD%04i%04i%02i000" % (self.FIRST_STEP, self.LAST_STEP, 0)
self.send(msg)
@classmethod
def locate_scanner_device(cls):
basedir = "/dev/serial/by-id"
candidates = [
candidate
for candidate in os.listdir(basedir)
if "URG" in candidate
]
assert len(candidates) == 1
return os.path.join(basedir, candidates[0])
def main():
logging.basicConfig(
stream=sys.stderr,
level=logging.DEBUG,
)
device_file = Scanner.locate_scanner_device()
logger.info("device: %s", device_file)
scanner = Scanner(device_file, None, {})
while scanner.state != Scanner.RESET:
pass
logger.info("scanner is reset, aquiring measurement")
scanner.measure()
while scanner.state != Scanner.ON:
if scanner.state == Scanner.ERROR:
raise Exception("Error")
#print "waiting for ON"
count = 0
then = time.time()
while True or count < 2 and scanner.state == Scanner.ON:
time.sleep(.1)
continue
updated, _, _ = scanner.current_measurement
if updated:
elapsed = time.time() - then
then += elapsed
logger.info("new measurement, rate: %.2f", 1.0 / elapsed)
count += 1
print "turning scanner off"
if scanner.state == Scanner.ERROR:
print "error: ", scanner.error_code
scanner.laser_off()
while scanner.state == Scanner.ON:
pass
print "resetting"
scanner.reset()
while scanner.state != Scanner.RESET:
pass
if __name__ == "__main__":
main()