OS X 10.7.5 (Lion) und multiprocessing bzw. os.fork

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
suryahap

Ich verwende Python 3.3.0 unter OS X 10.7.5 (Lion) und habe Probleme mit multiprocessing bzw. os.fork
Funktioniert multiprocessing bzw. os.fork() auf OS X bzw. muss man etwas am OS zuvor einstellen, damit die Anweisungen aus den Libraries funktionieren.

Dieses einfache Bsp. sollte nur die Werte einlesen und in der List erg den eingelesenen Wert und die Process_id speichern.
Es werden aber die Tasks aus der Queue nicht abgearbeitet.

Code: Alles auswählen

import multiprocessing
#
class MultiProcess(multiprocessing.Process):
    def __init__(self, task_queue, process_id):
        multiprocessing.Process.__init__(self)
        self.__task_queue   = task_queue
        self.__process_id   = process_id
        print('Created Process: ' + str(self.__process_id))
#
    def run(self):
        print('Running Process: ' + str(self.__process_id))
        while True:
            next_task = self.__task_queue.get()
            if next_task is None:
                print('Cancel Process:  ' + str(self.__process_id))
                break
            print('Working Process: ' + str(self.__process_id))
            erg[next_task].append(task[next_task])
            erg[next_task].append(self.__process_id)
            self.__task_queue.task_done()
        return
#
if __name__ == '__main__':
    task = [12, 5, 76, 34, 5, 2, 8, 67, 89, 345]
    erg  = []
    for i in range(len(task)):
        erg.append([])
    #
#    manager = multiprocessing.Manager()
#    tasks = manager.Queue()
    tasks         = multiprocessing.Queue() #.Queue(1)
    num_processes = multiprocessing.cpu_count() # 1
    processes     = [MultiProcess(tasks, i) for i in range(num_processes)]
    #
    for p in processes:
        p.daemon = True
        p.start()
        print('Started Process: ' + str(p))
    #
    for task_id in range(len(task)):
        tasks.put(task_id)
        print('Queued Task:     ' + str(task_id))
    #
    for p in processes:
        p.join()
    #
    for i in erg:
        print(i)
#
Ergebnis auf der Konsole:
Created Process: 0
Created Process: 1
Created Process: 2
Created Process: 3
Started Process: <MultiProcess(MultiProcess-1, started daemon)>
Started Process: <MultiProcess(MultiProcess-2, started daemon)>
Started Process: <MultiProcess(MultiProcess-3, started daemon)>
Started Process: <MultiProcess(MultiProcess-4, started daemon)>
Queued Task: 0
Queued Task: 1
Queued Task: 2
Queued Task: 3
Queued Task: 4
Queued Task: 5
Queued Task: 6
Queued Task: 7
Queued Task: 8
Queued Task: 9 --> hier bleibt der Job hängen

Wenn: tasks = multiprocessing.Queue() abgeändert wird auf: tasks = multiprocessing.Queue(1),
bleibt der Job schon bei dem Statement: Queued Task: 0 hängen
Änderung mit manager.Queue() bewirkt auch nichts
Wenn diese zwei Zeilen ausdokumentiert werden:
for p in processes:
p.join()
Auch Statements um leere Tasks in die Queue zu stellen wie:
for p in processes:
tasks.put(None)
bringt keinen Erfolg
läuft der Prozess komplett durch, tut aber gar nichts, da das Hauptprogramm nicht auf die Nebenprozesse wartet.

Um mutliprocessing zu umgehen habe ich ein Minibsp. mit os.fork() versucht, welches aber auch nicht läuft

Code: Alles auswählen

import os
#
def child():
    print('A new child')
    print('Second task')
    os._exit(0)  
#
def parent():
    while True:
        newpid = os.fork()
        if newpid == 0:
            child()
        else:
            pids = (os.getpid(), newpid)
            print("parent: %d, child: %d" % pids)
        if input() == 'q':
            break
#
if __name__ == '__main__':
    parent()
