Multiprocessing Pool Beenden von Prozessen nach PID und Timeout

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
Patrick1990
User
Beiträge: 136
Registriert: Freitag 3. Juni 2016, 05:45

Hallo Leute,
ich hänge mal wieder an einer Sache und komme nicht so recht weiter.

Ich habe hier beispielhaft einen Code, welcher ein Programm drei mal parallel öffnet, dich die PID dieser Programme speichert und auch die Zeit, die das Programm benötigt, bis es wieder geschlossen wird.
Das Beispiel ist stark abstrahiert.
Zum testen müsste dieser Code dann bei Euch noch leicht angepasst werden.

Folgendes habe ich vor:
Ich möchte zunächst erstmal eins der drei Programme öffnen und die Zeit bis zum Schließen messen.
Danach möchte ich diese Zeit sichern und als Referenz für alle folgenden (parallelen) Prozesse der gleichen Art (+ etwas Toleranz) nutzen.
Benötigt eines der Programme viel mehr Zeit bis zum schließen, soll dieses dann geschlossen werden und erneut gestartet werden.
Ich übergebe dabei jedem der Programme einen Satz von Inputparametern. Diese könnte ich ja duch die gespeicherte PID wieder zurückverfolgen.

Habe Ihr eine Idee dazu oder wisst wie es evtl sogar besser geht?

Vielen Dank im Voraus.
Patrick

Code: Alles auswählen


from multiprocessing import Pool,Manager
import psutil
import subprocess
import time
import os




class Test():
    
    def __init__(self, max_cpu):
        
        self.max_cpu = max_cpu
        
        manager = Manager()
        self.shared_list = manager.list()
        
        self.execute_processes()
        
        
        
        
    def worker(self, liste, process):
        start = time.time()
        
        process2 = subprocess.Popen(process) 
        current_process = psutil.Process(pid=process2.pid)
        if current_process.name() == "ansysedt.exe":
            pid=process2.pid
        process2.communicate()
        end = time.time()
        liste.append([pid,end-start])
        
        
        
    def execute_processes(self):
        
        cmd_1 = r'C:\Program Files\AnsysEM\AnsysEM19.5\Win64\ansysedt.exe'
        cmd_2 = r'C:\Program Files\AnsysEM\AnsysEM19.5\Win64\ansysedt.exe'
        cmd_3 = r'C:\Program Files\AnsysEM\AnsysEM19.5\Win64\ansysedt.exe'
        
        processes = [cmd_1,cmd_2,cmd_3]        
        
        pool = Pool(self.max_cpu)

        for process in processes:
            result = pool.apply_async(self.worker, args=(self.shared_list,process,))
            
        pool.close()
        pool.join()
        
        print("Shared_list:" + str(self.shared_list))
        
        print("Ende")
        os.system("pause")
        

        
def main():
    print("Beginn der Optimierung")
    test=Test(2)  
    print("Ende der Optimierung")

if __name__ == '__main__':                                                      
    main()
Benutzeravatar
__blackjack__
User
Beiträge: 14052
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@Patrick1990: Dir sind klar das PIDs nicht eindeutig sind, sondern nur solange der Prozess auch tatsächlich läuft‽ Wenn der fertig ist kann durchaus ein anderer Prozess die gleiche PID bekommen. Ist wie mit `id()` und Objekten in Python.
“Vir, intelligence has nothing to do with politics!” — Londo Mollari
Benutzeravatar
__blackjack__
User
Beiträge: 14052
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@Patrick1990: In der `__init__()` sollte ein Objekt in einen Zustand versetzt werden in dem es der Erseteller benutzen kann. Wenn über die `__init__()` Code abläuft der am Ende ein Nutzloses Objekt hinterlässt, hat man etwas falsch gemacht. `worker()` ist keine Methode sondern nur eine Funktion. Wenn man das aus der Klasse heraus zieht wird die Klasse noch sinnfreier. Das ist auch einfach eine Funktion. Warum steht das da so kompliziert?

Der Rückgabewert von `apply_async()` wird nicht verwendet, also braucht man den auch nicht an einen Namen binden.

Zeichenkettenliterale und Werte mit ``+`` und `str()` zusammenstückeln ist eher BASIC als Python. In Python gibt es dafür Zeichenkettenformatierung mit der `format()`-Methode und ab Python 3.6 f-Zeichenkettenliterale.

