Zyklische und Azyklischen Programmteil Parallel

Python auf Einplatinencomputer wie Raspberry Pi, Banana Pi / Python für Micro-Controller
rob87
User
Beiträge: 45
Registriert: Donnerstag 17. Oktober 2019, 14:24

rogerb hat geschrieben: Freitag 10. September 2021, 12:26

Code: Alles auswählen

import asyncio
import xbox
from evdev import InputDevice, categorize, ecodes, KeyEvent

Jetzt scheitert es daran das ich evdev nicht für Python 2 installiert bekommen. Hat jemand einen tipp?
"pip3" geht nicht wie im Netz geschrieben...

Antwort:

Code: Alles auswählen

sudo apt-get install python3-evdev
__deets__
User
Beiträge: 14545
Registriert: Mittwoch 14. Oktober 2015, 14:29

Python 2 benutzt man nicht mehr. Das ist seit fast 2 Jahren ohne Unterstützung und wird auch bald aus allen Distributionen fliegen.
rob87
User
Beiträge: 45
Registriert: Donnerstag 17. Oktober 2019, 14:24

rogerb hat geschrieben: Freitag 10. September 2021, 12:26 @rob87,

asyncio.gather() führt die Coroutines "read_count" und "blinker_write" quasi paralellel aus und wartet bis beide abgeschlossen sind. Man verwendet gather() nur wenn mehrere Couroutines von der Eventloop ausgeführt werden sollte. Dasist hier der Fall.
Bei dir wurden die Coroutines nacheinander ausgeführt, was natürlich keine Kommunikation über die Queue erlaubt.

Code: Alles auswählen

#...
async def read_cont(dev, eventspeicher_queue):
	async for ev in dev.async_read_loop():
		# Binaer input
		print(repr(ev))
		if ev.type == ecodes.EV_KEY:
			# Input Type , ID und Wert verketten
			queuestring = "K " + ev.code + " " + categorize(ev).keystate
			eventspeicher_queue.put(queuestring)


#...
async def manager(dev):
    inputspeicher = asyncio.Queue()
    await asyncio.gather(blinker_write(inputspeicher), read_cont(dev, inputspeicher))


# Eventlistener auf Bluetoothcontoller anlagen und definieren.
dev = InputDevice("/dev/input/event1")

asyncio.run(manager(dev))

Ich habe den Code Probiert... Leider bekommen ich keine ausgabe. Mit dem Code
Hier ist ja so ein Beispiel:
https://python-evdev.readthedocs.io/en/ ... ng-asyncio
Kommt zumindest eine Ausgabe.

Also schein das Aufrufkonstrukt nicht zu passen. Was ich nicht verstehe sind die einzelnen Elemente .gather() /.run_until_complete() /.get_event_loop() / .async_read_loop().

nach meinem Verständnis:
gather: baut ein Konstrukt von Funktionen das unabhängig davon ob Sie schleifen enthalten "parallel" abgearbeitet werden
run_until_complete: aktiviert eine Funktion und lässt sie bis zum Ende durchlaufen (pause werden für andere Funktionen genutzt)
get_event_loop: Startet eine Eventbasierte Funktion. Zeiträume ohne Event werden für andere Funktionen genutzt
async_read_loop: liest Hardwareeingaben...

aber wie sind die zu verschachteln um die eingangs erwähnte Funktion zu realisieren? (eine Event- und eine Interrupt - gesteuerte Funktion laufen lassen mit konsistenter Datenübergabe?)

Danke!
__deets__
User
Beiträge: 14545
Registriert: Mittwoch 14. Oktober 2015, 14:29

Ich würde dir raten hier https://realpython.com/async-io-python/ anzufangen, um das Grundprinzip zu verstehen. Und dann nochmal rogerb’s Beispiele betrachten, denn die beantworten deine Frage.
rob87
User
Beiträge: 45
Registriert: Donnerstag 17. Oktober 2019, 14:24

__deets__ hat geschrieben: Dienstag 14. September 2021, 21:09 Ich würde dir raten hier https://realpython.com/async-io-python/ anzufangen, um das Grundprinzip zu verstehen. Und dann nochmal rogerb’s Beispiele betrachten, denn die beantworten deine Frage.
Ich habe mir das mal durchgelesen.
get_event_loop() in kombinaton mit run_until_complete ist die alte variante. Neu wäre einfach asyncio.run
gather erstellt einfach eine Aufgabeliste und ist die moderne Form von create_task.


