Zwischen subprocesse in Threads switchen

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
jb_alvarado
User
Beiträge: 55
Registriert: Mittwoch 11. Juli 2018, 11:11

Hallo Allerseits,

ich versuche folgendes zu lösen:
  • 1. subprocess läuft im main Thread durchgehend.
  • 2. subprocess läuft in Thread und schickt Pakete von stdout in Queue, welche im main Thread gelesen werden soll und dort an stdin vom 1. subprocess gepipet wird.
  • 3. subprocess läuft in weiteren Thread.
  • Nach einem Ereignis (in meinem Codebeispiel wenn couter auf 70000 angelangt ist) soll der 2. subprocess gestoppt werden und der 3. subprocess gestartet werden und Pakete in gleiche Queue schicken um damit den 1. subprocess zu füttern.

Das funktioniert soweit auch prinzipiell, wenn ich allerdings vom 3. subprocess wieder auf den 2. subprocess wechseln möchte, bleibt meine while Schleife stehen.

Hier ist dazu der Code den ich provisorisch dafür zusammen gezimmert habe:

Code: Alles auswählen

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from threading import Thread
from subprocess import Popen, PIPE
from queue import Queue


SOURCES = [
    "/Users/jonathan/DEV/watch/intro.mp4",
    "/Users/jonathan/DEV/watch/abenteuer.mp4",
    "/Users/jonathan/DEV/watch/kontinent.mp4",
    "/Users/jonathan/DEV/watch/terra1.mp4",
    "/Users/jonathan/DEV/watch/terra2.mp4",
    "/Users/jonathan/DEV/watch/terra3.mp4",
    "/Users/jonathan/DEV/watch/test.mp4"
]


class Player(Thread):
    def __init__(self, buffer):
        Thread.__init__(self)

        self._buffer = buffer
        self.decoder = None
        self.index = 0
        self.is_running = True

    def run(self):
        self.play()

    def play(self):
        while self.index < len(SOURCES) - 1 and self.is_running:
            self.decoder = Popen([
                'ffmpeg', '-hide_banner', '-v', 'error', '-nostats',
                '-i', SOURCES[self.index], '-r', '25', '-s', '1024x576',
                '-c:v', 'mpeg2video', '-intra', '-b:v', '50M',
                '-minrate', '50M', '-maxrate', '50M', '-bufsize', '50M',
                '-c:a', 's302m', '-strict', '-2', '-ar', '48k',
                '-ac', '2', '-f', 'mpegts', '-'], stdout=PIPE)

            print('Play:', SOURCES[self.index])

            for data in iter(self.decoder.stdout.readline, ''):
                if not data:
                    break

                self._buffer.put(data)

            self.index += 1
        else:
            self.index = 0

    def next(self):
        self.is_running = True
        self.play()

    def stop_decoder(self):
        if self.decoder is not None:
            self.decoder.terminate()
            self.is_running = False
            self.decoder = None


class Inject(Thread):
    def __init__(self, buffer):
        Thread.__init__(self)

        self._buffer = buffer
        self.decoder = None
        self.index = 0
        self.is_running = True

    def run(self):
        self.play()

    def play(self):
        if self.is_running:
            self.decoder = Popen([
                'ffmpeg', '-hide_banner', '-nostats', '-v', 'error',
                '-i', SOURCES[6], '-r', '25', '-s', '1024x576',
                '-c:v', 'mpeg2video', '-intra', '-b:v', '50M',
                '-minrate', '50M', '-maxrate', '50M', '-bufsize', '50M',
                '-c:a', 's302m', '-strict', '-2', '-ar', '48k', '-ac', '2',
                '-f', 'mpegts', '-'], stdout=PIPE)

            for data in iter(self.decoder.stdout.readline, ''):
                if not data:
                    self._buffer.put(None)
                    break

                self._buffer.put(data)

            self.stop_decoder()

    def next(self):
        self.is_running = True
        self.play()

    def stop_decoder(self):
        if self.decoder is not None:
            self.decoder.terminate()
            self.is_running = False
            self.decoder = None


