os.pipe()s, read/write und select

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
gurke111
User
Beiträge: 28
Registriert: Freitag 26. Oktober 2007, 22:55

Hallo!

Ich brauche eine Interthreadkommunikation zwischen einen Worker-Thread (A) und einem Urwid-GUI-Thread (B) (siehe http://www.python-forum.de/topic-19633.html ).

Die Kommunikation geht in zwei Richtungen:

(B) -> (A)
Um Befehle, die am GUI eingegeben werden, an den Worker-Thread weiterzuleiten, werde ich Queue.Queue nehmen.

(A) -> (B)
Logging, stdout und stderr des Worker-Threads müssen für die Darstellung im GUI zum Urwid-Thread transportiert werden. Hier bietet sich denke ich eine os.pipe() - Lösung an (für andere Vorschläge bin ich natürlich offen :-) ) Nehmen wir also für die folgenden Fragen einfach an, dass ich auf der schreibenden Seite einer os.pipe() regelmäßig irgendwelche Strings mit os.write() reinstopfe. Jetzt habe ich zwei Fragen zum weiteren Vorgehen:


[1] Was bedeutet es, wenn select() den lesenden FD returned?


Urwid hat eine SelectEventLoop, die ich im Zusammenhang mit dem lesenden Ende der Pipe benutzen möchte. Das Ganze basiert auf select.select() und es wird eine callback-Funktion losgetreten, wenn ein filedeskriptor.. und hier weiß ich nicht weiter. Was heißt es, wenn der lesende FD an select() übergeben wird und returned wird? Insbesondere: Sind dann Daten da oder nicht? Was wird der nächste os.read()-Aufruf bringen? Kann os.read() blockierend sein?

Was ich hierzu gelesen habe:
In der Pythondocs Intro zu select:
It cannot be used on regular files to determine whether a file has grown since it was last read.
Das ist ja eigentlich genau das, was ich machen will. Nur: ich habe ja keine "regular files", sondern ne pipe. Was sagt mir das jetzt?

Trundle in http://www.python-forum.de/topic-18022,15.html:
Jetzt habe ich eben geschrieben, dass `select()` dir zurückgibt, ob man von der Pipe lesen kann, nicht aber, ob Daten vorliegen.
Wenn das generell so stimmt, ist das Mist für mich; andererseits stelle ich mir dann die Frage, was mit dem FD los sein muss, damit select() ihn nicht returned. Wann passiert das? Und ich stelle mir die Frage, warum urwid eine SelectEventLoop auf select() basierend hat, wenn select() nicht aussagt, ob Daten vorliegen oder nicht.

Wenn select mir nicht sagt, ob Daten vorliegen -- gibt es eine andere Methode?

