(Verständnis-) Frage zu Coroutines

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
Benutzeravatar
noisefloor
User
Beiträge: 3843
Registriert: Mittwoch 17. Oktober 2007, 21:40
Wohnort: WW
Kontaktdaten:

Hallo,

ich habe ein paar (Verständnis-) Fragen zu Coroutines. Die Grundidee dahinter habe ich verstanden (glaube ich...), praktisch habe ich die aber noch nie eingesetzt.

Heute bot sich am die Gelegenheit, diesen Code (der IMHO so wie so nicht sooo toll ist) auf eine Coroutine-basierte Version umzustricken.

Hier mein Code:

Code: Alles auswählen

import csv
from datetime import datetime
from sense_hat import SenseHat
from time import sleep

FILENAME = "tester"
WRITE_FREQUENCY = 3
DELAY=3
TEMP_H=True
TEMP_P=False
HUMIDITY=True
PRESSURE=True

def coroutine(func):
    def start(*args,**kwargs):
        cr = func(*args,**kwargs)
        next(cr)
        return cr
    return start

def get_header():
    header = []
    if TEMP_H:
        header.append('temperature_from_humidity')
    if TEMP_P:
        header.append('temperature_from_pressure')
    if HUMIDITY:
        header.append('humidity')
    if PRESSURE:
        header.append('pressure')
    header.append('date and time')
    return header

def get_sense_data(sense, delay, write_frequency, storage_target):
    data=[]
    while True:
        sense_data = []
        if TEMP_H:
            sense_data.append(sense.get_temperature_from_humidity())
        if TEMP_P:
            sense_data.append(sense.get_temperature_from_pressure())
        if HUMIDITY:
            sense_data.append(sense.get_humidity())
        if PRESSURE:
            sense_data.append(sense.get_pressure())
        sense_data.append(datetime.now())
        data.append(sense_data)
        print('added data')
        if len(data) == write_frequency:
            storage_target.send(data)
            data = []
        sleep(delay)

@coroutine
def save_to_file(writer):
    while True:
        data_to_save = (yield)
        writer.writerows(data_to_save)
        print('Wrote data')

def main():
    sense = SenseHat()
    full_filename = '{}_{}.csv'.format(FILENAME, int(datetime.now().timestamp()))
    try:
        with open(full_filename, 'w', newline='') as f:
            writer = csv.writer(f)
            header = get_header()
            writer.writerow(header)
            saver = save_to_file(writer)
            get_sense_data(sense, DELAY, WRITE_FREQUENCY, saver)
    except KeyboardInterrupt:
        saver.close()
        print('Exited programm')

if __name__ == '__main__':
    main()
Und zum testen ein Dummy-Modul für `sense_hat`:

Code: Alles auswählen

class SenseHat:

    def get_temperature_from_humidity(self):
        return 20.123

    def get_temperature_from_pressure(self):
        return 15.987

    def get_humidity(self):
        return 85

    def get_pressure(self):
        return 1001
Das ganze funktioniert auch, wie es soll.

Was mir nicht klar ist: macht das Sinn (oder Unsinn?), dass so zu implementieren (das man das auch anders, also ohne Coroutine implementieren kann ist mir klar)? Nebenläufigkeit, also dass das Schreiben in die CSV-Datei non-blocking ist, habe ich so doch auch nicht, oder? Und muss man hier die Coroutine mit `close()` schließen oder ist das im gegebenen Falle gleich?

Gruß, noisefloor
BlackJack

@noisefloor: Ich sehe hier keinen Sinn dafür. Man hätte das an der Stelle auch anders lösen können, und das wäre sicher verständlicher. Das man Generatoren als Coroutinen verwenden kann hat sich ja nicht wirklich durchgesetzt und in aktuellen Python 3 Versionen gibt es ``async def`` und ``await`` und das `asyncio`-Modul. `send()` habe ich sehr selten gesehen und `close()` und `throw()` noch überhaupt nicht ausserhalb der Dokumentation. Kann natürlich auch an mir liegen. :-)