``os.system("pause")`` sollte da nicht stehen. `os.system()` sollte man nicht verwenden. Dessen Dokumentation weist ja auch auf das `subprocess`-Modul hin. Und "pause" ist Windows-spezifisch. Zudem sollte das auch gar nicht nötig sein wenn man Konsolenprogramme wie das vorgesehen ist aus einer laufenden Konsole heraus startet.

Für Zeitmessungen sollte man `time.monotonic()` verwenden, weil da garantiert ist, dass diese Zeit immer nur in eine Richtung läuft.

Was hat die `2` im Namen `process2` zu suchen?

Das was da mit `psutil` gemacht wird sieht sehr unsinnig und vor allem auch fehlerhaft aus. Natürlich ist der "ansysedt.exe" wenn man das Programm "ansysedt.exe" startet. Sollte das *nicht* der Name sein, dann gibt's auch eine Ausnahme weil dann `pid` nicht definiert wird.

Da bleibt dann das hier (ungetestet):

Code: Alles auswählen

#!/usr/bin/env python3
import subprocess
import time
from multiprocessing import Manager, Pool

COMMAND = r"C:\Program Files\AnsysEM\AnsysEM19.5\Win64\ansysedt.exe"


def worker(liste, process):
    start = time.monotonic()
    process = subprocess.Popen(process)
    process.communicate()
    end = time.monotonic()
    liste.append((process.pid, end - start))


def execute_processes(max_cpu):
    shared_list = Manager().list()
    processes = [COMMAND] * 3
    pool = Pool(max_cpu)
    for process in processes:
        pool.apply_async(worker, args=(shared_list, process))
    pool.close()
    pool.join()

    print(f"Shared_list: {shared_list}")
    print("Ende")


def main():
    print("Beginn der Optimierung")
    execute_processes(2)
    print("Ende der Optimierung")


if __name__ == "__main__":
    main()
Wobei wie gesagt die Aussagekraft vom `pid`-Wert begrenzt ist. Da kann bei einem Einprozessorsystem bis zu drei mal der gleiche Wert stehen, was sogar bei einem System mit mehr als einem Prozessor unter Umständen passieren kann.

Ich finde es auch komisch die Liste zwischen den Prozessen zu teilen wenn man dafür auch den Rückgabewert von den Workern verwenden könnte.
“Vir, intelligence has nothing to do with politics!” — Londo Mollari
Patrick1990
User
Beiträge: 136
Registriert: Freitag 3. Juni 2016, 05:45

__blackjack__ hat geschrieben: Mittwoch 19. Februar 2020, 16:58 @Patrick1990: Dir sind klar das PIDs nicht eindeutig sind, sondern nur solange der Prozess auch tatsächlich läuft‽ Wenn der fertig ist kann durchaus ein anderer Prozess die gleiche PID bekommen. Ist wie mit `id()` und Objekten in Python.
Ist mir klar,
ist ja auch so gewollt.
Wenn ich den Prozess starte lese ich die PID aus. Wenn nun der Prozess länger als erwartet aktiv ist kann ich ihn mit Hilfe der PID killen. So der Plan.


Die Klasse soll in diesem Fall auch gar keinen Sinn haben, genauso wie die Namen der Variablen. Ich habe einfach die nötigsten Codeschnipsel zusammenkopiert für ein Minimalbeispiel und nicht weiter abgeändert. Deshalb stehen dort auch einige Dinge, die so in diesem Zusammenhang keinen Sinn machen.

os.system(pause) steht auch nur dort, weil das Terminal Fenster in dem Fall offen bleibt und ich sehen kann was passiert ist - also zu Testzwecken.

Auch letzteres ist dem Minimalbeispiel geschuldet. Es werden eben nicht nur ansysedt.exe Prozesse gestartet, sondern auch andere. Ich möchte aber nur die ansysedt.exe Prozesse überwachen und notfalls killen. Deshalb habe ich da noch einmal unter allen gestarteten Prozessen die gesucht, die für mich nur relevant sind.

Ich werde das morgen mal probieren. Dankeschön :)
Benutzeravatar
__blackjack__
User
Beiträge: 14052
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@Patrick1990: Du packst in die Liste PIDs von Prozessen die garantiert nicht mehr laufen weil sie bereits vollständig durchgelaufen sind. Das kann so nicht gewollt sein, weil wie gesagt nicht sinnvoll. Mit den PIDs kann man zu dem Zeitpunkt nichts mehr anfangen.

