stdout von nebenlaeufigen prozessen tracken ..

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
pepe75
User
Beiträge: 3
Registriert: Donnerstag 9. Juli 2015, 10:51

Hallo allerseits,

Ich habe folgende Problemstellung und hoffe ihr wisst Rat:
Ich habe vor durch ein Python-Skript nebenlaeufig Prozesse zu starten und deren stdout zu tracken ..
(letzteres soll später zur Fortschritts-Ausgabe des jeweiligen Prozesses dienen).

Hierfür benutze ich folgenden Python Code (und eine C++ Test-Anwendung, die für x Sekunden sekundenweise diverse Ausgaben auf stdout ausgibt):

Code: Alles auswählen

#!/usr/bin/env python

import sys
import subprocess
import threading
import Queue
from collections import deque

# //////////////////////////////////////////////
class StdoutTracker(threading.Thread):
# //////////////////////////////////////////////
  '''
  Helper class to track stdout of spawned processes
  '''
  def __init__(self, proc):
      assert callable(proc.stdout.readline)
      threading.Thread.__init__(self)
      self._proc    = proc
      self._queue   = Queue.Queue() 

  def run(self):
    for line in iter(self._proc.stdout.readline, ''):
      print("pushing line:"+line),
      self._queue.put(line)
  
  def eof(self):
    return not self.is_alive() and self._queue.empty()

# //////////////////////////////////////////////
# Help function to invoke processes 
# //////////////////////////////////////////////
def invokeProcess(cmd, trackerList):
  proc  = subprocess.Popen(cmd, stdout=subprocess.PIPE)
  stdR  = StdoutTracker(proc)
  stdR.start()
  trackerList.append(stdR)
 
# //////////////////////////////////////////////
# Main starting point ..
# //////////////////////////////////////////////
if __name__ == '__main__':
  
  trackerList = deque()

  invokeProcess(['./qplay', '3', '2600'],  trackerList)
  invokeProcess(['./qplay', '6', '2139'], trackerList)

  while trackerList:
    pT = trackerList.popleft()
    if not pT.eof():
      line = pT._queue.get()
      if line:
        print("got line from tracker:"+line),
      trackerList.append(pT)
Das aufgerufene Programm "qplay" ist eine C-Anwendung zum testing, welche fuer x-Sekunden (erster Parameter) Ausgaben auf stdout erzeugt.
Eine Ausgabe nach Start obigen Programmes ist z.B.:

pushing line:2600:1/3
got line from tracker:2600:1/3
pushing line:2600:2/3
pushing line:2600:3/3
pushing line:2139:1/6
got line from tracker:2139:1/6
pushing line:2139:2/6
got line from tracker:2600:2/3
got line from tracker:2139:2/6
pushing line:2139:3/6
got line from tracker:2600:3/3
got line from tracker:2139:3/6
pushing line:2139:4/6
pushing line:2139:5/6
pushing line:2139:6/6
got line from tracker:2139:4/6
got line from tracker:2139:5/6
got line from tracker:2139:6/6

Diese erscheinen jedoch erst nach (!) Ausfuehrungsdauer des jeweiligen Test-Prozesses (also erst nach drei bzw. sechs Sekunden).
Dies liegt sehr wahrscheinlich an der Implementierung der run Methode.
Vermutlich müsste der Test-Prozess in der run-Methode ständig gepollt und dessen stdout-Ausgaben so abgespeichert werden.
Eine korrekte / funktionierende Implementierung hierfür habe ich allerdings nicht hinbekommen.

Korrekt soll hier heissen: ich würde erwarten, dass das Python-Skript die Ausgaben der beiden gestarteten Prozesse im Hauptprogramm "parallel" ausgibt (d.h. sobald Ausgaben verfügbar sind)
und das solange bis alle Test-Prozesse beendet sind.

Ich bin C++ Programmierer und Python-Neuling und würde mich über Tips / Links / Code-Schnippsel sehr freuen.

Vielen Dank.
BlackJack

@pepe75: Das mit dem `iter()` ist unnötig kompliziert weil Dateiobjekte selbst schon über die Zeilen iterierbar sind. Du musst dafür sorgen das keine Daten gepuffert werden und das hast Du auch nur auf Deiner Seite bei `Popen()` in der Hand. Das das externe Programm nichts puffert liegt am externen Programm.

Edit: Und ich sehe gerade das Du die beiden Prozesse gar nicht gleichzeitig bearbeitest. Während Du den einen Prozess abarbeitest sammelt sich beim anderen natürlich potentiell dessen Ausgabe in seiner Queue. Man würde da vielleicht besser nur *eine* Queue verwenden.
Benutzeravatar
snafu
User
Beiträge: 6738
Registriert: Donnerstag 21. Februar 2008, 17:31
Wohnort: Gelsenkirchen

