subprocess.Popen - Buffer-Problem und input-Frage

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
Glühbirne
User
Beiträge: 7
Registriert: Samstag 23. Januar 2021, 14:34

Hallo,

Ich habe da ein Beispielprogramm, dessen Ausgabe ich live verfolgen möchte. Die realen Programme laufen deutlich länger und können Eingaben mit "input()" erwarten:

Code: Alles auswählen

import time

MAX = 4
i = 0
while i < MAX:
    print('runlong,', i+1, 'von', MAX)
    time.sleep(1)
    i += 1

print('finished')
Das klappt mit dem folgenden Programm auch ganz gut:

Code: Alles auswählen

import subprocess
import sys

proc = subprocess.Popen([sys.executable, '-u', 'runlong.py'],
                        text=True,
                        bufsize=0,
                        stderr=subprocess.PIPE,
                        stdout=subprocess.PIPE)

while proc.poll() is None:
    print(proc.stdout.readline())
    
print('retcode:', proc.returncode)
print(proc.communicate())
Dazu habe ich 2 Fragen:

Wie kann ich die Option "-u" loswerden? "bufsize" scheint keine Auswirkung zu haben und auch "os.pipe()" mit entsprechenden "fdopen()" lässt mich die Buffersize nicht merklich einstellen.
Wie kann ich in der while-Schleife (unteres Programm) abfragen, ob das obere Programm seinerseits Daten/Eingaben (per "input()") erwartet?

(Programm soll unter Win10 laufen)

Danke!
Sirius3
User
Beiträge: 17737
Registriert: Sonntag 21. Oktober 2012, 17:20

Warum willst Du das -u loswerden?
Wenn man ein Python-Programm hat, schreibt man es normalerweise so um, dass man es im anderen Python-Programm als Modul benutzen kann. Dann hat man die ganzen Probleme nicht, die Du jetzt versuchst zu umgehen.

Aus dem Beispiel wird mir leider nicht ganz klar, was Du wirklich erreichen willst. Kannst Du weiter ausholen?
Glühbirne
User
Beiträge: 7
Registriert: Samstag 23. Januar 2021, 14:34

Hallo Sirius3
Sirius3 hat geschrieben: Mittwoch 26. Mai 2021, 18:57 Warum willst Du das -u loswerden?
Ich habe den Tag über versucht, dieses obige Beispiel mit anderen Mitteln zu erzeugen, also ohne "-u". Durch Zufall habe ich die Option gefunden und nun läuft es. Dass es "-u" gibt heißt aber auch, dass es NonBuffering unter Windows gibt und ich es einfach nur noch nicht gefunden habe.
Sirius3 hat geschrieben: Mittwoch 26. Mai 2021, 18:57 Kannst Du weiter ausholen?
Das Ganze ist Teil einer kleinen IDE. Dieser Teil funktioniert eigentlich mit subprocess.run() ganz gut, aber nur für Programme, deren Laufzeit sehr kurz ist und die keine Eingaben haben. Die kurze Laufzeit kommt daher, weil subprocess.run() eben bis zum Programmende wartet, bis es die Ausgabe generiert.

Viele Grüße
Glühbirne
User
Beiträge: 7
Registriert: Samstag 23. Januar 2021, 14:34

So, die zweite Frage habe ich nun mit einem Thread gelöst:

Code: Alles auswählen

def threadedProcessReader(proc, qout):
    """Reads proc.stdout. Blocks on read.
        Writes received data into Queue qout
    """
    while True:
        if proc.poll() is not None:
            break
        proc.stdout.flush()
        data = proc.stdout.read(1)
        if len(data) == 0:
            continue
        qout.put(data)
    rest = proc.communicate()
    for element in rest:
        qout.put(element)
    qout.put('retcode:'+str(proc.returncode)+'\n')
Aufgerufen wird dieser Thread mit einem Knopf in der GUI. Diese GUI ethält ein tk.Textwidget und ein ttk.Entry sowie ein paar Knöpfe:

Code: Alles auswählen

def _onStart(self):
        """Runs an external program and reads it's output queue"""
        qout = queue.Queue()
        self.proc = subprocess.Popen(
            [sys.executable, '-u', 'runshort-input.py'],
            text=True,
            stdin=subprocess.PIPE,
            stderr=subprocess.PIPE,
            stdout=subprocess.PIPE)
        
        t = Thread(target=threadedProcessReader,
                   daemon=True, args=(self.proc, qout))
        t.start()
        self._isRunning = True
        self._entry.focus_set()
        while t.is_alive() or (not qout.empty()):
            try:
                z = qout.get(timeout=0.1)
            except queue.Empty:
                self.update()
                continue
            else:
                self._insertText(z)
        
        self._isRunning = False
Und die Daten, die ein Programm per "input()" erwartet werden in der tk-GUI per "ttk.Edit()" übergeben:

Code: Alles auswählen

