Tornado: read/write mit locks sichern

Django, Flask, Bottle, WSGI, CGI…
Antworten
Benutzeravatar
sls
User
Beiträge: 480
Registriert: Mittwoch 13. Mai 2015, 23:52
Wohnort: Country country = new Zealand();

Ich habe einen Tornado-Webservice, der eine Vielzahl von Requests pro Sekunde entgegennimmt. Der MainHandler nimmt dabei die Anfragen entgegen und übergibt sie an die Klasse `Store`, welche die Daten aus dem Request lokal auf Platte schreibt. Der Filename trägt dabei die ID, die dem Request übergeben wird. So weit, so gut. Was ich noch nicht ganz verstanden habe bzw. sicherstellen möchte ist, dass die Methode Store.write() nicht blockiert, bzw. die worker asynchron schreiben können. Das scheint mit dem ThreadPoolExecutor soweit zu laufen, jedoch verstehe ich nicht, was genau passiert, wenn z.B. 10 Anfragen gleichzeitig auf ein und das selbe File zugreifen wollen.

Die Applikation ist dabei nie auf die Nase gefallen, ich habe testweise 10 Files mit gleicher File-ID mit á 300 MB gleichzeitig an den Webservice geschickt, alle Files werden dabei ohne Probleme dem File angehängt. Daraus ergibt sich für mich die Frage, was passiert hier? Nach meinem Verständnis müssten doch alle Worker gleichzeitig versuchen, in das File zu schreiben, was aber eigentlich einen Fehler schmeißen sollte, da das File bereits geöffnet wird? Oder stimmt Tornado hier bereits den Zugriff ab, oder aber schreiben alle Worker einfach wirr in das File ?

Zugegeben, Tornado macht regen Gebrauch von Asyncio unter der Haube, wovon ich wenig Praxiserfahrung habe.

Der Code (um o.g. Problem mit dem parallelen Filezugriff zu umgehen habe ich ein lock implementiert. Kannte das bisher nur von threading.Lock() um Zugriffe atomar zu machen):

Code: Alles auswählen

from concurrent import futures
from tornado import web, ioloop, gen, concurrent, locks


class Store:

    def __init__(self):
        self.executor = futures.ThreadPoolExecutor(max_workers=10)

    @concurrent.run_on_executor(executor='executor')
    def write(self, id, content):
        with open(id, 'ab') as fd:
            fd.write(content)


class MainHandler(web.RequestHandler):

    def initialize(self):
        self.store = Store()
        self.lock = locks.Lock()

    @gen.coroutine
    def put(self, id):
        with (yield self.lock.acquire()):
            yield self.store.write(id, self.request.body)


def start():
    return web.Application([
        (r"/(.*)", MainHandler),
    ])


if __name__ == '__main__':
    app = start()
    app.listen(8888, max_buffer_size=(300*1024*1024))
    ioloop.IOLoop.instance().start()
When we say computer, we mean the electronic computer.
__deets__
User
Beiträge: 14493
Registriert: Mittwoch 14. Oktober 2015, 14:29

Tornado hat mit concurrent ja nichts zu tun. Wenn das echte Threads sind, dann greifen die auch theoretisch parallel auf die Datei zu. ABER es gibt ja noch das GIL - womit wiederum eine serialisierung der Zugriffe stattfindet. Allerdings nur beschraenkt: Python erlaubt eine Reihe von byte-codes in einem Thread bevor dem das GIL wieder entzogen wird. Je nachdem wie deine Anwendung aussieht, reicht das, um alle Daten zu schreiben. Es ist aber NICHT garantiert! Deine Anwendung funktioniert also nur zufaellig, und das mag zB unter bestimmten Lastszenarien auch ganz ploetzlich aufhoeren.

Besser als threading waere hier die Schreibvorgaenge einfach mit dem IOLoop zu registrieren. Sie duerfen zu dem Zweck natuerlich nicht all zu gross sein, sondern muessen ihrerseits wieder als Co-Routinen daherkommen. Oder du nimmst ein GBL - grosses, boeses, lock, rund um deinen Schreibvorgang. Der zwingt die Threads dann in Reih und Glied.
Sirius3
User
Beiträge: 17711
Registriert: Sonntag 21. Oktober 2012, 17:20

Da es ein grosses boeses write gibt, blockiert das ganze System sowieso.

Wie immer würde ich mir hier nichts neues erfinden, sondern einen Hintergrundthread pro ID starten, der über eine Queue gefüttert wird, und alle Daten schreibt. Und natürlich keine Threads, sondern was leichtgewichtiges via asyncio.
__deets__
User
Beiträge: 14493
Registriert: Mittwoch 14. Oktober 2015, 14:29

@Sirius3: wieso blockiert das ganze System? Da er concurrent verwendet, ist die blockierende write-Operation doch in einem anderen Thread, womit die IO-Pause auch das GIL freigibt, und anderer Code laufen kann? Wenn die Daten gross genug waeren, wuerde das auch zu verwuerfelten Daten fuehren, denn so weit ich weiss, sind verschiedene writes auf eine Datei nicht serialisiert.
Benutzeravatar
sls
User
Beiträge: 480
Registriert: Mittwoch 13. Mai 2015, 23:52
Wohnort: Country country = new Zealand();

