Pipe von subprocess.Popen mit thread auslesen

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
Willi
User
Beiträge: 8
Registriert: Freitag 2. März 2007, 21:43

Hallo Python-Forum,

ich stehe vor dem Rätsel, dass mein Code ein unterschiedliches Verhalten zeigt je nach dem, wie bzw. wo ich den Code laufen lasse.
Mein Ziel ist es, Eingaben an ein externes Programm (gnuplot) über eine Pipe zu schreiben und die Antworten von gnuplot während der Laufzeit aus einer Pipe zu lesen.

Mein Code:

Code: Alles auswählen

import subprocess
import thread


def pipelesen(pipe, liste) :
	global Marker
	Marker=0
	while Marker==0:
		liste.append(pipe.readline())
		
def threatbeenden (objekt):
	global Marker
	Marker=1
	objekt.stdin.write('\n')
	return objekt.communicate()

ausgabe=[]
g=subprocess.Popen('/usr/bin/gnuplot',
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT)

thread.start_new_thread(pipelesen,
                        (g.stdout, ausgabe))
                        
g.stdin.write('se sam 1788\n')
g.stdin.write('sh sam\n')
print "ausgabe = ", ausgabe
print "Ergebnis von threatbeenden(g):"
print threatbeenden(g)
print "Marker = ", Marker
Starte ich den Code in Idle im Debug-Modus, so erhalte ich genau das erwünschte Verhalten:
[DEBUG ON]
>>>
ausgabe = ['\n', '\tsampling rate is 1788, 1788\n', '\n']
Ergebnis von threatbeenden(g):
('', None)
Marker = 1
[DEBUG ON]
>>>
Starte ich den Code in Idle im normalen Modus, so erhalte ich:
>>>
ausgabe = []
Ergebnis von threatbeenden(g):
('\n\tsampling rate is 1788, 1788\n\n', None)
Unhandled exception in thread started by Marker =
Traceback (most recent call last):
1 File "/home/walter/Python_Skripte/script.py", line 9, in pipelesen

liste.append(pipe.readline())
>>> IOError: (0, 'Error')
Schreibe ich den Code Zeilenweise in die interaktive Shell, so ist die Pipe ausgelesen und befindet sich in der Liste "ausgabe", das Beenden des Threads liefert aber den I/O-Fehler. "Marker" hat den Wert 1.
Starte ich das Skript von der Kommandozeile aus mit "python script.py", so erhalte ich keine Fehlermeldung, "ausgabe" ist aber eine leere Liste. "Marker" hat meistens den Wert 0 und manchmal (ohne für mich erkennbares Muster) den Wert 1.
Ich schließe daraus, dass gnuplot seine Ausgaben ordentlich in die Pipe schreibt aber der Thread sich immer unterschiedlich verhält.

Hat vielleicht jemand eine Idee, warum der Code so unterschiedlich interpretiert wird und wie ich den Code verändern muss damit er sich nicht nur im Debug-Modus richtig verhält?[/code]
Benutzeravatar
HWK
User
Beiträge: 1295
Registriert: Mittwoch 7. Juni 2006, 20:44

Es gibt wahrscheinlich Probleme, weil beide Threads gleichzeitig auf dasselbe Objekt (Marker) zugreifen, der eine lesend, der andere schreibend. Verwende doch ein Event-Objekt. Schau Dir hierzu mal das entsprechende Kapitel unter threadings in der Python-Doku an.
Ein ungetestetes Beispiel:

Code: Alles auswählen

import subprocess 
import thread 
from threading import Event


def pipelesen(pipe, liste) : 
    global Marker 
    Marker.clear() 
    while not Marker.isSet(): 
        liste.append(pipe.readline()) 
        
def threatbeenden (objekt): 
    global Marker 
    Marker.set() 
    objekt.stdin.write('\n') 
    return objekt.communicate() 

ausgabe=[] 
g=subprocess.Popen('/usr/bin/gnuplot', 
        stdin=subprocess.PIPE, 
        stdout=subprocess.PIPE, 
        stderr=subprocess.STDOUT) 

Marker = Event()
thread.start_new_thread(pipelesen, 
                        (g.stdout, ausgabe)) 
                        
