sqlalchemy in mehreren threads

Installation und Anwendung von Datenbankschnittstellen wie SQLite, PostgreSQL, MariaDB/MySQL, der DB-API 2.0 und sonstigen Datenbanksystemen.
Antworten
Trubinial Guru
User
Beiträge: 117
Registriert: Dienstag 7. April 2009, 13:40

Hallo,
nach einigen Schwierigkeiten mit pysqlite in verschiedenen Threads benutze ich nun sqlalchemy. Ich habe insgesamt 4 Threads, wobei die Lese- und Schreibzugriffe der ersten beiden Threads immer sofort ausgeführt werden sollten. Die beiden weiteren Threads brauchen wesentlich mehr Zugriffe auf die Datenbank, als die ersten beiden.
Im Moment ist es so, dass die Zugriffe der beiden letzten Threads die Zugriffe der ersten beiden Threads total blockieren, da es eine Weile dauert, bis alle Zugriffe des 3.+4. Threads durchgelaufen sind.

Ich habe mit Datenbank sehr wenig Erfahrungen und suche deshalb Hilfe. Ich weiß die sqlalchemy Dokumentation soll gut sein und ist sehr ausführlich, doch ich finde mich einfach nicht zurecht und komme da alleine nicht weiter :( :K . Wie schaffe ich dieses Geschwindigkeitsproblem zu beheben? Ist es möglich mit sqlalchemy parallele Zugriffe auf die sqlite Datenbank zu machen bzw. kann ich Prioritäten oder ähnliches an die ersten beiden Threads geben?

Ich hoffe ihr könnt mir weiterhelfen
Benutzeravatar
sparrow
User
Beiträge: 4195
Registriert: Freitag 17. April 2009, 10:28

Was machen denn diese verschiedenen Threads?
Trubinial Guru
User
Beiträge: 117
Registriert: Dienstag 7. April 2009, 13:40

@sparrow Das erste Thread nimmt nur eine Änderung einer Zeile vor, während das dritte Thread die Datenbank regelmäßig mit einer anderen Quelle abgleicht. Im vierten Thread wird übrigens nur lesend auf die Datenbank zugegriffen.
Benutzeravatar
sparrow
User
Beiträge: 4195
Registriert: Freitag 17. April 2009, 10:28

Gut, der Thraed der mit einer anderen Datenbank abgleicht, muss wohl schreiben. Das bedeutet bei SQLite immer einen Exklusiven Lock, andere Schreiboperationen müssen entsprechend warten.

Du könntest diesen Teil aber so schreiben, dass er nur möglichst kurz diesen exklusiven Zugriff braucht. Du könntest die entsprechenden Daten zur Änderungen ermitteln und diese dann mit einem Rutsch wegschreiben. Selbst bei einem großen Datenbestand ist das dann nur noch eine Angelegenheit, die schnell erledigt ist.

Also beide Datenbank lesen und die Daten abgleichen. Die entsprechenden "Änderungen" für die Datenbank werden dabei ermittelt und vorbereitet. Während dieser Zeit braucht der Thread keinen exklusiven Zugriff.
Anschließend dann kurz die vorbereiteten Updates und Inserts, die sind dann das einzig Schreibende an dem ganzen Vorgang.
Trubinial Guru
User
Beiträge: 117
Registriert: Dienstag 7. April 2009, 13:40

Ich verstehe. Ich habe nun ein wenig am code getüftelt. Du meintest, dass ich die Daten mit einem Rutsch wegschreiben sollte. Da ich keine Möglichkeit gefunden habe mehrere Statements gleichzeitig mit sqlalchemy "abzuschicken" (sql-stmt-list), musste ich eine Schleife schreiben. Diese Schleife wiederum ist suboptimal, da der Prozess noch immer lange dauert und darüber hinaus wieder die "Database is locked" Fehler kommt, wenn einer der anderen Threads in dieser Zeit versucht etwas aus der Datenbank zu lesen.
Gibt es da bessere Möglichkeiten? Was mache ich falsch?

Welchen Bezug haben Sessions auf sqlalchemy in threads? Bis jetzt erstelle ich keine Sessions, sondern nur engines (für jeden Thread ein eine eigene).

Vielen Dank für deine Hilfe!
Trubinial Guru
User
Beiträge: 117
Registriert: Dienstag 7. April 2009, 13:40

Ich habe mal ein kleines sinnfreies Beispielprogramm beschrieben, was das Problem ein wenig veranschaulicht.

Code: Alles auswählen

# -*- coding: utf-8 -*-

from sqlalchemy import *
from sqlalchemy.orm import mapper
from sqlalchemy.orm import sessionmaker
import os, time, thread

class Zeiten(object):
    pass
    
engine = create_engine('sqlite:///adressen.db')
metadata = MetaData()
metadata.bind = engine

Session = sessionmaker(bind=engine)
session = Session()

tab = Table('Zeittabelle', metadata,
Column('id', Integer, primary_key = True),
Column('Zeit',Integer))
metadata.create_all()

mapper(Zeiten, tab)


def neueZeiten():
    for i in range(100):
        engine = create_engine('sqlite:///adressen.db')
        Session = sessionmaker(bind=engine)
        session = Session()

        neue_Zeit = Zeiten()
        neue_Zeit.Zeit = time.time()

        session.add(neue_Zeit)
        session.commit()
        print "Thread"

thread.start_new_thread(neueZeiten,()) 

for i in range(100):
    neue_Zeit = Zeiten()
    neue_Zeit.Zeit = time.time()

    session.add(neue_Zeit)
    session.commit()
    print 'Hauptprogramm'

Ich weiß, dass ich nicht gleichzeitig in die Datenbank schreiben kann, aber in meinem Programm kann es trotzdem passieren, dass mehrere Schreibzugriffe gleichzeitig versucht werden. Gibt es eine Möglichkeit, dass die Schreibzugriffe nacheinander Abgearbeitet werden und somit der Fehler "Database is locked" Fehler nicht mehr auftritt? Bzw. wie kann ich dafür sorgen, dass zuerst die Schreibzugriffe des Hauptprogrammes abgearbeitet werden und dann die weniger wichtigeren des Threads folgen?
Benutzeravatar
pillmuncher
User
Beiträge: 1484
Registriert: Samstag 21. März 2009, 22:59
Wohnort: Pfaffenwinkel

@Trubinial Guru: Warum machst du dir das Leben unnötig schwer, indem du thread benutzt, statt threading? Wenn man den Index zur Library Reference von oben nach unten liest, dann findet man threading sogar vor thread.

Du könntest alle Schreib-Operationen serialisieren (= von einem einzigen Thread in Serie ausführen lassen) und die Schreib-Aufträge über eine Priority-Queue verschicken (ab Python Version 2.6), zB. so:

Code: Alles auswählen

import Queue
from threading import Thread

SUPER_IMPORTANT = 0
NOT_SO_IMPORTANT = 1

def writer(queue):
    while True:
        priority, data = queue.get()
        ... write data somewhere ...

def super_important_stuff(queue):
    while True:
        data = ... get super important data ...
        queue.put((SUPER_IMPORTANT, data))

def not_so_important_stuff(queue):
    while True:
        data = ... get not so important data ...
        queue.put((NOT_SO_IMPORTANT, data))

def main():
    queue = Queue.PriorityQueue()
    threads = (
        [Thread(target=lambda:writer(queue))] +
        [Thread(target=lambda:super_important_stuff(queue)) for i in xrange(3)] +
        [Thread(target=lambda:not_so_important_stuff(queue)) for i in xrange(10)])
    for each in threads:
        each.start()
    ... laaaange Zeit irgendwas anderes machen, und dann irgendwann:
    for each in threads:
        each.join()
So jedenfalls sähe das allgemeine Schema aus. Du musst dir allerdings noch überlegen, wie du aus den while True:-Blöcken rauskommst, damit dein Programm auch irgendwann beendet werden kann.
In specifications, Murphy's Law supersedes Ohm's.
Trubinial Guru
User
Beiträge: 117
Registriert: Dienstag 7. April 2009, 13:40

Von der Priority-Queue bin ich ja sehr begeistert! Super Praktisch. Du meintest, dass die Schreiboperationen serialisiert werden müssen, wie sieht es mit den Lesezugriffen aus. Wenn die Datenbank gelocked ist, wenn gerade geschrieben wird, sollten ja auch die Lesezugriffe scheitern.

Sollte das so sein muss ich mir was überlegen, wie ich die Daten zurückgebe an das jeweilige Thread.
Tausend Dank schonmal für den Queue-Tipp.
Benutzeravatar
pillmuncher
User
Beiträge: 1484
Registriert: Samstag 21. März 2009, 22:59
Wohnort: Pfaffenwinkel

Trubinial Guru hat geschrieben:Du meintest, dass die Schreiboperationen serialisiert werden müssen, wie sieht es mit den Lesezugriffen aus. Wenn die Datenbank gelocked ist, wenn gerade geschrieben wird, sollten ja auch die Lesezugriffe scheitern.
Du solltest dir das hier: http://www.sqlite.org/sharedcache.html mal durchlesen, insbesondere die Punkte 2.1 und 2.2.*.

Wenn dir die Konsistenz der gelesenen Daten egal ist, dann kannst du den Isolation Level auf READ_UNCOMMITTED setzen und kannst parallel zu den Schreib-Operationen auch Lese-Operationen ausführen.

Wenn du dagegen beim Lesen unbedingt Inkonsistenzen vermeiden möchtest, dann solltest du den Isolation Level auf SERIALIZABLE (=default) belassen. Aber Obacht: Wenn du in diesem Level parallel zu einem Schreib-Thread zu lesen versuchst, dann bekommst du den bekannten Zugriffs-Fehler. Deswegen solltest du die Lese-Aufträge über dieselbe Priotity-Queue wie die Schreib-Aufträge schicken, dann wird der Thread, der aus ihr liest, zum Kommunikations-Zentrum mit der DB, ungefähr so:

Code: Alles auswählen

def db_thread(queue):  # war: writer()
    while True:
        priority, data, result_queue = queue.get()
        if result_queue is not None:
            result = ... select *** from ---
            result_queue.put(result)
        else:
            ... write data somewhere ...

def query(db_queue, importance, *params):
    result_queue = Queue.Queue()
    db_queue.put((importance, ...select stmt..., result_queue))
    return result_queue.get()  # blockt bis das Ergebnis da ist
    
def super_important_stuff(queue):
    while True:
        data = ... get super important data ...
        queue.put((SUPER_IMPORTANT, data, None))

...

my_data = query(SUPER_IMPORTANT, queue, bla, fasel)
Die queue in der letzten Zeile ist die Priority-Queue aus main() in meinem andere Beitrag oben. Die muss halt irgendwo zentral zugänlich sein.

Inwieweit das alles skaliert, weiß ich allerdings nicht. Das musst du ausprobieren. Ich hab mal bei einer Firma gearbeitet, da wurde das Verhalten des Programms bei möglichen konkurrierenden Zugriffen so getestet:

Alle Programmierer bitte:
  • 14:30 - 14:45: Rechnungen anlegen
  • 15:00 - 15:15: Aufträge bestätigen
  • 15:30 - 15:45: Mahn-Aufträge anlegen
  • ...


*brrr*
In specifications, Murphy's Law supersedes Ohm's.
Benutzeravatar
jens
Python-Forum Veteran
Beiträge: 8502
Registriert: Dienstag 10. August 2004, 09:40
Wohnort: duisburg
Kontaktdaten:

Wie wäre es eine andere DB als SQlite zu nutzten?

GitHub | Open HUB | Xing | Linked in
Bitcoins to: 1JEgSQepxGjdprNedC9tXQWLpS424AL8cd
Antworten