Irgend ein Typ bei Stackoverflow ( http://stackoverflow.com/questions/3238 ... -in-python ):
This code
select.select([sys.stdin], [], [], 1.0)
does exactly what I want on Linux
mit der Frage: "How to find out if there is data to be read from stdin[..]?"
Das macht mir also wieder Hoffnung, dass select.select() und damit die SelectEventLoop das ist, was ich brauche.



[2] Inwiefern ist os.pipe().read()/write() eigentlich thread safe?


Was passiert, wenn ein lesender und schreibender Zugriff beider Threads gleichzeitig auf die selbe pipe erfolgt? Ich denke, dass Pipes vom Prinzip her threadsafe sein sollten, aber eine Bestätigung dessen habe ich irgendwie nicht gefunden bisher..
BlackJack

@gurke111: Ich würde es in die Gegenrichtung auch erst einmal mit einer Queue versuchen. Eventuell die Wartezeit für `Screen.get_input()` ein wenig heruntersetzen, falls die halbe Sekunde zu lang ist.

Wenn `select()` einen Dateidescriptor zurück gibt, dann liegen Daten an oder etwas anderes wichtiges ist passiert, zum Beispiel ein Fehler ist aufgetreten oder jemand hat die Datei geschlossen.

Das Zitat aus der Doku ist hier irrelevant, weil Du bei einer Pipe weder eine reguläre Datei hast, noch sich deren Grösse ändern kann. Pipes haben keine Grösse.

Was mit dem Kommentar gemeint ist, ist dass man nicht das Unix-Programm ``tail`` damit nachbauen kann. Also reguläre Datei öffnen, den Dateizeiger mit `seek()` ans Ende platzieren und dann mit `select()` darauf warten, ob ein anderer Schreibvorgang Daten an das Ende der Datei anhängt, funktioniert nicht. Das ist aber auch ein ganz anderer Anwendungsfall als Deiner.
gurke111
User
Beiträge: 28
Registriert: Freitag 26. Oktober 2007, 22:55

BlackJack hat geschrieben: Ich würde es in die Gegenrichtung auch erst einmal mit einer Queue versuchen.
Darüber habe ich auch nachgedacht:

Ich habe mit Urwid zwei prinzipiell mögliche Ansätze:
- 0.9.8.4: eigene Mainloop
- dev tip: MainLoop mit SelectEventLoop

Ich habe mich erstmal für den letzteren entschieden und da würde ich gerne von urwid's eingebauter Fähigkeit profitieren, Dateideskriptoren per select() überwachen zu können. Entscheidende Frage: Wenn ich von meinem Programm her garantiere, dass nie mehr als ein Thread auf das selbe Ende einer os.pipe() zugreift, jedoch zwei Threads eine os.pipe() gleichzeitig lesen und beschreiben, handele ich mir dann Schwierigkeiten ein oder nicht? Ist die Antwort "Nein", so ist dieser Ansatz einfacher als mit Queue(s):

Wenn ich für den Output des Workerthreads Queue nehme, muss ich den logger erstmal auf ne Queue umbiegen. Und evtl. noch stderr (das weiß ich noch nicht so genau). Und wieder auf den "eigene Mainloop"-Approach zurückgreifen, in dem ich dann diese Queue selbst polle.
Wenn `select()` einen Dateidescriptor zurück gibt, dann liegen Daten an oder etwas anderes wichtiges ist passiert, zum Beispiel ein Fehler ist aufgetreten oder jemand hat die Datei geschlossen.
Danke für die Klarstellung
Was mit dem Kommentar gemeint ist, ist dass man nicht das Unix-Programm ``tail`` damit nachbauen kann. Also reguläre Datei öffnen, den Dateizeiger mit `seek()` ans Ende platzieren und dann mit `select()` darauf warten, ob ein anderer Schreibvorgang Daten an das Ende der Datei anhängt, funktioniert nicht. Das ist aber auch ein ganz anderer Anwendungsfall als Deiner.
Aber Trundle bezog sich auch auf eine os.pipe()... er hat also meiner Meinung nach genau das Gegenteil von Dir behauptet: er sagte, dass select() nicht zurückgibt, ob Daten in der Pipe vorliegen
BlackJack

@gurke111: Was verstehst Du unter eigener Mainloop? Ich meine man schreibt doch da sowieso ein ``while True:`` um das `screen.get_input()` weil das einen Timeout hat. Innerhalb dieser Schleife könnte man auch bequem die Rückgabequeue abfragen.

Also ich bin der Meinung Trundle und ich behaupten das selbe. Du hast anscheinend beides nicht zuende gelesen. ;-) Nein, das `select()` zurückkehrt bedeutet nicht *immer* das Daten anliegen, aber das macht nichts, weil im Fall wo keine Daten anliegen, das Lesen nicht blockiert. Denn dann hat entweder jemand die Datei/Pipe geschlossen und es wird *sofort* eine Zeichenkette der Länge 0 geliefert, oder es gab ein Problem, dann wird beim Leseversuch eine Ausnahme ausgelöst. Du hast da also nie ein länger blockierendes `read()`. Das ist ja der Sinn von `select()`.
gurke111
User
Beiträge: 28
Registriert: Freitag 26. Oktober 2007, 22:55

BlackJack hat geschrieben:@gurke111: Was verstehst Du unter eigener Mainloop? Ich meine man schreibt doch da sowieso ein ``while True:`` um das `screen.get_input()` weil das einen Timeout hat. Innerhalb dieser Schleife könnte man auch bequem die Rückgabequeue abfragen.
while True: ist genau die "eigene Mainloop", welche bei der Entwicklungsversion von urwid durch eine "eingebaute Mainloop mit einer EventLoop und Callbacks" ersetzt wird. Codeschnipsel beider Ansätze im Vergleich: (kein lauffähiger Code, nur um das Prinzip zu veranschaulichen)

Eigene Mainloop:
Hier muss man in der eigenen Mainloop saemtlichen Input selbst behandeln (inkl. resize), den screen neu zeichnen, etc:

Code: Alles auswählen

def main(self):
    self.ui = urwid.raw_display.Screen()
    self.ui.set_input_timeouts(max_wait=0.05)
    self.ui.run_wrapper(self.run)

def run(self):
    size = self.ui.get_cols_rows()
    while True:
        self.draw_screen(size)
        
        # hier könnte ich ein Queue.Queue.get() machen und in
        # Abhängigkeit des Rückgabewertes z.B. ein neues Text-Widget
        # erstellen und auf den Screen setzen..
                  
        keys = self.ui.get_input()
        if "f8" in keys:
            break
        for key in keys:
            if key == "window resize":
                size = self.ui.get_cols_rows()
                continue
            else:
                self.top.keypress( size, key )

def draw_screen(self, size):
    canvas = self.top.render(size, focus=True)
    self.ui.draw_screen(size, canvas)
Neue urwid.MainLoop:
Hier werden Events über Callbacks abgehandelt. Einige Inputevents wie resize werden von der MainLoop schon selbst behandelt, auch muss man den screen nicht mehr selbst zeichnen. urwid.MainLoop.event_loop ist standardmäßig die "SelectEventLoop". Mit urwid.MainLoop.event_loop.watch_file(filedeskriptor, callback) kann man dann eine Funktion definieren, die aufgerufen wird, wenn select.select() (was innerhalb von urwids SelectEventLoop aufgerufen wird) den Filedeskriptor returned.