g.stdin.write('se sam 1788\n') 
g.stdin.write('sh sam\n') 
print "ausgabe = ", ausgabe 
print "Ergebnis von threatbeenden(g):" 
print threatbeenden(g) 
print "Marker = ", Marker
MfG
HWK
Willi
User
Beiträge: 8
Registriert: Freitag 2. März 2007, 21:43

Danke für den Tipp!
Leider ist es noch nicht die Lösung, aber mit den Threads, die sich gegenseitig beeinflussen, hat es offenbar zu tun. Die Event-Klasse ist wesentlich eleganter als mein alter Marker. Damit funktioniert es zwar noch nicht immer aber manchmal. Ich habe Deine Änderungen übernommen und noch etwas ausgebaut:

Code: Alles auswählen

import subprocess
import thread
from threading import Event

Marker=Event()
Marker2=Event()

def pipelesen(pipe, liste) :
	global Marker, Marker2
	Marker.clear()
	Marker2.clear()
	while not Marker.isSet():
		liste.append(pipe.readline())
	Marker2.set()
		
def threatbeenden (objekt):
	global Marker
	Marker.set()
        Marker.wait(5.0)
	objekt.stdin.write('\n')
	Marker2.wait(5.0)
	return objekt.communicate()

ausgabe=[]
g=subprocess.Popen('/usr/bin/gnuplot',
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT)

thread.start_new_thread(pipelesen,
                        (g.stdout, ausgabe))
                        
g.stdin.write('se sam 1788\n')
g.stdin.write('sh sam\n')
print "ausgabe = ", ausgabe
print "Ergebnis von threatbeenden(g):"
print threatbeenden(g)
print "Marker = ", Marker.isSet()
Damit bekomme ich beim regulären Aufruf in Idle bei nacheinanderfolgenden Aufrufen entweder das korrekte(=erwünschte) Ergebnis oder verschiedene Fehler. Auch ist der Marker mal gesetzt und mal auch nicht, ebenso wahllos ist "ausgabe" mal mit Output gefüllt und mal eine leere Liste :
>>> ================================ RESTART ================================
>>>
ausgabe = []
Ergebnis von threatbeenden(g):
('\tsampling rate is 1788, 1788\n\n', None)
Marker = True
>>> ================================ RESTART ================================
>>>
ausgabe = []
Ergebnis von threatbeenden(g):
('', None)
Marker = False
>>> Unhandled exception in thread started by
Traceback (most recent call last):
File "/home/walter/Python_Skripte/script.py", line 13, in pipelesen
liste.append(pipe.readline())
IOError: [Errno 11] Resource temporarily unavailable
================================ RESTART ================================
>>>
ausgabe = []
Ergebnis von threatbeenden(g):
('\tsampling rate is 1788, 1788\n\n', None)
Marker = True
>>> ================================ RESTART ================================
>>>
ausgabe = ['\n', '\tsampling rate is 1788, 1788\n', '\n']
Ergebnis von threatbeenden(g):
('', None)
Marker = True
>>> Unhandled exception in thread started by
Traceback (most recent call last):
File "/home/walter/Python_Skripte/script.py", line 13, in pipelesen
liste.append(pipe.readline())
IOError: [Errno 11] Resource temporarily unavailable
Auf der Befehlszeile mit "python script.py" erhalte ich nach wie vor "ausgabe=[]" und "Marker = True" oder "Marker = False".

Auf jeden Fall weiß ich jetzt, dass ich noch nicht genug über parallel laufende threads weiß. Aber darüber werde ich nicht mehr heute schlauer :?
Benutzeravatar
HWK
User
Beiträge: 1295
Registriert: Mittwoch 7. Juni 2006, 20:44

Ich denke, Du musst auch ausgabe mit einem Lock versehen, denn beide Threads greifen darauf zu. Falls dies gleichzeitig geschieht, ergeben sich daraus die Fehlermeldungen. Selbst ohne Fehler sind die Ergebnisse recht willkürlich, weil die Ausgabeliste beim print immer unterschiedlich gefüllt sein kann, je nachdem wie weit GNU-Plot mit seiner Arbeit ist. Ganz einfach wäre es erst einmal, Zeile 36 hinter Zeile 38 zu verschieben. Du wüsstest dann zumindest, ob der Fehler hier liegt. Auf Dauer ist es natürlich keine Lösung, weil das Ergebnis erst nach Beendigung von GNU-Plot ausgegeben wird.
MfG
HWK
Willi
User
Beiträge: 8
Registriert: Freitag 2. März 2007, 21:43