Wenn man einen Prozess mit `subprocess.Popen` startet und dann mit dessen `pid` ein `psutils.Process`-Objekt erstellt, kann man übrigens besser gleich `psutils.Popen` verwenden, was von `psutils.Process` abgeleitet ist und ein `subprocess.Popen`-Objekt kapselt.

Ein Minimalbeispiel und eine unnötige Klasse widersprechen sich irgendwie. Wenn da eine unnötige Klasse drin ist, dann ist das Beispiel ja ganz offensichtlich nicht minimal.

``os.system("pause")`` hat auch zu Testzwecken da nix zu suchen. Erstens immer noch wegen `os.system()` was man nicht verwendet, sondern beispielsweise `subprocess.run()`, zweitens weil man auch das nicht verwendet wenn doch ein einfaches `input()` ohne externen Shell-Prozess und einem plattformunabhängigen ``pause``-Shell-Befehl den gleichen Effekt hätte, und drittens weil man Konsolenprogramme immer noch aus einer laufenden Konsole heraus starten sollte. Das Fenster geht nicht zu also braucht man da nicht etwas einbauen was zum Beispiel bei einer Ausnahme nicht mehr greift.

Du brauchst dann also eher keine PIDs sondern die Namen von Programmen die auf eine ungefähre Laufzeit abgebildet werden. Vielleicht mit `None` initialisiert wenn es noch keine Laufzeit gibt und dann immer mit der durchschnittlichen bisherigen Laufzeit aktualisiert an der man sich dann für's beenden orientieren kann. Und man braucht ein `Lock` beim aktualisieren/auslesen der durchschnittlichen Zeit, denn da wird ja potentiell nebenläufig drauf zugegriffen. Oder man lässt das mit dem `Manager` und den verteilten Datenstrukturen und verwaltet die Daten nur in dem Prozess der den `Pool` besitzt. Die maximale Laufzeit gibt man von dort als Argument mit und die tatsächliche Laufzeit gibt die Funktion als Rückgabewert zurück. Die kann man dann zum Beispiel per Rückruffunktion in das Wörterbuch welches Kommandonamen auf Laufzeiten abbildet einpflegen. Auch hier an ein `threading.Lock` denken.

`communicate()` braucht man übrigends auch nicht wenn man keine Ein- oder Ausgabeumleitung macht. Da kann man dann auch einfach `wait()` verwenden.

Statt mit `psutils` den Prozessnamen zu ermitteln kann man doch auch das Kommando verwenden. Dann spart man sich die zusätzliche Abhängigkeit zu `psutils`.
“Vir, intelligence has nothing to do with politics!” — Londo Mollari
Benutzeravatar
sparrow
User
Beiträge: 4538
Registriert: Freitag 17. April 2009, 10:28

Warum wird denn überhaupt mit der pid gearbeitet? Man bekommt beim Start mit subprocess doch ein Objekt, mit dem man direkt arbeiten kann und das viel mehr über sich weiß.
Patrick1990
User
Beiträge: 136
Registriert: Freitag 3. Juni 2016, 05:45

Ahhh tatsächlich, sorry. Da hatte ich einen Denkfehler mit den PIDs. Vielen Dank für die Anregungen.

Dann nennen wir es halt nicht Minimalbeispiel sondern Beispiel :D Aber ja, du hast recht.

Zu os.system(): Wieso gibt es das dann, wenn man es nicht verwendet? Wieso ist es in so vielen Python-Programmschnipseln im Internet zu finden?

Du brauchst dann also eher keine PIDs sondern die Namen von Programmen die auf eine ungefähre Laufzeit abgebildet werden. Vielleicht mit `None` initialisiert wenn es noch keine Laufzeit gibt und dann immer mit der durchschnittlichen bisherigen Laufzeit aktualisiert an der man sich dann für's beenden orientieren kann. Und man braucht ein `Lock` beim aktualisieren/auslesen der durchschnittlichen Zeit, denn da wird ja potentiell nebenläufig drauf zugegriffen. Oder man lässt das mit dem `Manager` und den verteilten Datenstrukturen und verwaltet die Daten nur in dem Prozess der den `Pool` besitzt. Die maximale Laufzeit gibt man von dort als Argument mit und die tatsächliche Laufzeit gibt die Funktion als Rückgabewert zurück. Die kann man dann zum Beispiel per Rückruffunktion in das Wörterbuch welches Kommandonamen auf Laufzeiten abbildet einpflegen. Auch hier an ein `threading.Lock` denken.

