Seite 1 von 1
multiprocessing Problem
Verfasst: Freitag 18. März 2011, 14:32
von mit
Hi,
Ich moechte aus 4 verschiedenen Dateien Werte auslesen und diese in ein Dict abspeichern wobei Deteiname soll der key sein. Jede Datei soll auf verschieden CPUs laufen.
Code: Alles auswählen
import sys
import pprint
from multiprocessing import Process
def f(file_name):
freq = 0
for line in open(file_name):
if not line.strip():
break
elif not line.startswith('='):
tmp = int(line.rstrip().split('\t')[0])
freq += tmp
return freq
if __name__ == '__main__':
freq = {}
print sys.argv[1]
for file_name in sys.argv[1].replace(' ', '').split(','):
if not file_name in freq:
freq[file_name] = 0
else:
print file_name + "is not unique!"
sys.exit(1)
p = Process(target=f, args=(file_name,))
p.start()
p.join()
#freq[file_name] = f(file_name)
pprint.pprint(freq)
Leider weiss ich nicht wie ich die 4 Dateien auf 4 vier verschieden CPUs ausfuehren kann.
Vielen Dank im Voraus.
Re: multiprocessing Problem
Verfasst: Freitag 18. März 2011, 14:39
von BlackJack
@mit: Dafür würde ich `multiprocessing.Pool` verwenden.
Re: multiprocessing Problem
Verfasst: Freitag 18. März 2011, 15:38
von mit
Danke fuer die Antwort. Habe den Code umgeschrieben und bekomme folgende Fehlermeldung:
Code: Alles auswählen
tests/Rep1.log
Traceback (most recent call last):
File "find_biggest_no.py", line 30, in <module>
freq[file_name] = p.map(f, file_name)
File "/home/mitlox/apps/python/lib/python2.7/multiprocessing/pool.py", line 199, in map
return self.map_async(func, iterable, chunksize).get()
File "/home/mitlox/apps/python/lib/python2.7/multiprocessing/pool.py", line 491, in get
raise self._value
IOError: [Errno 2] No such file or directory: 't'
Ich habe keine Datei mit dem Namen t. Hier ist der aktuelle Code:
Code: Alles auswählen
import sys
import pprint
from multiprocessing import Pool
def f(file_name):
freq = 0
for line in open(file_name):
if not line.strip():
break
elif not line.startswith('='):
tmp = int(line.rstrip().split('\t')[0])
freq += tmp
return freq
if __name__ == '__main__':
freq = {}
p = Pool(4)
for file_name in sys.argv[1].replace(' ', '').split(','):
if not file_name in freq:
freq[file_name] = 0
else:
print file_name + "is not unique!"
sys.exit(1)
try:
print file_name
freq[file_name] = p.map(f, file_name)
except KeyboardInterrupt:
print 'parent received control-c'
p.terminate()
p.close()
p.join()
pprint.pprint(freq)
Wie koennte man es zum laufen bringen?
Re: multiprocessing Problem
Verfasst: Freitag 18. März 2011, 15:58
von BlackJack
@mit: `map()` wendet eine Funktion auf alle Elemente des zweiten Arguments an. Wenn Du da nur *einen* Dateinamen angibst, dann wird die Funktion auf jeden Buchstaben angewendet. Nicht so ganz was Du haben willst.
Da die Reihenfolge egal ist, kann man auch `pool.imap_unordered()` verwenden, wenn die Funktion auch den Dateinamen mit zurück gibt. Ungetestet:
Code: Alles auswählen
import sys
from itertools import takewhile
from multiprocessing import Pool
from pprint import pprint
def process_file(filename):
with open(filename) as lines:
return (
filename,
sum(
int(s.split('\t', 1)[0])
for s in takewhile(bool, (a.strip() for a in lines))
if not s.startswith('=')
)
)
def main():
pool = Pool()
file_names = sys.argv[1].replace(' ', '').split(',')
if len(file_names) != len(set(file_names)):
print 'file names are not uniq'
sys.exit(1)
result = dict(pool.imap_unordered(process_file, file_names))
pprint(result)
if __name__ == '__main__':
main()
Müssen es zwingend 4 Prozesse sein? Ich würde das ja dem `Pool` überlassen zu schauen wie viele Prozessoren zur Verfügung stehen.
Re: multiprocessing Problem
Verfasst: Sonntag 20. März 2011, 11:40
von mit
Danke fur deine Losung. Ich habe die process_file Funktion wie folgt geändert:
Code: Alles auswählen
def process_file(filename):
print("Processing " + filename + " on " + str(os.getppid()))
with open(filename) as lines:
return (
filename,
sum(
int(s.split('\t', 1)[0])
for s in takewhile(bool, (a.strip() for a in lines))
if not s.startswith('=')
)
)
und bekomme diese Ausgabe:
Code: Alles auswählen
Processing tests/Rep1.log on 4122
Processing tests/Rep2.log on 4122
Processing tests/Rep3.log on 4122
Processing tests/Rep4.log on 4122
Ich habe es mit Pool() und Pool(4) versucht, aber es hat es nichts geändert. Warum ist getppid() fuer alle Dateien der selbe oder läuft der Code nicht parallel?
Re: multiprocessing Problem
Verfasst: Sonntag 20. März 2011, 11:50
von BlackJack
@mit: Da ist ein 'p' zu viel in `os.getppid()`. Das gibt die *parent* process id zurück, und der Elternprozess ist ja für alle der gleiche.