def main():
    buffer = Queue(maxsize=1024)
    try:
        encoder = Popen([
            'ffplay', '-v', 'error', '-hide_banner', '-nostats',
            '-i', 'pipe:0'], stderr=None, stdin=PIPE, stdout=None)

        play_thread = Player(buffer)
        play_thread.daemon = True
        play_thread.start()

        inject_thread = Inject(buffer)
        inject_thread.daemon = True

        counter = 0

        while True:
            data = buffer.get()
            print("Counter: {}".format(counter), end="\r")
            counter += 1
            if data is None:
                play_thread.next()
            else:
                encoder.stdin.write(data)

            if counter == 70000:
                print("switch source")
                play_thread.stop_decoder()

                if inject_thread.is_alive():
                    inject_thread.next()
                else:
                    inject_thread.start()

                counter = 0

    finally:
        encoder.wait()


if __name__ == '__main__':
    main()
Könnt ihr mir hier auf die Sprünge helfen?

Die Inject Klasse soll später mal durch einen rtmp Server ersetzt werden, der denn übers www sein Signal bekommen soll. Daher der Aufwand.
Benutzeravatar
__blackjack__
User
Beiträge: 13077
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@jb_alvarado: `Player.run()` lässt sich einfacher definieren wenn man das nach der `play()`-Methode definiert:

Code: Alles auswählen

    def run(self):
        self.play()
        
    # ->
    
    run = play
In der `play()`-Methode gibt es ein ``else`` zu der ``while``-Schleife das keinen Sinn macht. Das ``self.index = 0`` sollte einfach so nach der Schleife stehen. Beziehungsweise vielleicht auch nicht, denn das macht nicht viel Sinn, denn das wird ja nirgends verwendet, dass das wieder auf 0 gesetzt wird, kurz bevor der Thread endet.

Das das lezzte Element von `SOURCES` nicht verwendet wird ist Absicht? Falls ja, sollte man kommentieren warum das so sein soll, damit sich Leser an der Stelle nicht fragen müssen ob das ``- 1`` ein Fehler ist oder nicht.

Der Test auf ``if not data:`` ist überflüssig wenn man beim `iter()` nicht einen Fehler beim zweiten Argument gemacht hätte. Das ist der Wert bei dem das erste Argument nicht mehr aufgerufen werden soll. Und bei `bytes` ist der Vergleich auf eine leere Zeichenkette *immer* unwahr, da hätte man also ein leeres `bytes`-Objekt angeben sollen/wollen (``b''``). Allerdings, und ich glaube das Thema hatten wir schon mal, macht das Lesen von *Zeilen* bei *Binärdaten* sowieso keinen Sinn. Du möchtest hier Blöcke mit einer festgelegten Grösse verarbeiten.

Die `next()`-Methode ist falsch, jedenfalls wenn Du von einem anderen Thread aus aufrufst, aber möchtest, das die `play()`-Methode dann *nicht* in dem anderen Thread ausgeführt wird. Das macht so wie's da steht keinen Sinn und dürfte für das Stehenbleiben der ``while``-Schleife im Hauptprogramm verantwortlich sein, weil so ein `next()`-Aufruf erst zurückkehrt wenn `play()` komplett abgearbeitet ist.

So wie das Programm momentan geschrieben ist, kann es passieren das `inject_thread.start()` mehr als einmal gestartet wird – das geht nicht, Threads kann man nur einmal starten.
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
jb_alvarado
User
Beiträge: 55
Registriert: Mittwoch 11. Juli 2018, 11:11