Code: Alles auswählen

def main(self):
    self.screen = urwid.raw_display.Screen()
    self.screen.set_input_timeouts(max_wait=0.5)
    self.main_loop = urwid.MainLoop(
        widget = self.top,
        palette = self.palette,
        screen = self.screen,
        unhandled_input=self.unhandled_input)
    self.main_loop.event_loop.watch_file(self.pipe_read, self.pipe_event)
    self.main_loop.run()

def pipe_event(self):
    # die SelectEventLoop innerhalb der urwid.MainLoop hat den fd
    # zu self.pipe_read returned und daraufhin diese fkt als callback
    # aufgerufen.
    # Hier kann ich jetzt per os.read() aus self.pipe_read lesen
    # und ein neues Text-Widget erstellen

def unhandled_input(self, key):
    # dies ist die Callback-Funktion für sonstigen Input, der von der
    # urwid.MainLoop nicht behandelt wurde und dementsprechend übrig
    # geblieben ist (d.h. es war irgendein spezieller input, z.B. ein belie-
    # biger Tastendruck)
    if key == 'f8':
        raise urwid.ExitMainLoop()
Bei diesem Ansatz kann ich Queue.Queue.get() nicht so einfach unterbringen... (ich habe gerade gar keine Idee). Schließlich kann man hier nur über Filedeskriptoren Events in die MainLoop einschleusen.
Also ich bin der Meinung Trundle und ich behaupten das selbe. Du hast anscheinend beides nicht zuende gelesen. ;-)
:-) Ich habe beides mehrfach gelesen und gebe Dir nun Recht. Ich weiß jetzt -- nach Deiner Erklärung --, wie Trundle es meinte und dass ihr beide das gleiche meintet. Seine Formulierung war für mich (als Unwissenden) einfach fehlleitend. Jetzt weiß ich, dass immer Nutzdaten zum Lesen bereitliegen, wenn select() den FD returned, solange ich garantiere, dass die Pipe nicht geschlossen wird und/oder andere Probleme auftauchen.

Meine letzte verbliebene Frage ist nun immernoch die Eingangsfrage [2]:

"Wenn ich von meinem Programm her garantiere, dass nie mehr als ein Thread auf das selbe Ende einer os.pipe() zugreift, jedoch zwei Threads eine os.pipe() gleichzeitig lesen und beschreiben, handele ich mir dann Schwierigkeiten ein oder nicht?"

Ich muss zur Frage noch eine Unterfrage stellen: Betrachten wir das Schreib-Ende der Pipe. Wenn ich mehrere Threads habe, die per logging auf das Schreibende schreiben wollen, gibt es keine Probleme, weil logging thread safe ist. Ist diese Aussage richtig?
[url]http://gehrcke.de[/url]
gurke111
User
Beiträge: 28
Registriert: Freitag 26. Oktober 2007, 22:55

Ich habe eine lange Nacht hinter mir und die verbliebenen Fragen bei stackoverflow gestellt:
http://stackoverflow.com/questions/1185 ... threadsafe

und eine wirklich zufriedenstellende, gute Antwort (von Alex Martelli :D ) bekommen:

Zusammengefasst:

- os.write() und os.read() auf die zwei Enden einer os.pipe() anzuwenden ist threadsafe. Alle Daten, die ich per os.write() in die Pipe reinstecke, kommen auch in der richtigen Reihenfolge wieder hinten raus per os.read(). Große os.write()'s können jedoch in Häppchen rauskommen, sodass ich ein postprocessing brauche, um die Häppchen wieder zusammenzusetzen.

- logging ist threadsafe -> Mit mehreren Threads per logging.handlers.FileHandler() in das Schreib-Ende der os.pipe() zu schreiben ist eine sichere Sache.


@ BlackJack:
Mein Ansatz der Kommunikation zwischen urwid und worker scheint somit konzeptionell gut, idiotensicher und einfach realisierbar zu sein. Ich hab bereits eine Testimplementierung: Text, den ich in ein urwid.Edit Widget eingebe und per Enter abschicke, landet in einer Queue. Der Workerthread macht nichts anderes als queue.get(block=True) und schreibt, wenn er nen Item empfängt, diesen in die Pipe. Urwids SelectEventLoop überwacht das lesende Ende der Pipe mit select.select(). Wenn hier ein Event gemeldet wird, lese ich die Pipe aus und schreibe die Daten auf das UI in Form von urwid.Text Widgets. Das tolle ist, dass man mit dieser Art der MainLoop/SelectEventLoop ein ``self.screen.set_input_timeouts(max_wait=None)`` setzen kann -- also unendliches Warten; bzw. rein eventbasierte Steuerung. Ich habe mir die Timings des Kommunikationsweges angeschaut: von schreibe-in-die-queue bis gib-wieder-auf-dem-ui-aus vergehen 1-2 ms. Ich find das toll :-)

Gute Nacht!
[url]http://gehrcke.de[/url]
Antworten