Die Locks waren tatsächlich die Lösung, so sieht mein Code nun aus: (Ist es eigentlich "erlaubt" im Sinne eines guten und sicheren Programmierstils, einen Lock im Hauptprogramm zu setzen und im Thread zu lösen?)

Code: Alles auswählen

import subprocess
import thread
from threading import Lock

lock1=Lock()
lock2=Lock()

def pipelesen(pipe, liste) :
	global lock1, lock2
	lock1.acquire()
	lock2.release() #Hauptprogramm wartet, bis Thread lock1 erhalten hat
	while liste[-1]<>'enDE\n':  #Pipe wird bis zum Ende ausgelesen
		liste.append(pipe.readline())
	lock1.release()
		
def threatbeenden (objekt):
	global lock1
	objekt.stdin.write('print "enDE"\n') #Ende der Pipe markieren
	lock1.acquire()
    print "Ergebnis von threatbeenden(g):"
	print objekt.communicate()
    lock1.release()

ausgabe=['Starteintrag'] #noetig fuer Bedingung von while in pipelesen()

g=subprocess.Popen('/usr/bin/gnuplot',
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT)

lock2.acquire()
thread.start_new_thread(pipelesen,
                        (g.stdout, ausgabe))
lock2.acquire()  #Hauptprogramm wartet, bis Thread lock1 erhalten hat
g.stdin.write('se sam 1788\n')
g.stdin.write('sh sam\n')
print "ausgabe = ", ausgabe  #liefert meist nur  ['Starteintrag']
threatbeenden(g)
print "ausgabe = ", ausgabe  #liefert immer komplette Ausgabe
lock2.release()
Ich weiß zwar nicht, ob der Code wirklich threadsicher ist aber ich habe ein bischen was über Threads gelernt: Ich habe eine Weile gebraucht um zu verstehen, dass das Hauptprogramm schneller sein kann als die threads. Dabei waren print-Anweisungen vor und nach locks hilfreich. Beim folgenden Code ist mir ein Licht aufgegangen:

Code: Alles auswählen

import subprocess
import thread
from threading import Lock

lock1=Lock()
lock2=Lock()

def pipelesen(pipe, liste) :
	global lock1
	lock1.acquire()
	print "pipelesen hat lock1 erhalten."
	while liste[-1]<>'enDE\n':
		liste.append(pipe.readline())
    print "pipelesen loest lock1."
	lock1.release()
		
def threatbeenden (objekt):
	global lock1, lock2
	lock2.acquire()
	print "threatbeenden hat lock2 erhalten."
	objekt.stdin.write('print "enDE"\n')
	lock1.acquire()
	print "threatbeenden hat lock1 erhalten."
    print "Ergebnis von threatbeenden(g):"
	print objekt.communicate()
	print ausgabe
    print "threatbeenden loest lock1."
    lock1.release()
    lock2.release()

ausgabe=['Starteintrag']
g=subprocess.Popen('/usr/bin/gnuplot',
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT)

print "Hauptprogramm hat lock2 gesetzt."
thread.start_new_thread(pipelesen,
                        (g.stdout, ausgabe))
                        
g.stdin.write('se sam 1788\n')
g.stdin.write('sh sam\n')
print "ausgabe = ", ausgabe
thread.start_new_thread(threatbeenden, (g,))
print "Hauptprogramm fordert lock2 an."
lock2.acquire()
print "Hauptprogramm hat lock2 erhalten."
print "ausgabe = ", ausgabe
lock2.release()
print "Hauptprogramm hat lock2 geloest."
Erst als ich die Zeile 27 (print ausgabe) eingefügt habe war mir klar, dass ich das Problem "im Prinzip" schon eine Weile gelöst hatte. Der Code funktioniert allerdings nur in idle richtig, auf der Konsole nicht.

Danke noch einmal für die hilfreichen Tipps!
BlackJack

``global`` brauchst Du hier nicht. Und wenn es auf der Konsole nicht funktioniert, dann ist es offensichtlich nicht "threadsafe".

