SQLAlchemy: Arbeiten mit mehreren QThreads?

Installation und Anwendung von Datenbankschnittstellen wie SQLite, PostgreSQL, MariaDB/MySQL, der DB-API 2.0 und sonstigen Datenbanksystemen.
Benutzeravatar
Sophus
User
Beiträge: 1109
Registriert: Freitag 25. April 2014, 12:46
Wohnort: Osnabrück

@__deets__: Das ich mich "komisch" ausdrücke, ist wohl meine Schwäche. Wobei ich es mir eher komisch vorstelle. Sagen wir mal, ich habe 100 Threads (nur rein hypothetisch), dann habe ich auch 100 with-Anweisungen. Der Quelltext bläht sich dadurch auf? Und wenn die session als scoped_session() vorbereitet wurde, dann kann man doch diese Session unter den Threads teilen, also die scoped_session() sich darum kümmern, dass die Threads miteiander nicht in Berührung kommen. Daher nahm ich an, dass man eine with-Anweisung anbringen kann, und im Korpus der with-Anweisung die ganzen Threads abarbeiten und die scoped_session() an die jeweilige Threads verteilen?
__deets__
User
Beiträge: 14493
Registriert: Mittwoch 14. Oktober 2015, 14:29

Wieso blähen denn 100 threads den Quellcode auf? Hier starte ich nicht nur hypothetisch 1000 Threads, und das in weniger als 1000 Zeilen, womit deine These ja ad absurdum gefuehrt wird:

Code: Alles auswählen

import threading

def tuwas():
      with scoped_session() as session:
             session.execute("etwas")

threads = [threading.Thread(target=tuwas) for _ in xrange(1000)]
[t.start() for t in threads]
[t.join() for t in threads]
Und zum gefuehlt 100sten mal: Sessions duerfen NICHT zwischen Threads geteilt werden. Was glaubst du denn, was eine Session eigentlich *ist*? Du kannst auch keine einzelne Datenbankverbindung nicht zwischen 1000 Threads teilen (normalerweise), weil eine solche Verbindung sich dann in jedem Thread gegenseitig auf die Fuesse tritt. Offene Cursor, Position auf der die stehen etc.

Also, noch einmal: EINE SESSION PRO THREAD.
Benutzeravatar
Sophus
User
Beiträge: 1109
Registriert: Freitag 25. April 2014, 12:46
Wohnort: Osnabrück

@__deets__: Zwei Dinge. Erstens: Das sich dein Quelltext nicht aufbläht ist doch klar. Du erstellst zwar 1000 Threads, aber all die Threads führen nur ein und die selbe Aufgabe aus. In meinem Fall ist es so, dass jeder Thread jedes Eingabefeld (QComboBox(), QLineEdit()... etc.) befüllen. Wesen Aussage führt ad absurdum? Zweitens: Aus deinen Ausführungen werde ich auch nicht schlau, daher zum 101. Male. Du redest die ganze Zeit von Session, demzufolge weiß ich nicht ob du von scoped_session() redest oder von den normalen Session? Denn scoped_session(), die als Rückgabe <class 'sqlalchemy.orm.scoping.scoped_session'> liefert, ist thread-sicher, als im Gegensatz einer normalen Session. So meinem Verständnis: Solange ich NUR die Rückgabe von scoped_session() nicht aufrufe, habe ich keine normale Session, demzufolge frage ich mich, wieso man an dieser Stelle nicht die Rückgabe der scoped_session() an die Threads verteilen kann? Ist nicht gerade der Vorteil der scoped_session(), dass sie sich darum kümmert, dass sich die Threads nicht gegenseitig in die Knie schießen?
__deets__
User
Beiträge: 14493
Registriert: Mittwoch 14. Oktober 2015, 14:29

Wenn man 100 verschiedene Funktionalitaeten erstellt, dann muss man 100 verschiedene Dinge programmieren, mit diversene Statements, Funktionen, Klassen, Modulen, Paketen sogar. Nichts davon ist besonders. Warum ist dann ein solches with-Statement etwas, das dir solche Sorgen bereitet, und "den Code aufbläht"?

Und die scoped_session ist thread-sicher, weil sie einen thread-local storage verwendet um eine Session anzulegen.

Du redest die ganze Zeit davon, dass du den Wert, der dir durch den Aufruf von eben dieser scoped_session zurueckgegeben wird *verteilen* willst. DAS ist FALSCH.

