multiprocessing - communication between processes
Verfasst: Montag 20. April 2015, 23:03
Hallo,
ich habe ein Problem, welches ich selber nicht gelöst bekomme und bitte um Eure Hilfe. Vielen Dank.
Ich benutze Windows 7 und Python 2.7.9 [MSC v.1500 32 bit (Intel)] on win32.
Das nachstehende Programm funktioniert ohne multiprocessing.
Mit multiprocessing funktioniert die Variante qDone.put((task[0], task[1])) in der Funktion worker() Zeile 23.
Die Variante qDone.put((task[0], urllib.urlopen(task[1]).read())), Zeile 22 funktioniert mit mein Programm nicht mehr.
urllib.urlopen(task[1]).read() selber funktioniert, aber mein Programm beendet sich dann nicht mehr korrekt, da sich dieses,
soweit ich das feststellen konnte, in der Funktion main() Zeile 59-60
for p in worker_processes:
.... p.join()
beim zweiten Durchlauf der Schleife ins Nirwana verabschiedet.
Ersetze ich die for-Schleife komplett durch ein simples time.sleep(10) funktioniert alles wieder, welches jedoch keine
wirklich brauchbare Lösung ist.
Ich habe die while-schleife in der Funktion worker() auch schon durch eine ... for url in iter(qTask.get(), 'STOP') ...
Schleife ersetzt. In der Funktion main() habe ich alle Queues, qTask.put('STOP'), ..... entsprechend abgeändert.
Desweiteren habe ich anstelle einer URL auch schon den entsprechenden HTML-Code direkt an die Funktion worker() übergeben.
..... sites = [('Name1', 'https://docs.python.org/2.7/py-modindex.html'), ..........
ersetzt durch
...... sites = [('Name1', '''<!DOCTYPE html PUBLIC ...........'''),
funktioniert alles. Nur eben nicht mit qDone.put((task[0], urllib.urlopen(task[1]).read())).
Eine Überlegung hätte ich noch.
Vielleicht liegt es an den Daten die mit urlib.urlopen().read() an die Queue übergeben werden.
HTML-Code beinhaltet Sonderzeichen und "".
Auf der anderen Seite funktioniert
.. myListe.append(( task[0], urllib.urlopen(task[1]).read()))
problemlos.
Ich bin mit meinen Möglichkeiten am Ende und habe auch keine weiteren Lösungsansätze mehr, was mache ich falsch ?
ich habe ein Problem, welches ich selber nicht gelöst bekomme und bitte um Eure Hilfe. Vielen Dank.
Ich benutze Windows 7 und Python 2.7.9 [MSC v.1500 32 bit (Intel)] on win32.
Das nachstehende Programm funktioniert ohne multiprocessing.
Mit multiprocessing funktioniert die Variante qDone.put((task[0], task[1])) in der Funktion worker() Zeile 23.
Die Variante qDone.put((task[0], urllib.urlopen(task[1]).read())), Zeile 22 funktioniert mit mein Programm nicht mehr.
urllib.urlopen(task[1]).read() selber funktioniert, aber mein Programm beendet sich dann nicht mehr korrekt, da sich dieses,
soweit ich das feststellen konnte, in der Funktion main() Zeile 59-60
for p in worker_processes:
.... p.join()
beim zweiten Durchlauf der Schleife ins Nirwana verabschiedet.
Ersetze ich die for-Schleife komplett durch ein simples time.sleep(10) funktioniert alles wieder, welches jedoch keine
wirklich brauchbare Lösung ist.
Ich habe die while-schleife in der Funktion worker() auch schon durch eine ... for url in iter(qTask.get(), 'STOP') ...
Schleife ersetzt. In der Funktion main() habe ich alle Queues, qTask.put('STOP'), ..... entsprechend abgeändert.
Desweiteren habe ich anstelle einer URL auch schon den entsprechenden HTML-Code direkt an die Funktion worker() übergeben.
..... sites = [('Name1', 'https://docs.python.org/2.7/py-modindex.html'), ..........
ersetzt durch
...... sites = [('Name1', '''<!DOCTYPE html PUBLIC ...........'''),
funktioniert alles. Nur eben nicht mit qDone.put((task[0], urllib.urlopen(task[1]).read())).
Eine Überlegung hätte ich noch.
Vielleicht liegt es an den Daten die mit urlib.urlopen().read() an die Queue übergeben werden.
HTML-Code beinhaltet Sonderzeichen und "".
Auf der anderen Seite funktioniert
.. myListe.append(( task[0], urllib.urlopen(task[1]).read()))
problemlos.
Ich bin mit meinen Möglichkeiten am Ende und habe auch keine weiteren Lösungsansätze mehr, was mache ich falsch ?
Code: Alles auswählen
#!/usr/bin/env python
# -*- coding: utf8 -*-
import time
import urllib
import socket
import multiprocessing as mp
#socket.setdefaulttimeout(15)
def worker(qTask, qDone, qError):
while True:
task = qTask.get()
if task is None:
print '=> The process <{}> ends now.'.format(mp.current_process().name)
break
print '=> {} : GET : {}'.format(mp.current_process().name, task[1])
try :
# qDone.put((task[0], urllib.urlopen(task[1]).read()))
qDone.put((task[0], task[1]))
except :
print '=> {} : Konnte {} nicht geladen werden.'.format(mp.current_process().name, task[1])
qError.put((task[0], task[1]))
def main():
pmStart = time.clock()
sites = [('Name1', 'https://docs.python.org/2.7/py-modindex.html'),
('Name2', 'https://docs.python.org/2.7/genindex-A.html'),
('Name3', 'https://docs.python.org/2.7/genindex-B.html')]
# Establish communication queues
qTask = mp.Queue()
qDone = mp.Queue()
qError = mp.Queue()
# Submit tasks
for url in sites:
qTask.put(url)
# switch : 0 = ohne mp, 1 = use mp
m = 1
if m == 1:
# The processes
workers = 2
worker_processes = []
for w in xrange(workers):
p = mp.Process(target=worker, args=(qTask, qDone, qError))
p.start()
worker_processes.append(p)
qTask.put(None)
# for p in worker_processes:
# p.join()
time.sleep(10)
else:
qTask.put(None)
worker(qTask, qDone, qError)
qDone.put(None)
qError.put(None)
while True:
done = qDone.get()
if done is None:
break
print '=>', done[0], done[1][:15]
print "\n=> Finished ....."
print "=> Active children :", mp.active_children()
print '=> Runtime : {0:.4f} Sec.\n'.format(time.clock() - pmStart)
if __name__ == '__main__':
mp.freeze_support()
main()