Ich hab's jetzt nicht nachvollzogen weil ich zu faul und bei kurzem Blick etwas verwirrt war, aber Du solltest die Liste IMHO nicht für zwei verschiedene Sachen benutzen. Das Kennzeichen für's Ende sollte nicht über die Liste kommen.

Um auf einen Thread zu warten gibt's übrigens die `join()`-Methode.
Willi
User
Beiträge: 8
Registriert: Freitag 2. März 2007, 21:43

``global`` brauchst Du hier nicht.
tatsächlich... :o
Und wenn es auf der Konsole nicht funktioniert, dann ist es offensichtlich nicht "threadsafe".
Der erste Code läuft sowohl in idle als auch in der Konsole "richtig", jedenfalls konnte ich noch nichts gegenteiliges feststellen.
Der zweite Code stellt eigentlich nur mein Lernvehikel dar, um zu verstehen welcher Prozess wann zum Zuge kommt. Hätte ich vielleicht etwas deutlicher und straffer darstellen sollen.
Ich hab's jetzt nicht nachvollzogen weil ich zu faul und bei kurzem Blick etwas verwirrt war, aber Du solltest die Liste IMHO nicht für zwei verschiedene Sachen benutzen. Das Kennzeichen für's Ende sollte nicht über die Liste kommen.
Ich geb' zu, das ist ein wunder Punkt meiner Lösung. Gefällt mir auch nicht so ganz...
Um auf einen Thread zu warten gibt's übrigens die `join()`-Methode.
Wenn ich die Library-Reference richtig verstehe, kann ich mit join() nur das Ende eines Threads abwarten. Mit dem zweiten Lock wolltze ich nur sicherstellen, dass der Thread das erste Lock wirklich erhalten hat bevor das Hauptprogramm die Abbruchbedingung erhalten hat. Das Ende muss ich jedoch gezielt von außen einleiten, weil der readline-Befehl in der while-Schleife am derzeitigen Ende der Pipe "hängt", also auf weitere Zeilen wartet. Schließe ich einfach die Pipe, so bekomme ich eine Fehlermeldung.
Verwende ich als Abbruchbedingung der while-Schleife ein Event wie in meinem zweiten Posting, so kann ich nicht ganz sicher sein, dass die Pipe wirklich bis zum Ende ausgelesen wurde. Ich löse nur einen zusätzlichen Schleifendurchlauf aus, wenn der readline-Befehl am Ende der Pipe hängt. Anderenfalls stoppe ich das Auslesen miten in der Pipe. Oder hab ich da was übersehen?
Benutzeravatar
HWK
User
Beiträge: 1295
Registriert: Mittwoch 7. Juni 2006, 20:44

Vielleicht noch ein paar Anmerkungen:
Globale Variablen sind bei den wahren Pythoniern sowieso nicht so gern gesehen. Du könntest die locks ja als Argumente übergeben. Schau Dir auch mal den Style-Guide an: http://www.python.org/dev/peps/pep-0008/.
Noch eins zur Funktionalität: Bei längeren Aufträgen möchtest Du mit der Ausgabe ja wahrscheinlich nicht bis zum Ende warten. Du könntest vielleicht noch in die while-Schleife bei Zeile 12 Code einfügen, der dem Hauptthread über eine Event-Instanz (z.B. ausgabe_flag.set()) signalisiert, dass eine Ausgabe erfolgen sollte. Der Hauptthread könnte seine Eingaben in einer Schleife mache und bei jedem Durchlauf abfragen, ob eine Ausgabe vorhanden ist. Du müsstest dann aber noch die Locks anpassen.
Ich habe selbst mal einen Versuch unternommen. Ich kann es unter Windows aber leider nicht testen, da bei pgnuplot.exe die Pipes nur in eine Richtung funktionieren. Vielleicht kannst Du es ja unter Linux überprüfen.

Code: Alles auswählen

import subprocess
import threading
#import time