Was bei Deinem Beispiel vielleicht mehr Sinn machen würde wären unabhängige Coroutinen für die einzelnen Sensoren und eine zum schreiben. Die dann aber die Daten nicht unbedingt per `send()` bekommt, sondern ”traditionell” über eine Queue. Und dann eine Hauptschleife die das ganze antreibt. Schreiben müsste dann nicht-blockierend passieren, ausser es ist sowieso schnell genug. In Deinem Beispiel müsste `get_sense_data()` auch eine Coroutine sein, und die darf dann nicht mit `time.sleep()` blockieren. Diesen Wunsch nach ”schlafen” müsste man über eine Coroutine regeln, die einfach die gegebene Zeit verschläft (ohne die Hauptschleife zu blockieren). `asyncio.sleep()` halt.

Nebenläufig beudeutet übrigens nicht zwingend das Sachen tatsächlich *gleichzeitig* passieren müssen.
Benutzeravatar
noisefloor
User
Beiträge: 3843
Registriert: Mittwoch 17. Oktober 2007, 21:40
Wohnort: WW
Kontaktdaten:

Hallo,

Danke für's Feedback :-) Das die Funktion `get_sense_data` "zu viel" kann hatte ich mir gedacht. Aber wenn man mit Couroutines noch nie gearbeitet hat, ist das IMHO gedanklich schwierig zu "entwerfen".

Ich habe das ganze mal auf Python >= 3.5 und asyncio umgebaut. Ist auch das 1. Mal, dass ich asyncio benutze:

Code: Alles auswählen

import asyncio
import csv
from datetime import datetime
from sense_hat import SenseHat

FILENAME = "tester"
WRITE_FREQUENCY=3
DELAY=3
TEMP_H=True
TEMP_P=False
HUMIDITY=True
PRESSURE=True

sense = SenseHat()

def get_header():
    header = []
    if TEMP_H:
        header.append('temperature_from_humidity')
    if TEMP_P:
        header.append('temperature_from_pressure')
    if HUMIDITY:
        header.append('humidity')
    if PRESSURE:
        header.append('pressure')
    header.append('date and time')
    return header

def setup_csv(writer):
    header = get_header()
    writer.writerow(header)
    print('setup CSV - done')

async def get_temp_h():
    return sense.get_temperature_from_humidity()

async def get_temp_p():
    return sense.get_temperature_from_pressure()

async def get_humidity():
    return sense.get_humidity()

async def get_pressure():
    return sense.get_pressure()

async def write_to_csv(writer, data_to_save):
    writer.writerows(data_to_save)
    print('wrote to CSV')

async def main():
    full_filename = '{}_{}.csv'.format(FILENAME, int(datetime.now().timestamp()))
    with open(full_filename, 'w', newline='') as f:
        writer = csv.writer(f)
        setup_csv(writer)
        data = []
        while True:
            dataset = []
            if TEMP_H:
                temp_h = await get_temp_h()
                dataset.append(temp_h)
            if TEMP_P:
                temp_h = await get_temp_p()
                dataset.append/(temp_p)
            if HUMIDITY:
                humidity = await get_humidity()
                dataset.append(humidity)
            if PRESSURE:
                pressure = await get_pressure()
                dataset.append(pressure)
            data.append(dataset)
            print('added data')
            if len(data) == WRITE_FREQUENCY:
                await write_to_csv(writer, data)
                data=[]
            await asyncio.sleep(DELAY)            

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
Auch hier gilt: läuft und macht das, was es soll - ob das aber wirklich sinnvoll ist und eine "gute" Lösung weiß ich nicht.

Welche Vorteile hätte es eigentlich, wenn man der `write_to_csv`Funktion die Daten per Queue zukommen lässt statt über das Argument beim Aufruf der Funktion?

Gruß, noisefloor
BlackJack

@noisefloor: Das ist komplett sinnlos weil alles blockiert. Stell Dir den Aufruf einer ``async def``-Funktion mal als starten eines Threads vor und das ``await`` als als `join()` auf den Thread. Du startest hier im Laufe des Programms ganz viele Coroutinen, aber von denen gibt es maximal zwei gleichzeitig, und eine davon schläft grundsätzlich weil sie im ``await`` hängt. Dadurch ist rein gar nichts gewonnen. Du hast da genau den gleichen Programmablauf als wenn Du ganz normale Funktionen aufrufen würdest.
Benutzeravatar
noisefloor
User
Beiträge: 3843
Registriert: Mittwoch 17. Oktober 2007, 21:40
Wohnort: WW
Kontaktdaten:

Hallo,

ok, Danke für die Erklärung. Das war mir nämlich bis dato nicht klar, wie viel da wirklich "parallel" laufen kann.

Ist das eigentlich irgendwo in Form eines Tutorials oder so in der Python-Doku erklärt? Ich habe bisher nur den Einstiegspunkt (https://docs.python.org/3.5/library/asyncio.html) und "Develop with asycnio" (https://docs.python.org/3.5/library/asy ... syncio-dev) gefunden. Beides ist IMHO aber schwierig zu verstehen, wenn man noch nie damit gearbeitet hat...

Andere Frage: macht asyncio in irgendeiner Form überhaupt für das gegebene Beispiel Sinn, also Lesen von X Sensoren und Schreiben nach Y Lesevorgängen? Oder lagert man das Schreiben besser in einen Thread aus, um das Lesen der Sensoren nicht zu blocken?

Gruß, noisefloor
__deets__
User
Beiträge: 14494
Registriert: Mittwoch 14. Oktober 2015, 14:29

Ob sich asyncio dafuer anbietet haengt hauptsaechlich davon ab, wie die Sensoren angesprochen werden. Wenn die mit vernuenftigen Treibern kommen, in denen man Werte einfach per read auslesen kann, ist asyncio denke ich durchaus sinnvoll. Insbesondere, weil es schon gleich mit einer eigenen Hauptschleife & einem Verstaendnis von Timern daherkommt. Natuerlich kann es nicht zaubern - wenn ein Vorgang X lange braucht, und du aber Y < X Intervalle einhalten willst, dann wirst du um echte parallelisierung nicht herumkommen.

Wenn du hingegen selbst im User-Space mit bit-banging anfangen musst, dann ist das denke ich eher oede. Das waere denke ich in einem eigenen Thread oder gar eigenen Prozess besser aufgehoben. Allerdings koennte man das immer noch per (ggf in-Prozess) pipe an die Hauptschleife weiterleiten, und somit aus Sicht des Programmierers immer noch als asyncio-Ereignis behandeln. Das ist einer meiner wenn-ichs-mal-brauche-Projekte fuer den PI, damit mal dieses selbstgebastelte Ereignis+Timer-gefrickel ein Ende hat.
BlackJack

@noisefloor: Ich versuch's noch mal besser als Vergleich zu Threads zu erklären. Das die ``await coroutine()``-Aufrufe wie Threads sind, stimmt nicht so ganz. `Task`\s sind besser mit Threads zu vergleichen, und davon hast Du in Deinem Programm nur *einen*, nämlich die Coroutine die an `run_until_complete()` übergeben wird. Die Methode macht aus Coroutinen nämlich `Task`\s.

Im Gegensatz zu Threads auf Einprozessor-/Ein-Kern-Systemen, die im Grunde an jeder Stelle angehalten und fortgesetzt werden können, damit andere Threads auch mal dran kommen, kann bei `Task`\s der Wechsel nur an den Stellen passieren, an denen ``await`` ausgeführt wird. Deswegen der Name Coroutinen — das ist kooperative Nebenläufigkeit. Die Tasks/Coroutinen müssen selbst dafür sorgen das andere auch zum Zug kommen.

Du hast da also letztendlich eine „single threaded“- beziehungsweise „single tasked“-Anwendung geschrieben.

Um dort nun eine gewisse Nebenläufigkeit rein zu bringen, brauchst Du zwei Tasks von denen abwechselnd Coroutinen laufen können. Also wie bei Threads die übliche Aufteilung zwischen einem Task der die Messungen durchführt und einen Task der die Daten wegschreibt. Und die müssen miteinander kommunizieren, und das macht man am am einfachsten mit einer `async.Queue` auf der der Schreib-Task beim `get()` ”blockieren” kann. Das heisst wen man das `get()` ausführt, dann geht diese Coroutine erst weiter wenn ein anderer Task etwas in die Queue geschrieben hat.

Ungetestet:

Code: Alles auswählen

# ...

async def write(queue):
    full_filename = '{}_{}.csv'.format(FILENAME, int(datetime.now().timestamp()))
    with open(full_filename, 'w', newline='') as f:
        writer = csv.writer(f)
        setup_csv(writer)
        while True:
            rows = await queue.get()
            try:
                writer.writerows(rows)
            finally:
                queue.task_done()


async def main(queue):
    data = []
    while True:
        dataset = []
        if TEMP_H:
            temp_h = sense.get_temp_h()
            dataset.append(temp_h)
        if TEMP_P:
            temp_h = sense.get_temp_p()
            dataset.append(temp_p)
        if HUMIDITY:
            humidity = sense.get_humidity()
            dataset.append(humidity)
        if PRESSURE:
            pressure = sense.get_pressure()
            dataset.append(pressure)
        data.append(dataset)
        print('added data')
        if len(data) == WRITE_FREQUENCY:
            await queue.put(data)
            data = []
        await asyncio.sleep(DELAY)


queue = asyncio.Queue()
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(main(queue), write(queue)))
loop.close()
Hier können jetzt zwei Probleme auftreten, wobei eines davon auf dem Raspi recht wahrscheinlich ist: Genau wie bei GUIs muss der Code der ausserhalb der Hauptschleife ausgeführt wird, immer nur recht kurz laufen, denn der hält immer alles andere auf. Die Laufzeiten von einem zum nächsten ``await`` sollten also kurz sein. Bei den Messungen weiss ich das nicht, aber schreiben auf SD-Karte oder externe USB-Platte kann eventuell (zu) lange blockieren. Das `csv`-Modul kennt aber keine Funktionen oder Methoden die Coroutinen sind. Da muss man mit der synchronen, blockierenden API leben. An der Stelle kann man sich mit Threads (oder gar Prozessen) behelfen. `asyncio` bietet dafür auch schon einen Threadpool und eine Methode auf der Ereignisschleife: `run_in_executor()`.

