multiprocessing - communication between processes

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
Daikoku
User
Beiträge: 66
Registriert: Montag 20. April 2015, 21:14

Hallo,

ich habe ein Problem, welches ich selber nicht gelöst bekomme und bitte um Eure Hilfe. Vielen Dank.

Ich benutze Windows 7 und Python 2.7.9 [MSC v.1500 32 bit (Intel)] on win32.

Das nachstehende Programm funktioniert ohne multiprocessing.

Mit multiprocessing funktioniert die Variante qDone.put((task[0], task[1])) in der Funktion worker() Zeile 23.

Die Variante qDone.put((task[0], urllib.urlopen(task[1]).read())), Zeile 22 funktioniert mit mein Programm nicht mehr.
urllib.urlopen(task[1]).read() selber funktioniert, aber mein Programm beendet sich dann nicht mehr korrekt, da sich dieses,
soweit ich das feststellen konnte, in der Funktion main() Zeile 59-60

for p in worker_processes:
.... p.join()

beim zweiten Durchlauf der Schleife ins Nirwana verabschiedet.

Ersetze ich die for-Schleife komplett durch ein simples time.sleep(10) funktioniert alles wieder, welches jedoch keine
wirklich brauchbare Lösung ist.

Ich habe die while-schleife in der Funktion worker() auch schon durch eine ... for url in iter(qTask.get(), 'STOP') ...
Schleife ersetzt. In der Funktion main() habe ich alle Queues, qTask.put('STOP'), ..... entsprechend abgeändert.

Desweiteren habe ich anstelle einer URL auch schon den entsprechenden HTML-Code direkt an die Funktion worker() übergeben.
..... sites = [('Name1', 'https://docs.python.org/2.7/py-modindex.html'), ..........
ersetzt durch
...... sites = [('Name1', '''<!DOCTYPE html PUBLIC ...........'''),
funktioniert alles. Nur eben nicht mit qDone.put((task[0], urllib.urlopen(task[1]).read())).

Eine Überlegung hätte ich noch.
Vielleicht liegt es an den Daten die mit urlib.urlopen().read() an die Queue übergeben werden.
HTML-Code beinhaltet Sonderzeichen und "".
Auf der anderen Seite funktioniert
.. myListe.append(( task[0], urllib.urlopen(task[1]).read()))
problemlos.

Ich bin mit meinen Möglichkeiten am Ende und habe auch keine weiteren Lösungsansätze mehr, was mache ich falsch ?

Code: Alles auswählen

#!/usr/bin/env python
# -*- coding: utf8 -*-
import time
import urllib
import socket
import multiprocessing as mp

#socket.setdefaulttimeout(15)

def worker(qTask, qDone, qError):

    while True:
        task = qTask.get()

        if task is None:
            print '=> The process <{}> ends now.'.format(mp.current_process().name)
            break

        print '=> {} : GET : {}'.format(mp.current_process().name, task[1])

        try :
#            qDone.put((task[0], urllib.urlopen(task[1]).read()))
            qDone.put((task[0], task[1]))
        except :
            print '=> {} : Konnte {} nicht geladen werden.'.format(mp.current_process().name, task[1])
            qError.put((task[0], task[1]))


def main():
    pmStart = time.clock()

    sites = [('Name1', 'https://docs.python.org/2.7/py-modindex.html'),
             ('Name2', 'https://docs.python.org/2.7/genindex-A.html'),
             ('Name3', 'https://docs.python.org/2.7/genindex-B.html')]

    # Establish communication queues
    qTask = mp.Queue()
    qDone = mp.Queue()
    qError = mp.Queue()

    # Submit tasks
    for url in sites:
        qTask.put(url)

    # switch : 0 = ohne mp, 1 = use mp
    m = 1

    if m == 1:
        # The processes
        workers = 2
        worker_processes = []

        for w in xrange(workers):
            p = mp.Process(target=worker, args=(qTask, qDone, qError))
            p.start()
            worker_processes.append(p)
            qTask.put(None)

#        for p in worker_processes:
#            p.join()
        time.sleep(10)
    else:
        qTask.put(None)
        worker(qTask, qDone, qError)

    qDone.put(None)
    qError.put(None)

    while True:
        done = qDone.get()

        if done is None:
            break

        print '=>', done[0], done[1][:15]

    print "\n=> Finished ....."
    print "=> Active children :", mp.active_children()
    print '=> Runtime         : {0:.4f} Sec.\n'.format(time.clock() - pmStart)

if __name__ == '__main__':
    mp.freeze_support()
    main()
BlackJack

@Daikoku: Wenn ich mal raten müsste liegt das daran das die Verbindung zwischen den Prozessen durch die Daten blockiert sind, sprich die Subprozesse warten darauf das ihnen die Daten vom Hauptprozess abgenommen werden um sich dann beenden zu können während der Hauptprozess darauf wartet das sich die Subprozesse beenden bevor er die Daten abnimmt.
Antworten