threading sicher ?

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Antworten
waki
User
Beiträge: 133
Registriert: Dienstag 9. März 2010, 16:41

Guten Tag,
hab gerade diese Klasse implemetiert und mich würde interessieren ob das Teil 100% threading sicher ist? Kenn mich nicht ganz genau mit threading.Condition aus, aber so müsste es funktionieren, oder?

Code: Alles auswählen

import threading

class Queue(object):
    def __init__(self):
        self.queue = {}
        self.ergebnis = {}
        self.mutex = threading.Lock()
        self.not_empty = threading.Condition(self.mutex)
        self.not_full = threading.Condition(self.mutex)
        th = threading.Thread(target=self.worker)
        th.setDaemon(True)
        th.start()


    def put(self, function, args):

        self.not_full.acquire()

        for x in xrange(10000):
            if x not in self.queue.keys():
                id = x
                break
        self.queue[id] = (function, args)
        self.not_empty.notify()
        self.not_full.wait()
        erg = self.ergebnis[id]
        del self.ergebnis[id]
        self.not_full.release()
        return erg

    def worker(self):
        while True:
            self.not_empty.acquire()
            self.not_empty.wait()
            key = self.queue.keys()[0]
            item = self.queue[key]
            del self.queue[key]
            erg = item[0](*item[1])
            self.ergebnis[key] = erg
            self.not_full.notify()
            self.not_empty.release()

Gruß Waki
Zuletzt geändert von waki am Freitag 22. Juli 2011, 12:39, insgesamt 1-mal geändert.
EyDu
User
Beiträge: 4881
Registriert: Donnerstag 20. Juli 2006, 23:06
Wohnort: Berlin

Hallo.

Du solltest einen Blick auf`Queue` werfen, dann sparst du dir die ganze eigenen Locks. Außerdem solltest du dir Locks in Verbindung mit with-Statements anschauen. Das macht die ganze Sache ein wenig übersichtlicher und weniger Fehleranfällig.

Sebastian
Das Leben ist wie ein Tennisball.
sma
User
Beiträge: 3018
Registriert: Montag 19. November 2007, 19:57
Wohnort: Kiel

Ich würde sagen, die Queue-Implementierung (die überraschenderweise auf einem dict statt einer list arbeitet) ist unter der Annahme, dass Python-Code multi-threaded ausgeführt werden könnte, nicht thread-safe. Zum einen sieht

Code: Alles auswählen

            self.not_empty.acquire()
            self.not_empty.wait()
komisch aus. Zum anderen musst du in einer Schleife warten, denn du weißt ja nicht, welches Thread, der in einem put() auf das Ergebnis wartet, von dem notify() in worker() aufgeweckt wird. Tatsächlich ist hier IMHO ein Fehler in deinem Konzept, denn so wie du die Methoden lockst, funktioniert das gar nicht mit mehr als einem Thread. Und schließlich, was ist self.ergebnisse? Das definierst du gar nicht.

Stefan
waki
User
Beiträge: 133
Registriert: Dienstag 9. März 2010, 16:41

Ok, habe jetzt glaube ich eine andere Lösung die wirklich multithreading fähig ist, oder liege ich falsch?

Code: Alles auswählen

class file_access(object):
    def __init__(self):
        self.queue = Queue.Queue()
        self.ids = range(10000)
        self.ergebnis = {}
        self.mutex = threading.Lock()
        th = threading.Thread(target=self.worker)
        th.setDaemon(True)
        th.start()

    def put(self, function, args):

        with self.mutex:
            id = self.ids.pop(0)
            event = threading.Event()
        self.queue.put((id, event, function, args))
        event.wait()
        with self.mutex:
            erg = self.ergebnis[id]
            del self.ergebnis[id]
        return erg

    

    def worker(self):
        while True:
            if self.queue.empty():
                time.sleep(0.1)
                continue
            item = self.queue.get()
            erg = item[2](*item[3])
            self.ergebnis[item[0]] = erg
            with self.mutex:
                self.ids.append(item[0])
                item[1].set()
deets

Ich finde es schwer zu verstehen, was das soll - aber offensichtlicher Fehler ist der busy-loop fuer das queue.get in deinem worker. Nimm doch einfach direkt das queue.get, der wartet schon auf dich.

Und mir draengt sich der Eindruck auf, dass du da wild mit allem rumfuhrwerkst, was das threading-modul so hergibt. Wie waere es, wenn du mal erzaehlst, was du eigentlich erreichen willst - dann schauen wir mal, was man da so machen kann?
waki
User
Beiträge: 133
Registriert: Dienstag 9. März 2010, 16:41

Ich weiß nicht was du meinst, queue.get() ist doch außerhalb des locks?
Naja so wollte ich das eigentlich....

Code: Alles auswählen

Thread -> put(funktion, args) ->             wartet                                   -> bekommt den rückgabewert 
                      |                                                                           /\
                     \/                                                                            |
                  worker              -> führt übergebene funktion mit den args aus  ->       erg

Und so sollen dann die Funktionen nacheinader im worker ausgeführt werden. Der Grund für das ganze ist, die FUnktionen die übergeben werden verändern alle eine Datei. Damit sich das ganze nicht überschneidet aber ich trotzdem threading nutzen kann hab ich mir das hier überlegt....
BlackJack

@waki: `Queue.Queue` ist selbst schon thread-safe. Die Klasse benutzt schon Sperren, da brauchst Du nicht noch selber welche drumherum bauen. Starte einfach einen Thread mit einer Queue, der die Ergebnisse aus der Queue in eine Datei schreibt und lass die Worker-Threads ihre Ergebnisse in diese Queue stecken.
waki
User
Beiträge: 133
Registriert: Dienstag 9. März 2010, 16:41