Ungetestet:

Code: Alles auswählen

async def write(loop, queue):
    full_filename = '{}_{}.csv'.format(FILENAME, int(datetime.now().timestamp()))
    with open(full_filename, 'w', newline='') as f:
        writer = csv.writer(f)
        setup_csv(writer)
        while True:
            rows = await queue.get()
            try:
                await loop.run_in_executor(None, writer.writerows, rows)
            finally:
                queue.task_done()
Falls Messungen auch länger dauern können, kann man sie auch auf diese Weise in Threads laufen lassen. Und statt `None` als erstes Argument, könnte man dort auch eigenen eigenen Thread- oder Prozesspool aus `concurrent.futures` übergeben.

Alternativ könnte man auch überlegen für jedes schreiben von Messungen einen Task zu starten. Im Gegensatz zu Threads sind die ja ”billig” und nicht durch Betriebssystem-Ressourcen begrenzt (Speicher mal ausser acht gelassen), so dass man durchaus 1000 oder mehr davon laufen lassen kann. Da kann man dann aber Probleme mit der Reihenfolge bekommen, die im Beispiel oben durch die Queue erzwungen wird.

Wenn in einer ``async def``-Funktion kein einziges ``await`` steht, dann ist das übrigens ein Warnzeichen, dass man etwas komisches macht, denn so eine Funktion läuft ja immer komplett durch, ohne das die Kontrolle an einen anderen Task abgegeben werden könnte oder das sie von aussen abgebrochen werden könnte.

Neben der API-Dokumentation gibt es noch das/die PEPs zum Thema. Was vielleicht auch beim Verständnis hilft, ist sich jahrelang mit asynchronen Funktionen in JavaScript herum zu schlagen, mit den ganzen Rückruffunktionen, sich dann über `Promise`\s zu feuen, und letztendlich bei ``async``/``await`` fast Freudentränen in den Augen zu haben. ;-) Das ist fast so eine Art Erlösung wenn Node.js 8 im Oktober zur LTS-Version wird und damit die beiden Schlüsselworte zur Verfügung stehen werden.