Code: Alles auswählen

import asyncio
import random

async def read_controller():# controller input lesen und bildschirmausgabe
    i = 0
    while i <= 10:
        p = random.randint(0, 10)
        await asyncio.sleep(p*0.1)
        print(str(i) + "controller schläft für " + str(p))
        i= i + 1


async def blinker():#Blinklicht simulieren
    letzt_zustand = False
    while True:
        if letzt_zustand:
            print("on")
            letzt_zustand=False
        else:
            print("off")
            letzt_zustand=True
        await asyncio.sleep(1)

async def main():
    await asyncio.gather(read_controller(), blinker())

asyncio.run(main())

Heute abend probiere ich das mal mit dem inputevent:

Code: Alles auswählen

from evdev import InputDevice, categorize, ecodes, KeyEvent
import asyncio
import random

async def read_controller():# controller input lesen und bildschirmausgabe
	gamepad = InputDevice('/dev/input/event0')
		for event in gamepad.async_read_loop():
			print(event.code)
		
async def blinker():#Blinklicht simulieren
	letzt_zustand = False
	while True:
		if letzt_zustand:
			print("on")
			letzt_zustand=False
		else:
			print("off")
			letzt_zustand=True
		await asyncio.sleep(1)
		
async def main():
	await asyncio.gather(read_controller(), blinker())
	
asyncio.run(main())

rob87
User
Beiträge: 45
Registriert: Donnerstag 17. Oktober 2019, 14:24

Hiho,

geiler Scheiss, jetzt gehts hier der Code:

Code: Alles auswählen

#Controllerobjekt 

A=0.0
B=0.0

def InputKEY(id,val): #input kann 0.0 oder 1.0 sein
	global A,B
	if id==304:
		A=val
	elif id==305:
		B=val
Teil 2 ("main")

Code: Alles auswählen

from evdev import InputDevice, categorize, ecodes, KeyEvent
import asyncio
import Xbox_V2 as controller
gamepad = InputDevice('/dev/input/event0')

async def read_controller(eventspeicher_queue):# controller input lesen und bildschirmausgabe
	async for event in gamepad.async_read_loop():
		if event.type == 1:
			await eventspeicher_queue.put(event.value)
	
async def blinker(eventspeicher_queue):#Blinklicht simulieren
	letzt_zustand = False
	while True:
		while not eventspeicher_queue.empty():
			controller.InputKEY(304,eventspeicher_queue.get_nowait())
		
		if controller.A == 1:
			if letzt_zustand:
				print("on")
				letzt_zustand=False
			else:
				print("off")
				letzt_zustand=True
			await asyncio.sleep(0.9)
		await asyncio.sleep(0.1)
		
async def main():
	eventspeicher = asyncio.Queue()
	await asyncio.gather(read_controller(eventspeicher), blinker(eventspeicher))
	
asyncio.run(main())
rogerb
User
Beiträge: 878
Registriert: Dienstag 26. November 2019, 23:24

@rob87,

das mit dem Controller object ist etwas merkwürdig. Man sollte in keinem Fall globale Variablen dafür verwenden.
Auch wenn du A und B vorgesehen hast, kann es ja nur A geben. (Sind das Buttons?)
Ich habe es nicht getestet, aber das mit dem Controller würde ich in einem Modul unterbringen:

Code: Alles auswählen

import asyncio
from evdev import InputDevice, categorize, ecodes, KeyEvent

gamepad = InputDevice("/dev/input/event0")


def toggle_blinker_state(state):
    if state == "on":
        return "off"
    else:
        return "on"


async def read_controller(eventspeicher_queue):  # controller input lesen und bildschirmausgabe
    async for event in gamepad.async_read_loop():
        if event.type == 1:
            await eventspeicher_queue.put(event.value)


async def blinker(eventspeicher_queue):  # Blinklicht simulieren
    letzt_zustand = "off"
    while True:
        event_value = eventspeicher_queue.get() # warte bis etwas in die queue gekommen ist

        if event_value == 1:
            letzt_zustand == toggle_blinker_state(letzt_zustand)
            print(letzt_zustand)
            await asyncio.sleep(0.9)

        await asyncio.sleep(0.1)


async def main():
    eventspeicher = asyncio.Queue()
    await asyncio.gather(read_controller(eventspeicher), blinker(eventspeicher))


asyncio.run(main())
Antworten