Danke für den Fahrplan. Nun ist das grpße Problem noch die Umsetzung. Kannst du mir dabei helfen?
Benutzeravatar
__blackjack__
User
Beiträge: 14052
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@Patrick1990: Wieso gibt's Rassismus/Sexismus/…? Findet man auch überall im Netz. Sollte man auch nicht verwenden. 😎 Zu Code aus dem Internet fand ich gerade das hier ganz lustig: https://twitter.com/Foone/status/1229641258370355200

`os.system()` ist letztlich die `system()`-Funktion die man von C kennt und die gibt es weil sie dummerweise mal in einen Standard geschrieben wurde. Und schon in der Dokumentation *davon* stehen Gründe sie in C nicht zu verwenden. Im `os`-Modul sind auch die ganzen anderen C-Funktionen zu finden, die mit Prozess starten/ersetzen zu tun haben (`exec*()`, `spawn*()`, `fork()`) und auch die sollte man nicht benutzen wenn das `subprocess`-Modul die Funktionalität bietet die man braucht. Das Zeug hat entweder Probleme oder ist ziemlich Low-Level und man kann leicht Fehler machen. `subprocess` nutzt das, aber das ist a) von Leuten geschrieben die Erfahrung haben, und b) schon sehr lange im Einsatz und damit getestet.

Ähnliches galt für `thread` vs. `threading` wo das `thread`-Modul ja mittlerweile nicht mehr öffentlich ist. Man sollte auch nichts aus `time` für das Rechnen mit Datumswerten verwenden → `datetime`-Modul. Oder `os.path` wo es jetzt das `pathlib`-Modul gibt. Zeichenkettenformatierung mit dem ``%`` würde ich auch nicht mehr machen wenn es keinen guten Grund gegen `format()` oder f-Zeichenkettenliterale gibt.
“Vir, intelligence has nothing to do with politics!” — Londo Mollari
Patrick1990
User
Beiträge: 136
Registriert: Freitag 3. Juni 2016, 05:45

Vielen Dank für die Aufklärung.
Stecke da leider nicht so tief drin, dass ich einen Überblick hätte was man aktuell nutzt oder nicht nutzt.

Könntest du mir zum obigen "Fahrplan" ein wenig mehr schreiben? Ich wüsste nun nicht, wie ich anfangen sollte.
Patrick1990
User
Beiträge: 136
Registriert: Freitag 3. Juni 2016, 05:45

Ich habe jetzt mal weiter gemacht, ist sicher nicht so wie du es meintest. Die Notwendigkeit eines Locks in meiner Ausführung habe ich jetzt nicht so herausgefunden.

Code: Alles auswählen

#!/usr/bin/env python3
import subprocess
import time
from multiprocessing import Manager, Pool

COMMAND = r"C:\Program Files\AnsysEM\AnsysEM19.5\Win64\ansysedt.exe"


def worker(timeout, lock, process):
    killed = False
    start = time.monotonic()
    process = subprocess.Popen(process)    
    try:
        outs, errs = process.communicate(timeout=timeout)
    except subprocess.TimeoutExpired:
        process.kill()
        outs, errs = process.communicate()
        killed = True
    end = time.monotonic()
    return [end-start,killed]    
    

def execute_processes(max_cpu):
    processes = [COMMAND] * 6
    lock = Manager().Lock()
    
    pool = Pool(2)
    results =[]
    processes_start = [processes[0],processes[1]]
    pool = Pool(2)#max_cpu
    for process in processes_start:
        result_temp = pool.apply_async(worker, (None, lock, process))
        results.append(result_temp)
    pool.close()
    pool.join()
        
    timeout = (results[0].get()[0]+results[1].get()[0])/2
    timeout = 1.5 * timeout
    
    results = []
    processes_rest = processes[2:]

    lock = Manager().Lock()
    pool = Pool(max_cpu)
    for process in processes_rest:
        result_temp = pool.apply_async(worker, (timeout, lock, process))
        results.append(result_temp)
    pool.close()
    pool.join()
    
    for result in results:
        print(result.get()[0],result.get()[1])
    

def main():

    print("Beginn der Optimierung")
    execute_processes(6)
    input()
    print("Ende der Optimierung")


if __name__ == "__main__":
    main()