Ich denke wegen JavaScript empfinde ich Python's `asyncio` als so ”nutzlos”, weil es in Python Sockets und Unterprozesse gibt, die man asynchron bedienen kann, aber ansonsten die API-Welt in Python blockierend ist. Es gibt nur wenige, ausgewählte Module die `asyncio` ”können”. Selbst für normale, lokale Dateien muss man schon etwas externes installieren. Während man in JavaScript von asynchronen APIs nur so umzingelt ist und eine Ereignisschleife die das alles antreibt sozusagen die Grundlage für alles ist.
jerch
User
Beiträge: 1669
Registriert: Mittwoch 4. März 2009, 14:19

Ich bin gespannt, wie sich die async-Sache im Javascript-Land auswirken wird. Bisher lief Code in den JS-Engines ja streng synchron singlethreaded (die Ausführung von JS-Code selbst blockiert), und die async-Funktionalität wird durch die zusätzliche native Standard-Ereignisschleife "untergeschoben" (natürlich u.a. unter Benutzung von Threads ;) ). Der Schritt, dies in die Engines selbst einzuführen und direkt ohne Bloatcode annotieren zu können, war imho lange überfällig.
Und trotzdem - echten Threadsupport fände ich wichtiger als die async Sache, aber wird wohl nie kommen (weder für Javascript noch für CPython).
BlackJack

@jerch: Ändert sich durch ``async`` in JavaScript denn grundsätzlich etwas ausser am Quelltext den man damit schreiben kann? Ich hatte bis jetzt den Eindruck, dass das hauptsächlich syntaktischer Zucker ist, so ähnlich wie bei Python, wo man sich das ja auch in Python 2 mit Generatorfunktionen nachbasteln kann. Siehe `trollius`-Package. Für JavaScript gibt es ja auch Compiler die JavaScript mit ``async``/``await`` in JavaScript übersetzen, welches das mit Hilfe von Coroutinen umsetzt, also beispielsweise mit dem `co`-Package.
jerch
User
Beiträge: 1669
Registriert: Mittwoch 4. März 2009, 14:19

@BlackJack: Das stimmt schon, Javascript kann damit nicht mehr, allerdings finde den Umstand, dass man mit async/await wieder althergebrachte Kontrollstrukturen nutzen kann und sich der Code wieder "synchroner" formulieren lässt, ein dickes Plus. Das ist mit der Callbackfummelei oder rein promise-basiert undenkbar. Und Generatoren sind auch in Javascript nett, werden aber noch kaum genutzt und sind auch noch nicht richtig angekommen (die Engines haben z.T. etliche Probleme beim Debuggen).
Und um nochmal auf meine Randnotiz zurückzukommen, mit dem async/await wäre auch eine echte Nebenläufigkeit in Javascript mit relativ simpler API denkbar. Leider auf Seite der Engine mit Interpreter/JIT-Zustand derzeit undenkbar.
__deets__
User
Beiträge: 14494
Registriert: Mittwoch 14. Oktober 2015, 14:29

Ohne damit gearbeitet zu haben - aber es gibt doch WebWorker. Und da die ebenfalls eine Ereignisschleife vorraussetzen sollten die doch auch ganz gut mit dem Paradigma asynchroner Programmierung funktionieren - oder uebersehe ich da etwas?
BlackJack

@jerch: Jau, *dickes* Plus. Wie gesagt, mir standen die Freudentränen in den Augen. :-)

Ich weiss noch wie ich das erste mal so zum testen eine kleine Python-Konsolenanwendung portiert hatte, die ein kleines Frage-/Antwort-Spielchen mit dem Benutzer veranstaltete und wie stark ich da umdenken musste. Ich hatte dann letztendlich eine Bibliothek installiert die dabei geholfen hat. Promises waren dann eine wahre Freude und jetzt ``async``/``await`` ist fast schon zu schön um wahr zu sein. :-D

Generatoren habe ich schon genutzt, das kann aber natürlich auch daran liegen das ich die von Python her gewohnt bin.