Re: multiprocessing Problem
Verfasst: Dienstag 12. April 2011, 05:16
von mit
Danke es funktioniert.
Habe eine neue funktion mit mehreren parameters und deshalb habe ich apply_async versucht.
Code: Alles auswählen
import os
from multiprocessing import Pool
from pprint import pprint
def calc_p(fname, reference_name, start_pos, end_pos):
return (reference_name, [os.getpid(), 'x1', 'x2'])
if __name__ == '__main__':
pool = Pool()
fname = "ex1.txt"
references = ['Test1', 'Test2', 'Test3', 'Test4']
results = dict([pool.apply_async(calc_p, [fname, reference, 100, 120]) for reference in references])
pool.close()
pool.join()
for r in results:
pprint(r.get())
Und bekomme folgende Fehlemeldung:
Code: Alles auswählen
$ python read_files_different_parameters.py
Traceback (most recent call last):
File "read_files_different_parameters.py", line 16, in <module>
results = dict([pool.apply_async(calc_p, [fname, reference, 100, 120]) for reference in references])
TypeError: cannot convert dictionary update sequence element #0 to a sequence
Wie ist es moeglich ein dict zu erzeugen?
Vielen Dank im Voraus.
Re: multiprocessing Problem
Verfasst: Dienstag 12. April 2011, 08:40
von BlackJack
@mit: Ich würde mal sagen gar nicht, beziehungsweise dass Du Dir den Aufwand sicher nicht machen möchtest. Einfacher wäre es mit `functools.partial()` die Argumente die sich nicht ändern zu setzen, und damit wieder eine Funktion zu bekommen, die nur ein Argument entgegen nimmt.
Re: multiprocessing Problem
Verfasst: Dienstag 12. April 2011, 12:18
von mit
Danke, Ich musste reference_name als letztes Parameter angeben und ich glaube es funktioniert jetzt.
Code: Alles auswählen
import os
from multiprocessing import Pool
from pprint import pprint
import functools
def calc_p(fname, start_pos, end_pos, reference_name):
print os.getpid()
print "fname", fname
print "reference_name", reference_name
print "start_pos", start_pos
print "end_pos", end_pos
print
return (reference_name, [os.getpid(), 'x1', 'x2'])
if __name__ == '__main__':
pool = Pool()
fname = "ex1.txt"
references = ['Test1', 'Test2', 'Test3', 'Test4']
run_test = functools.partial(calc_p, fname, 100, 120)
result = dict(pool.imap_unordered(run_test, references))
pprint(result)
Code: Alles auswählen
$ python read_files_different_parameters.py
7145
fname ex1.txt
reference_name Test1
start_pos 100
end_pos 120
7146
fname ex1.txt
reference_name Test2
start_pos 100
end_pos 120
7145
fname ex1.txt
reference_name Test4
start_pos 100
end_pos 120
7147
fname ex1.txt
reference_name Test3
start_pos 100
end_pos 120
{'Test1': [7145, 'x1', 'x2'],
'Test2': [7146, 'x1', 'x2'],
'Test3': [7147, 'x1', 'x2'],
'Test4': [7145, 'x1', 'x2']}
Re: multiprocessing Problem
Verfasst: Samstag 14. Mai 2011, 12:55
von mit
Hallo,
wie kann man functools.partial mit AutoS(r_name, r_len).run() verbinden?
Code: Alles auswählen
class AutoS():
def __init__(self, ref_name, ref_lengths):
self.__ref_name = ref_name
self.__ref_lengths = ref_lengths
def run(self):
print self.__ref_name, self.__ref_lengths
return (self.__ref_name, [1,2,3,4])
if __name__ == '__main__':
references = ['Test1', 'Test2', 'Test3', 'Test4']
references_length = [1231, 1234, 2542, 3636]
results = {}
for (r_name, r_len) in zip(references, references_length):
run_test = functools.partial(calc_p, fname, 100, 120)
result = dict(pool.imap_unordered(run_test, references))
results = dict(pool.imap_unordered(AutoS(r_name, r_len).run())
Vielen Dank im Vorraus.
Re: multiprocessing Problem
Verfasst: Samstag 14. Mai 2011, 13:04
von BlackJack
@mit: Ich sehe jetzt nicht so ganz wozu man da `partial()` überhaupt benötigt, denn die `run()`-Methode hat doch gar keine Argumente!?
`imap_unordered()` möchte als erstes Argument etwas das man mit einem Argument *aufrufen* kann und hätte dann auch gerne ein weiteres Argument wo die Argumente für diese Aufrufe herkommen. Die `run()`-Methode gibt ein Tupel zurück — und die sind nicht aufrufbar. Die `run()`-Methode selbst könnte man auch nicht übergeben, denn die nimmt ja kein freies Argument entgehen. Und worauf sollte da etwas angewendet werden? Was willst Du denn erreichen!?
Re: multiprocessing Problem
Verfasst: Montag 23. Mai 2011, 13:32
von mit
Danke fuer die Antwort. Ich habe das problem ohne Klassen geloest, weil es einfacher war mit multiprocessing.
Ich moechte drei veschieden Funktionen aufrufen und habe dieses Problem wie folgt geloest:
Code: Alles auswählen
from multiprocessing import Pool
def test1(file_name, test_data):
with open(file_name, 'w') as f:
f.write(test_data)
def test2(file_name, test_data):
with open(file_name, 'w') as f:
f.write(test_data)
def test3(file_name, test_data):
with open(file_name, 'w') as f:
f.write(test_data)
if __name__ == '__main__':
pool = Pool()
pool.apply_async(test1,['test1.txt', 'test1a_data'])
pool.apply_async(test2,['test2.txt', 'test2a_data'])
pool.apply_async(test3,['test3.txt', 'test3a_data'])
pool.close()
pool.join()
Ist dies eine gute Loesung?
Re: multiprocessing Problem
Verfasst: Montag 23. Mai 2011, 13:37
von Leonidas
Sofern sie nicht nur im Namen verschieden sind

Re: multiprocessing Problem
Verfasst: Samstag 18. Juni 2011, 06:23
von mit
Hallo,
Code: Alles auswählen
from multiprocessing import Pool
class Test():
a = 0
b = ''
def test(m):
m.b = 'tttttt'
t = Test()
t.a = 2
m = {}
m[1] = t
#test(m[1])
pool = Pool()
pool.apply_async(test, [m[1]])
pool.close()
pool.join()
print m[1].a
print m[1].b
Mit test(m[1]) bekomme ich als Ausgabe:
aber mit pool.apply_async(test, [m[1]]) ich bekomme nur als Ausgabe:
Wie koennte man den String mit apply_async zurueck geliefert bekommen?
Vielen Dank im Vorraus
Re: multiprocessing Problem
Verfasst: Samstag 18. Juni 2011, 08:54
von BlackJack
@mit: Du müsstest das Ergebnis in `test()` halt auch zurück geben, damit die Daten von dem anderen Prozess wieder in den vom Aufrufer übertragen werden.
Re: multiprocessing Problem
Verfasst: Samstag 18. Juni 2011, 13:12
von mit
Ich dachte dict "m[1]" wird als Referenz uebergeben und in test() modifiziert. Wie uebergebt man am Besten das Ergebnis?
Re: multiprocessing Problem
Verfasst: Samstag 18. Juni 2011, 15:01
von BlackJack
@mit: Über Prozessgrenzen kann man keine Referenzen übergeben — Prozesse haben verschiedene Adressräume.
Die Rückgabe erfolgt in der Funktion selbst ganz normal mit ``return``. Der Aufrufer bekommt ein spezielles Objekt, weil der Aufruf ja bei der asynchron aufgerufenen Funktion *sofort* zurückkehrt, zu dem Zeitpunkt aber das Ergebnis ja noch nicht fertig berechnet ist. Näheres siehe Doku.