Vielleicht hilft auch ein `stdout.flush()`?
pepe75
User
Beiträge: 3
Registriert: Donnerstag 9. Juli 2015, 10:51

Und ich sehe gerade das Du die beiden Prozesse gar nicht gleichzeitig bearbeitest. Während Du den einen Prozess abarbeitest sammelt sich beim anderen natürlich potentiell dessen Ausgabe in seiner Queue. Man würde da vielleicht besser nur *eine* Queue verwenden.
.. Ist das wirklich so ? Das Hauptprogramm hat eine Liste (trackerList) in der alle "Ausgaben-Sammler" (und gespawnte Prozesse) abgelegt sind.
Es wird solange aus dieser Liste ein Ausgaben-Sammler herausgenommen (und wieder hinzugepackt !) + eine weitere Ausgabe aus dessen Queue herausgenommen, bis dieser nichts mehr auszugeben hat.
D.h. es wird schon abwechselnd fuer jeden Prozess geschaut, ob dieser mindestens eine Zeile ausgegeben hat (also gewissermassen schon "gleichzeitig").

Eine andere Implementierung von run wie folgende hat leider auch nicht den gewünschten Effekt gebracht:

Code: Alles auswählen

 def run(self):
    '''
    for line in iter(self._proc.stdout.readline, ''):
      print("pushing line:"+line),
      self._queue.put(line)
    '''
    while True:
      rc = self._proc.poll()
      line = self._proc.stdout.readline()
      if line:
        print("pushing line:"+line),
        self._queue.put(line)
      if rc is not None:
        return                      
Diese erzeugt folgende Ausgabe:
pushing line:2600:1/3
got line from tracker:2600:1/3
pushing line:2600:2/3
pushing line:2139:1/6
got line from tracker:2139:1/6
pushing line:2139:2/6
got line from tracker:2600:2/3
got line from tracker:2139:2/6
pushing line:2139:3/6
got line from tracker:2139:3/6

Also nicht komplett alle erwarteten Ausgaben (!) + wieder gleiches zeitliches Verhalten: ich sehe die Ausgaben nach 3 und dann nach 6 Sekunden.
pepe75
User
Beiträge: 3
Registriert: Donnerstag 9. Juli 2015, 10:51

... Wie ich schon sagte bin zwar ein Python Neuling aber kein C++ Neuling.
Daher hätte ich folgendes wissen müssen (wonach es übrigens auch funktioniert):

Das Test-Programm darf stdout nicht buffern bzw. muss stets flushen.

Für interessierte hier nochmal der komplette funktionierende Python-Code:

Code: Alles auswählen

#!/usr/bin/env python

import sys
import subprocess
import threading
import Queue
from collections import deque

# //////////////////////////////////////////////
class StdoutTracker(threading.Thread):
# //////////////////////////////////////////////
  '''
  Helper class to track stdout of spawned processes
  '''
  def __init__(self, proc):
      assert callable(proc.stdout.readline)
      threading.Thread.__init__(self)
      self._proc    = proc
      self._queue   = Queue.Queue() 

  def run(self):
    while True:
      rc = self._proc.poll()
      line = self._proc.stdout.readline()
      if line:
        print("pushing line:"+line),
        self._queue.put(line)
      if rc is not None:
        return
  
# //////////////////////////////////////////////
# Help function to invoke processes 
# //////////////////////////////////////////////
def invokeProcess(cmd, trackerList):
  proc  = subprocess.Popen(cmd, stdout=subprocess.PIPE)
  stdR  = StdoutTracker(proc)
  stdR.start()
  trackerList.append(stdR)
 
# //////////////////////////////////////////////
# Main starting point ..
# //////////////////////////////////////////////
if __name__ == '__main__':
  
  trackerList = deque()

  invokeProcess(['./qplay', '3', '2600'],  trackerList)
  invokeProcess(['./qplay', '6', '2139'], trackerList)


  while trackerList:
    pT = trackerList.popleft()
    if pT.is_alive():
      trackerList.append(pT)
    if not pT._queue.empty():
      line = pT._queue.get()
      if line:
        print("got line from tracker:"+line),
       
Fuer Interessierte hier noch die Korrektur des Test-Programms:

Code: Alles auswählen

#include <string>
#include <iostream>

using namespace std;

// ////////////////////////////////////////////////////
int main(int argc, char* argv[])
// ////////////////////////////////////////////////////
{
  setbuf(stdout, NULL);	// <<<--- das muss sein damit stdout nicht gebuffert wird !! 

  string pid = "????";
  int end = 5;
  if( argc > 2 )
  {
    end = atoi(argv[1]);
    pid = argv[2];
  }
  for(int i=0; i<end; i++)
  {
    sleep(1);
    float prog = ((float)(i+1) / (float)end) * 100;
    printf("%s:%d/%d\n", pid.c_str(), i+1, end);
  }
  return 0;
}
Antworten