multiprocessing Problem

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
mit
User
Beiträge: 285
Registriert: Dienstag 16. September 2008, 10:00

Hi,
Ich moechte aus 4 verschiedenen Dateien Werte auslesen und diese in ein Dict abspeichern wobei Deteiname soll der key sein. Jede Datei soll auf verschieden CPUs laufen.

Code: Alles auswählen

import sys
import pprint
from multiprocessing import Process


def f(file_name):
    freq = 0

    for line in open(file_name):
        if not line.strip():
            break
        elif not line.startswith('='):
            tmp = int(line.rstrip().split('\t')[0])
            freq += tmp
    return freq

if __name__ == '__main__':
    freq = {}

    print sys.argv[1]
    for file_name in sys.argv[1].replace(' ', '').split(','):
        if not file_name in freq:
            freq[file_name] = 0
        else:
            print file_name + "is not unique!"
            sys.exit(1)

        
        p = Process(target=f, args=(file_name,))
        p.start()
        p.join()
	#freq[file_name] = f(file_name)
    pprint.pprint(freq)
Leider weiss ich nicht wie ich die 4 Dateien auf 4 vier verschieden CPUs ausfuehren kann.

Vielen Dank im Voraus.
BlackJack

@mit: Dafür würde ich `multiprocessing.Pool` verwenden.
mit
User
Beiträge: 285
Registriert: Dienstag 16. September 2008, 10:00

Danke fuer die Antwort. Habe den Code umgeschrieben und bekomme folgende Fehlermeldung:

Code: Alles auswählen

tests/Rep1.log
Traceback (most recent call last):
  File "find_biggest_no.py", line 30, in <module>
    freq[file_name] = p.map(f, file_name)
  File "/home/mitlox/apps/python/lib/python2.7/multiprocessing/pool.py", line 199, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/home/mitlox/apps/python/lib/python2.7/multiprocessing/pool.py", line 491, in get
    raise self._value
IOError: [Errno 2] No such file or directory: 't'
Ich habe keine Datei mit dem Namen t. Hier ist der aktuelle Code:

Code: Alles auswählen

import sys
import pprint
from multiprocessing import Pool


def f(file_name):
    freq = 0

    for line in open(file_name):
        if not line.strip():
            break
        elif not line.startswith('='):
            tmp = int(line.rstrip().split('\t')[0])
            freq += tmp
    return freq

if __name__ == '__main__':
    freq = {}

    p = Pool(4)
    for file_name in sys.argv[1].replace(' ', '').split(','):
        if not file_name in freq:
            freq[file_name] = 0
        else:
            print file_name + "is not unique!"
            sys.exit(1)

        try:
            print file_name
            freq[file_name] = p.map(f, file_name)
        except KeyboardInterrupt:
            print 'parent received control-c'
            p.terminate()
    p.close()
    p.join()
    pprint.pprint(freq)
Wie koennte man es zum laufen bringen?
BlackJack

@mit: `map()` wendet eine Funktion auf alle Elemente des zweiten Arguments an. Wenn Du da nur *einen* Dateinamen angibst, dann wird die Funktion auf jeden Buchstaben angewendet. Nicht so ganz was Du haben willst. ;-)

Da die Reihenfolge egal ist, kann man auch `pool.imap_unordered()` verwenden, wenn die Funktion auch den Dateinamen mit zurück gibt. Ungetestet:

Code: Alles auswählen

import sys
from itertools import takewhile
from multiprocessing import Pool
from pprint import pprint


def process_file(filename):
    with open(filename) as lines:
        return (
            filename,
            sum(
                int(s.split('\t', 1)[0])
                for s in takewhile(bool, (a.strip() for a in lines))
                if not s.startswith('=')
            )
        )


def main():
    pool = Pool()
    file_names = sys.argv[1].replace(' ', '').split(',')
    if len(file_names) != len(set(file_names)):
        print 'file names are not uniq'
        sys.exit(1)
    result = dict(pool.imap_unordered(process_file, file_names))
    pprint(result)


if __name__ == '__main__':
    main()
Müssen es zwingend 4 Prozesse sein? Ich würde das ja dem `Pool` überlassen zu schauen wie viele Prozessoren zur Verfügung stehen.
mit
User
Beiträge: 285
Registriert: Dienstag 16. September 2008, 10:00

Danke fur deine Losung. Ich habe die process_file Funktion wie folgt geändert:

Code: Alles auswählen

def process_file(filename):
    print("Processing " + filename + " on " + str(os.getppid()))
    with open(filename) as lines:
        return (
            filename,
            sum(
                int(s.split('\t', 1)[0])
                for s in takewhile(bool, (a.strip() for a in lines))
                if not s.startswith('=')
                )
        )


und bekomme diese Ausgabe:

Code: Alles auswählen

Processing tests/Rep1.log on 4122
Processing tests/Rep2.log on 4122
Processing tests/Rep3.log on 4122
Processing tests/Rep4.log on 4122
Ich habe es mit Pool() und Pool(4) versucht, aber es hat es nichts geändert. Warum ist getppid() fuer alle Dateien der selbe oder läuft der Code nicht parallel?
BlackJack

@mit: Da ist ein 'p' zu viel in `os.getppid()`. Das gibt die *parent* process id zurück, und der Elternprozess ist ja für alle der gleiche. :-)
mit
User
Beiträge: 285
Registriert: Dienstag 16. September 2008, 10:00

Danke es funktioniert.

Habe eine neue funktion mit mehreren parameters und deshalb habe ich apply_async versucht.

Code: Alles auswählen

import os
from multiprocessing import Pool
from pprint import pprint


def calc_p(fname, reference_name, start_pos, end_pos):
    return (reference_name, [os.getpid(), 'x1', 'x2'])


if __name__ == '__main__':
    pool = Pool()

    fname = "ex1.txt"
    references = ['Test1', 'Test2', 'Test3', 'Test4']

    results = dict([pool.apply_async(calc_p, [fname, reference, 100, 120]) for reference in references])

    pool.close()
    pool.join()
    for r in results:
        pprint(r.get())

Und bekomme folgende Fehlemeldung:

Code: Alles auswählen

$ python read_files_different_parameters.py 
Traceback (most recent call last):
  File "read_files_different_parameters.py", line 16, in <module>
    results = dict([pool.apply_async(calc_p, [fname, reference, 100, 120]) for reference in references])
TypeError: cannot convert dictionary update sequence element #0 to a sequence
Wie ist es moeglich ein dict zu erzeugen?

Vielen Dank im Voraus.
BlackJack

@mit: Ich würde mal sagen gar nicht, beziehungsweise dass Du Dir den Aufwand sicher nicht machen möchtest. Einfacher wäre es mit `functools.partial()` die Argumente die sich nicht ändern zu setzen, und damit wieder eine Funktion zu bekommen, die nur ein Argument entgegen nimmt.
mit
User
Beiträge: 285
Registriert: Dienstag 16. September 2008, 10:00

Danke, Ich musste reference_name als letztes Parameter angeben und ich glaube es funktioniert jetzt.

Code: Alles auswählen

import os
from multiprocessing import Pool
from pprint import pprint
import functools


def calc_p(fname, start_pos, end_pos, reference_name):
    print os.getpid()
    print "fname", fname
    print "reference_name", reference_name
    print "start_pos", start_pos
    print "end_pos", end_pos
    print
    return (reference_name, [os.getpid(), 'x1', 'x2'])


if __name__ == '__main__':
    pool = Pool()

    fname = "ex1.txt"
    references = ['Test1', 'Test2', 'Test3', 'Test4']

    run_test = functools.partial(calc_p, fname, 100, 120)
    result = dict(pool.imap_unordered(run_test, references))

    pprint(result)

Code: Alles auswählen

$ python read_files_different_parameters.py 
7145
fname ex1.txt
reference_name Test1
start_pos 100
end_pos 120

7146
fname ex1.txt
reference_name Test2
start_pos 100
end_pos 120

7145
fname ex1.txt
reference_name Test4
start_pos 100
end_pos 120

7147
fname ex1.txt
reference_name Test3
start_pos 100
end_pos 120

{'Test1': [7145, 'x1', 'x2'],
 'Test2': [7146, 'x1', 'x2'],
 'Test3': [7147, 'x1', 'x2'],
 'Test4': [7145, 'x1', 'x2']}
mit
User
Beiträge: 285
Registriert: Dienstag 16. September 2008, 10:00

Hallo,
wie kann man functools.partial mit AutoS(r_name, r_len).run() verbinden?

Code: Alles auswählen