Genügt es denn, in der put-Methode ein lock.acquire() zu setzen?

Code: Alles auswählen

 @gen.coroutine
    def put(self, id):
        with (yield self.lock.acquire()):
            yield self.store.write(id, self.request.body)
Brainfuck, werden dadurch alle Coroutinen blockiert, oder nur eine, und damit dann verhindert dass mehrere Coroutinen in ein und das selbe File schreiben?
When we say computer, we mean the electronic computer.
Sirius3
User
Beiträge: 17711
Registriert: Sonntag 21. Oktober 2012, 17:20

Wie schon geschrieben, benutze Queues, das ist die sauberste Art, konkurrierende Operationen zu serialisieren.
Benutzeravatar
kbr
User
Beiträge: 1487
Registriert: Mittwoch 15. Oktober 2008, 09:27

@sls: Deine Coroutine wartet brav bis sie einen Lock erhält und blockiert dabei nicht. Nach setzen des Locks wird die write-Methode aufgerufen und in einem separaten Thread ausgeführt. Der Lock aber wird im Scope der Coroutine gehalten und daher sofort wieder freigegeben. D.h. in Deinem Programm blockiert nichts und die write-Vorgänge können erratisch sein.

Besser du schiebst alles, was zu schreiben ist, in eine Queue und baust es in einem anderen Thread zum schreiben wieder richtig zusammen. Dann brauchst Du im einfachsten Fall nur einen zusätzlichen Thread sowie kein Lock.
Benutzeravatar
sls
User
Beiträge: 480
Registriert: Mittwoch 13. Mai 2015, 23:52
Wohnort: Country country = new Zealand();

Ich habe jetzt mal versucht einen Queue-Mechanismus zu implementieren, der Code schaut jetzt so aus:

Code: Alles auswählen

from concurrent import futures
from tornado import web, ioloop, gen, concurrent
from tornado.queues import Queue


class Store:

    def __init__(self):
        self.executor = futures.ThreadPoolExecutor(max_workers=10)
        self.queue = Queue()

    @gen.coroutine
    def watch_queue(self):
        print(self.queue.qsize())
        while True:
            item = yield self.queue.get()
            self.write(item[0], item[1])

    @concurrent.run_on_executor(executor='executor')
    def write(self, id, content):
        print("Message with id %s stored" % id)
        with open(id, 'ab') as fd:
            fd.write(content)


class MainHandler(web.RequestHandler):

    def initialize(self, store):
        self.store = store

    @gen.coroutine
    def put(self, id):
        item = (id, self.request.body)
        yield self.store.queue.put(item)


def start(store):
    return web.Application([
        (r"/(.*)", MainHandler,
         {"store": store})
    ])


if __name__ == '__main__':
    store = Store()
    app = start(store)
    app.listen(8888, max_buffer_size=(300*1024*1024))
    ioloop.IOLoop.instance().add_callback(store.watch_queue)
    ioloop.IOLoop.instance().start()
Beim Testen ergeben sich jedoch folgende Ergebnisse:

Schicke ich 6 Messages mit á 300 MB, werden diese anscheinend der Reihe nach verarbeitet und dem lokalen File angefügt. (Ich wollte mir auch die Queue-Tiefe anzeigen lassen, irgendwie ist diese aber immer 0, anscheinend weil eine Nachricht in die Queue geschoben und gleich wieder verabeitet wird, hier staut sich nichts auf)

Code: Alles auswählen

0
Message with id 1 stored
Message with id 1 stored
Message with id 1 stored
Message with id 1 stored
Message with id 1 stored
Message with id 1 stored
Mit time in der shell gemessen:
0,34s user 1,11s system 24% cpu 5,850 total

Schicke ich nun 6 Files mit id `1` und parallel 6 Files mit id `2` ergibt sich folgendes Ergebnis:

Code: Alles auswählen

0
Message with id 2 stored
Message with id 2 stored
Message with id 1 stored
Message with id 2 stored
Message with id 1 stored
Message with id 2 stored
Message with id 1 stored
Message with id 1 stored
Message with id 2 stored
Message with id 1 stored
Message with id 2 stored
Message with id 1 stored
Wieder mit time gemessen:
0,34s user 1,15s system 16% cpu 9,119 total (die ersten 6 files id `1`)
0,33s user 1,11s system 15% cpu 9,418 total (die anderen, parallel geschickten 6 files id `2`)

So, jetzt sehe ich zwar dass der Webservice die Anfragen zwar parallel bearbeitet, und die Queue (hoffentlich) die Daten pro file sauber hintereinander schreibt, ich frage mich jedoch warum sich die einfache Verarbeitungszeit von ca. 5,9 Sekunden pro 6 files im zweiten Beispiel, beim parallelen Versenden von insg. 12 files die Zeit auf rund 9,x Sekunden erhöht?

