multiprocessing und queue problem

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
mit
User
Beiträge: 285
Registriert: Dienstag 16. September 2008, 10:00

Hi,
Gibt es in multiprocessing eine Moeglichkeit CPU Cores zu reversieren und anschließend vielleicht von einer Queue mit Daten zu versorgen. ZB. wenn ein Core fertig mit bearbeiten ist und das Ergebnis zurück geliefert hat bekommt es neue Daten. Ich habe bemerkt, dass jedes mal ein neues Thread/Pool zu starten zu viel Zeit Anspruch nehmt wenn man viele Daten hat. Ich habe es wie folgt versucht zu loesen:

Code: Alles auswählen

from multiprocessing import Pool
from heapq import heapreplace
from operator import itemgetter
from pprint import pprint

def test_func(cluster):
    ref = dict(cluster)
    return ref

get_second = itemgetter(1)

def group_similar_sums(items, group_count):   #Loesung von BlackJack
    groups = [(0, list()) for _ in xrange(group_count)]
    for item in sorted(items, key=get_second, reverse=True):
        sum_, group = groups[0]
        group.append(item)
        heapreplace(groups, (sum_ + item[1], group))
    return map(get_second, groups)


if __name__ == '__main__':
    info = [('a', 3),('b', 2),('c', 3),('d', 2),('e', 2)]
    clusters = group_similar_sums(info, 3) 
    print clusters # [[('c', 3)], [('a', 3), ('e', 2)], [('b', 2), ('d', 2)]]

    pool = Pool(processes=3)
    s = dict(pool.imap_unordered(test_func, clusters))
    pool.close()

    print s
Aber bekomme folgende Fehlermeldung:

Code: Alles auswählen

 python b.py
[[('c', 3)], [('a', 3), ('e', 2)], [('b', 2), ('d', 2)]]
Traceback (most recent call last):
  File "b.py", line 29, in <module>
    s = dict(pool.imap_unordered(test_func, clusters))
ValueError: dictionary update sequence element #0 has length 1; 2 is required
Wie koennte man das Problem beheben oder gibt eine bessere Methode?

Vielen Dank im voraus
BlackJack

@mit: Was soll denn hier das Ergebnis sein? Verstehst Du die Fehlermeldung? `dict()` erwartet wenn es mit einem iterierbaren Objekt aufgerufen wird, zweielementige Objekte, also Schlüssel/Wert-Paare. Das sind `dict`-Exemplare aber nicht.
mit
User
Beiträge: 285
Registriert: Dienstag 16. September 2008, 10:00

Vorher hat jeder Thread z.b. ('a', 3) zurueck geliefert, aber es mussten zu viele Threads geöffnet und geschlossen werden. Deshalb hat das ganze zu lange gedauert. Mit den cluster ist es viel schneller, aber ich bekomme die Ergebnisse nicht in den dict kopiert und ich weiss nicht wie man es beheben koennte.
deets

@mit

Dann schau dir doch mal *genau* an, was du da eigentlich zurueckbekommst. Ist das eine Liste von Tupeln?
BlackJack

@mit: Dann musst Du entweder tatsächlich ein Wörterbuch mit den einzelnen Ergebnissen, die ja wieder Wörterbücher sind, füttern. Dafür gibt es eine Methode. Oder Du musst dafür sorgen, dass die Ergebnisse als Strom von Schlüssel/Wert-Paaren in den `dict()`-Aufruf kommt. Wenn die einzelnen Prozesse eine Liste von solchen Paaren liefern, ginge das zum Beispiel mit `itertools.chain.from_iterable()`
mit
User
Beiträge: 285
Registriert: Dienstag 16. September 2008, 10:00

Danke, mit `itertools.chain.from_iterable()` hat es funktioniert.

Ich habe hier http://www.doughellmann.com/PyMOTW/mult ... ation.html eine Queue Loesung gefunden und an das Problem angepasst (siehe unten):

Code: Alles auswählen

from multiprocessing import Process, JoinableQueue, Queue
from pprint import pprint

class TestClass(Process):
    def __init__(self, task_queue, result_queue):
        Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue

    def run(self):
        proc_name = self.name
        while True:
            next_task = self.task_queue.get()
            if next_task is None:
                # Poison pill means shutdown
                print '%s: Exiting' % proc_name
                self.task_queue.task_done()
                break
            print '%s: %s' % (proc_name, next_task)
            answer = next_task
            self.task_queue.task_done()
            self.result_queue.put(answer)
            
        return 

if __name__ == '__main__':
    info = [('a', 3),('b', 2),('c', 3),('d', 2),('e', 2)]

    tasks = JoinableQueue()
    results = Queue()
    
    # Start consumers
    num_cores = 2 #multiprocessing.cpu_count() 
    print 'Creating %d consumers' % num_cores
    threads = [ TestClass(tasks, results)
                  for i in range(num_cores) ]
    for i, w in enumerate(threads):
        w.start()
        print('Thread ' + str(i+1) + ' has started!')
    
    # Enqueue jobs
    for i in info:
        print i
        tasks.put(i)
    
    # Add a poison pill for each consumer
    for i in range(num_cores):
        tasks.put(None)

    # Wait for all of the tasks to finish
    tasks.join()
    
    num_jobs = len(info)
    # Start printing results
    
    results_total = dict((results.get() for i in range(num_jobs)))
    print results_total
Jedoch benutzt der obere code mehr RAM ist aber schneller.

Ist es moeglich Threads mit einer anderen Loesung zu beenden anstatt mit 'if next_task is None:'?
Warum verwendet man in den orginalen code JoinableQueue() und Queue(), anstatt zb. nur Queue()?
Gibt es andere Queues die weniger RAM verwenden und schnell sind?
BlackJack

@mit: Threads kann man nicht wirklich anders beenden wenn man eine Queue verwendet. Zumindest nicht einfacher.

Auf der `JoinableQueue` werden `task_done()` und `join()` aufgerufen. Du möchtest ja warten bis alle Aufgaben abgearbeitet sind bevor Du das Ergebnis auswertest.

Die Queues verwenden keinen Speicher. Also natürlich nicht *gar* keinen, aber es sollte nichts sein was zum Beispiel über eine Liste mit dem gleichen Inhalt deutlich hinaus geht. Wenn Du also ein Speicherproblem hast, dann sind das eher die Daten *in* der Queue als das bisschen Speicher was so eine Queue zusätzlich benötigt.
Antworten