ich schreibe gerade eine kleine Anwendung in Python:
Eine Waage ist per Serial-Port an den Ras angeschlossen. Ein Handy soll später per NFC Befehle an den Ras geben, der diese an die Waage weiterleitet, und das Ergebnis per NFC ans Handy zurück gibt (Bsp: <gib mir aktuelles Gewicht>).
Ich habe jetzt bereits die SW für die serielle Abfrage fertig(die NFC-Befehle werden bisher noch gefakt und einfach in die tx-queue geschrieben). Ich habe das ganze als Modul aufgebaut, sodass man es von einem anderen Skript aus einfach aufrufen kann.
Ich würde gerne jemanden der mehr Erfahrung hat mal über den QT schauen lassen, weil ich gerne Tipps für die Implementierung hätte, was man anders &| besser machen kann (ich habe selbst fast keine Programmier-Erfahung und hab die SW einfach nach gut dünken erstellt).
Hier also der QT:
die __init__.py des Packets:
Code: Alles auswählen
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import ser_ras
__version__ = '1.0'
__title__ = "tol"
__description__ = "Python package for communication between toledo-scale, a nfc-device and the raspberry pi"
__author__ = "Work group"
__email__ = "xx@yy"
__licence__ = "yy"
__copyright__ = "yy"
~
Code: Alles auswählen
# tol/ser_ras.py
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
import time
import serial
import Queue
import threading
class Ser():
line = '++++++++++++++++++++++++++++++++++++++++++++++++++++++++++'
err = 1
inf = 2
suc = 3
ser_rate = 9600
ser_port = '/dev/ttyUSB0'
ser_timeout = 1
txq_size = 100
rxq_size = 100
t_active = True
starttime = 0
t_max_time = 7
cmd = {
'w_std' : b's\n',
'w_std1' : b's\n',
'w_std2' : b's\n',
}
#-----------------------------------------------------------------------
# Constructor inits queues from arg outside
def __init__(self,txq,rxq):
self.txq = txq
self.rxq = rxq
self.ser = self.ser_init()
# Init the serial-connection
def ser_init(self):
try:
return serial.Serial(Ser.ser_port, Ser.ser_rate, timeout=Ser.ser_timeout)
except:
Ser.m(Ser.err,"Could not set up serial-port")
# My own <print> implementation
def m(self, type, m):
if type == Ser.err:
print("---\t" + m)
elif type == 2:
print("###\t" + m)
elif type == 3:
print("+++\t" + m)
else:
Ser.m(Ser.err, "Wrong message type.")
#-----------------------------------------------------------------------
# This is the serial-worker. It checks tx-queue to pass commands to the scale and puts the answer to the rx-queue
def ser_worker(self):
Ser.m(self, Ser.suc, "Serial-Worker-Thread started successfully")
# While loop to end the threads automatically
while Ser.t_active and (time.time()-Ser.starttime < Ser.t_max_time):
try:
# Get command from the tx-queue
req = self.txq.get(timeout=Ser.ser_timeout)
# Write the command to the scale(on serial)
self.ser.write(req)
# Read the answer from the scale(serial)
res = self.ser.readline()
# Put the result to rx-queue
self.rxq.put(res)
Ser.m(self, Ser.inf, res)
except:
pass
Ser.m(self, Ser.suc, "Serial-Worker-Thread stopped")
exit
# This is the NFC-worker. It just puts the same cmd to the tx-queue as long there is no real connection and app to/on a mobile
def nfc_worker(self):
Ser.m(self, Ser.suc, "NFC-Worker-Thread started successfully")
while Ser.t_active and (time.time()-Ser.starttime < Ser.t_max_time):
# Write the "give me current weight" cmd to tx-queue and wait 1s
self.txq.put(Ser.cmd['w_std'])
time.sleep(1)
Ser.m(self, Ser.suc, "NFC-Worker-Thread stopped")
exit
#-----------------------------------------------------------------------
def main(self):
Ser.starttime = time.time()
# Initialise the serial- & worker-thread
self.ser_wor = threading.Thread(target=self.ser_worker, name='t1', args=[])
self.nfc_wor = threading.Thread(target=self.nfc_worker, name='t2', args=[])
# Start the threads
self.ser_wor.start()
self.nfc_wor.start()
# Wait for threads to be finish
self.ser_wor.join()
self.nfc_wor.join()
# Clsoe serial-connection
self.ser.close()
return
#-----------------------------------------------------------------------
Code: Alles auswählen
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
import time
import serial
import Queue
import threading
import tol
txq = Queue.Queue()
rxq = Queue.Queue()
i = tol.ser_ras.Ser(txq,rxq)
i.main()