Ausgabe auf Konsole:
parent: 851, child: 853

parent: 851, child: 854A new child
q

Somit wird in der Funktion child() nur das erste print(...) ausgeführt, die zwei weiteren nicht mehr und auch das os._exit(0) nicht
In der Aktivitätsanzeige muss ich die gestarteten Prozess manuell canceln.
Wenn ich die print-Anweisungen hinaus geben und somit das os._exit(0) als erste und einzige Anweisung stehen lasse, dann terminiert sich der Child-Prozess selbst, tut aber auch nichts, ist somit auch unnötig.

Auch alle anderen Codes bzgl. multiprocessing bzw. os.fork(), welche ich im Internet gefunden habe, laufen auf meinem Mac nicht

Alle _threads, threading, bzw. threading/queue-Programme, welche ich geschrieben habe, funktionieren einwandfrei
Danke vorab an alle Antworter
Sirius3
User
Beiträge: 17754
Registriert: Sonntag 21. Oktober 2012, 17:20

Hallo suryahap,

fork ist eine Grundfunktion des Betriebssystems und funktioniert somit immer.
os._exit solltest Du nicht benutzen, weil damit das Programm sofort beendet wird und eventuell Ausgaben nicht mehr vollständig geschrieben werden. Queue.get wartet bis in der Queue wieder ein Eintrag ist, so dass das erste Programm natürlich beim join hängen bleibt.

Auf meinem Mac tun beide Programme das, was sie tun sollen, inklusive aller Fehler.
suryahap

Hallo Sirius3

Was wäre die Alternative zu os._exit(0), irgendwie sollte ich den Prozess nach seiner Tätigkeit terminieren

Wenn ich das os._exit(0) rausnehme, wird trotzdem nur das erste print(...) durchgeführt,
danach ist der Prozess zwar in der Aktivitätsanzeige noch da, aber er tut nichts mehr, sprich arbeitet auch nicht das zweite und dritte print(...) ab

Queue.get() braucht das Programm um sich aus der Queue wieder ein quasi Ticket abzuholen.
Da bei mir schon das erste Ticket nicht verarbeitet wird --> kommt nie zum print('Running Process: ' + str(self.__process_id)),
also spielt das .get() noch gar nicht mit

Meine Programme mit threading sind analog der Queue, Yorker-Tasks aufgebaut, nur halt nicht auf die Lib multiprocessing und funktionieren fehlerfrei inklusive Skalierung von Anzahl Worker-Threads und queue.Queue(n) #n=0|1|30|etc.

Was tun sie genau, bitte um den Output und was sind die Fehler, ausser den bereits von dir genannten?

Danke vorab für deine schnelle Antwort
Sirius3
User
Beiträge: 17754
Registriert: Sonntag 21. Oktober 2012, 17:20

Hallo suryahap,
ich meinte nur die von mir genannten Fehler.
Statt os._exit benutze doch sys.exit.
suryahap

Hallo Sirius3

sys.exit eingebaut, aber wird als dritte Anweisung nicht mehr erreicht, da schon das zweite print nicht durchgeführt wird
Was sind die anderen Fehler, das .get() ist in einer queue notwendig? Das .join ist der Standard um das Hauptprogramm mit den Childs zu synchronisieren
lunar

@suryahap Dein Skript ist in vielerlei Hinsicht fehlerhaft.

Die doppelten führenden Unterstriche in den Namen der Attribute sind überflüssig. Private Variablen werden mit einem einzelnen führenden Unterstrich gekennzeichnet.

Der Versuch, dass Ergebnis in die globale Liste "erg" zu schreiben, ist schon wegen der Verwendung einer globalen Variable fragwürdig, kann aber vor allem überhaupt nicht funktionieren, da Prozesse unabhängige Adressräume haben. "erg" im Kindprozess ist nicht dasselbe wie "erg" im Vaterprozess, die Ergebnisse kommen mithin nie beim Vaterprozess an. Du musst eine weitere Queue nutzen, um die Ergebnisse an den Vaterprozess zu übergeben.

