SQLAlchemy: Arbeiten mit mehreren QThreads?
@__deets__: Das ich mich "komisch" ausdrücke, ist wohl meine Schwäche. Wobei ich es mir eher komisch vorstelle. Sagen wir mal, ich habe 100 Threads (nur rein hypothetisch), dann habe ich auch 100 with-Anweisungen. Der Quelltext bläht sich dadurch auf? Und wenn die session als scoped_session() vorbereitet wurde, dann kann man doch diese Session unter den Threads teilen, also die scoped_session() sich darum kümmern, dass die Threads miteiander nicht in Berührung kommen. Daher nahm ich an, dass man eine with-Anweisung anbringen kann, und im Korpus der with-Anweisung die ganzen Threads abarbeiten und die scoped_session() an die jeweilige Threads verteilen?
Wieso blähen denn 100 threads den Quellcode auf? Hier starte ich nicht nur hypothetisch 1000 Threads, und das in weniger als 1000 Zeilen, womit deine These ja ad absurdum gefuehrt wird:
Und zum gefuehlt 100sten mal: Sessions duerfen NICHT zwischen Threads geteilt werden. Was glaubst du denn, was eine Session eigentlich *ist*? Du kannst auch keine einzelne Datenbankverbindung nicht zwischen 1000 Threads teilen (normalerweise), weil eine solche Verbindung sich dann in jedem Thread gegenseitig auf die Fuesse tritt. Offene Cursor, Position auf der die stehen etc.
Also, noch einmal: EINE SESSION PRO THREAD.
Code: Alles auswählen
import threading
def tuwas():
with scoped_session() as session:
session.execute("etwas")
threads = [threading.Thread(target=tuwas) for _ in xrange(1000)]
[t.start() for t in threads]
[t.join() for t in threads]
Also, noch einmal: EINE SESSION PRO THREAD.
@__deets__: Zwei Dinge. Erstens: Das sich dein Quelltext nicht aufbläht ist doch klar. Du erstellst zwar 1000 Threads, aber all die Threads führen nur ein und die selbe Aufgabe aus. In meinem Fall ist es so, dass jeder Thread jedes Eingabefeld (QComboBox(), QLineEdit()... etc.) befüllen. Wesen Aussage führt ad absurdum? Zweitens: Aus deinen Ausführungen werde ich auch nicht schlau, daher zum 101. Male. Du redest die ganze Zeit von Session, demzufolge weiß ich nicht ob du von scoped_session() redest oder von den normalen Session? Denn scoped_session(), die als Rückgabe <class 'sqlalchemy.orm.scoping.scoped_session'> liefert, ist thread-sicher, als im Gegensatz einer normalen Session. So meinem Verständnis: Solange ich NUR die Rückgabe von scoped_session() nicht aufrufe, habe ich keine normale Session, demzufolge frage ich mich, wieso man an dieser Stelle nicht die Rückgabe der scoped_session() an die Threads verteilen kann? Ist nicht gerade der Vorteil der scoped_session(), dass sie sich darum kümmert, dass sich die Threads nicht gegenseitig in die Knie schießen?
Wenn man 100 verschiedene Funktionalitaeten erstellt, dann muss man 100 verschiedene Dinge programmieren, mit diversene Statements, Funktionen, Klassen, Modulen, Paketen sogar. Nichts davon ist besonders. Warum ist dann ein solches with-Statement etwas, das dir solche Sorgen bereitet, und "den Code aufbläht"?
Und die scoped_session ist thread-sicher, weil sie einen thread-local storage verwendet um eine Session anzulegen.
Du redest die ganze Zeit davon, dass du den Wert, der dir durch den Aufruf von eben dieser scoped_session zurueckgegeben wird *verteilen* willst. DAS ist FALSCH.
Du kannst problemlos das hier machen:
Das hier ist, wovon du die ganze Zeit redest (zumindest klingt es so, wenn du "die Rückgabe von scoped_session() an die Trheads weiterreichte"
Und die scoped_session ist thread-sicher, weil sie einen thread-local storage verwendet um eine Session anzulegen.
Du redest die ganze Zeit davon, dass du den Wert, der dir durch den Aufruf von eben dieser scoped_session zurueckgegeben wird *verteilen* willst. DAS ist FALSCH.
Du kannst problemlos das hier machen:
Code: Alles auswählen
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker
session_factory = sessionmaker(bind=some_engine)
Session = scoped_session(session_factory)
def tuwas():
# RICHTIG WEIL IMPLIZIT EIN SESSION-OBJEKT PRO THREAD
print(Session.query(MyClass).all())
threads = [threading.Thread(target=tuwas) for _ in xrange(1000)]
[t.start() for t in threads]
[t.join() for t in threads]
Code: Alles auswählen
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker
session_factory = sessionmaker(bind=some_engine)
Session = scoped_session(session_factory)
def tuwas(session):
# GANZ DOLLE FALSCH, WEIL GLEICHES OBJEKT IN ALLEN THREADS
print(session.query(MyClass).all())
my_session = Session()
threads = [threading.Thread(target=tuwas, args=(my_session,)) for _ in xrange(1000)]
[t.start() for t in threads]
[t.join() for t in threads]
Als Ergänzung: Wenn man das zweite Beispiel von __deets__ in Zeile 12 folgerndermassen ändert…
…also im Hauptthread 1000 mal `Session` aufruft und das Ergebnis an die Threads verteilt, hat man genau das *gleiche* Problem, denn `Session()` liefert in ein und dem selben Thread immer das *selbe* `Session`-Exemplar, nämlich das für den Hauptthread.
Code: Alles auswählen
threads = [threading.Thread(target=tuwas, args=(Session(),)) for _ in xrange(1000)]
@__deets__: Ich habe meine grobe Skizze etwas umstrukturiert, weil ich eine bessere Teilung vornehmen möchte. Der Quelltext ist extrem gekürzt und nicht lauffähig! Wenn ich also __deets__s, snafus und BlackJacks Ratschläge streng verfolge, dann sieht meine Umstrukturierung und mein Gedankenproblem wie folgt aus.
Zunächst ein paar Anmerkungen zu meiner Umstrukturierung. Ich habe eine Teilung zwischen Session und Engine vorgenommen. Es gibt die ManagedEgine()-Klasse, die nur einmalig aufgerufen wird. In dieser Klasse wird in einem Atemzug die create_engine() und auch der sessionmaker() erstellt. Es sind beides Objekte, die man nur einmal erstellt. Dann haben wir die ManagedSessionScope()-Klasse, die bei Bedarf immer wieder aufgerufen werden kann.
Dann haben wir die MyCustomDialog()-Klasse, die für die View verantwortlich ist. In dieser Klasse sehen wir einmal die log_in_database()-Methode. Klickt der Benutzer auf die Schaltfläche "LogIn" wird er/sie angemeldet. Es wird eine Instanzt der ManagedEgine()-Klasse erstellt und sessionmaker() wird klassenweit in das self.managed_engine-Attribut gespeichert. Nach dem erfolgreichen LogIn klickt der Benutzer auf die Schaltfläche "Daten abfragen" und ruft dadurch die start_all_selection()-Methode auf. Und genau dort habe ich das Problem. Aber dazu komme ich noch.
Darüber hinaus habe ich eine Worker()-Klasse, die durch die aufgerufenen Threads verwaltet werden. In dieser Worker-Klasse sehen wir, dass dort mit der MasterDataManipulation()-Klasse gewerkelt wird.
Un zu guter Letzt haben wir eine MasterDataManipulation() -Klasse, in welcher die ganzen Abfragen und weitere Operatoren (Update, Add, Delete) befinden. Diese Klasse wird bei jedem Thread erneut instanziiert.
Gedankenproblem:
Ich habe bisher die Session deshalb stets an die Threads übergeben, weil die Sessions in den MasterDataManipulation()-Klassen gebraucht werden, in welcher die Operationen stattfinden. Ursprünglich war es so gedacht: Bei jedem Thread-Start wird jeweils eine Instanz von der MasterDataManipulation()-Klasse erstellt, die Sessions werden dorthin übergeben und anschließend werden die Abfragen bearbeitet. Wenn ich das wie __deets__ und snafu mache, dann müsste ich die ganzen Abfragen in meinen Hauptthread (in dem Falle wäre es die GUI-Klasse) verlegen. Jedoch wollte ich es vermeiden, weil ich nicht will, dass die GUI einfriert, wenn die ganzen Abfragen im Hautthread ausgeführt werden. Und genau da liegt mein Gedankenproblem.
Zunächst ein paar Anmerkungen zu meiner Umstrukturierung. Ich habe eine Teilung zwischen Session und Engine vorgenommen. Es gibt die ManagedEgine()-Klasse, die nur einmalig aufgerufen wird. In dieser Klasse wird in einem Atemzug die create_engine() und auch der sessionmaker() erstellt. Es sind beides Objekte, die man nur einmal erstellt. Dann haben wir die ManagedSessionScope()-Klasse, die bei Bedarf immer wieder aufgerufen werden kann.
Dann haben wir die MyCustomDialog()-Klasse, die für die View verantwortlich ist. In dieser Klasse sehen wir einmal die log_in_database()-Methode. Klickt der Benutzer auf die Schaltfläche "LogIn" wird er/sie angemeldet. Es wird eine Instanzt der ManagedEgine()-Klasse erstellt und sessionmaker() wird klassenweit in das self.managed_engine-Attribut gespeichert. Nach dem erfolgreichen LogIn klickt der Benutzer auf die Schaltfläche "Daten abfragen" und ruft dadurch die start_all_selection()-Methode auf. Und genau dort habe ich das Problem. Aber dazu komme ich noch.
Darüber hinaus habe ich eine Worker()-Klasse, die durch die aufgerufenen Threads verwaltet werden. In dieser Worker-Klasse sehen wir, dass dort mit der MasterDataManipulation()-Klasse gewerkelt wird.
Un zu guter Letzt haben wir eine MasterDataManipulation() -Klasse, in welcher die ganzen Abfragen und weitere Operatoren (Update, Add, Delete) befinden. Diese Klasse wird bei jedem Thread erneut instanziiert.
Gedankenproblem:
Ich habe bisher die Session deshalb stets an die Threads übergeben, weil die Sessions in den MasterDataManipulation()-Klassen gebraucht werden, in welcher die Operationen stattfinden. Ursprünglich war es so gedacht: Bei jedem Thread-Start wird jeweils eine Instanz von der MasterDataManipulation()-Klasse erstellt, die Sessions werden dorthin übergeben und anschließend werden die Abfragen bearbeitet. Wenn ich das wie __deets__ und snafu mache, dann müsste ich die ganzen Abfragen in meinen Hauptthread (in dem Falle wäre es die GUI-Klasse) verlegen. Jedoch wollte ich es vermeiden, weil ich nicht will, dass die GUI einfriert, wenn die ganzen Abfragen im Hautthread ausgeführt werden. Und genau da liegt mein Gedankenproblem.
Code: Alles auswählen
[...] # All imports are here
[...]
class ManagedEgine(object):
def __init__(self,
dbms=None,
dbdriver=None,
dbuser=None,
dbuser_pwd=None,
db_server_host=None,
dbport=None,
db_name=None,
echo_verbose=True):
self.dbms = dbms
self.dbdriver = dbdriver
self.dbuser = dbuser
self.dbuser_pwd = dbuser_pwd
self.db_server_host = db_server_host
self.dbport = dbport
self.db_name = db_name
self.echo_verbose = echo_verbose
url = '{}+{}://{}:{}@{}:{}/{}'.format(
self.dbms, self.dbdriver, self.dbuser, self.dbuser_pwd, self.db_server_host, self.dbport, self.db_name)
_engine = create_engine(url, echo=self.echo_verbose)
# I have to persist all tables and create them
Base.metadata.create_all(_engine)
# Create the session factory
self.session_factory = sessionmaker(bind=_engine)
class ManagedSessionScope(object):
def __init__(self, engine=None):
self._enginge = engine
self.session = None
self._Session = scoped_session(self._enginge)
def __enter__(self):
self.session = self._Session # this is now a scoped session
# sqlalchemy.orm.scoping.scoped_session
return self.session
def __exit__(self, exception, exc_value, traceback):
try:
if exception:
self.session.rollback()
else:
self.session.commit()
finally:
self.session.close()
class MasterDataManipulation(object):
def __init__(self):
def select_all(self, category):
dict_store_session_query = {
'person_gender': lambda: self._session.query(PERSON_GENDER),
[...]
}
try:
for record in dict_store_session_query[category]():
if category == 'person_gender':
yield record.id, record.gender
self._session.commit()
except Exception:
self._session.rollback()
class Worker(QObject):
[...] # Signals are here
def __init__(self,
combo_box=None,
category=None,
time_interval=None,
parent=None):
QObject.__init__(self, parent)
[...]
def init_object(self):
self.timer = QTimer()
master_data_manipulation = MasterDataManipulation(session_object=self.new_scope)
query_data=master_data_manipulation.select_all
self._element = query_data(self.category)
self.timer.setSingleShot(False)
self.timer.setInterval(int(self.time_interval))
self.timer.timeout.connect(self.populate_item)
self.timer.start()
[...]
class MyCustomDialog(QDialog):
[...] Here are some signals
def __init__(self, parent=None):
QDialog.__init__(self, parent)
[...]
self.managed_engine = None
[...]
def log_in_database(self):
# User wants to program to log in
dbms ="mysql"
dbdriver="pymysql"
dbuser="root"
dbuser_pwd="xxx"
db_server_host="localhost"
dbport=3306
db_name="test"
echo_verbose=True
try:
managed_engine = ManagedEgine(dbms=dbms,
dbdriver=dbdriver,
dbuser=dbuser,
dbuser_pwd=dbuser_pwd,
db_server_host=db_server_host,
dbport=dbport,
db_name=db_name,
echo_verbose=echo_verbose)
self.managed_engine = managed_engine.session_factory
except SQLAlchemyError:
desired_trace = format_exc(exc_info())
print "desired_trace", desired_trace
def start_thread(self,
combo_box=None,
session=None,
time_interval=None,
category=None):
task_thread = QThread(self)
task_thread.work = Worker(new_scope=session,
time_interval=time_interval,
category=category,
combo_box=combo_box)
task_thread.work.moveToThread(task_thread)
[...]
task_thread.started.connect(task_thread.work.init_object)
[...]
[...]
def start_all_selection(self):
[...]
list_tuple = [
("person_salutation", self.combo_person_salutation),
("person_title", self.combo_person_title),
("person_gender", self.combo_person_gender),
("person_religion", self.combo_person_religion),
("person_eye_color", self.combo_person_eye_color),
("person_hair_color", self.combo_person_hair_color),
("person_relationship_status", self.combo_person_relationship_status),
("person_nationality", self.combo_person_nationality)
]
try:
#with ManagedSessionScope(engine=self.managed_engine) as session_that_scoped:
# Here we don't want to share one session with other threads
# What should I do?
for category, combobox in list_tuple:
combobox.clear()
self.start_thread(combo_box=combobox,
session=session_that_scoped,
time_interval=100,
category=category)
except SQLAlchemyError as err:
[...] # Do stuff with Exception
except OperationalError as OpErr:
[...] # Do stuff with Exception
Zuletzt geändert von Sophus am Montag 14. August 2017, 17:04, insgesamt 4-mal geändert.
Wie *kommst* du denn auf sowas? Ich habe dir doch Code gepostet, der belegt, dass ich Abfragen ueber die Session in verschiedenen Threads mache. JEDER hier hat verstanden, was du willst.Sophus hat geschrieben: Wenn ich das wie __deets_ und snafu mache, dann müsste ich die ganzen Abfragen in meinen Hauptthread (in dem Falle wäre es die GUI-Klasse) verlegen. Jedoch wollte ich es vermeiden, weil ich nicht will, dass die GUI einfriert, wenn die ganzen Abfragen im Hautthread ausgeführt werden. Und genau da liegt mein Gedankenproblem.
Du benutzt die scoped session einfach komplett falsch. Du brauchst davon genau *eine*, und die kannst du auch ausnahmsweise auf Modul-Ebene anlegen (so macht SQLAlchemy das selbst, und erklaert ausfuehrlich, warum das ausnahmsweise nicht boese(tm) ist)
Ich habe jetzt hier schon mehrfach code gepostet, der das illustriert. Schmeiss mal deinen ganzen Managed*-Wahnsinn weg, und erstell dir ein ganz einfaches Qt-Programm welches
- *eine* scoped session auf Modul-Ebene erzeugt.
- einen Dialog hat, der auf Knopfdruck eine Query absetzt und die Ergebnisse einfach ausdruckt mit print
- einen Hintergrund-Thread started, welcher periodisch dasselbe macht, und einfach die Ergebnisse ausdruckt mit print.
Und streu ein "print(id(Session())" ein, um dich zu ueberzeugen, dass der Thread eine andere Session benutzt.
Natuerlich kann man statt einer globalen Variablen auch das scoped_session-Objekt in die Threads (und jede moegliche andere Stelle, die SQL taetigt) reinreichen. Aber das ist nicht die Art, wie man das in SQLAlchemy ueblicherweise macht. Und *wenn* man es macht, dann reicht man die eine scoped_session rein, und legt nicht ueberall eine neue an. So wie du in Zeile 43.
Hier mal ein multi-threaded und self-contained Beispiel das belegt, dass der Ansatz funktioniert und auch wunderbar parallel Daten abholt:
Code: Alles auswählen
import time
import random
import threading
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker
session_factory = sessionmaker()
Session = scoped_session(session_factory)
def work():
print(id(Session()))
for _ in xrange(5):
time.sleep(random.random() * 3)
res = Session.execute("SELECT * FROM test LIMIT 3")
for row in res:
print(threading.currentThread().getName(), row)
time.sleep(.1)
def main():
eng = create_engine('sqlite:////tmp/test.db')
Session.configure(bind=eng)
Session.execute('CREATE TABLE IF NOT EXISTS test ( data int)')
for i in xrange(1000):
Session.execute('INSERT INTO test (data) values (:param)', { "param": i})
Session.commit()
print(id(Session()))
threads = [threading.Thread(target=work) for _ in xrange(10)]
[t.start() for t in threads]
[t.join() for t in threads]
if __name__ == '__main__':
main()
@__deets__: Ich will nicht auf Modul-Ebene arbeiten, denn jede Klasse, die ich hier extrem verkürzt dargestellt habe, präsentiert bei mir jedes Modul. Für jede Klasse ein Modul. Demzufolge wüsste ich nicht woich die scoped_session(), zusammen mit create_engine() und sessionmaker() auf Modulebene anlegen soll? In einem Modul (GUI) findet die Anmeldung zur Datenbank statt, in ein anderes Modul (ebenfalls GUI) sind dann diese Schaltflächen, dass ManagedEngine() und ManagedSessionScope() sind je in einem Modul untergebracht. Und auch die MasterDataManipulation() ist in einem Modul. Deswegen habe ich auch die strikte Teilung zwischen Engine und Session vorgenommen, damit ich die Engine klassenweit benutze, und wenn ich in eine andere GUI gehe, reiche ich die bereits erstellte Engine einfach weiter. Ich komme mir gerade mächtig dumm for, als jemand, dem man sagt "1+1=2" und mich dann fragt "was sind 2+2=?" und ich dann sage "na 2".
Und das threading-Modul bringt mir leider nichts, weil mir das mit den Klassen vollkommen auf der Strecke bleibt. Ich versuche mal das Beispiel zu verstehen, und dann auf meinen Klassen anzuwenden.
Und das threading-Modul bringt mir leider nichts, weil mir das mit den Klassen vollkommen auf der Strecke bleibt. Ich versuche mal das Beispiel zu verstehen, und dann auf meinen Klassen anzuwenden.
Es ist ueblich fuer SQLAlchemy die Session in einem Modul zu definieren und dann in anderen Modulen eben einfach zu importieren. Und in main() bindet man die konkrete Engine (die ja zB von Benutzereinstellungen abhaengen kann etc) an die Session. Wie vorgemacht.
Und das threading-Modul unterscheided sich bezueglich des gesagten nicht von QThreads, insofern verstehe ich nicht, warum dir "das nichts bringt". Das man fuer Qt QThreads braucht ist schoen, SQLAlchemy ist das egal.
Und das threading-Modul unterscheided sich bezueglich des gesagten nicht von QThreads, insofern verstehe ich nicht, warum dir "das nichts bringt". Das man fuer Qt QThreads braucht ist schoen, SQLAlchemy ist das egal.
Na ja, meine QThread-Struktur ist doch ganz anders, da unterscheidet sich eine ganze Menge. In meinem Fall wird die Worker()-Klasse zum QThread() hinzugefügt. Bei dir spielt sich alles auf einfachen Funktionen ab. Da ist nichts, einfach Funktionen an den Threads zu übergeben. Deine Struktur ist weitaus unterschiedlicher als meine Struktur, und daher komme ich gerade gar nicht mit.__deets__ hat geschrieben:Und das threading-Modul unterscheided sich bezueglich des gesagten nicht von QThreads, insofern verstehe ich nicht, warum dir "das nichts bringt". Das man fuer Qt QThreads braucht ist schoen, SQLAlchemy ist das egal.
Da ich ueberhaupt *NICHTS* an meine Thread-targets uebergebe, ist das eigentlich voellig gleichgueltig, wie diese Funktionen zur Ausfuehrung gelangen. Die kannst du auch in work() aufrufen, bzw. den Code ersetzen.
Mir scheint es eher so, als ob du schon viel investiert hast in deinen Code, und jetzt Aenderungen lieber einarbeiten wuerdest, statt dir mal von Grund auf klar zu machen, wie man mit threading und SQLAlchemy arbeitet. Halte ich fuer einen verfehlten Ansatz. Bau lieber ein kleines Beispiel das geht, und lerne daran, statt dein Monstrum zaehmen zu wollen.
Mir scheint es eher so, als ob du schon viel investiert hast in deinen Code, und jetzt Aenderungen lieber einarbeiten wuerdest, statt dir mal von Grund auf klar zu machen, wie man mit threading und SQLAlchemy arbeitet. Halte ich fuer einen verfehlten Ansatz. Bau lieber ein kleines Beispiel das geht, und lerne daran, statt dein Monstrum zaehmen zu wollen.
Gut, dann basteln wir mal was. Wobei ich anmerken muss, dass mein Quelltext nicht funktioniert. Irgendwo scheint etwas zu klemmen. Aber das ist erst einmal nebensächlich, weil es mir darum geht, zu erkennen, ob ich deine ausführlichen Aussagen verstanden habe. Damit meine Spielerei auch meinem Projekt etwas nahe kommt, habe ich insgesamt fünf Module eingerichtet.
defined_session.py
Du hast gesagt, dass sowohl session_factory () als auch scoped_session() auf Modulebene angelegt werden können, und diese dann in anderen Modulen nur zu importieren brauche.
managed_engine.py
Für meine Engine habe ich mir die Freiheit erlaubt, und die separate Klasse erstellt.
managed_data_manipulation.py
In dieser Klasse liegen meine Operatoren, und ich habe sessionmaker() und scoped_session() importiert.
worker.py
Hier habe ich die scoped_session() und sessionmaker() nicht importiert, brauchen wir nicht.
my_custom_dialog.py
Ich habe auf die QComboBox()-Objekte verzichtet. Ich nehme die Print-Anweisungen. Die on_login()-Methode simuliert das Anmelden an den Datenbank. In der on_start_select_all()-Methode starte ich die ganzen Threads.
defined_session.py
Du hast gesagt, dass sowohl session_factory () als auch scoped_session() auf Modulebene angelegt werden können, und diese dann in anderen Modulen nur zu importieren brauche.
Code: Alles auswählen
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker
session_factory = sessionmaker()
Session = scoped_session(session_factory)
Für meine Engine habe ich mir die Freiheit erlaubt, und die separate Klasse erstellt.
Code: Alles auswählen
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
''' setting up root class for declarative declaration '''
Base = declarative_base()
class ManagedEngine(object):
def __init__(self,
dbms=None,
dbdriver=None,
dbuser=None,
dbuser_pwd=None,
db_server_host=None,
dbport=None,
db_name=None,
echo_verbose=True):
self.dbms = dbms
self.dbdriver = dbdriver
self.dbuser = dbuser
self.dbuser_pwd = dbuser_pwd
self.db_server_host = db_server_host
self.dbport = dbport
self.db_name = db_name
self.echo_verbose = echo_verbose
url = '{}+{}://{}:{}@{}:{}/{}'.format(
self.dbms, self.dbdriver, self.dbuser, self.dbuser_pwd, self.db_server_host, self.dbport, self.db_name)
self._engine = create_engine(url, echo=self.echo_verbose)
# I have to persist all tables and create them
Base.metadata.create_all(self._engine)
In dieser Klasse liegen meine Operatoren, und ich habe sessionmaker() und scoped_session() importiert.
Code: Alles auswählen
from defined_session import*
class MasterDataManipulation(object):
def __init__(self):
self.attr = None
def select_all(self, category):
dict_store_session_query = {'person_gender': lambda: Session.query(PERSON_GENDER),
'person_nationality': lambda: Session.query(PERSON_NATIONALITY),
'person_salutation': lambda: Session.query(PERSON_SALUTATION),
'person_title': lambda: Session.query(PERSON_TITLE),
'person_hair_color': lambda: Session.query(PERSON_HAIR_COLOR),
'person_eye_color': lambda: Session.query(PERSON_EYE_COLOR),
'person_religion': lambda: Session.query(PERSON_RELIGION),
'person_relationship_status': lambda: Session.query(PERSON_RELATIONSHIP_STATUS)}
try:
for record in dict_store_session_query[category]():
if category == 'person_gender':
yield record.id, record.gender
if category == 'person_nationality':
yield record.id, record.nationality
if category == 'person_salutation':
yield record.id, record.salutation
if category == 'person_title':
yield record.id, record.title
if category == 'person_hair_color':
yield record.id, record.hair_color
if category == 'person_eye_color':
yield record.id, record.eye_color
if category == 'person_religion':
yield record.id, record.religion
if category == 'person_relationship_status':
yield record.id, record.relationship_status
Session.commit()
except Exception:
Session.rollback()
Hier habe ich die scoped_session() und sessionmaker() nicht importiert, brauchen wir nicht.
Code: Alles auswählen
import sys
from PyQt4.QtCore import QTimer, QObject, pyqtSignal
from managed_data_manipulation import MasterDataManipulation
class Worker(QObject):
notify_item = pyqtSignal(object)
finish_progress = pyqtSignal()
def __init__(self, category=None, parent=None):
QObject.__init__(self, parent)
self.category=category
def init_object(self):
master_data_manipulation = MasterDataManipulation()
query_data=master_data_manipulation.select_all
self._element = query_data(self.category)
self.timer = QTimer()
self.timer.setSingleShot(False)
self.timer.setInterval(100)
self.timer.timeout.connect(self.increment)
self.timer.start()
def increment(self):
try:
self.notify_item.emit(next(self._element))
except StopIteration:
self.finish_progress.emit()
self.timer.stop()
def stop(self):
self.timer.stop()
Ich habe auf die QComboBox()-Objekte verzichtet. Ich nehme die Print-Anweisungen. Die on_login()-Methode simuliert das Anmelden an den Datenbank. In der on_start_select_all()-Methode starte ich die ganzen Threads.
Code: Alles auswählen
import sys
from PyQt4.QtCore import pyqtSignal, QThread
from PyQt4.QtGui import QDialog, QLabel, QPushButton, \
QApplication, QVBoxLayout
from sqlalchemy.exc import SQLAlchemyError
from defined_session import *
from worker import Worker
from managed_engine import ManagedEngine
class MyCustomDialog(QDialog):
finish = pyqtSignal()
def __init__(self, parent=None):
QDialog.__init__(self, parent)
self.managed_engine = None
layout = QVBoxLayout(self)
self.pushButton_start = QPushButton("Start", self)
self.pushButton_login = QPushButton("LogIn", self)
self.pushButton_stopp = QPushButton("Stopp", self)
self.pushButton_close = QPushButton("Close", self)
layout.addWidget(self.pushButton_login)
layout.addWidget(self.pushButton_start)
layout.addWidget(self.pushButton_stopp)
layout.addWidget(self.pushButton_close)
self.pushButton_login.clicked.connect(self.on_login)
self.pushButton_start.clicked.connect(self.on_start_select_all)
self.pushButton_stopp.clicked.connect(self.on_finish)
self.pushButton_close.clicked.connect(self.close)
def print_populate(self, i):
print "i", i
def on_login(self):
dbms ="mysql"
dbdriver="pymysql"
dbuser="root"
dbuser_pwd="xxx"
db_server_host="localhost"
dbport=3306
db_name="test"
echo_verbose=True
try:
managed_engine = ManagedEngine(dbms=dbms, dbdriver=dbdriver,
dbuser=dbuser, dbuser_pwd=dbuser_pwd,
db_server_host=db_server_host,
dbport=dbport, db_name=db_name,
echo_verbose=echo_verbose)
Session.configure(bind=managed_engine._engine)
except SQLAlchemyError as Err:
print "Err", Err
def on_start_select_all(self):
category_list = [
"person_salutation", "person_title",
"person_gender","person_religion",
"person_eye_color", "person_hair_color",
"person_relationship_status",
"person_nationality"
]
for category in category_list:
self.on_start_thread_tasks(category=category)
def on_start_thread_tasks(self, category=None):
task_thread = QThread(self)
task_thread.work = Worker(category=category)
task_thread.work.moveToThread(task_thread)
task_thread.work.notify_item.connect(self.print_populate)
task_thread.work.finish_progress.connect(task_thread.quit)
self.finish.connect(task_thread.work.stop)
task_thread.started.connect(task_thread.work.init_object)
task_thread.finished.connect(task_thread.deleteLater)
task_thread.start()
def on_finish(self):
self.finish.emit()
def main():
app = QApplication(sys.argv)
window = MyCustomDialog()
window.resize(600, 400)
window.show()
sys.exit(app.exec_())
if __name__ == "__main__":
main()
Damit man nicht alles kopieren muss, habe ich dieses Programm auf BitBucket hochgeladen: https://bitbucket.org/Xenophyl/sqlalche ... thread/src
Zwischenergebnis: Wenn ich in der MyCustomDialog()-Klasse die on_start_select_all()-Methode, nach erfolgreicher Anmeldung an die Datenbank, mehrmals ausführe gibt es schon Probleme. Beim ersten Mal wird alles wunderbar ausgeführt. Beim Zweiten Mal werden mal sporadisch ein paar Datensätze unterschlagen und beim dritten Mal werden gar keine Daten erst abgefragt. Wenn ich eine Weile warte, und ein viertes Male versuche, dann kann es passieren, dass alle Daten geholt werden.
So, das ist mein letzter Beitrag hier, weil ich meinen Punkt ausreichend belegt habe, und du augenscheinlich nicht in der Lage bist, dich mal wirklich mit einem kleinen Beispiel zu beschaefigen, sondern stattdessen immer riesen Menge an Code schreibst bei denen die Problem gottweisswo liegen koennen.
Ich bekomme auf meinem System kein Qt4 mehr installiert (das ist *uralt* und nicht mehr maintained. Solltest du nicht mehr verwenden, aber auch da halte ich jetzt lieber nicht die Luft an...), darum ist das PyQt5. Und was soll ich sagen? Es funktioniert wie angepriesen, man kann wunderbar nebenlaeufig Queries fahren etc.
Ich bekomme auf meinem System kein Qt4 mehr installiert (das ist *uralt* und nicht mehr maintained. Solltest du nicht mehr verwenden, aber auch da halte ich jetzt lieber nicht die Luft an...), darum ist das PyQt5. Und was soll ich sagen? Es funktioniert wie angepriesen, man kann wunderbar nebenlaeufig Queries fahren etc.
Code: Alles auswählen
import sys
import time
import random
import threading
from PyQt5.QtWidgets import QApplication, QWidget, QPushButton
from PyQt5.QtCore import QThread, QObject
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker
session_factory = sessionmaker()
Session = scoped_session(session_factory)
def execute_db_query():
try:
res = Session.execute("SELECT * FROM test LIMIT 100")
for row in res:
print(threading.currentThread().getName(), row)
time.sleep(.1)
except:
Session.rollback()
print(sys.exc_info())
else:
Session.commit()
class Worker(QObject):
def work(self):
while True:
print("working...")
time.sleep(random.random() * 3)
execute_db_query()
def setup_db():
eng = create_engine('sqlite:////tmp/test.db')
Session.configure(bind=eng)
Session.execute('CREATE TABLE IF NOT EXISTS test ( data int)')
for i in range(1000):
Session.execute('INSERT INTO test (data) values (:param)', { "param": i})
Session.commit()
def create_worker_thread(parent=None):
task_thread = QThread(parent)
task_thread.worker = Worker()
task_thread.worker.moveToThread(task_thread)
task_thread.started.connect(task_thread.worker.work)
task_thread.finished.connect(task_thread.deleteLater)
return task_thread
def main():
setup_db()
app = QApplication(sys.argv)
w = QWidget()
w.resize(250, 150)
w.move(300, 300)
w.setWindowTitle('Simple')
button = QPushButton(w)
button.clicked.connect(execute_db_query)
w.show()
worker_thread = create_worker_thread()
worker_thread.start()
sys.exit(app.exec_())
if __name__ == '__main__':
main()
Nein, ich habe für das genaue Gegenteil argumentiert: Die Session soll erst in der Worker-Funktion (Thread) angefasst werden. Irgendwie schreibe ich A und du verstehst Z. Das ist langsam frustrierend.Sophus hat geschrieben:Wenn ich das wie __deets__ und snafu mache, dann müsste ich die ganzen Abfragen in meinen Hauptthread (in dem Falle wäre es die GUI-Klasse) verlegen.
@snafu: Es ist nicht böse gemeint, aber was meinst, wie frustrierend das für mich ist? Erst arbeite ich mich deine Vorschläge durch, klappt nicht, dann die von __deets__ und irgendwie will das auch nicht klappen und dabei habe ich einen klar leserlichen Quelltext auf BitBucket bereits hingelegt
Dass du erstmal alles ohne zusätzliche Threads schreibst, ist gar keine Option für dich? Dir fehlt bezüglich Threads offenbar noch einiges an Grundverständnis. Du hängst ja jetzt schon seit Tagen an dem Problem und doktorst mehr herum als du wirklich verstehst.
@snafu: Ich erstelle gerade extra mit deinem Namen eine Responsy auf meinem BitBucket, und lade mal deine Version hoch. Wie man mit Qthreads arbeitet, weiß ich, hoffentlich schon. Denn abseits von SQLAlchemy stellt für mich QThread kein Problem dar. Ich melde mit zurück, wenn die Responsy fertig ist.