Dachte ich mir auch erst.Das Problem ist nur dass auch aus der Datei gelesen wird. Somit muss der Thread auch den Rückgabewert bekommen.

@ Naja, lasen wir vom grundsatz das teil, es funktioniert genau so wie ich es will ^^ Mich interessiert nur ob das Teil wirklich multithreading fähig ist?
Benutzeravatar
pillmuncher
User
Beiträge: 1484
Registriert: Samstag 21. März 2009, 22:59
Wohnort: Pfaffenwinkel

waki hat geschrieben:Dachte ich mir auch erst.Das Problem ist nur dass auch aus der Datei gelesen wird. Somit muss der Thread auch den Rückgabewert bekommen.
Wo ist da der Zusammenhang? Ah, hier ist er:
waki hat geschrieben:

Code: Alles auswählen

Thread -> put(funktion, args) ->             wartet                                   -> bekommt den rückgabewert 
                      |                                                                           /\
                     \/                                                                            |
                  worker              -> führt übergebene funktion mit den args aus  ->       erg
Und so sollen dann die Funktionen nacheinader im worker ausgeführt werden. Der Grund für das ganze ist, die FUnktionen die übergeben werden verändern alle eine Datei. Damit sich das ganze nicht überschneidet aber ich trotzdem threading nutzen kann hab ich mir das hier überlegt....
Was du möchtest ist also, dass aus dem aktuellen Thread heraus worker Threads gestartet werden sollen, die dann aber nicht parallel, sondern sequentiell abgearbeitet werden, und auf die der aktuelle Thread dann wartet. Somit läuft auch dieser nicht parallel (da er ja wartet). Du hast also einen rein sequenziellen Ablauf. Wozu meinst du, brauchst du dann Threads?
waki hat geschrieben:@ Naja, lasen wir vom grundsatz das teil, es funktioniert genau so wie ich es will ^^ Mich interessiert nur ob das Teil wirklich multithreading fähig ist?
Was Steve Oualline immer sagt: If we don't write code like this, then we don't have to worry about such questions.

Falls du wirklich parallel laufende Threads brauchst, nimm einfach Queue.Queue. Da muss man nichts mehr threadsafe machen. Wenn du einen sequenziellen Ablauf möchtest, nimm Funktionsaufrufe.
In specifications, Murphy's Law supersedes Ohm's.
deets

@waki

"Das Teil" ist mit einer Menge Cargo-Cult-Programming mit Requisiten aus dem threading-Modul gespickt. Das macht es aber noch nicht thread-safe im Sinne der Aufgabe. Und auch die ist noch voellig unklar. Wenn deine Funktionen alle auf derselben Datei arbeiten (und sich dabei in die Quere kommen koennen), dann bringt dir threading nichts.

Also nochmal: *was* genau soll eigentlich passieren? Kann mehr als ein worker gleichzeitig arbeiten, und was macht dann in der Zeit der main-thread? Warten? Wenn ja, dann kannst du dir threading schenken. Wenn nur ein worker arbeiten kann, und der main-thread was anderes macht, dann kannst du dir auch eine Queue schenken, und es reicht wahlweise ein join oder eine condition. Oder eine simple variable auf True setzen, dank des GIL.
waki
User
Beiträge: 133
Registriert: Dienstag 9. März 2010, 16:41

Ok nochmal:

Es gibt EINEN worker thread in einer endlos-schleife. Dieser soll nacheinander die verschiedenen Funktionen ausführen, die er von den Threads bekommt. Denn dann bearbeitet immer nur der worker-thread die datei und es gibt keine überschneidung. Da dieser jedoch auch aus der Datei lesen muss, muss der das gelesene auch dann Auftragsgeber-thread zurück geben. Der Thread ist auch nicht da um das ganze zu beschleunigen, sondern nur um zu ermöglichen, dass bei einem Programm mit multithreading einfach operationen an einer Datei ausführen kann. Und die worker funktion lass ich nicht im main-thread laufen, weil ich sonst das Teil nur bei multithreading nutzen könnte und nicht wenn das programm ohne threading arbeitet, also put aus dem main-thread aufgerufen werden soll....
Benutzeravatar
pillmuncher
User
Beiträge: 1484
Registriert: Samstag 21. März 2009, 22:59
Wohnort: Pfaffenwinkel

@waki: Mach dir halt das Leben nicht so schwer:

Code: Alles auswählen

import Queue
import threading
import time

def work(queue):
    while True:
        func, rq = queue.get()
        rq.put(func())

worker_queue = Queue.Queue()

worker = threading.Thread(target=lambda:work(worker_queue))
worker.daemon = True
worker.start()

def enqueue(func):
    result_queue = Queue.Queue()
    worker_queue.put((func, result_queue))
    return result_queue.get()

# ...

def foo(a, b, c):
    time.sleep(1)
    return (a + b) * c

result = enqueue(lambda:foo(1, 2, 3))
print result
[EDIT]

Übrigens könntest du zusätzlich noch einen Decorator definieren:

Code: Alles auswählen

from functools import wraps

multithreaded = True # oder False, das kann man hier einstellen.

def file_modifier(func):
    if not multithreaded:
        return func
    @wraps(func)
    def caller(*args, **kwargs):
        return enqueue(lambda:func(*args, **kwargs))
    return caller

# ...

@file_modifier
def foo(a, b, c):
    time.sleep(1)
    return (a + b) * c

result = foo(1, 2, 3) # multithreaded == True  --> wird über den worker thread ausgeführt.
                      # multithreaded == False --> wird direkt ausgeführt.
print result
In specifications, Murphy's Law supersedes Ohm's.
waki
User
Beiträge: 133
Registriert: Dienstag 9. März 2010, 16:41

Ok, Danke! Funktioniert genauso wie ich es wollte :)
Antworten