Einige erinnern sich vllt. noch, ich hatte um die Weihnachtszeit mal ein völlig schlechten Python Code von meinem kodi addon gezeigt.
Nun bin ich mal wieder dabei, den code umzuschreiben.
Aktuell geht es um eine Klasse die Serial Daten lies und entsprechend Telegramme ausgibt.
Funktion is folgt:
Es wird die Serial-Device geöffnet und zeitgleich in einem Thread geprüft, ob der Datenbus frei ist.
Dann kann man den Thread zum lesen starten.
Man kann nun ununterbrochen sich die telegramme mit der get funktion holen und entsprechend weiterverarbeiten.
Senden kann man mit der write funktion, die von unterschiedlichsten Threads aufgerufen wird.
Entsprechend muss ich zwischen high und low prio nachrichten unterscheiden, und verhindern dass keine Kollisionen entstehen.
Könnt ihr mal drüber schauen, was ich verbessern kann? Sofern es möglich ist bitte nicht gleich den ganzen Code umschreiben. Oder zumindest eine Erklärung dazu, damit ich es als Anfänger verstehe.
Code: Alles auswählen
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import serial, time
from threading import Thread
from copy import deepcopy
def log(message, log_level=1):
print '%s' % message
def set_kodi_prop(key, value):
log('%s - %s' % (key, value))
class ibusFace(object):
def __init__(self, devPath):
try:
self.SDEV = serial.Serial(devPath,
baudrate=9600,
bytesize=serial.EIGHTBITS,
parity=serial.PARITY_EVEN,
stopbits=serial.STOPBITS_ONE)
self.init_success = True
except:
self.init_success = False
return
self.read_buffer = []
self.cancel_all = False # set True to cancel to close the serial device
self.read_access = False # set False to trim packet_buffer
self.dont_send = False # is True to access high prio answer messages
self.writing = False # is True while writing
self.wait_highprio_writing = False # let wait a low prio message
self.ctsCnt = 0 # free bus counter
self.msgerrcnt = 0 # counter for read errors
set_kodi_prop('msgerr_cnt', '0')
time.sleep(0.2)
self.SDEV.setRTS(False)
self.SDEV.flushInput()
# log('SERIAL INPUT BUFFER: ' + str(self.SDEV.inWaiting())) # just for information
log('IBUS: Initialized')
# Reading Thread
self.cancel_read_thread = False
self.read_thread = Thread(target=self._reading)
self.read_thread.setDaemon(True)
# Free Bus Thread
self.cancel_cts_thread = False
self.cts_thread = Thread(target=self._check_free_bus)
self.cts_thread.setDaemon(True)
self.cts_thread.start()
def _reading(self):
self.read_access = True
while not self.cancel_read_thread:
self._read_char()
time.sleep(0.001)
def _check_free_bus(self):
self.ctsCnt = 0
while not self.cancel_cts_thread:
try:
if self.SDEV.getCTS():
self.ctsCnt += 1
else:
self.ctsCnt = 0
time.sleep(0.001)
except:
time.sleep(0.001)
def wait_free_bus(self, waiting=17, timeout=1000.0):
while self.ctsCnt < waiting:
if self.cancel_cts_thread:
return False
if not timeout > 0.0:
return False
timeout -= 0.5
time.sleep(0.0005)
return True
def start_reading(self):
self.read_thread.start()
def flush_input(self):
self.SDEV.flushInput()
def set_rts(self, rts_state):
self.SDEV.setRTS(rts_state)
log('IBUS: SET RTS: ' + str(rts_state))
def _read_char(self):
try:
while self.SDEV.inWaiting() > 0:
if self.cancel_read_thread:
return
if self.read_access:
self.read_buffer.append('%02X' % ord(self.SDEV.read()))
except:
log('IBUS: Hit a serialException', 3)
return
def _cut_packet(self, range=1):
self.read_access = False
time.sleep(0.003)
self.read_buffer = self.read_buffer[range:]
self.read_access = True
def get_bus_packet(self):
packet = {'src': None,
'len': None,
'dst': None,
'dat': [],
'xor': None}
try:
dataLen = int(self.read_buffer[1], 16)
except:
return
if dataLen < 3 or dataLen > 37:
log('IBUS: Length of +37 found (%s), %s' % (dataLen, self.read_buffer), 2)
self._cut_packet()
self.msgerrcnt += 1
set_kodi_prop('msgerr_cnt', '%s' % self.msgerrcnt)
return
else:
buffer_len = len(self.read_buffer)
if buffer_len < 5 or buffer_len < dataLen + 2:
return
message = self.read_buffer[:dataLen + 2]
if self._is_packet_ok(message):
packet['src'] = message[0]
packet['len'] = message[1]
packet['dst'] = message[2]
packet['dat'] = message[3:dataLen + 1]
packet['xor'] = message[-1]
self._cut_packet(dataLen + 2)
log('IBUS-READ: %s' % ' '.join(message), 5)
return packet
log('IBUS: Corrupt packet found (%s), %s' % (dataLen, self.read_buffer), 2)
self._cut_packet()
return
def write_bus_packet(self, src, dst, data, highprio=False):
while self.dont_send:
time.sleep(0.001)
self.dont_send = highprio
if not highprio:
while self.wait_highprio_writing:
time.sleep(0.001)
self.wait_highprio_writing = True
length = '%02X' % (2 + len(data))
packet = [src, length, dst]
full_packet_as_string = ' '.join(deepcopy([src, length, dst, ' '.join(data)]))
for p in data:
packet.append(p)
for i in range(len(packet)):
packet[i] = int('0x%s' % packet[i], 16)
chk = self._get_checksum(packet)
lastInd = len(packet) - 1
packet[lastInd] = chk
if highprio:
while True:
if self._write_high_packet(packet):
break
# log('IBUS-SENT: TRY AGAIN', 2)
time.sleep(0.001)
log('IBUS-SENT: %s CK SUCCESS' % full_packet_as_string, 2)
else:
while True:
if self._write_low_packet(packet):
break
# log('IBUS-SENT: TRY AGAIN', 2)
time.sleep(0.001)
log('IBUS-SENT: %s CK SUCCESS' % full_packet_as_string, 2)
if highprio:
self.dont_send = False
self.wait_highprio_writing = False
def _write_low_packet(self, packet):
if self.writing:
return False
data = ''.join((chr(p) for p in packet))
self.writing = True
if not self.cancel_all:
if self.wait_free_bus():
if not self.dont_send:
self.ctsCnt = 0
self.SDEV.write(data)
self.ctsCnt = 0
self.SDEV.flush()
self.ctsCnt = 0
self.writing = False
return True
else:
self.writing = False
return False
else:
self.writing = False
return False
def _write_high_packet(self, packet):
if self.writing:
return False
data = ''.join((chr(p) for p in packet))
self.writing = True
if not self.cancel_all:
if self.wait_free_bus():
self.ctsCnt = 0
self.SDEV.write(data)
self.ctsCnt = 0
self.SDEV.flush()
self.ctsCnt = 0
self.writing = False
return True
else:
self.writing = False
return False
def _get_checksum(self, packet):
chk = 0
packet.append(0)
for p in packet:
chk ^= p
return chk
def _is_packet_ok(self, message):
message_tmp = deepcopy(message)
for i in range(len(message_tmp)):
message_tmp[i] = int('0x%s' % message_tmp[i], 16)
chk = self._get_checksum(message_tmp[:-1])
if chk == message_tmp[-1]:
return True
else:
return False
def write_hex_string(self, hexstring):
hexstring_tmp = hexstring.split(' ')
src = hexstring_tmp[0]
dst = hexstring_tmp[2]
datend = len(hexstring_tmp) - 1
data = hexstring_tmp[3:datend]
self.write_bus_packet(src, dst, data)
def close(self):
self.cancel_read_thread = True
self.cancel_cts_thread = True
self.cancel_all = True
time.sleep(600)
self.SDEV.close()
log('IBUS: Closed')