Du kannst problemlos das hier machen:

Code: Alles auswählen

from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker

session_factory = sessionmaker(bind=some_engine)
Session = scoped_session(session_factory)

def tuwas():
      # RICHTIG WEIL IMPLIZIT EIN SESSION-OBJEKT PRO THREAD
      print(Session.query(MyClass).all())

threads = [threading.Thread(target=tuwas) for _ in xrange(1000)]
[t.start() for t in threads]
[t.join() for t in threads]
Das hier ist, wovon du die ganze Zeit redest (zumindest klingt es so, wenn du "die Rückgabe von scoped_session() an die Trheads weiterreichte"

Code: Alles auswählen

from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker

session_factory = sessionmaker(bind=some_engine)
Session = scoped_session(session_factory)

def tuwas(session):
      # GANZ DOLLE FALSCH, WEIL GLEICHES OBJEKT IN ALLEN THREADS
      print(session.query(MyClass).all())

my_session = Session()
threads = [threading.Thread(target=tuwas, args=(my_session,)) for _ in xrange(1000)]
[t.start() for t in threads]
[t.join() for t in threads]
BlackJack

Als Ergänzung: Wenn man das zweite Beispiel von __deets__ in Zeile 12 folgerndermassen ändert…

Code: Alles auswählen

threads = [threading.Thread(target=tuwas, args=(Session(),)) for _ in xrange(1000)]
…also im Hauptthread 1000 mal `Session` aufruft und das Ergebnis an die Threads verteilt, hat man genau das *gleiche* Problem, denn `Session()` liefert in ein und dem selben Thread immer das *selbe* `Session`-Exemplar, nämlich das für den Hauptthread.
Benutzeravatar
Sophus
User
Beiträge: 1109
Registriert: Freitag 25. April 2014, 12:46
Wohnort: Osnabrück

@__deets__: Ich habe meine grobe Skizze etwas umstrukturiert, weil ich eine bessere Teilung vornehmen möchte. Der Quelltext ist extrem gekürzt und nicht lauffähig! Wenn ich also __deets__s, snafus und BlackJacks Ratschläge streng verfolge, dann sieht meine Umstrukturierung und mein Gedankenproblem wie folgt aus.

Zunächst ein paar Anmerkungen zu meiner Umstrukturierung. Ich habe eine Teilung zwischen Session und Engine vorgenommen. Es gibt die ManagedEgine()-Klasse, die nur einmalig aufgerufen wird. In dieser Klasse wird in einem Atemzug die create_engine() und auch der sessionmaker() erstellt. Es sind beides Objekte, die man nur einmal erstellt. Dann haben wir die ManagedSessionScope()-Klasse, die bei Bedarf immer wieder aufgerufen werden kann.

Dann haben wir die MyCustomDialog()-Klasse, die für die View verantwortlich ist. In dieser Klasse sehen wir einmal die log_in_database()-Methode. Klickt der Benutzer auf die Schaltfläche "LogIn" wird er/sie angemeldet. Es wird eine Instanzt der ManagedEgine()-Klasse erstellt und sessionmaker() wird klassenweit in das self.managed_engine-Attribut gespeichert. Nach dem erfolgreichen LogIn klickt der Benutzer auf die Schaltfläche "Daten abfragen" und ruft dadurch die start_all_selection()-Methode auf. Und genau dort habe ich das Problem. Aber dazu komme ich noch.

Darüber hinaus habe ich eine Worker()-Klasse, die durch die aufgerufenen Threads verwaltet werden. In dieser Worker-Klasse sehen wir, dass dort mit der MasterDataManipulation()-Klasse gewerkelt wird.

Un zu guter Letzt haben wir eine MasterDataManipulation() -Klasse, in welcher die ganzen Abfragen und weitere Operatoren (Update, Add, Delete) befinden. Diese Klasse wird bei jedem Thread erneut instanziiert.

Gedankenproblem:
Ich habe bisher die Session deshalb stets an die Threads übergeben, weil die Sessions in den MasterDataManipulation()-Klassen gebraucht werden, in welcher die Operationen stattfinden. Ursprünglich war es so gedacht: Bei jedem Thread-Start wird jeweils eine Instanz von der MasterDataManipulation()-Klasse erstellt, die Sessions werden dorthin übergeben und anschließend werden die Abfragen bearbeitet. Wenn ich das wie __deets__ und snafu mache, dann müsste ich die ganzen Abfragen in meinen Hauptthread (in dem Falle wäre es die GUI-Klasse) verlegen. Jedoch wollte ich es vermeiden, weil ich nicht will, dass die GUI einfriert, wenn die ganzen Abfragen im Hautthread ausgeführt werden. Und genau da liegt mein Gedankenproblem.

Code: Alles auswählen

[...] # All imports are here

[...]

class ManagedEgine(object):
    def __init__(self,
                 dbms=None,
                 dbdriver=None,
                 dbuser=None,
                 dbuser_pwd=None,
                 db_server_host=None,
                 dbport=None,
                 db_name=None,
                 echo_verbose=True):
        
        self.dbms = dbms
        self.dbdriver = dbdriver
        self.dbuser = dbuser
        self.dbuser_pwd = dbuser_pwd
        self.db_server_host = db_server_host
        self.dbport = dbport
        self.db_name = db_name
        self.echo_verbose = echo_verbose
        url = '{}+{}://{}:{}@{}:{}/{}'.format(
            self.dbms, self.dbdriver, self.dbuser, self.dbuser_pwd, self.db_server_host, self.dbport, self.db_name)

        _engine = create_engine(url, echo=self.echo_verbose)
        
        # I have to persist all tables and create them
        Base.metadata.create_all(_engine)

        # Create the session factory
        self.session_factory = sessionmaker(bind=_engine)
		
class ManagedSessionScope(object):
    def __init__(self, engine=None):
 
        self._enginge = engine

        self.session = None

        self._Session = scoped_session(self._enginge)
 
    def __enter__(self):
        self.session = self._Session # this is now a scoped session
                                     # sqlalchemy.orm.scoping.scoped_session
        return self.session
 
    def __exit__(self, exception, exc_value, traceback):
        try:
            if exception:
                self.session.rollback()
            else:
                self.session.commit()
        finally:
            self.session.close()

class MasterDataManipulation(object):

    def __init__(self):

    def select_all(self, category):

        dict_store_session_query = {
									'person_gender': lambda: self._session.query(PERSON_GENDER),
                                     [...]
									 }

        try:

            for record in dict_store_session_query[category]():
                if category == 'person_gender':
                    yield record.id, record.gender
            self._session.commit()

        except Exception:
            self._session.rollback()
			
class Worker(QObject):
 
    [...] # Signals are here   
    def __init__(self,
                 combo_box=None,
                 category=None,
                 time_interval=None,
                 parent=None):
        QObject.__init__(self, parent)

        [...]

    def init_object(self):
        self.timer = QTimer()

        master_data_manipulation = MasterDataManipulation(session_object=self.new_scope)
        query_data=master_data_manipulation.select_all
        self._element = query_data(self.category)

        self.timer.setSingleShot(False)
        self.timer.setInterval(int(self.time_interval))
        self.timer.timeout.connect(self.populate_item)
        self.timer.start()
	
	[...]
			
class MyCustomDialog(QDialog):
 
    [...] Here are some signals
 
    def __init__(self, parent=None):
        QDialog.__init__(self, parent)
		[...]
		self.managed_engine = None
		[...]
    def log_in_database(self):
		# User wants to program to log in
        dbms ="mysql"
        dbdriver="pymysql"
        dbuser="root"
        dbuser_pwd="xxx"
        db_server_host="localhost"
        dbport=3306
        db_name="test"
        echo_verbose=True

        try:

            managed_engine = ManagedEgine(dbms=dbms,
                                               dbdriver=dbdriver,
                                               dbuser=dbuser,
                                               dbuser_pwd=dbuser_pwd,
                                               db_server_host=db_server_host,
                                               dbport=dbport,
                                               db_name=db_name,
                                               echo_verbose=echo_verbose)
            
            self.managed_engine = managed_engine.session_factory
            
        except SQLAlchemyError:
            desired_trace = format_exc(exc_info())
            print "desired_trace", desired_trace

	def start_thread(self,
                     combo_box=None,
                     session=None,
                     time_interval=None,
                     category=None):

        task_thread = QThread(self)

        task_thread.work = Worker(new_scope=session,
                                  time_interval=time_interval,
                                  category=category,
                                  combo_box=combo_box)

        task_thread.work.moveToThread(task_thread)
		[...]
		task_thread.started.connect(task_thread.work.init_object)
		[...]
		
	[...]
    def start_all_selection(self):
		[...]
        list_tuple = [
                    ("person_salutation", self.combo_person_salutation), 
                    ("person_title", self.combo_person_title), 
                    ("person_gender", self.combo_person_gender), 
                    ("person_religion", self.combo_person_religion),
                    ("person_eye_color", self.combo_person_eye_color),
                    ("person_hair_color", self.combo_person_hair_color),
                    ("person_relationship_status", self.combo_person_relationship_status),
                    ("person_nationality", self.combo_person_nationality)
                       ]

        try:
            #with ManagedSessionScope(engine=self.managed_engine) as session_that_scoped:
			# Here we don't want to share one session with other threads
			# What should I do?
                
			for category, combobox in list_tuple:

				combobox.clear()

				self.start_thread(combo_box=combobox,
								  session=session_that_scoped,
								  time_interval=100,
								  category=category)

        except SQLAlchemyError as err:
            [...] # Do stuff with Exception
        except OperationalError as OpErr:
            [...] # Do stuff with Exception
Zuletzt geändert von Sophus am Montag 14. August 2017, 17:04, insgesamt 4-mal geändert.
__deets__
User
Beiträge: 14493
Registriert: Mittwoch 14. Oktober 2015, 14:29

Sophus hat geschrieben: Wenn ich das wie __deets_ und snafu mache, dann müsste ich die ganzen Abfragen in meinen Hauptthread (in dem Falle wäre es die GUI-Klasse) verlegen. Jedoch wollte ich es vermeiden, weil ich nicht will, dass die GUI einfriert, wenn die ganzen Abfragen im Hautthread ausgeführt werden. Und genau da liegt mein Gedankenproblem.
Wie *kommst* du denn auf sowas? Ich habe dir doch Code gepostet, der belegt, dass ich Abfragen ueber die Session in verschiedenen Threads mache. JEDER hier hat verstanden, was du willst.

Du benutzt die scoped session einfach komplett falsch. Du brauchst davon genau *eine*, und die kannst du auch ausnahmsweise auf Modul-Ebene anlegen (so macht SQLAlchemy das selbst, und erklaert ausfuehrlich, warum das ausnahmsweise nicht boese(tm) ist)

Ich habe jetzt hier schon mehrfach code gepostet, der das illustriert. Schmeiss mal deinen ganzen Managed*-Wahnsinn weg, und erstell dir ein ganz einfaches Qt-Programm welches

- *eine* scoped session auf Modul-Ebene erzeugt.
- einen Dialog hat, der auf Knopfdruck eine Query absetzt und die Ergebnisse einfach ausdruckt mit print
- einen Hintergrund-Thread started, welcher periodisch dasselbe macht, und einfach die Ergebnisse ausdruckt mit print.

Und streu ein "print(id(Session())" ein, um dich zu ueberzeugen, dass der Thread eine andere Session benutzt.

Natuerlich kann man statt einer globalen Variablen auch das scoped_session-Objekt in die Threads (und jede moegliche andere Stelle, die SQL taetigt) reinreichen. Aber das ist nicht die Art, wie man das in SQLAlchemy ueblicherweise macht. Und *wenn* man es macht, dann reicht man die eine scoped_session rein, und legt nicht ueberall eine neue an. So wie du in Zeile 43.
__deets__
User
Beiträge: 14493
Registriert: Mittwoch 14. Oktober 2015, 14:29

Hier mal ein multi-threaded und self-contained Beispiel das belegt, dass der Ansatz funktioniert und auch wunderbar parallel Daten abholt:

Code: Alles auswählen

import time
import random
import threading

from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker


session_factory = sessionmaker()
Session = scoped_session(session_factory)


def work():
    print(id(Session()))
    for _ in xrange(5):
        time.sleep(random.random() * 3)
        res = Session.execute("SELECT * FROM test LIMIT 3")
        for row in res:
            print(threading.currentThread().getName(), row)
            time.sleep(.1)


def main():
    eng = create_engine('sqlite:////tmp/test.db')
    Session.configure(bind=eng)

    Session.execute('CREATE TABLE IF NOT EXISTS test ( data int)')
    for i in xrange(1000):
        Session.execute('INSERT INTO test (data) values (:param)',  { "param": i})

    Session.commit()

    print(id(Session()))
    threads = [threading.Thread(target=work) for _ in xrange(10)]
    [t.start() for t in threads]
    [t.join() for t in threads]

if __name__ == '__main__':
    main()
Benutzeravatar
Sophus
User
Beiträge: 1109
Registriert: Freitag 25. April 2014, 12:46
Wohnort: Osnabrück

@__deets__: Ich will nicht auf Modul-Ebene arbeiten, denn jede Klasse, die ich hier extrem verkürzt dargestellt habe, präsentiert bei mir jedes Modul. Für jede Klasse ein Modul. Demzufolge wüsste ich nicht woich die scoped_session(), zusammen mit create_engine() und sessionmaker() auf Modulebene anlegen soll? In einem Modul (GUI) findet die Anmeldung zur Datenbank statt, in ein anderes Modul (ebenfalls GUI) sind dann diese Schaltflächen, dass ManagedEngine() und ManagedSessionScope() sind je in einem Modul untergebracht. Und auch die MasterDataManipulation() ist in einem Modul. Deswegen habe ich auch die strikte Teilung zwischen Engine und Session vorgenommen, damit ich die Engine klassenweit benutze, und wenn ich in eine andere GUI gehe, reiche ich die bereits erstellte Engine einfach weiter. Ich komme mir gerade mächtig dumm for, als jemand, dem man sagt "1+1=2" und mich dann fragt "was sind 2+2=?" und ich dann sage "na 2".

Und das threading-Modul bringt mir leider nichts, weil mir das mit den Klassen vollkommen auf der Strecke bleibt. Ich versuche mal das Beispiel zu verstehen, und dann auf meinen Klassen anzuwenden.
__deets__
User
Beiträge: 14493
Registriert: Mittwoch 14. Oktober 2015, 14:29

Es ist ueblich fuer SQLAlchemy die Session in einem Modul zu definieren und dann in anderen Modulen eben einfach zu importieren. Und in main() bindet man die konkrete Engine (die ja zB von Benutzereinstellungen abhaengen kann etc) an die Session. Wie vorgemacht.

Und das threading-Modul unterscheided sich bezueglich des gesagten nicht von QThreads, insofern verstehe ich nicht, warum dir "das nichts bringt". Das man fuer Qt QThreads braucht ist schoen, SQLAlchemy ist das egal.
Benutzeravatar
Sophus
User
Beiträge: 1109
Registriert: Freitag 25. April 2014, 12:46
Wohnort: Osnabrück

__deets__ hat geschrieben:Und das threading-Modul unterscheided sich bezueglich des gesagten nicht von QThreads, insofern verstehe ich nicht, warum dir "das nichts bringt". Das man fuer Qt QThreads braucht ist schoen, SQLAlchemy ist das egal.
Na ja, meine QThread-Struktur ist doch ganz anders, da unterscheidet sich eine ganze Menge. In meinem Fall wird die Worker()-Klasse zum QThread() hinzugefügt. Bei dir spielt sich alles auf einfachen Funktionen ab. Da ist nichts, einfach Funktionen an den Threads zu übergeben. Deine Struktur ist weitaus unterschiedlicher als meine Struktur, und daher komme ich gerade gar nicht mit.
__deets__
User
Beiträge: 14493
Registriert: Mittwoch 14. Oktober 2015, 14:29

Da ich ueberhaupt *NICHTS* an meine Thread-targets uebergebe, ist das eigentlich voellig gleichgueltig, wie diese Funktionen zur Ausfuehrung gelangen. Die kannst du auch in work() aufrufen, bzw. den Code ersetzen.

Mir scheint es eher so, als ob du schon viel investiert hast in deinen Code, und jetzt Aenderungen lieber einarbeiten wuerdest, statt dir mal von Grund auf klar zu machen, wie man mit threading und SQLAlchemy arbeitet. Halte ich fuer einen verfehlten Ansatz. Bau lieber ein kleines Beispiel das geht, und lerne daran, statt dein Monstrum zaehmen zu wollen.
Benutzeravatar
Sophus
User
Beiträge: 1109
Registriert: Freitag 25. April 2014, 12:46
Wohnort: Osnabrück

Gut, dann basteln wir mal was. Wobei ich anmerken muss, dass mein Quelltext nicht funktioniert. Irgendwo scheint etwas zu klemmen. Aber das ist erst einmal nebensächlich, weil es mir darum geht, zu erkennen, ob ich deine ausführlichen Aussagen verstanden habe. Damit meine Spielerei auch meinem Projekt etwas nahe kommt, habe ich insgesamt fünf Module eingerichtet.

defined_session.py
Du hast gesagt, dass sowohl session_factory () als auch scoped_session() auf Modulebene angelegt werden können, und diese dann in anderen Modulen nur zu importieren brauche.

Code: Alles auswählen

from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker

session_factory = sessionmaker()
Session = scoped_session(session_factory)
managed_engine.py
Für meine Engine habe ich mir die Freiheit erlaubt, und die separate Klasse erstellt.

Code: Alles auswählen

from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base

''' setting up root class for declarative declaration '''
Base = declarative_base()

class ManagedEngine(object):
    def __init__(self,
                 dbms=None,
                 dbdriver=None,
                 dbuser=None,
                 dbuser_pwd=None,
                 db_server_host=None,
                 dbport=None,
                 db_name=None,
                 echo_verbose=True):
        
        self.dbms = dbms
        self.dbdriver = dbdriver
        self.dbuser = dbuser
        self.dbuser_pwd = dbuser_pwd
        self.db_server_host = db_server_host
        self.dbport = dbport
        self.db_name = db_name
        self.echo_verbose = echo_verbose
        url = '{}+{}://{}:{}@{}:{}/{}'.format(
            self.dbms, self.dbdriver, self.dbuser, self.dbuser_pwd, self.db_server_host, self.dbport, self.db_name)

        self._engine = create_engine(url, echo=self.echo_verbose)
        
        # I have to persist all tables and create them
        Base.metadata.create_all(self._engine)

managed_data_manipulation.py
In dieser Klasse liegen meine Operatoren, und ich habe sessionmaker() und scoped_session() importiert.

Code: Alles auswählen

from defined_session import*

class MasterDataManipulation(object):

    def __init__(self):

        self.attr = None

    def select_all(self, category):

        dict_store_session_query = {'person_gender':               lambda: Session.query(PERSON_GENDER),
                                     'person_nationality':          lambda: Session.query(PERSON_NATIONALITY),
                                     'person_salutation':           lambda: Session.query(PERSON_SALUTATION),
                                     'person_title':                lambda: Session.query(PERSON_TITLE),
                                     'person_hair_color':           lambda: Session.query(PERSON_HAIR_COLOR),
                                     'person_eye_color':            lambda: Session.query(PERSON_EYE_COLOR),
                                     'person_religion':             lambda: Session.query(PERSON_RELIGION),
                                     'person_relationship_status':  lambda: Session.query(PERSON_RELATIONSHIP_STATUS)}

        try:

            for record in dict_store_session_query[category]():
                if category == 'person_gender':
                    yield record.id, record.gender
                if category == 'person_nationality':
                    yield record.id, record.nationality
                if category == 'person_salutation':
                    yield record.id, record.salutation
                if category == 'person_title':
                    yield record.id, record.title
                if category == 'person_hair_color':
                    yield record.id, record.hair_color
                if category == 'person_eye_color':
                    yield record.id, record.eye_color
                if category == 'person_religion':
                    yield record.id, record.religion
                if category == 'person_relationship_status':
                    yield record.id, record.relationship_status
            Session.commit()

        except Exception:
            Session.rollback()
worker.py
Hier habe ich die scoped_session() und sessionmaker() nicht importiert, brauchen wir nicht.

Code: Alles auswählen

import sys
 
from PyQt4.QtCore import QTimer, QObject, pyqtSignal 
from managed_data_manipulation import MasterDataManipulation
class Worker(QObject):

    notify_item = pyqtSignal(object)
    finish_progress = pyqtSignal()
   
    def __init__(self, category=None, parent=None):
        QObject.__init__(self, parent)

        self.category=category
        
    def init_object(self):
        master_data_manipulation = MasterDataManipulation()
        query_data=master_data_manipulation.select_all
        self._element = query_data(self.category)

        self.timer = QTimer()

        self.timer.setSingleShot(False)
        self.timer.setInterval(100)
        self.timer.timeout.connect(self.increment)
        self.timer.start()
           
    def increment(self):
 
        try:
            self.notify_item.emit(next(self._element))
 
        except StopIteration:

            self.finish_progress.emit()
 
            self.timer.stop()
       
    def stop(self):

        self.timer.stop()
my_custom_dialog.py
Ich habe auf die QComboBox()-Objekte verzichtet. Ich nehme die Print-Anweisungen. Die on_login()-Methode simuliert das Anmelden an den Datenbank. In der on_start_select_all()-Methode starte ich die ganzen Threads.

Code: Alles auswählen

import sys

from PyQt4.QtCore import pyqtSignal, QThread
 
from PyQt4.QtGui import QDialog, QLabel, QPushButton, \
     QApplication, QVBoxLayout

from sqlalchemy.exc import SQLAlchemyError

from defined_session import *

from worker import Worker
from managed_engine import ManagedEngine


class MyCustomDialog(QDialog):
 
    finish = pyqtSignal()
 
    def __init__(self, parent=None):
        QDialog.__init__(self, parent)

        self.managed_engine = None
       
        layout = QVBoxLayout(self)
       
        self.pushButton_start = QPushButton("Start", self)
        self.pushButton_login = QPushButton("LogIn", self)
        self.pushButton_stopp = QPushButton("Stopp", self)
        self.pushButton_close = QPushButton("Close", self)

        layout.addWidget(self.pushButton_login)
        layout.addWidget(self.pushButton_start)
        layout.addWidget(self.pushButton_stopp)
        layout.addWidget(self.pushButton_close)

        self.pushButton_login.clicked.connect(self.on_login)
        self.pushButton_start.clicked.connect(self.on_start_select_all)
        self.pushButton_stopp.clicked.connect(self.on_finish)
        self.pushButton_close.clicked.connect(self.close)
 
    def print_populate(self, i):
        print "i", i

    def on_login(self):
        dbms ="mysql"
        dbdriver="pymysql"
        dbuser="root"
        dbuser_pwd="xxx"
        db_server_host="localhost"
        dbport=3306
        db_name="test"
        echo_verbose=True

        try:

            managed_engine = ManagedEngine(dbms=dbms, dbdriver=dbdriver,
                                               dbuser=dbuser, dbuser_pwd=dbuser_pwd,
                                               db_server_host=db_server_host,
                                               dbport=dbport, db_name=db_name,
                                               echo_verbose=echo_verbose)
            
            Session.configure(bind=managed_engine._engine)
            
        except SQLAlchemyError as Err:
            print "Err", Err

    def on_start_select_all(self):
        category_list = [
                    "person_salutation", "person_title",  
                    "person_gender","person_religion",
                    "person_eye_color", "person_hair_color",
                    "person_relationship_status",
                    "person_nationality"
                       ]
        for category in category_list:
            self.on_start_thread_tasks(category=category)
       
    def on_start_thread_tasks(self, category=None):        
         task_thread = QThread(self)
         
         task_thread.work = Worker(category=category)
         task_thread.work.moveToThread(task_thread)
         
         task_thread.work.notify_item.connect(self.print_populate)
         task_thread.work.finish_progress.connect(task_thread.quit)
 
         self.finish.connect(task_thread.work.stop)
         
         task_thread.started.connect(task_thread.work.init_object)
         task_thread.finished.connect(task_thread.deleteLater)
         task_thread.start()
 
    def on_finish(self):
         self.finish.emit()
     
def main():
    app = QApplication(sys.argv)
    window = MyCustomDialog()
    window.resize(600, 400)
    window.show()
    sys.exit(app.exec_())
 
if __name__ == "__main__":
    main()

Benutzeravatar
Sophus
User
Beiträge: 1109
Registriert: Freitag 25. April 2014, 12:46
Wohnort: Osnabrück

Damit man nicht alles kopieren muss, habe ich dieses Programm auf BitBucket hochgeladen: https://bitbucket.org/Xenophyl/sqlalche ... thread/src
Benutzeravatar
Sophus
User
Beiträge: 1109
Registriert: Freitag 25. April 2014, 12:46
Wohnort: Osnabrück

Zwischenergebnis: Wenn ich in der MyCustomDialog()-Klasse die on_start_select_all()-Methode, nach erfolgreicher Anmeldung an die Datenbank, mehrmals ausführe gibt es schon Probleme. Beim ersten Mal wird alles wunderbar ausgeführt. Beim Zweiten Mal werden mal sporadisch ein paar Datensätze unterschlagen und beim dritten Mal werden gar keine Daten erst abgefragt. Wenn ich eine Weile warte, und ein viertes Male versuche, dann kann es passieren, dass alle Daten geholt werden.
__deets__
User
Beiträge: 14493
Registriert: Mittwoch 14. Oktober 2015, 14:29

So, das ist mein letzter Beitrag hier, weil ich meinen Punkt ausreichend belegt habe, und du augenscheinlich nicht in der Lage bist, dich mal wirklich mit einem kleinen Beispiel zu beschaefigen, sondern stattdessen immer riesen Menge an Code schreibst bei denen die Problem gottweisswo liegen koennen.

Ich bekomme auf meinem System kein Qt4 mehr installiert (das ist *uralt* und nicht mehr maintained. Solltest du nicht mehr verwenden, aber auch da halte ich jetzt lieber nicht die Luft an...), darum ist das PyQt5. Und was soll ich sagen? Es funktioniert wie angepriesen, man kann wunderbar nebenlaeufig Queries fahren etc.

Code: Alles auswählen

import sys
import time
import random
import threading

from PyQt5.QtWidgets import QApplication, QWidget, QPushButton
from PyQt5.QtCore import QThread, QObject

from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker


session_factory = sessionmaker()
Session = scoped_session(session_factory)


def execute_db_query():
    try:
        res = Session.execute("SELECT * FROM test LIMIT 100")
        for row in res:
            print(threading.currentThread().getName(), row)
            time.sleep(.1)
    except:
        Session.rollback()
        print(sys.exc_info())
    else:
        Session.commit()


class Worker(QObject):

    def work(self):
        while True:
            print("working...")
            time.sleep(random.random() * 3)
            execute_db_query()


def setup_db():
    eng = create_engine('sqlite:////tmp/test.db')
    Session.configure(bind=eng)

    Session.execute('CREATE TABLE IF NOT EXISTS test ( data int)')
    for i in range(1000):
        Session.execute('INSERT INTO test (data) values (:param)',  { "param": i})

    Session.commit()

def create_worker_thread(parent=None):
    task_thread = QThread(parent)
    task_thread.worker = Worker()
    task_thread.worker.moveToThread(task_thread)
    task_thread.started.connect(task_thread.worker.work)
    task_thread.finished.connect(task_thread.deleteLater)
    return task_thread


def main():
    setup_db()
    app = QApplication(sys.argv)

    w = QWidget()
    w.resize(250, 150)
    w.move(300, 300)
    w.setWindowTitle('Simple')

    button = QPushButton(w)
    button.clicked.connect(execute_db_query)
    w.show()

    worker_thread = create_worker_thread()
    worker_thread.start()
    sys.exit(app.exec_())


if __name__ == '__main__':
    main()
Benutzeravatar
snafu
User
Beiträge: 6731
Registriert: Donnerstag 21. Februar 2008, 17:31
Wohnort: Gelsenkirchen

Sophus hat geschrieben:Wenn ich das wie __deets__ und snafu mache, dann müsste ich die ganzen Abfragen in meinen Hauptthread (in dem Falle wäre es die GUI-Klasse) verlegen.
Nein, ich habe für das genaue Gegenteil argumentiert: Die Session soll erst in der Worker-Funktion (Thread) angefasst werden. Irgendwie schreibe ich A und du verstehst Z. Das ist langsam frustrierend. :(
Benutzeravatar
Sophus
User
Beiträge: 1109
Registriert: Freitag 25. April 2014, 12:46
Wohnort: Osnabrück

@snafu: Es ist nicht böse gemeint, aber was meinst, wie frustrierend das für mich ist? Erst arbeite ich mich deine Vorschläge durch, klappt nicht, dann die von __deets__ und irgendwie will das auch nicht klappen und dabei habe ich einen klar leserlichen Quelltext auf BitBucket bereits hingelegt :(
Benutzeravatar
snafu
User
Beiträge: 6731
Registriert: Donnerstag 21. Februar 2008, 17:31
Wohnort: Gelsenkirchen

Dass du erstmal alles ohne zusätzliche Threads schreibst, ist gar keine Option für dich? Dir fehlt bezüglich Threads offenbar noch einiges an Grundverständnis. Du hängst ja jetzt schon seit Tagen an dem Problem und doktorst mehr herum als du wirklich verstehst.
Benutzeravatar
Sophus
User
Beiträge: 1109
Registriert: Freitag 25. April 2014, 12:46
Wohnort: Osnabrück

@snafu: Ich erstelle gerade extra mit deinem Namen eine Responsy auf meinem BitBucket, und lade mal deine Version hoch. Wie man mit Qthreads arbeitet, weiß ich, hoffentlich schon. Denn abseits von SQLAlchemy stellt für mich QThread kein Problem dar. Ich melde mit zurück, wenn die Responsy fertig ist.
Antworten