def _onSend(self):
        if not self._isRunning:
            return
        editText = self._entry.get() + '\n'
        self.proc.stdin.write(editText)
        self._insertText(editText)
        self.proc.stdin.flush()
        self._entry.delete(0, tk.END)
Der "threadedProcessReader()" blockiert leider, und dieses Blockieren habe ich nicht gelöst bekommen. So muss ich mit der bisherigen Lösung eben direkt in den Prozess schreiben (self.proc.stdin.write()). Unschön, aber funktioniert.

Der Vollständigkeit halber noch der Prozess, der hier als Beispiel dient, ist aber wirklich nur ein kleines Testprogramm:

Code: Alles auswählen

print('Hallo, Welt', end="")
wert = input('gib')
print('Wert:', wert)

Dieser Thread kann geschlossen werden.
Vielen Dank
Benutzeravatar
__blackjack__
User
Beiträge: 13064
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@Glühbirne: Die lang laufende ``while``-Schleife und das `update()` sind keine gute Idee. Du programmierst damit gegen das GUI-Rahmenwerk wenn Du anfängst deine eigene Hauptschleife zu schreiben. Das löst man in der Regel über `after()` und einer Funktion/Methode die regelmässig die Queue (nicht-blockierend) abfragt.

Das `stderr` nicht verarbeitet wird ist ein Fehler. Sollte der externe Prozess da irgendwann einmal mehr rein schreiben, als der/die Puffer zwischen den beiden Prozessen aufnehmen, blockiert dieses Konstrukt aus den beiden Programmen endlos, weil das externe Programm darauf wartet, dass Daten von `stderr` abgenommen werden, während Dein Programm darauf wartet, dass das externe Programm Daten auf `stdin` schreibt. Damit warten dann beide auf etwas das nicht eintreten kann.
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
Glühbirne
User
Beiträge: 7
Registriert: Samstag 23. Januar 2021, 14:34

Hallo __blackjack__,
vielen Dank für deinen Input, der hat mir sehr geholfen!

Es wird nun pro Kanal ein Thread eröffnet und alle 200 ms die Queues ausgelesen ( _timedReadQueues())

Code: Alles auswählen

def _onStart(self):
        """Runs an external program and reads it's output queue"""
        self.queueOut = queue.Queue()
        self.queueErr = queue.Queue()
        self.proc = subprocess.Popen(
            [sys.executable, '-u', 'runlong.py'],
            text=True,
            stdin=subprocess.PIPE,
            stderr=subprocess.PIPE,
            stdout=subprocess.PIPE)
        tout = Thread(target=self._threadedStreamReader,
                   args=(self.proc.stdout, self.queueOut))
        terr = Thread(target=self._threadedStreamReader,
                   args=(self.proc.stderr, self.queueErr))
        tout.start()
        terr.start()
        self._timedReadQueues()
        self._isRunning = True
Diese Methode schreibt in die Queues:

Code: Alles auswählen

def _threadedStreamReader(self, stdStream, targetQueue):
        """reads a process stream, puts data to queue"""
        while True:
            try:
                stdStream.flush()
                data = stdStream.read(1)
            except:
                break
            if len(data) > 0:
                targetQueue.put_nowait(data)
Und diese Methode liest die Queues:

Code: Alles auswählen

def _timedReadQueues(self):
        """read the queues, check for end of process"""
        while self.queueOut.qsize() > 0:
            outValue = self.queueOut.get_nowait()
            self._insertText(outValue, False)
        while self.queueErr.qsize() > 0:
            outValue = self.queueErr.get_nowait()
            self._insertText(outValue, True)
        self.update_idletasks()
        if self.proc.poll() is not None:
            self._readTheRest(self.proc)
        else:
            self.after(200, self._timedReadQueues)
Ich finde, dank deiner Hinweise ist das Programm deutlich besser geworden, vielen Dank dafür.
__deets__
User
Beiträge: 14522
Registriert: Mittwoch 14. Oktober 2015, 14:29

Warum liest du immer genau ein Zeichen? read ohne Argumente blockiert, und liefert dann alles, was gerade vorliegt, ab.

Ich persönlich würde auch nur eine Queue machen, weil das viel Code-Dopplung spart. Dann muss man natürlich ein Tupel aus Quelle & Daten darein stopfen, aber der ganze Code word IMHO Klarer.
Glühbirne
User
Beiträge: 7
Registriert: Samstag 23. Januar 2021, 14:34

Hallo __deets__,

die Programme, die das Programm verarbeitet, können

Code: Alles auswählen

 input('gib eine Zahl ein:')
enthalten. read(1) kann solche Zeilen verarbeiten (eben den Prompt von Input ohne Newline), read() jedoch nicht - soweit ich das bisher ausprobiert habe.

Je nach Ausgabkanal wird die Ausgabe vormatiert: Error-Text wird über einen tk.Text-Tag mit rotem Hintergrund formatiert. Da sind zwei Queues leichter zu handhaben.

Viele Grüße
Antworten