Die Art, Aufgaben an die Kinder weiterzugeben, ist nicht minder fragwürdig. Warum übergibst Du die Position des Tasks in der "task"-Liste? Du kannst ebenso gut den Task selbst direkt als Objekt übergeben (in diesem Fall einfach die entsprechende Zahl), und so eine weitere globale Variable, nämlich "task" vermeiden.

Das Programm selbst ist im Großen und Ganzen nur eine fürchterlich komplizierte Art und Weise, ".map()" aus "multiprocessing.Pool" zu implementieren.

Auf meinem OS X-System funktioniert eine aufgeräumte und insbesondere um das Warten auf die Beendigung des Kindprozesses erweiterte Version Deines "fork"-Beispiels tadellos:

Code: Alles auswählen

#!/usr/bin/env python3

import sys
import os


def child():
    print('A new child')
    sys.exit()


def parent():
    while True:
        child_pid = os.fork()
        if child_pid == 0:
            child()
        else:
            print("parent: {}, child: {}".format(os.getpid(), child_pid))
            os.waitpid(child_pid, 0)
        if input('Try again? [y/N] ').lower().strip() != 'y':
            break


if __name__ == '__main__':
    parent()
Dein Beispiel bezüglich Multiprocessing habe ich nicht ausprobiert, ich hatte keine Lust, es zu korrigieren. Die Variante mit ".map()" dagegen funktioniert bei mir ebenfalls tadellos:

Code: Alles auswählen

#!/usr/bin/env python3

import os

from pprint import pprint
from multiprocessing import Pool


def dummy(x):
    return (x*x, os.getpid())


def main():
    with Pool() as pool:
        tasks = [12, 5, 76, 34, 5, 2, 8, 67, 89, 345]
        results = pool.map(dummy, tasks)
        pprint(results)


if __name__ == '__main__':
    main()
suryahap

Hallo lunar

Danke für deine kritische Antwort

01) Die doppelten Unterstriche sind kein Fehler, vielleicht überflüssig
Meine erste Doku war "A Byte of Python" und dort steht doppelter Underscore für "__privateVariable" beschrieben, mag falsch sein
02) sys.exit() terminiert den child-Prozess NICHT
Original-Zitat Python-Doku: Since exit() ultimately “only” raises an exception,
it will only exit the process when called from the main thread, and the exception is not intercepted.

--> http://docs.python.org/2.7/library/sys. ... t#sys.exit
--> http://docs.python.org/3.3/library/sys. ... t#sys.exit
03) globale List, unterschiedliche Adressräume, hätte ich auch darauf kommen sollen, gebe ich zu.
04) Wenn ich os.waitpid(child_pid, 0) absetze, warte ich bis zum Sanktnimmerleinstag, ich weiss nicht warum, aber es ist bei mir so

Das fork-Bsp. habe ich bereits gelöst, Synchronisierung des Masters mit den child-Prozessen geht bei mir nicht und eigenartigerweise kann ich das Statement print(...) nicht verwenden, File-/Datenbank-Zugriffe, Berechnungen, etc. funktionieren im child-Prozess aber sehr gut, child-Prozesse überholen einander, etc. --> dokumentiert inLog-File inkl. timestamps

Dein gepostet fork-Bsp. läuft bei mir nicht. Es läuft, aber ich sehe keine Ausgabe und der Prozess bleibt hängen, wenn ich das os.waitpid(...) ausdokumentiere, läuft der Prozess durch und macht KEINE Ausgabe.

Generell habe ich meine mulitiprocessing Variante mit Queue aus vielen anderen Foren und Pythonsites abgekupfert und für mich vereinfacht.

Aus der Python-Doku bringe ich folgendes einfache multiporcessing-Konstrukt nicht zum Laufen:

Code: Alles auswählen

from multiprocessing import Process

def f(name):
    print('hello', name)

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()
Wenn ich das p.join() ausdokumentiere, läuft der Job durch ohne f(name) auszuführen, ansonst bleibt der Job hängen und tut nichts.