class GnuPlot:
    def __init__(self, commands):
        self.ausgabe_flag = threading.Event()
        self.lock = threading.Lock()

        self.ausgabe = []
        self.ausgabe_flag.clear()

        self.g = subprocess.Popen('/usr/bin/gnuplot',
        #self.g = subprocess.Popen('C:\\Download\\gnuplot\\bin\\pgnuplot.exe',
                                  stdin=subprocess.PIPE,
                                  stdout=subprocess.PIPE,
                                  stderr=subprocess.STDOUT)

        self.thread_ = threading.Thread(target=self.pipe_lesen)
        self.thread_.start()

        for command in commands:
            self.g.stdin.write(command + '\n')
            #time.sleep(2) # Zum Testen
            self.ausgeben()

        ret = self.thread_beenden()
        self.ausgeben()
        print 'Ergebnis von thread_beenden: \n', ret
        
    def pipe_lesen(self) :
        while True:
            answer = self.g.stdout.readline()
            if answer == 'enDE\n': # Pipe wird bis zum Ende ausgelesen
                break
            if answer:
                self.lock.acquire()
                self.ausgabe.append(answer)
                self.lock.release()
                self.ausgabe_flag.set()

    def thread_beenden (self):
        self.g.stdin.write('print "enDE"\n') # Ende der Pipe markieren
        self.thread_.join()
        return self.g.communicate()

    def ausgeben(self):
        if self.ausgabe_flag.isSet():
            self.lock.acquire()
            print ''.join(self.ausgabe)
            self.ausgabe = []
            self.lock.release()
            self.ausgabe_flag.clear()


if __name__ == '__main__':
    commands = ('se sam 1788', 'sh sam')
    GnuPlot(commands)
MfG
HWK
Zuletzt geändert von HWK am Sonntag 4. März 2007, 23:08, insgesamt 2-mal geändert.
Willi
User
Beiträge: 8
Registriert: Freitag 2. März 2007, 21:43

Hallo HWK,