@__deets__: WebWorker sind echt asynchron, aber auch ziemlich „heavy“. Die meisten Browser realisieren die als Prozesse. Ich habe beim Client bisher noch nichts machen müssen/wollen was derartige Geschütze gerechtfertigt hätte.
Benutzeravatar
noisefloor
User
Beiträge: 3843
Registriert: Mittwoch 17. Oktober 2007, 21:40
Wohnort: WW
Kontaktdaten:

Hallo,

@BlackJack: vielen vielen Dank für die sehr ausführliche Antwort! Die ersetzt zumindest in Teilen die fehlende Grundlagen-Doku, die ich weiter oben angesprochen habe.

Den Code zum SenseHat Daten speichern hatte ich "nur" als Anlass genommen, mich mit dem Thema zu beschäftigen. Ich habe da kein "real world" Problem. Was in der Tat in der realen Welt beim obigen Beispiel ein Problem weil blocking sein könnte ist das Schreiben der Daten, wenn die WRITE_FREQUENCY hoch ist. Wobei es bei PyPi glaube ich auch auf asyncio-basierende Module gibt, die non-blocking File I/O ermöglichen.

Für mich persönlich als Hobbyprogrammierer sehe ich auch ehrlich gesagt nicht, wo ich asyncio in Zukunft mal nutzen müsste / könnten. Aber immerhin ist jetzt mein Grundverständnis ein bisschen besser geworden :-)

Gruß, noisefloor

P.S.: Falls mal jemand den Code tatsächlich mit dem SenseHat nutzen will - die Abfrage der Werte für Temperatur, Druck etc. ist schnell, IMHO ist da keine spürbare Verzögerung zwischen Aufruf der Funktion und der Rückgabe des Werts. Von daher blockiert das Lesen der Sensor IMHO nicht, so dass der obige Code auch für "real world" Problem ok wäre.
jerch
User
Beiträge: 1669
Registriert: Mittwoch 4. März 2009, 14:19

Hier eine schöne Demo von David Beazley dazu https://www.youtube.com/watch?v=MCs5OvhV9S4
Er ist imho auch der heimliche GIL Papst oder Antipode? ;)
BlackJack

@noisefloor: Ich habe mir das Video, welches jerch verlinkt hat, jetzt mal angeschaut und fand es recht interessant. Und auch ein wenig ernüchternd was die Leistungsfähigkeit von `asyncio` angeht wenn man dann irgendwo doch wieder Threads oder Prozesse starten muss. Aber interessant für Dich wäre vielleicht der Code den er da live schreibt, wo man sehen kann wie die Coroutinen mit Generatoren grundsätzlich funktionieren.
Benutzeravatar
noisefloor
User
Beiträge: 3843
Registriert: Mittwoch 17. Oktober 2007, 21:40
Wohnort: WW
Kontaktdaten:

Hallo,

hatte mir das Video die Tage auch mal angeschaut. Das ist schon ziemlich gut, sowohl inhaltlich als auch vom Vortragsstil. Hat mir jedenfalls beim Verständnis geholfen.

Was ich allgemein beim Thema asyncio & Co schade finde ist, dass die allermeisten Beispiele, inkl. denen in der Python-Doku, entweder total flach (`print('foo')`) sind oder ein paralleler Download von irgendwelchen Internetseiten gemacht wird. Letzteres mag zwar ein gutes Beispiel sein, weil das Warten auf Netzwerk I/O im Vergleich zu Laufzeit des Programms lang ist, aber so ein paar andere "real world" Probleme als Beispiel wäre IMHO auch mal gut.

Gruß, noisefloor
BlackJack

@noisefloor: Ich denke mal *das* reale Problem was man mit `asyncio` löst, ist tausende Socketverbindungen gleichzeitig zu bedienen. Nicht unbedingt ein Problem das viele Leute täglich haben. :-)
sting
User
Beiträge: 5
Registriert: Mittwoch 27. Februar 2019, 21:09
Kontaktdaten:

@BlackJack
Du gibst sehr gute Erklärungen ab! Vielen Dank, das Nachlesen hier hat mir bei einer entscheidenden Kleinigkeit sehr geholfen!
Antworten