Ich starte erst zwei Prozesse, messe die Zeit und Bilde einen Durchschnitt sowie gebe ich 50% zusätzlich drauf.
Alle weiteren Prozesse werden mit einem Timeout versehen. Dann stelle ich fest welcher Prozess gekillt wurde und welcher nicht.
Den Prozessen müsste ich nun ja noch Namen zuweisen.
Benutzeravatar
__blackjack__
User
Beiträge: 14052
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@Patrick1990: Ein `Lock` würde man brauchen wenn man das tatsächlich asynchron lösen würde. Dein Code synchronisiert ja die Rückgabewerte explizit in dem der Reihe nach `get()` aufgerufen wird. Was Du da jetzt zweimal hintereinander machst ist `Pool.map()` selbst zu erfinden, nur eben nicht als Methode/Funktion sondern in dem der ganze Code dafür zweimal dort hingeschrieben wurde.

Der erste `Pool` der erzeugt wird, wird nirgends verwendet. Ich sehe auch nicht warum man erst einen Pool mit 2 Workern erstellen sollte und danach dann einen mit 6 für den Rest, denn ein Pool mit 6 Workern kann natürlich auch nur zwei Aufgaben ausführen.

`outs` und `errs` in `worker` werden nirgends verwendet. Da keine Augaben umgeleitet wurden, sind die sowieso leer.

Auf einem gekillten Prozess kann man kein `communicate()` mehr aufrufen, das macht keinen Sinn. Ein `wait()` wäre an der Stelle vielleicht sinnvoll. Anstelle von `kill()` würde ich dort `terminate()` verwenden, es sei denn es ist tatsächlich *nötig* den Prozess hart zu beenden.

Der Rückgabewerte sollte ein Tupel und keine Liste sein. Listen enthalten in der Regel Elemente bei denen jedes die gleiche Bedeutung hat, also die Bedeutung nicht von der Position in der Liste abhängig ist. Ein `collections.namedtuple`-Typ würde den Code verständlicher machen, weil man dann keine magischen Indexwerte braucht.

Ungetestet:

Code: Alles auswählen

#!/usr/bin/env python3
import subprocess
import time
from collections import namedtuple
from functools import partial
from multiprocessing import Pool

COMMAND = r"C:\Program Files\AnsysEM\AnsysEM19.5\Win64\ansysedt.exe"

WorkerResult = namedtuple("WorkerResult", "runtime was_killed")


def worker(timeout, command):
    killed = False
    start = time.monotonic()
    process = subprocess.Popen(command)
    try:
        process.communicate(timeout=timeout)
    except subprocess.TimeoutExpired:
        process.terminate()
        process.wait()
        killed = True
    end = time.monotonic()
    return WorkerResult(end - start, killed)


def execute_processes(max_cpu):
    commands = [COMMAND] * 6

    with Pool(max_cpu) as pool:
        results = pool.map(partial(worker, None), commands[:2])
        timeout = 1.5 * (
            sum(result.runtime for result in results) / len(results)
        )
        results = pool.map(partial(worker, timeout), commands[2:])
        pool.close()
        pool.join()

    for result in results:
        print(result.runtime, result.was_killed)


def main():
    print("Beginn der Optimierung")
    execute_processes(6)
    print("Ende der Optimierung")


if __name__ == "__main__":
    main()
“Vir, intelligence has nothing to do with politics!” — Londo Mollari
Benutzeravatar
DeaD_EyE
User
Beiträge: 1240
Registriert: Sonntag 19. September 2010, 13:45
Wohnort: Hagen
Kontaktdaten:

Der Rückgabewerte sollte ein Tupel und keine Liste sein. Listen enthalten in der Regel Elemente bei denen jedes die gleiche Bedeutung hat, also die Bedeutung nicht von der Position in der Liste abhängig ist. Ein `collections.namedtuple`-Typ würde den Code verständlicher machen, weil man dann keine magischen Indexwerte braucht.
Der Verwendung der namedtuple fände ich auch besser. Außerdem ist der Zugriff darauf viel einfacher und aussagekräftiger und es dokumentiert den Code.
Listen oder Tupel mit gemischten Datentypen sind doof und führen zu Fehlern, weil man sich irgendwann nicht mehr dran erinnern kann in welchem Index was ist.
sourceserver.info - sourceserver.info/wiki/ - ausgestorbener Support für HL2-Server
Antworten