class AutoS():
    def __init__(self, ref_name, ref_lengths):
        self.__ref_name = ref_name
        self.__ref_lengths = ref_lengths

    def run(self):
        print self.__ref_name, self.__ref_lengths
        return (self.__ref_name, [1,2,3,4])

if __name__ == '__main__':

    references = ['Test1', 'Test2', 'Test3', 'Test4']
    references_length = [1231, 1234, 2542, 3636]

    results = {}
    for (r_name, r_len) in zip(references, references_length):
        run_test = functools.partial(calc_p, fname, 100, 120)
        result = dict(pool.imap_unordered(run_test, references))
        results = dict(pool.imap_unordered(AutoS(r_name, r_len).run())
Vielen Dank im Vorraus.
BlackJack

@mit: Ich sehe jetzt nicht so ganz wozu man da `partial()` überhaupt benötigt, denn die `run()`-Methode hat doch gar keine Argumente!?

`imap_unordered()` möchte als erstes Argument etwas das man mit einem Argument *aufrufen* kann und hätte dann auch gerne ein weiteres Argument wo die Argumente für diese Aufrufe herkommen. Die `run()`-Methode gibt ein Tupel zurück — und die sind nicht aufrufbar. Die `run()`-Methode selbst könnte man auch nicht übergeben, denn die nimmt ja kein freies Argument entgehen. Und worauf sollte da etwas angewendet werden? Was willst Du denn erreichen!?
mit
User
Beiträge: 285
Registriert: Dienstag 16. September 2008, 10:00

Danke fuer die Antwort. Ich habe das problem ohne Klassen geloest, weil es einfacher war mit multiprocessing.

Ich moechte drei veschieden Funktionen aufrufen und habe dieses Problem wie folgt geloest:

Code: Alles auswählen

from multiprocessing import Pool

def test1(file_name, test_data):
	with open(file_name, 'w') as f:
		f.write(test_data)

def test2(file_name, test_data):
        with open(file_name, 'w') as f:
                f.write(test_data)

def test3(file_name, test_data):
        with open(file_name, 'w') as f:
                f.write(test_data)

if __name__ == '__main__':
	pool = Pool()
	pool.apply_async(test1,['test1.txt', 'test1a_data'])
	pool.apply_async(test2,['test2.txt', 'test2a_data'])
	pool.apply_async(test3,['test3.txt', 'test3a_data'])
	    
	pool.close()
	pool.join()
Ist dies eine gute Loesung?
Leonidas
Python-Forum Veteran
Beiträge: 16025
Registriert: Freitag 20. Juni 2003, 16:30
Kontaktdaten:

Sofern sie nicht nur im Namen verschieden sind ;)
My god, it's full of CARs! | Leonidasvoice vs (former) Modvoice
mit
User
Beiträge: 285
Registriert: Dienstag 16. September 2008, 10:00

Hallo,

Code: Alles auswählen

from multiprocessing import Pool

class Test():
    a = 0
    b = ''
    
def test(m):
    m.b = 'tttttt'

t = Test()
t.a = 2
m = {}
m[1] = t


#test(m[1])

pool = Pool()
pool.apply_async(test, [m[1]])
pool.close()
pool.join()

print m[1].a
print m[1].b
Mit test(m[1]) bekomme ich als Ausgabe:

Code: Alles auswählen

2
tttttt
aber mit pool.apply_async(test, [m[1]]) ich bekomme nur als Ausgabe: Wie koennte man den String mit apply_async zurueck geliefert bekommen?

Vielen Dank im Vorraus
BlackJack

@mit: Du müsstest das Ergebnis in `test()` halt auch zurück geben, damit die Daten von dem anderen Prozess wieder in den vom Aufrufer übertragen werden.
mit
User
Beiträge: 285
Registriert: Dienstag 16. September 2008, 10:00

Ich dachte dict "m[1]" wird als Referenz uebergeben und in test() modifiziert. Wie uebergebt man am Besten das Ergebnis?
BlackJack

@mit: Über Prozessgrenzen kann man keine Referenzen übergeben — Prozesse haben verschiedene Adressräume.

Die Rückgabe erfolgt in der Funktion selbst ganz normal mit ``return``. Der Aufrufer bekommt ein spezielles Objekt, weil der Aufruf ja bei der asynchron aufgerufenen Funktion *sofort* zurückkehrt, zu dem Zeitpunkt aber das Ergebnis ja noch nicht fertig berechnet ist. Näheres siehe Doku.
Antworten