Danke __blackjack__ für die Antwort,
babe versucht deine Ratschläge um zusetzten. Stimmt, das letzte mal hatten wir das schon mal mit dem readline, danach hatte ich das auch geändert (https://github.com/ffplayout/ffplayout- ... t.py#L1100), auch hatte ich mich da von dem Thread verabschiedet. Habe das jetzt wieder auf *.read() gestellt...

Hänge jetzt noch an dem .next(). Du meinst das wird dann im Mainthread ausgeführt? Wie kann ich dann dem Thread mitteilen dass er seine Arbeit wieder aufnehmen soll? Warum wird denn next() nicht im gleichen Kontext ausgeführt?

Auch sagst du, dass unter Umständen der inject_thread mehrmals gestartet wird, reicht da eine Abfrage mit .is_alive() nicht?
Benutzeravatar
__blackjack__
User
Beiträge: 13077
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@jb_alvarado: Ja, wenn Du `next()` im Hauptthread aufrufst, wird das auch im Hauptthread ausgeführt. Ich weiss jetzt nicht wie ich das „warum“ beantworten soll. Das ist halt so. Im Grund die einzig ”magische” Methode die etwas in einem anderen Thread ausführt ist `Thread.start()`. Alle anderen Aufrufe werden in dem Thread ausgeführt in dem sie auch aufgerufen werden.
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
__deets__
User
Beiträge: 14528
Registriert: Mittwoch 14. Oktober 2015, 14:29

@__blackjack__: du meinst eher Thread.run, oder? Die wird (nach Aufruf von start aus einem anderen Thread natuerlich) in dem neuen, nebenlaeufigen OS thread ausgefuehrt. Das war's aber auch. Das Python da Thread-Objekte hat ist etwas ungluecklich, und einem einer vergangenen, javaesken API-Phase der Standardbibliothek geschuldet.
Benutzeravatar
__blackjack__
User
Beiträge: 13077
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@__deets__: Nee, ich meine schon `start()`. Nur in dieser Methode passiert etwas magisches, das dafür sorgt das sich der ”Faden” des Programmablaufs teilt und einer direkt nach dem Aufruf weiterläuft und einer gleichzeitig in der aufgerufenen `run()`-Methode weiterläuft. Wenn man selbst die `run()` aufruft passiert nix magisches, das bleibt der gleiche Thread, genau wie bei allen anderen Methodenaufrufen.
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
Sirius3
User
Beiträge: 17741
Registriert: Sonntag 21. Oktober 2012, 17:20

@__deets__: manchmal ist es ganz praktisch, ein Thread-Objekt zu haben, das man fragen kann, ob es noch läuft und das man joinen kann. Dass jemand auf die Idee kommt, von Thread zu erben, liegt wohl daran, dass zu oft Praktiken von Java übernommen werden. Die Methoden-Altlasten hätte man mit Umstieg auf Python3 eigentlich aufräumen können.
__deets__
User
Beiträge: 14528
Registriert: Mittwoch 14. Oktober 2015, 14:29

Das stimmt, ich hätte das aber als Rückgabewert gemacht. Ähnlich subprocess.run.
jb_alvarado
User
Beiträge: 55
Registriert: Mittwoch 11. Juli 2018, 11:11

Ok ich bedanke mich für die Klarstellung, wird mir auch in Zukunft weiterhelfen!

Ich habe jetzt die while Schleife im Player Thread umgebaut nach:

Code: Alles auswählen

while True:
    if self.is_running:
        self.decoder = Popen(cmd, stdout=PIPE)

        while self.decoder.poll() is None:
            data = self.decoder.stdout.read(65424)
            self._buffer.put(data)

        self.index += 1
    else:
        time.sleep(0.5)
Und im Mainthread setzte ich dann nur noch is_running:

Code: Alles auswählen

while True:
    data = buffer.get()
    print("Counter: {}".format(counter), end="\r")
    counter += 1
    if data is None:
        play_thread.is_running = True
    else:
        encoder.stdin.write(data)

    if counter == 200:
        print("switch source")
        play_thread.stop_decoder()
        inject_thread.start()
Benutzeravatar
sls
User
Beiträge: 480
Registriert: Mittwoch 13. Mai 2015, 23:52
Wohnort: Country country = new Zealand();

@jb_alvarado: sind `Inject` und `Player` wirklich sinnvolle Klassen? Ich sehe da keinen Vorteil, auch nicht dass sie von Thread erben. Bis auf `play` sind beide identisch, das kann man sicherlich vereinfachen.
When we say computer, we mean the electronic computer.
Benutzeravatar
__blackjack__
User
Beiträge: 13077
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@jb_alvarado: Das „busy waiting“ ist unschön, da würde man eher etwas aus dem `threading`-Modul zum Synchronisieren nehmen.
„All religions are the same: religion is basically guilt, with different holidays.” — Cathy Ladman
jb_alvarado
User
Beiträge: 55
Registriert: Mittwoch 11. Juli 2018, 11:11

@sls, hatte mir noch keine Gedanken über Optimierung gemacht, aber stimmt man kann da sicher noch was verbessern. Später würde die Inject Klasse eh anders aussehen, weil dort ein rtmp Server drin laufen würde. Auch die Player Klasse existiert so nur aus Testzwecken. Ich denke es wäre auch kein Problem die Playerfunktion in dem Mainthread zu holen und nur noch Inject in einem extra Thread.

@__blackjack__, meinst du so was wie .acquire() ? Kannte das noch nicht, muss ich mir näher anschauen.
Antworten