Sollte das schreiben der Dateien, egal ob 6 oder 12 nicht ungefähr gleich lange dauern? Im zweiten Beispiel habe ich zwar 12 Dateien, jedoch mit zwei unterschiedlichen IDs, oder muss ich hier irgendwie pro `id` eine eigene Queue implementieren?
When we say computer, we mean the electronic computer.
DasIch
User
Beiträge: 2718
Registriert: Montag 19. Mai 2008, 04:21
Wohnort: Berlin

Lesen und schreiben auf ein Dateisystem ist weder unbegrenzt schnell noch unbegrenzt parallelisierbar. 12 * 300MiB sind 3.6GiB, die in 9s zu schreiben ist schon eine gute Zeit. Das würdest du in vielen Szenarien nicht hinbekommen.
Sirius3
User
Beiträge: 17711
Registriert: Sonntag 21. Oktober 2012, 17:20

@sls: das mit der Queue macht doch so überhaupt keinen Sinn. Du startest ja trotzdem für jeden write einen eigenen Thread.

Code: Alles auswählen

from tornado import web, ioloop, gen
from tornado.queues import Queue
from tornado.iostream import PipeIOStream


class Store:
    def __init__(self):
        self.queue = Queue()

    @gen.coroutine
    def write_queue(self):
        while True:
            item = yield self.queue.get()
            print("Message with id %s stored" % id)
            fd = open(id, 'ab')
            stream = PipeIOStream(fd.fileno)
            yield stream.write(content)
            stream.close()


class MainHandler(web.RequestHandler):

    def initialize(self, store):
        self.store = store

    @gen.coroutine
    def put(self, id):
        yield self.store.queue.put((id, self.request.body))


def start(store):
    return web.Application([
        (r"/(.*)", MainHandler,
         {"store": store})
    ])


if __name__ == '__main__':
    store = Store()
    app = start(store)
    app.listen(8888, max_buffer_size=(300*1024*1024))
    ioloop.IOLoop.instance().add_callback(store.write_queue)
    ioloop.IOLoop.instance().start()
Jetzt ist garantiert, dass nicht zwei Threads gleichzeitig in eine Datei zu schreiben versuchen.
Als Optimierung, könnte man jetzt für jede ID eine eigene Queue erzeugen.
Benutzeravatar
sls
User
Beiträge: 480
Registriert: Mittwoch 13. Mai 2015, 23:52
Wohnort: Country country = new Zealand();

Sirius3 hat geschrieben: Mittwoch 30. Mai 2018, 13:37 @sls: das mit der Queue macht doch so überhaupt keinen Sinn. Du startest ja trotzdem für jeden write einen eigenen Thread.
Stimmt, leuchtet ein.

Ich wollte dein Beispiel jetzt so hinbauen dass es auch funktioniert:

Code: Alles auswählen

    @gen.coroutine
    def write_queue(self):
        while True:
            item = yield self.queue.get()
            print("Message with id %s stored" % item[0])
            fd = open(item[0], 'ab')
            stream = PipeIOStream(fd.fileno())
            yield stream.write(item[1])
            stream.close()
Funktioniert auch wenn ich ein File losschicke, nur leider fliegt er beim zweiten File auf die Nase:

Code: Alles auswählen

ERROR:tornado.application:Exception in callback functools.partial(<function wrap.<locals>.null_wrapper at 0x7fcebe53c6a8>, <Future finished exception=OSError(9, 'Bad file descriptor')>)
Traceback (most recent call last):
  File "/usr/local/lib/python3.5/dist-packages/tornado/ioloop.py", line 759, in _run_callback
    ret = callback()
  File "/usr/local/lib/python3.5/dist-packages/tornado/stack_context.py", line 276, in null_wrapper
    return fn(*args, **kwargs)
  File "/usr/local/lib/python3.5/dist-packages/tornado/ioloop.py", line 780, in _discard_future_result
    future.result()
  File "/usr/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
  File "/usr/local/lib/python3.5/dist-packages/tornado/gen.py", line 1113, in run
    yielded = self.gen.send(value)
  File "/home/sls/IdeaProjects/sandbox/start.py", line 18, in write_queue
    stream = PipeIOStream(fd.fileno())
  File "/usr/local/lib/python3.5/dist-packages/tornado/iostream.py", line 1643, in __init__
    self._fio = io.FileIO(self.fd, "r+")
OSError: [Errno 9] Bad file descriptor
Kann er den file descriptor nicht schließen, oder was hat er hier für Schmerzen? Ein fd.close() wirft den Fehler sofort nach dem schreiben des files, ich kann den file descriptor auch nicht über die IOLoop.instance() schließen.
When we say computer, we mean the electronic computer.
Benutzeravatar
sls
User
Beiträge: 480
Registriert: Mittwoch 13. Mai 2015, 23:52
Wohnort: Country country = new Zealand();

Nachtrag: ich habe jetzt nochmal bei SO gefragt und Antwort vom Lead-Developer von Tornado erhalten (https://stackoverflow.com/questions/506 ... descriptor). Demnach scheint der Vorschlag mit PipeIOStream leider nicht so richtig geeignet zu sein.
When we say computer, we mean the electronic computer.
Antworten