ich habe zwei while-Schleifen, welche ich gerne gleichzeitig laufen lassen würde. Unten der Code
Verfasst: Freitag 14. Oktober 2022, 09:25
async def run(address):
async with BleakClient(address) as bleakclient:
def my_page_handler(data):
pass
print(data)
ret=client.publish("Python/MQTT",str(data))
#time.sleep(10)
#Verbindung mit Bleak Client aufbauen
print("Verbindungsaufbau mit Trainer")
await bleakclient.is_connected()
trainer = TacxTrainerControl(bleakclient)
#trainer.set_specific_trainer_data_page_handler(my_page_handler)
trainer.set_general_fe_data_page_handler(my_page_handler)
await trainer.enable_fec_notifications()
print("Verbunden mit Trainer")
#resistance = on_message
#Takt der Schleife in Sekunden
duration = 1
try:
#Dauerschleife
while 1:
#Methode zum setzen des Wiederstands
await trainer.set_basic_resistance(resistance)
print("set %s"%resistance)
await asyncio.sleep(duration)
break
except KeyboardInterrupt:
pass
await trainer.disable_fec_notifications()
# For in-place text-mode output
logging.basicConfig(level=logging.DEBUG)
# For in-place text-mode output
screen = TextScreen()
def text_format_8_ch(digits, volts):
digits_str = ", ".join([f"{i: 8d}" for i in digits])
volts_str = ", ".join([f"{i: 8.3f}" for i in volts])
text = (" AIN0, AIN1, Winkelsensor, AIN3, "
#" AIN4, AIN5, AIN6, AIN7\n"
f"{digits_str}\n\n"
"Values converted to volts:\n"
f"{volts_str}\n"
)
ret=client.publish("Lenkwinkel",str(volts_str))
return text
POTI = POS_AIN0 | NEG_AINCOM
LDR = POS_AIN1 | NEG_AINCOM
CH2 = POS_AIN2 | NEG_AINCOM
CH3 = POS_AIN3 | NEG_AINCOM
#CH4 = POS_AIN4 | NEG_AINCOM
#CH5 = POS_AIN5 | NEG_AINCOM
#CH6 = POS_AIN6 | NEG_AINCOM
#CH7 = POS_AIN7 | NEG_AINCOM
CH_SEQUENCE = POTI, LDR, CH2, CH3, #CH4, CH5, CH6, CH7
def loop_forever_measurements(ads):
while True:
raw_channels = ads.read_sequence(CH_SEQUENCE)
voltages = [i * ads.v_per_digit for i in raw_channels]
screen.put(text_format_8_ch(raw_channels, voltages))
screen.refresh()
time.sleep(5)
try:
with ADS1256() as ads:
ads.drate = DRATE_100
ads.cal_self()
loop_forever_measurements(ads)
except KeyboardInterrupt:
print("\nUser Exit.\n")
if __name__ == "__main__":
#Debugmeldungen aktivieren
os.environ["PYTHONASYNCIODEBUG"] = str(1)
#MAC Addresse von Trainer
device_address = "XX:XX:XX:XX:XX:XX"
#Starten der asynchronen Schleife mit Verbindung zu Trainer und MQTT Server
loop = asyncio.get_event_loop()
loop.run_until_complete(run(device_address))
"""
x = threading.Thread(target = run, args=(1,))
y = threading.Thread(target = loop_forever_measurements, args=(1,))
x.start()
y.start()
"""
######
Leider springt er nur in eine der beiden while-Schleifen. Ich habe es auch mit multiprocessing versucht und auch da kein Erfolg. Könnte mir jemand ein Beispiel geben wie ich beide while schleifen am besten mit Threads laufen lassen könnte?
async with BleakClient(address) as bleakclient:
def my_page_handler(data):
pass
print(data)
ret=client.publish("Python/MQTT",str(data))
#time.sleep(10)
#Verbindung mit Bleak Client aufbauen
print("Verbindungsaufbau mit Trainer")
await bleakclient.is_connected()
trainer = TacxTrainerControl(bleakclient)
#trainer.set_specific_trainer_data_page_handler(my_page_handler)
trainer.set_general_fe_data_page_handler(my_page_handler)
await trainer.enable_fec_notifications()
print("Verbunden mit Trainer")
#resistance = on_message
#Takt der Schleife in Sekunden
duration = 1
try:
#Dauerschleife
while 1:
#Methode zum setzen des Wiederstands
await trainer.set_basic_resistance(resistance)
print("set %s"%resistance)
await asyncio.sleep(duration)
break
except KeyboardInterrupt:
pass
await trainer.disable_fec_notifications()
# For in-place text-mode output
logging.basicConfig(level=logging.DEBUG)
# For in-place text-mode output
screen = TextScreen()
def text_format_8_ch(digits, volts):
digits_str = ", ".join([f"{i: 8d}" for i in digits])
volts_str = ", ".join([f"{i: 8.3f}" for i in volts])
text = (" AIN0, AIN1, Winkelsensor, AIN3, "
#" AIN4, AIN5, AIN6, AIN7\n"
f"{digits_str}\n\n"
"Values converted to volts:\n"
f"{volts_str}\n"
)
ret=client.publish("Lenkwinkel",str(volts_str))
return text
POTI = POS_AIN0 | NEG_AINCOM
LDR = POS_AIN1 | NEG_AINCOM
CH2 = POS_AIN2 | NEG_AINCOM
CH3 = POS_AIN3 | NEG_AINCOM
#CH4 = POS_AIN4 | NEG_AINCOM
#CH5 = POS_AIN5 | NEG_AINCOM
#CH6 = POS_AIN6 | NEG_AINCOM
#CH7 = POS_AIN7 | NEG_AINCOM
CH_SEQUENCE = POTI, LDR, CH2, CH3, #CH4, CH5, CH6, CH7
def loop_forever_measurements(ads):
while True:
raw_channels = ads.read_sequence(CH_SEQUENCE)
voltages = [i * ads.v_per_digit for i in raw_channels]
screen.put(text_format_8_ch(raw_channels, voltages))
screen.refresh()
time.sleep(5)
try:
with ADS1256() as ads:
ads.drate = DRATE_100
ads.cal_self()
loop_forever_measurements(ads)
except KeyboardInterrupt:
print("\nUser Exit.\n")
if __name__ == "__main__":
#Debugmeldungen aktivieren
os.environ["PYTHONASYNCIODEBUG"] = str(1)
#MAC Addresse von Trainer
device_address = "XX:XX:XX:XX:XX:XX"
#Starten der asynchronen Schleife mit Verbindung zu Trainer und MQTT Server
loop = asyncio.get_event_loop()
loop.run_until_complete(run(device_address))
"""
x = threading.Thread(target = run, args=(1,))
y = threading.Thread(target = loop_forever_measurements, args=(1,))
x.start()
y.start()
"""
######
Leider springt er nur in eine der beiden while-Schleifen. Ich habe es auch mit multiprocessing versucht und auch da kein Erfolg. Könnte mir jemand ein Beispiel geben wie ich beide while schleifen am besten mit Threads laufen lassen könnte?