Wäre nett, wenn du noch einmal drüber schauen könntest, danke vorab
suryahap

Ich habe den Fehler gefunden

Habe alle meine Programme testweise als Files gespeichert und über idle3 gestartet und bis dato habe ich noch keinen Laufzeitfehler entdeckt, nicht bei GUI-, DB-, Threading- oder sonstigen Applikationen

multiprocessing und os.fork sind die ersten Anweisungen, welche ich kenne, welche sich unterschiedlich verhalten auf der bash und in idle3

Ich checke alles noch einmal, besten Dank für eure Ratschläge und Hilfe
BlackJack

@suryahap: Zu 02) `sys.exit()` terminiert den Kindprozess, denn normalerweise schlägt die Ausnahme ja bis ganz nach oben durch solange man daran nichts ändert. An der Stelle funkt Dir vielleich die IDE dazwischen. Bei der IDE ist auch die Frage wo die Ausgaben hingehen. Wenn Du sagst es läuft durch und macht keine Ausgabe, meinst Du dann keine Ausgabe in der IDE oder gänzlich keine Ausgabe? Die könnte ja auch auf der Standardausgabe des IDE-Prozesses landen und die sieht man in der Regel ja nicht. Es sei denn man startet die IDE in einem Terminal oder die Textausgaben von grafischen Programmen werden irgend wo hingeloggt.
lunar

@suryahap Wenn „A Byte of Python“ tatsächlich den doppelten Unterstrich für private Variablen empfiehlt, dann ist es in dieser Hinsicht tatsächlich fehlerhaft.

"os.fork()" erzeugt einen keinen Thread, sondern einen komplett neuen Prozess. Aus "fork(3)":
The fork() function shall create a new process. The new process (child process) shall be an exact copy of the calling process (parent process) except as detailed below: […]
Dein zweiter Einwand geht mithin vollkommen am Problem vorbei. Das Zitat aus der Dokumentation ist zwar zweifelsohne richtig, doch vollkommen irrelevant, da es sich hier nicht um einen Prozess mit mehreren Theads handelt, sondern um mehrere Prozesse mit jeweils einem Thread. Folglich beendet der Aufruf "sys.exit()" im Kindprozess denselben anstandslos. Recherchiere bitte den Unterschied zwischen Prozessen und Threads, falls Dir diese Ausführungen nicht einleuchten.

Die zweite Hervorhebung in obigem Zitat deutet an, warum Dein Beispiel in Idle nicht funktioniert. Offensichtlich führt Idle Python-Programme nicht in einem neuen Prozess, sondern innerhalb des laufenden Idle-Prozesses aus. Mithin kopiert "os.fork(") den kompletten Idle-Prozess, inklusive der Hauptschleife für die graphische Oberfläche, die in der Folge den Kindprozess darin hindert, Ausgabe zu tätigen und sich ordentlich zu beenden.

Ein Grund mehr, Idle den Rücken zu kehren, und eine vernünftige Entwicklungsumgebung zu verwenden.
suryahap

Hallo Lunar

01) Deine Aussage über private Instanzvariablen ist falsch, "Byte of Python" hat recht
Lass den Code laufen und du wirst es sehen!

Code: Alles auswählen

#!/usr/bin/env python3
#
class MemberTest():
    def __init__(self):
        self.__privat   = 1
        self._protected = 2
        self.public     = 3
#
    def getMembers(self):
        print('self.__privat:   ', self.__privat)
        print('self._protected: ', self._protected)
        print('self.public:     ', self.public)
