Da ich mich zurzeit mit dem ESP32 und MicroPython beschäftige, setze ich uasyncio ein.
Oftmals hat man gar keine andere Möglichkeit, da auf manchen Controllern nur ein bis zwei Threads möglich sind. Der ESP32 müsste schätzungsweise 14 Threads können (abhängig vom Speicher).
Jedenfalls hat man oft das Problem, dass man Sachen nebenläufig machen möchte. Es soll z.B. gleichzeitig eine Bewegung und ein Countdown stattfinden.
Das ist prädestiniert für asyncio.
Das Beispiel erfordert Python 3.11. Da auch die ExceptionGroups mit drin sind, kommt es bei Python 3.10 zu einem Syntaxfehler.
Mal ein Beispiel (heißt nicht, dass du es so umsetzen sollst).
Vieles dort ist unnötig, da auch Code zum Testen vorhanden ist.
Wobei, das kann man auch im Hinterkopf behalten, so kann man einfacher Unittests durchführen, ohne echte CNC.
Dann würde ich das aber mit socat nicht machen, sondern Python dazu nutzen.
(im Modul os kann man pty anlegen, hatte aber keine Lust mich jetzt damit auseinanderzusetzen)
Code: Alles auswählen
from __future__ import annotations
import asyncio
from enum import StrEnum
from random import uniform
from typing import Self
from serial_asyncio import open_serial_connection
PROG = [
"MOVE Z UP",
"MOVE XY",
"MOVE Z DOWN",
"MOVE Z UP",
]
async def handler(sr: asyncio.StreamReader, sw: asyncio.StreamWriter) -> None:
while True:
line = (await sr.readline()).decode("ascii")
print()
print(f"Serial: {line}")
async def countdown(n) -> None:
for count in range(n, 0, -1):
print("\33[2K\r", end="")
print("Countdown", count, end="", flush=True)
await asyncio.sleep(1)
print()
class SpindleState(StrEnum):
CW_ON = "Clockwise on"
OFF = "OFF"
class CNC:
RPM = 6000
def __init__(self):
self._error = False
self._direction = None
self._speed = None
# für test
self._sr: asyncio.StreamReader | None = None
self._sw: asyncio.StreamWriter | None = None
async def open(self):
# verhindern, dass zwei mal geöffnet wird
if self._sr is not None:
return
# zum Testen, da ich keine CNC habe
self.server = await asyncio.start_server(handler, host="127.0.0.1", port=8000)
# nur 3 Versuche den lokalen pty zu starten
for versuch in range(3):
self.socat = await asyncio.create_subprocess_exec(
"socat",
"PTY,link=csTTY1,echo=0,wait-slave",
"TCP:127.0.0.1:8000,retry=5",
)
if self.socat.returncode is not None:
print(f"Versuch {versuch}: SOCAT ist abgestützt")
# Die serielle Verbindung
self._sr, self._sw = await open_serial_connection(url="csTTY1", baudrate=115200)
async def __aenter__(self) -> Self:
"""
Der asynchrone Kontextmanager öffnet den seriellen port und
startet die Aufwärmphase
"""
await self.open()
await self._warmup()
return self
async def __aexit__(self, exc_type, exc_obj, exc_tb) -> None:
"""
Der asynchrone Kontextmanager stoppt beim Verlassen
des Kontexts die Spindel der CNC, sofern die Kommunikation
funktioniert.
"""
await self.spindle_state(SpindleState.OFF)
print("Motor ausgeschaltet")
# csTTY1 terminieren
self.socat.terminate()
self.server.close()
await self.server.wait_close()
async def _warmup(self) -> None:
"""
Aufwärmphase
"""
# Anstatt asyncio.gather soll nun asyncio.TaskGroup verwendet werden
# Die TaskGroup stell sicher, dass darin alle Tasks ausgeführt werden
# und auftretende Fehler werden gesammelt und erhalten.
async with asyncio.TaskGroup() as tg:
tg.create_task(self.spindle_speed())
tg.create_task(self.spindle_state(SpindleState.CW_ON))
# Ab hier sind alle Tasks in der TaskGroup fertig
print("Warmlauf gestartet")
for rpm in (11000, 15000):
await countdown(10)
await self.spindle_speed(rpm)
async def spindle_speed(self, rpm=None) -> int | None:
"""
Hole oder setze Geschwindigkeit der Spindel.
"""
if rpm is None:
return self._speed
if self._error:
raise RuntimeError("Kann Spindelstatus nicht setzen.")
rpm = rpm or self.RPM
await asyncio.sleep(0) # Kontrolle abgeben
print("Setze Geschwindigkeit:", rpm)
if self._sw:
self._sw.write(f"SET SPEED {rpm}\r\n".encode("ascii"))
await self._sw.drain()
self._speed = rpm
return rpm
async def spindle_state(self, direction=None) -> SpindleState | None:
"""
Hole oder setze den Status der Spindel.
"""
if direction is None:
return self._direction
if self._error:
raise RuntimeError("Kann Spindelgeschwindigkeit nicht setzen.")
await asyncio.sleep(0) # Kontrolle abgeben
print("Setze Drehrichtung:", direction)
if self._sw:
self._sw.write(f"SET DIRECTION {direction}\r\n".encode("ascii"))
await self._sw.drain()
self._direction = direction
return direction
async def do_command(self, command: str) -> None:
await asyncio.sleep(0) # Kontrolle abgeben
print("Befehl:", command)
if self._sw:
self._sw.write(f"{command}\r\n".encode("ascii"))
await self._sw.drain()
await asyncio.sleep(uniform(0.5, 5)) # Verzögerung simulieren
def set_error(self):
self._error = True
async def run_programm(cnc: CNC, programm: list[str]) -> None:
for line in programm:
await asyncio.sleep(0) # Kontrolle abgeben
if line == "MOVE Z DOWN":
cnc.set_error()
raise TimeoutError("CNC antwortet nicht mehr.")
await cnc.do_command(line)
async def main():
async with CNC() as cnc:
print("Starte Programm ...")
try:
await run_programm(cnc, PROG)
except* TimeoutError:
print("CNC Antwortet nicht.")
print("Ende")
if __name__ == "__main__":
try:
asyncio.run(main())
except* RuntimeError:
print("Laufzeitfehler")
PS:
https://www.youtube.com/watch?v=02CLD-42VdI
sourceserver.info - sourceserver.info/wiki/ - ausgestorbener Support für HL2-Server