sehr gerne hab ich Deinen Code getestet: Bis auf die print-Anweisung in Zeile 30 (liefert den Fehler
" print 'Ergebnis von threat_beenden: \n%s' % ret
TypeError: not all arguments converted during string formatting".
läuft er perfekt. Zeile 30 hab ich abgeändert zu

Code: Alles auswählen

print 'Ergebnis von threat_beenden: \n', ret
Mit und ohne die sleep-Anweisung funktioniert der Code "richtig", d.h. er liefert die Ausgabe
sampling rate is 1788, 1788


Ergebnis von threat_beenden:
('', None)
Und von dem Umgang mit Events und Locks muss ich mir noch ein oder zwei Scheiben abschneiden.

Dass allerdings die Windows-Version von gnuplot keine Ausgabe-Pipe bereitstellt ist ist eine echte Hiobsbotschaft: Ich will ein paar kleine GUIs schreiben, die auch unter Windows laufen. Und wie es aussieht muss ich dafür am Ende doch auf das Output von gnuplot verzichten.

Auf jeden Fall hat es Spaß gemacht, so weit vorzudringenund vielleicht unterstützt ja eine spätere Version von Gnuplot auch Ausgabepipes.

Gruß, Willi
Benutzeravatar
HWK
User
Beiträge: 1295
Registriert: Mittwoch 7. Juni 2006, 20:44

Hallo, Willi!
Zum Ausprobieren habe ich mir ein kleines Script geschrieben, was lediglich die Eingaben wieder ausgibt:

Code: Alles auswählen

# File return.py

import sys

while True:
    input_ = sys.stdin.readline()
    sys.stdout.write(input_)
    sys.stdout.flush()
    if 'enDE' in input_:
        break
Zeile 35 habe ich folgendermaßen geändert:

Code: Alles auswählen

            if 'enDE' in answer: # Pipe wird bis zum Ende ausgelesen
Der Aufruf erfolgt dann mit:

Code: Alles auswählen

        self.g = subprocess.Popen('C:\\Programme\\Python24\\python.exe '
                                  'return.py',
                                  stdin=subprocess.PIPE,
                                  stdout=subprocess.PIPE,
                                  stderr=subprocess.STDOUT)
Jetzt habe ich versuchsweise die "Lock-Zeilen" (38, 40, 50, 53) auskommentiert. Und siehe da: Unter Windows läuft es trotzdem problemlos. Willst Du das vielleicht unter Linux auch mal probieren? Und wenn das klappt, geht es ja mit GNU-Plot vielleicht doch ohne Locks.
MfG
HWK

Edit: Auch ausgabe_flag kann ich in eine normale, nicht Thread-sichere "Variable" ändern, ohne dass es zu Problemen kommen würde.
Willi
User
Beiträge: 8
Registriert: Freitag 2. März 2007, 21:43

Hallo HWK,

Dein Skript funktioniert mit den auskommentierten Lock-Zeilen und Gnuplot einwandfrei, die Locks scheinen auch unter Linux entbehrlich. Allerdings bekomme ich Deine Lösung mit dem Skript return.py zumindest nicht auf die Schnelle zum Laufen weil laut Fehlermeldung Schreibrechte auf die Pipe fehlen. Aber damit muss ich mich noch mal in Ruhe auseinandersetzen.

Die Variable ausgabe_flag kann ich auch in eine INT-Variable mit den Werten 0 und 1 umdefinieren, mit Gnuplot läuft es ebenfalls fehlerfrei.
Wenn ich das soweit richtig verstehe, ist die Zeile 45:

Code: Alles auswählen

self.thread_.join()
in der Definition von self.thread_beenden wesentlich und leistet das gleiche wie die zwei Locks in meinem Skript. Selbst wenn ich ausgabe_flag ganz herausnehme, läuft es ohne Fehler durch.

Gruß, Willi
Benutzeravatar
HWK
User
Beiträge: 1295
Registriert: Mittwoch 7. Juni 2006, 20:44

Komisch, komisch!
Hat dafür jemand eine Erklärung?
Hast Du übrigens zum Probieren ein sleep drin gehabt, so dass Ausgaben auch schon vor Thread-Ende erfolgen konnten?
MfG
HWK
Willi
User
Beiträge: 8
Registriert: Freitag 2. März 2007, 21:43

Erstmal 'tschulldigung dass die Antwort so lange gedauert hat, aber ich war beruflich und privat unterwegs die letzten Tage...

Also, ich hab es zuerst ohne das sleep getestet und jetzt noch einmal mit dem sleep. Zumindest bei einer Handvoll Durchläufe liefert es mit dem sleep immer das gleiche Ergebnis wie ohne sleep.

Zusätzlich habe ich "commands" in Zeile 58 noch um einen Ausgabebefehl erweiitert:

Code: Alles auswählen

commands = ('se sam 1788','pr "Eine Ausgabe"', 'sh sam')
Damit lässt er sich nach jedem der drei Befehle zwei Sekunden Zeit, liefert aber definitiv das richtige Ergebnis ab.

Gruß, Willi
Benutzeravatar
HWK
User
Beiträge: 1295
Registriert: Mittwoch 7. Juni 2006, 20:44

Danke für die Rückkopplung.
Man müsste sich abschließend mal die Unterschiede zwischen erster und letzter Variante anschauen, um zu sehen woran es anfangs gescheitert ist.
MfG
HWK
Willi
User
Beiträge: 8
Registriert: Freitag 2. März 2007, 21:43

Ich versuche es mal, die Ursachen zusammenzufassen, so wie sie sich mir darstellen.

Das Problem war, dass das Hauptprogramm und der Thread unterschiedlich schnell und in unvorhersagbarer Reihenfolge ablaufen. So schloss die Funktion "threatbeenden" die pipe u.U. noch bevor der Thread lesend darauf zugreifen konnte. Im Debugger lief das Programm linear ab, so dass das Programm dort funktionierte. (siehe das Listing in meinem ersten Posting)

Dafür fanden sich zwei Lösungen:

1. Verwendung von zwei Lock-Objekten:

Code: Alles auswählen

import subprocess
import thread
from threading import Lock

lock1=Lock()
lock2=Lock()

def pipelesen(pipe, liste) :
    lock1.acquire()
    lock2.release() #Hauptprogramm wartet, bis Thread lock1 erhalten hat
    while liste[-1]<>'enDE\n':  #Pipe wird bis zum Ende ausgelesen
        liste.append(pipe.readline())
    lock1.release()
       
def threatbeenden (objekt):
    objekt.stdin.write('print "enDE"\n') #Ende der Pipe markieren
    lock1.acquire()
    print "Ergebnis von threatbeenden(g):"
    print objekt.communicate()
    lock1.release()

ausgabe=['Starteintrag'] #noetig fuer Bedingung von while in pipelesen()

g=subprocess.Popen('/usr/bin/gnuplot',
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT)

lock2.acquire()
thread.start_new_thread(pipelesen,
                        (g.stdout, ausgabe))
lock2.acquire()  #Hauptprogramm wartet, bis Thread lock1 erhalten hat
g.stdin.write('se sam 1788\n')
g.stdin.write('sh sam\n')
print "ausgabe = ", ausgabe  #liefert meist nur  ['Starteintrag']
threatbeenden(g)
print "ausgabe = ", ausgabe  #liefert immer komplette Ausgabe
lock2.release() 
Der Thread erhält "lock1" und gibt es erst wieder ab, wenn die Pipe fertig ausgelesen ist, erst danach darf die Funktion "threatbeenden" die Pipe schließen. Damit wirklich der Thread den Lock erhält bevor das Hauptprogramm den Thread beenden und die Pipe schließen kann, braucht es ein weiteres Lock-Objekt: "lock2" sorgt dafür, dass das Hauptprogramm nach dem Anstoßen des Threadserst dann weiterläuft wenn der Thread "lock1" erhalten hat.

2. Die elegnatere Lösung benutzt die join()-Methode von Thread-Objekten der Klasse Threading:

Code: Alles auswählen

 import subprocess
import threading
#import time


class GnuPlot:
    def __init__(self, commands):
        self.ausgabe_flag = threading.Event()
        self.lock = threading.Lock()

        self.ausgabe = []
        self.ausgabe_flag.clear()

        self.g = subprocess.Popen('/usr/bin/gnuplot',
        #self.g = subprocess.Popen('C:\\Download\\gnuplot\\bin\\pgnuplot.exe',
                                  stdin=subprocess.PIPE,
                                  stdout=subprocess.PIPE,
                                  stderr=subprocess.STDOUT)

        self.thread_ = threading.Thread(target=self.pipe_lesen)
        self.thread_.start()

        for command in commands:
            self.g.stdin.write(command + '\n')
            #time.sleep(2) # Zum Testen
            self.ausgeben()

        ret = self.thread_beenden()
        self.ausgeben()
        print 'Ergebnis von thread_beenden: \n', ret
       
    def pipe_lesen(self) :
        while True:
            answer = self.g.stdout.readline()
            if answer == 'enDE\n': # Pipe wird bis zum Ende ausgelesen
                break
            if answer:
                self.lock.acquire()
                self.ausgabe.append(answer)
                self.lock.release()
                self.ausgabe_flag.set()

    def thread_beenden (self):
        self.g.stdin.write('print "enDE"\n') # Ende der Pipe markieren
        self.thread_.join()
        return self.g.communicate()

    def ausgeben(self):
        if self.ausgabe_flag.isSet():
            self.lock.acquire()
            print ''.join(self.ausgabe)
            self.ausgabe = []
            self.lock.release()
            self.ausgabe_flag.clear()


if __name__ == '__main__':
    commands = ('se sam 1788', 'sh sam')
    GnuPlot(commands) 
Vor dem Schließen der Pipe muss die im Hauptprogramm angesiedelte Funktion "threatbeenden" warten, bis der Thread die Pipe zuende ausgelesen hat.

Hauptprogramm und Threat greifen zum Teil auf die gleichen Datenobjekte zu. Ich konnte jedoch keine Fehler feststellen, speziell keine Fehler die darauf schließen lassen: Wenn ich in dem Listing "self.lock" und "self.ausgabe_flag" komplett auskommentiere, liefert das Programm immer noch reproduzierbar das gleiche, erwartete Ergebnis. (Was natürlich nicht heißt, dass das überall so sein muss, das kann ich schlicht nicht beurteilen.)
Benutzeravatar
HWK
User
Beiträge: 1295
Registriert: Mittwoch 7. Juni 2006, 20:44

Das Warten auf das Thread-Ende scheint die Erklärung zu sein. Der gemeinsame Zugriff auf die Variabeln ist wahrscheinlich wirklich kein Problem, solange ein Thread nur lesend und der andere nur schreibend darauf zugreift. Jetzt gibt es wohl in Zukunft bei der Thread-Programmierung ein Problem weniger.
MfG
HWK
Antworten