#
pt = MemberTest()
pt.getMembers()
print(pt.public)
print(pt._protected) # der Zugriff ist erlaubt, da "_" _protected nur eine quasi Vereinbarung darstellt
print(pt.__privat) # und das wirfts dich jetzt mit einem "AttributeError" auf, also lassen wir "__" __private als solches gelten
02) Ich kenne den Unterschied zwischen Prozessen und Threads. Danke!
03) Ich mag Idle, jede IDE hat so sein Stärken und Tücken, ich verwende Eclipse für Java, mag es aber nicht für Python und greife generell eher zu Bordmitteln als immer Fremdsoftware zu installieren, verwende z.B. nano für shell-scripts, nach dem Motto "Warum mit Kanonen auf Spatzen schiessen"
04) Danke für das .map Bsp, aber es ist nicht das was ich umsetzen möchte, denn es sollen nur maximal so viele Prozesse parallel gestartet werden wie CPU-Kerne, vielleicht nur (CPU-Kernel -1) um nicht Idle = 0 zu haben, vorhanden sind, aber WorkingTasks kann es Tausende geben, deswegen ist ein Queueing notwendig, auch wenn dies in meinem Bsp. nicht besonders modern, sondern eher aus der Threading-Ecke kommt und mit multiprocessing.Pool schöner wäre

Trotzdem danke, ich habe Anregungen bekommen
lunar

@suryahap Von wegen „privat“:

Code: Alles auswählen

class MemberTest():
    def __init__(self):
        self.__privat   = 1
        self._protected = 2
        self.public     = 3

    def getMembers(self):
        print('self.__privat:   ', self.__privat)
        print('self._protected: ', self._protected)
        print('self.public:     ', self.public)

pt = MemberTest()
pt.getMembers()
print(pt.public)
print(pt._protected)
print(pt._MemberTest__privat)
Beachte die letzte Zeile. Ich verweise zur Klärung auf das Tutorial, Abschnitt „Private Variables“:
“Private” instance variables that cannot be accessed except from inside an object don’t exist in Python. However, there is a convention that is followed by most Python code: a name prefixed with an underscore (e.g. _spam) should be treated as a non-public part of the API (whether it is a function, a method or a data member). It should be considered an implementation detail and subject to change without notice.

Since there is a valid use-case for class-private members (namely to avoid name clashes of names with names defined by subclasses), there is limited support for such a mechanism, called name mangling. Any identifier of the form __spam (at least two leading underscores, at most one trailing underscore) is textually replaced with _classname__spam, where classname is the current class name with leading underscore(s) stripped. This mangling is done without regard to the syntactic position of the identifier, as long as it occurs within the definition of a class.
Es gibt keine „privaten Variablen“ in Python, nur die Konvention, Namen mit führendem Unterstrich nicht als Teil der öffentlichen API zu betrachten. Zwei führende Unterstriche dienen einzig und allein dazu, in einer Klasse, die zur Vererbung bestimmt ist, Konflikte mit den Namen der abgeleiteten Klassen zu vermeiden. Auf die Klasse in Deinem Beispiel trifft das offensichtlich nicht zu, denn sie steht am Ende der Vererbungshierarchie.

Bezüglich "multiprocessing.Pool" verweise ich Dich ebenfalls auf die entsprechende Dokumentation:
A process pool object which controls a pool of worker processes to which jobs can be submitted. It supports asynchronous results with timeouts and callbacks and has a parallel map implementation.

processes is the number of worker processesb to use.[…]
Die "Pool"-Klasse startet nur eine bestimmte, vorher festgelegte Anzahl an Prozessen. Warum glaubst Du würde diese Klasse sonst "Pool" heißen? ".map()" verteilt die Ausführung der Funktion fair auf diese festgelegte Anzahl an Prozessen, und ist mithin exakt äquivalent zu Deiner komplizierten Implementierung. Mithilfe ".apply_async()" kannst Du einzelne Jobs auch explizit an den Prozess-Pool übergeben.

Es hat mich gefreut, Dir die Dokumentation vorlesen zu dürfen, doch meine Freude ist nicht so groß, als dass ich mir das Vorlesen zur Gewohnheit machen wollte. Ich bitte Dich also, in Zukunft selbst erst einmal in der Dokumentation nachzulesen, bevor Du mir widersprichst. Das erspart uns beiden unnötige Beiträge und Diskussionen.
Antworten