SQLAlchemy: Arbeiten mit mehreren QThreads?

Installation und Anwendung von Datenbankschnittstellen wie SQLite, PostgreSQL, MariaDB/MySQL, der DB-API 2.0 und sonstigen Datenbanksystemen.
Benutzeravatar
Sophus
User
Beiträge: 1109
Registriert: Freitag 25. April 2014, 12:46
Wohnort: Osnabrück

Hallo Leute,

der nachfolgende, ausführbare Quelltext wirkt auf dem ersten Blick ein wenig "groß". Aber keine Bange, bestimmte Aspekte könnt ihr getrost ignorieren. Warum ich das Programm so groß gestaltet habe? Auf diese Weise möchte ich meinem Problem ziemlich nahe kommen

Zeilen, die ihr ignorieren könnt:
- Zeile 18 - 72: hierbei handelt es sich nur um Tabellen
- Zeile 180 - 224: hier handelt es sich von einer Session, die ich in ein Context-Manager umgerüstet habe.
- Zeile 336 - 413: hier werden nur Daten eingetragen, wenn der Anwender es sich wünscht. Mit gefüllten Tabellen lässt sich besser arbeiten, als mit leeren. :)
- Zeile 415 - 447: der übliche Start eines programm

Was erwarte ich?
Auf der derzeitigen GUI sind insgesamt acht QComboBox(), für jede Kategorie eine QComboBox().
Wenn ihr einen Blick in die start_all_selection()-Methode (Zeile 272) der MyCustomDialog()-Klasse werft, dann seht ihr, dass ich
in Zeile 295-296 vor habe mehrere Threads zu starten. Warum ich der start_thread()-Methode die QComboBox() und bestimmte String übergebe, wird erst in der select_all()-Methode (Zeile 80) der MasterDataManipulation()-Klasse deutlich. Meine Ausgangsidee war, dass ich die Session einmal mittels der with-Anweisung öffne, nach bestimmten Kategorien suche, und dann die Abfrage erstelle, damit diese dann später über die die jeweilige QComboBox() ausgegeben werden. Noch einmal kurz zurück. Wenn ihr einen Blick in der start_thread()-Methode (Zeile 305) der MyCustomDialog()-Klasse werft, seht ihr, dass nicht nur jedesmal ein neuer Qthread und Worker() erstellt wird, sondern auch gleich dazu die MasterDataManipulation()-Klasse, der die aktuelle Session übergeben wird.
Auf diese Weise habe ich erwartet, dass bei jeder Abfrage eine neue Session eröffnet wird, und dann wieder geschlossen wird. Das heißt, die Abfrage für die Kategorie Gender wird eine neue Session geöffnet, dann für die Kategorie Religion etc. Also für jede Kategorie nicht nur einen neuen Thread, sondern auch neue Sitzung.

Problem
So wie der Quelltext jetzt ist, funktioniert es einwandfrei - zumindest augenscheinlich. Jetzt gehen wir in die Worker()-Klasse und ändern in Zeile 152 den Interval des QTimer() von derzeit 1000 auf 1. Wir wollen ja, dass das Programm ein bisschen zügiger die Daten holt. Wenn ich dann das Programm ausführe, verhält sich mein Programm ein wenig merkwürdig, als ob es ein Eigenleben besitzt.

Manchmal kommt es vor, dass zwar keine Ausnahmen geworfen werden, jedoch werden NICHT ALLE QComboBox()-Objekte gefüllt. Mal bleibt nur eine leer oder auch mehrere.

Des Weiteren passiert auch hin und wieder mal, dass mir von seiten SQLAlchemy gesagt wird, dass die Verbindung geschlossen wurde. Dazu habe ich eine Fehlermeldung:
Traceback (most recent call last):
File "D:\Dan\Python\Xarphus\xarphus\subclass_master_data_load_data_item.py", line 151, in populate_item
self.populate_item_signal.emit(next(self._element))
File "D:\Dan\Python\Xarphus\xarphus\core\manage_data_manipulation_master_data.py", line 232, in select_all
yield record.id, record.relationship
File "D:\Dan\Python\Xarphus\xarphus\core\manage_db_connection.py", line 245, in __exit__
self.session.commit()
File "C:\Python27\lib\site-packages\sqlalchemy\orm\session.py", line 906, in commit
self.transaction.commit()
File "C:\Python27\lib\site-packages\sqlalchemy\orm\session.py", line 465, in commit
t[1].commit()
File "C:\Python27\lib\site-packages\sqlalchemy\engine\base.py", line 1632, in commit
self._do_commit()
File "C:\Python27\lib\site-packages\sqlalchemy\engine\base.py", line 1663, in _do_commit
self.connection._commit_impl()
File "C:\Python27\lib\site-packages\sqlalchemy\engine\base.py", line 726, in _commit_impl
self.connection._reset_agent is self.__transaction:
File "C:\Python27\lib\site-packages\sqlalchemy\engine\base.py", line 351, in connection
self._handle_dbapi_exception(e, None, None, None, None)
File "C:\Python27\lib\site-packages\sqlalchemy\engine\base.py", line 1405, in _handle_dbapi_exception
util.reraise(*exc_info)
File "C:\Python27\lib\site-packages\sqlalchemy\engine\base.py", line 349, in connection
return self._revalidate_connection()
File "C:\Python27\lib\site-packages\sqlalchemy\engine\base.py", line 429, in _revalidate_connection
raise exc.ResourceClosedError("This Connection is closed")
ResourceClosedError: This Connection is closed

Code: Alles auswählen

import sys

from PyQt4.QtCore import QObject, QThread, pyqtSignal, pyqtSlot, QTimer
from PyQt4.QtGui import QApplication, QPushButton, QVBoxLayout, QDialog, \
                        QComboBox, QLabel

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy import Table, Column, Integer, String, MetaData

from traceback import format_exc
from sys import exc_info
 
Base = declarative_base()

class PERSON_GENDER(Base):

    __tablename__ = "person_gender"

    id = Column(Integer, primary_key=True)
    gender = Column(String(50), nullable=False, unique=True)

class PERSON_NATIONALITY(Base):

    __tablename__ = "person_nationality"

    id = Column(Integer, primary_key=True)
    nationality = Column(String(100), nullable=False, unique=True)

class PERSON_SALUTATION(Base):

    __tablename__ = "person_salutation"

    id = Column(Integer, primary_key=True)
    salutation = Column(String(50), nullable=False, unique=True)

class PERSON_TITLE(Base):

    __tablename__ = "person_title"

    id = Column(Integer, primary_key=True)
    title = Column(String(50), nullable=False, unique=True)

class PERSON_HAIR_COLOR(Base):

    __tablename__ = "person_hair_color"

    id = Column(Integer, primary_key=True)
    hair_color = Column(String(50), nullable=False, unique=True)

class PERSON_EYE_COLOR(Base):

    __tablename__ = "person_eye_color"

    id = Column(Integer, primary_key=True)
    eye_color = Column(String(50), nullable=False, unique=True)

class PERSON_RELIGION(Base):

    __tablename__ = "person_religion"

    id = Column(Integer, primary_key=True)
    religion = Column(String(50), nullable=False, unique=True)

class PERSON_RELATIONSHIP_STATUS(Base):

    __tablename__ = "person_relationship_status"

    id = Column(Integer, primary_key=True)
    relationship_status = Column(String(100), nullable=False, unique=True)

class MasterDataManipulation(object):

    def __init__(self, session_object=None):

    	self._session_scope = session_object

    def select_all(self, category):

        try:
            with self._session_scope as session:

                dict_store_session_query = {'person_gender':               lambda: session.query(PERSON_GENDER),
                                             'person_nationality':          lambda: session.query(PERSON_NATIONALITY),
                                             'person_salutation':           lambda: session.query(PERSON_SALUTATION),
                                             'person_title':                lambda: session.query(PERSON_TITLE),
                                             'person_hair_color':           lambda: session.query(PERSON_HAIR_COLOR),
                                             'person_eye_color':            lambda: session.query(PERSON_EYE_COLOR),
                                             'person_religion':             lambda: session.query(PERSON_RELIGION),
                                             'person_relationship_status':  lambda: session.query(PERSON_RELATIONSHIP_STATUS)}

                for record in dict_store_session_query[category]():
                    if category == 'person_gender':
                        yield record.id, record.gender
                    if category == 'person_nationality':
                        yield record.id, record.nationality
                    if category == 'person_salutation':
                        yield record.id, record.salutation
                    if category == 'person_title':
                        yield record.id, record.title
                    if category == 'person_hair_color':
                        yield record.id, record.hair_color
                    if category == 'person_eye_color':
                        yield record.id, record.eye_color
                    if category == 'person_religion':
                        yield record.id, record.religion
                    if category == 'person_relationship_status':
                        yield record.id, record.relationship_status

        except AttributeError:

            print "select all, desired_trace",  format_exc(exc_info())

        return

class Worker(QObject):
 
    finish_progress = pyqtSignal()
    populate_item_signal = pyqtSignal(object, object, object)
   
    def __init__(self,
                 category=None,
                 combo_box=None,
                 query_data=None,
                 parent=None):
        QObject.__init__(self, parent)

        self.category = category
        self.query_data = query_data
        self.combo_box=combo_box

        ''' Create attributes '''
        self._run_semaphore = 1

    def init_object(self):

        '''
            Storing new generator object, will reuse it.
            That means you have to create one generator.
        '''
        self._element = self.query_data(self.category)

        self.timer = QTimer()

        # assoziiert select_all_data() mit TIMEOUT Ereignis
        self.timer.setSingleShot(False)
        self.timer.setInterval(1000)
        self.timer.timeout.connect(self.populate_item)
        self.timer.start()
        
    def populate_item(self):
        try:

            if self._run_semaphore == 0:
    
                self._run_semaphore = 1

                raise StopIteration

            else:

                self.populate_item_signal.emit(next(self._element), self.category, self.combo_box)

        except StopIteration:

            print "StopIteration is raised"

            self.timer.stop()
       
    def stop(self):
        self._run_semaphore == 0
        #self.timer.stop()
        
        
class SessionScope(object):
    def __init__(self, dbms=None, dbdriver=None,
                 dbuser=None, dbuser_pwd=None,
                 db_server_host=None, dbport=None, db_name=None,
                 admin_database=None):
 
        self.dbms = dbms
        self.dbdriver = dbdriver
        self.dbuser = dbuser
        self.dbuser_pwd = dbuser_pwd
        self.db_server_host = db_server_host
        self.dbport = dbport
        self.db_name = db_name
        self.admin_database = admin_database
       
        url = '{}+{}://{}:{}@{}:{}/{}'.format(
           self.dbms, self.dbdriver, self.dbuser, self.dbuser_pwd, self.db_server_host, self.dbport, self.db_name)

        self._Engine = create_engine(url, encoding='utf8', echo=True)
 
        self.session = None
 
        self._session_factory = sessionmaker(bind=self._Engine)
 
        self._Session = scoped_session(sessionmaker(bind=self._Engine, expire_on_commit=False))

        Base.metadata.create_all(self._Engine)
 
    def __enter__(self):
        self.session = self._Session()
        return self.session
 
    def __exit__(self, exception, exc_value, traceback):
 
        try:
            if exception:
 
                self.session.rollback()
            else:
 
                self.session.commit()
 
        finally:
 
            self.session.close()

class MyCustomDialog(QDialog):
 
    finish = pyqtSignal()
 
    def __init__(self, scoped_session=None, parent=None):
        QDialog.__init__(self, parent)

        self._session_scope = scoped_session

        self._list_threads = []

        self.init_ui()
        self.start_all_selection()

    def init_ui(self):

        layout = QVBoxLayout(self)
 
        self.combo_person_title = QComboBox(self)
        self.combo_person_salutation = QComboBox(self)
        self.combo_person_gender = QComboBox(self)
        self.combo_person_religion = QComboBox(self)
        self.combo_person_relationship_status = QComboBox(self)
        self.combo_person_nationality = QComboBox(self)
        self.combo_person_eye_color = QComboBox(self)
        self.combo_person_hair_color = QComboBox(self)

        self.pushButton_populate_combo = QPushButton("Re-populate", self)
        self.pushButton_stopp = QPushButton("Stopp", self)
        self.pushButton_close = QPushButton("Close", self)
        layout.addWidget(self.combo_person_title)
        layout.addWidget(self.combo_person_salutation)
        layout.addWidget(self.combo_person_gender)
        layout.addWidget(self.combo_person_religion)
        layout.addWidget(self.combo_person_nationality)
        layout.addWidget(self.combo_person_relationship_status)
        layout.addWidget(self.combo_person_eye_color)
        layout.addWidget(self.combo_person_hair_color)
        layout.addWidget(self.pushButton_populate_combo)
        layout.addWidget(self.pushButton_stopp)
        layout.addWidget(self.pushButton_close)

        self.pushButton_stopp.clicked.connect(self.on_finish)
        self.pushButton_populate_combo.clicked.connect(self.start_all_selection)
        self.pushButton_close.clicked.connect(self.close)

    def start_all_selection(self):

        list_comboxes = self.findChildren(QComboBox)

        for combox in list_comboxes:
            combox.clear()
        
        list_start_threads = [('person_gender',self.combo_person_gender),
                              ('person_nationality', self.combo_person_nationality),
                              ('person_salutation', self.combo_person_salutation),
                              ('person_title', self.combo_person_title),
                              ('person_hair_color',self.combo_person_hair_color),
                              ('person_eye_color', self.combo_person_eye_color),
                              ('person_religion', self.combo_person_religion),
                              ('person_relationship_status', self.combo_person_relationship_status)]
        
        for category, combo_box in list_start_threads:
            self.start_thread(category=category, combo_box=combo_box)
        
    def fill_combo_boxt(self, item, category, combo_box):
        _, text = item
        combo_box.addItem(text)
        
    def on_label(self, i):
         self.label.setText("Result: {}".format(i))
       
    def start_thread(self, category=None, combo_box=None):
        master_data_manipulation = MasterDataManipulation(session_object=self._session_scope)
        query_data=master_data_manipulation.select_all

        task_thread = QThread(self)
        task_thread.work = Worker(query_data=query_data,
                                  combo_box=combo_box,
                                  category=category)

        ''' We need to store threads '''
        self._list_threads.append(task_thread)  
        task_thread.work.moveToThread(task_thread)

        task_thread.work.populate_item_signal.connect(self.fill_combo_boxt)

        self.finish.connect(task_thread.work.stop)

        task_thread.started.connect(task_thread.work.init_object)

        task_thread.finished.connect(task_thread.deleteLater)

        ''' This will emit 'started' and start thread's event loop '''
        task_thread.start()

    @pyqtSlot()
    def abort_workers(self):
        self.finish.emit()
        for thread in self._list_threads:
            ''' this will quit **as soon as thread event loop unblocks** '''
            thread.quit()

            ''' so you need to wait for it to *actually* quit'''
            thread.wait()
 
    def on_finish(self):
         self.finish.emit()

    def closeEvent(self, event):
        ''' Re-implementaate to handle with created threads '''
        self.abort_workers()

def populate_database(sess=None):

    try:
    
        with sess as session:
            genders = [PERSON_GENDER(gender="male"),
                       PERSON_GENDER(gender="female"),
                       PERSON_GENDER(gender="hybrid"),
                       PERSON_GENDER(gender="trans")]
            session.add_all(genders)

            nationalitys = [PERSON_NATIONALITY(nationality="german"),
                           PERSON_NATIONALITY(nationality="english"),
                           PERSON_NATIONALITY(nationality="french"),
                           PERSON_NATIONALITY(nationality="spanish"),
                           PERSON_NATIONALITY(nationality="greek"),
                           PERSON_NATIONALITY(nationality="mexican"),
                           PERSON_NATIONALITY(nationality="sweden"),]
            session.add_all(nationalitys)

            salutations = [PERSON_SALUTATION(salutation="Mister"),
                           PERSON_SALUTATION(salutation="Miss"),
                           PERSON_SALUTATION(salutation="Lady"),
                           PERSON_SALUTATION(salutation="Ma'am"),
                           PERSON_SALUTATION(salutation="Sir"),
                           PERSON_SALUTATION(salutation="Queen"),
                           PERSON_SALUTATION(salutation="Grandma"),]
            session.add_all(salutations)

            titles = [PERSON_TITLE(title="Prof."),
                      PERSON_TITLE(title="Dr."),
                      PERSON_TITLE(title="Sir"),
                      PERSON_TITLE(title="B.A."),
                      PERSON_TITLE(title="M.A."),
                      PERSON_TITLE(title="Bishop"),
                      PERSON_TITLE(title="God"),]
            session.add_all(titles)

            hair_colors = [PERSON_HAIR_COLOR(hair_color="blond."),
                           PERSON_HAIR_COLOR(hair_color="gray"),
                           PERSON_HAIR_COLOR(hair_color="blue"),
                           PERSON_HAIR_COLOR(hair_color="white"),
                           PERSON_HAIR_COLOR(hair_color="black"),
                           PERSON_HAIR_COLOR(hair_color="violet"),
                           PERSON_HAIR_COLOR(hair_color="brunette"),]
            session.add_all(hair_colors)

            eye_colors = [PERSON_EYE_COLOR(eye_color="blue."),
                          PERSON_EYE_COLOR(eye_color="blue-gray"),
                          PERSON_EYE_COLOR(eye_color="green"),
                          PERSON_EYE_COLOR(eye_color="white"),
                          PERSON_EYE_COLOR(eye_color="black"),
                          PERSON_EYE_COLOR(eye_color="violet"),
                          PERSON_EYE_COLOR(eye_color="brunette"),]
            session.add_all(eye_colors)

            religions = [PERSON_RELIGION(religion="Catholic."),
                         PERSON_RELIGION(religion="Protestant"),
                         PERSON_RELIGION(religion="Jew"),
                         PERSON_RELIGION(religion="Muslim"),
                         PERSON_RELIGION(religion="Islam"),
                         PERSON_RELIGION(religion="Hindu"),
                         PERSON_RELIGION(religion="Buddha"),]
            session.add_all(religions)

            relationship_status = [PERSON_RELATIONSHIP_STATUS(relationship_status="Single."),
                                   PERSON_RELATIONSHIP_STATUS(relationship_status="In a relationship"),
                                   PERSON_RELATIONSHIP_STATUS(relationship_status="Married"),
                                   PERSON_RELATIONSHIP_STATUS(relationship_status="In a open relationship"),
                                   PERSON_RELATIONSHIP_STATUS(relationship_status="Engaged"),
                                   PERSON_RELATIONSHIP_STATUS(relationship_status="Divorced"),
                                   PERSON_RELATIONSHIP_STATUS(relationship_status="Separate"),]
            session.add_all(relationship_status)

            session.commit()

    except SQLAlchemyError:
        print "SQLAlchemyError", format_exc(exc_info())
            
def main():
    dbms = raw_input('Enter database type: ')
    dbdriver = raw_input('Enter database driver: ')
    dbuser = raw_input('Enter user name: ')
    dbuser_pwd = raw_input('Enter user password: ')
    db_server_host = raw_input('Enter server host: ')
    dbport = raw_input('Enter port: ')
    db_name = raw_input('Enter database name: ')

    try:
        session_scope = SessionScope(dbms = dbms,
                                     dbdriver = dbdriver,
                                     dbuser = dbuser,
                                     dbuser_pwd = dbuser_pwd,
                                     db_server_host = db_server_host,
                                     dbport = dbport,
                                     db_name = db_name)

        answer = raw_input('Do you want to populate database? Type yes or no: ')
        
        if answer.lower() == 'yes':
            populate_database(sess=session_scope)

        app = QApplication(sys.argv)
        window = MyCustomDialog(scoped_session = session_scope)
        window.show()
        sys.exit(app.exec_())
    except TypeError:
        
        print "ERROR", format_exc(exc_info())
        
if __name__ == "__main__":
    main()
Benutzeravatar
Sophus
User
Beiträge: 1109
Registriert: Freitag 25. April 2014, 12:46
Wohnort: Osnabrück

Hallo Leute, ich konnte den oberen Quelltext wesentlich kürzen, um am Ende auf das gleiche Problem zu stoßen. Anstatt mit 8 Tabellen zu arbeiten habe ich nur eine Tabelle angelegt. Dennoch werden weiterhin 8 Abfragen erstellt, die dann an die Datenbank gesendet werden. Auch hier kommen gleiche Probleme wie beim Programm oben. Außerdem musste ich sys.exit() verwenden, damit das Programm richtig beendet wird. Ich denke, dass ist auch ziemlich unüblich.

Code: Alles auswählen

import sys

from PyQt4.QtCore import QObject, QThread, pyqtSignal, pyqtSlot, QTimer
from PyQt4.QtGui import QApplication, QPushButton, QVBoxLayout, QDialog, \
                        QComboBox, QLabel

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy import Table, Column, Integer, String, MetaData

from traceback import format_exc
from sys import exc_info
 
''' setting up root class for declarative declaration '''
Base = declarative_base()

class PERSON_SALUTATION(Base):

    __tablename__ = "person_salutation"

    id = Column(Integer, primary_key=True)
    salutation = Column(String(50), nullable=False, unique=True)


class MasterDataManipulation(object):

    def __init__(self, session_object=None):

    	self._session_scope = session_object

    def select_all(self):

        try:
            with self._session_scope as session:

                for record in session.query(PERSON_SALUTATION):

                    yield record.id, record.salutation
                    
        except AttributeError:

            print "select all, desired_trace",  format_exc(exc_info())

        return

class Worker(QObject):
 
    finish_progress = pyqtSignal()
    populate_item_signal = pyqtSignal(object, object)
   
    def __init__(self,
                 combo_box=None,
                 query_data=None,
                 parent=None):
        QObject.__init__(self, parent)

        self.query_data = query_data
        self.combo_box=combo_box

        ''' Create attributes '''
        self._run_semaphore = 1

    def init_object(self):

        self._element = self.query_data()

        self.timer = QTimer()

        # assoziiert select_all_data() mit TIMEOUT Ereignis
        self.timer.setSingleShot(False)
        self.timer.setInterval(1)
        self.timer.timeout.connect(self.populate_item)
        self.timer.start()
        
    def populate_item(self):
        try:

            if self._run_semaphore == 0:
    
                self._run_semaphore = 1

                return#raise StopIteration

            else:

                self.populate_item_signal.emit(next(self._element), self.combo_box)

        except StopIteration:

            print "StopIteration is raised"

            self.timer.stop()
       
    def stop(self):
        self._run_semaphore == 0
        self.timer.stop()
        
class SessionScope(object):
    def __init__(self, dbms=None, dbdriver=None,
                 dbuser=None, dbuser_pwd=None,
                 db_server_host=None, dbport=None, db_name=None,
                 admin_database=None):
 
        self.dbms = dbms
        self.dbdriver = dbdriver
        self.dbuser = dbuser
        self.dbuser_pwd = dbuser_pwd
        self.db_server_host = db_server_host
        self.dbport = dbport
        self.db_name = db_name
        self.admin_database = admin_database
       
        url = '{}+{}://{}:{}@{}:{}/{}'.format(
           self.dbms, self.dbdriver, self.dbuser, self.dbuser_pwd, self.db_server_host, self.dbport, self.db_name)

        self._Engine = create_engine(url, encoding='utf8', echo=True)

        self.session = None
 
        self._session_factory = sessionmaker(bind=self._Engine)

        self._Session = scoped_session(sessionmaker(bind=self._Engine, expire_on_commit=False))

        ''' create tables '''
        Base.metadata.create_all(self._Engine)
 
    def __enter__(self):
        self.session = self._Session()
        return self.session
 
    def __exit__(self, exception, exc_value, traceback):
 
        try:
            if exception:
 
                self.session.rollback()
            else:
 
                self.session.commit()
 
        finally:
 
            self.session.close()

class MyCustomDialog(QDialog):
 
    finish = pyqtSignal()
 
    def __init__(self, scoped_session=None, parent=None):
        QDialog.__init__(self, parent)

        self._session_scope = scoped_session

        self._list_threads = []

        self.init_ui()
        self.start_all_selection()

    def init_ui(self):

        layout = QVBoxLayout(self)
 
        self.combo_person_title = QComboBox(self)
        self.combo_person_salutation = QComboBox(self)
        self.combo_person_gender = QComboBox(self)
        self.combo_person_religion = QComboBox(self)
        self.combo_person_relationship_status = QComboBox(self)
        self.combo_person_nationality = QComboBox(self)
        self.combo_person_eye_color = QComboBox(self)
        self.combo_person_hair_color = QComboBox(self)

        self.pushButton_populate_combo = QPushButton("Re-populate", self)
        self.pushButton_stopp = QPushButton("Stopp", self)
        self.pushButton_close = QPushButton("Close", self)
        layout.addWidget(self.combo_person_title)
        layout.addWidget(self.combo_person_salutation)
        layout.addWidget(self.combo_person_gender)
        layout.addWidget(self.combo_person_religion)
        layout.addWidget(self.combo_person_nationality)
        layout.addWidget(self.combo_person_relationship_status)
        layout.addWidget(self.combo_person_eye_color)
        layout.addWidget(self.combo_person_hair_color)
        layout.addWidget(self.pushButton_populate_combo)
        layout.addWidget(self.pushButton_stopp)
        layout.addWidget(self.pushButton_close)

        self.pushButton_stopp.clicked.connect(self.on_finish)
        self.pushButton_populate_combo.clicked.connect(self.start_all_selection)
        self.pushButton_close.clicked.connect(self.close)

    def start_all_selection(self):

        list_comboxes = self.findChildren(QComboBox)

        for combo_box in list_comboxes:
            combo_box.clear()
            self.start_thread(combo_box=combo_box)
        
    def fill_combo_boxt(self, item, combo_box):
        id, text = item
        combo_box.addItem(text)
 
    def on_label(self, i):
         self.label.setText("Result: {}".format(i))
       
    def start_thread(self, combo_box=None):
        master_data_manipulation = MasterDataManipulation(session_object=self._session_scope)
        query_data=master_data_manipulation.select_all

        task_thread = QThread(self)
        task_thread.work = Worker(query_data=query_data,
                                  combo_box=combo_box,)

        ''' We need to store threads '''
        self._list_threads.append(task_thread)  
        task_thread.work.moveToThread(task_thread)

        task_thread.work.populate_item_signal.connect(self.fill_combo_boxt)

        self.finish.connect(task_thread.work.stop)

        task_thread.started.connect(task_thread.work.init_object)

        task_thread.finished.connect(task_thread.deleteLater)

        ''' This will emit 'started' and start thread's event loop '''
        task_thread.start()

    @pyqtSlot()
    def abort_workers(self):
        self.finish.emit()
        for thread in self._list_threads:
            ''' this will quit **as soon as thread event loop unblocks** '''
            thread.quit()

            ''' so you need to wait for it to *actually* quit'''
            thread.wait()
 
    def on_finish(self):
         self.finish.emit()

    def closeEvent(self, event):
        ''' Re-implementaate to handle with created threads '''
        self.abort_workers()
        
        sys.exit()

def populate_database(sess=None):

    try:
    
        with sess as session:

            salutations = [PERSON_SALUTATION(salutation="Mister"),
                           PERSON_SALUTATION(salutation="Miss"),
                           PERSON_SALUTATION(salutation="Lady"),
                           PERSON_SALUTATION(salutation="Ma'am"),
                           PERSON_SALUTATION(salutation="Sir"),
                           PERSON_SALUTATION(salutation="Queen"),
                           PERSON_SALUTATION(salutation="Grandma"),]
            session.add_all(salutations)
           
            session.commit()

    except SQLAlchemyError:
        print "SQLAlchemyError", format_exc(exc_info())
            
def main():
    dbms = raw_input('Enter database type: ')
    dbdriver = raw_input('Enter database driver: ')
    dbuser = raw_input('Enter user name: ')
    dbuser_pwd = raw_input('Enter user password: ')
    db_server_host = raw_input('Enter server host: ')
    dbport = raw_input('Enter port: ')
    db_name = raw_input('Enter database name: ')

    try:
        session_scope = SessionScope(dbms = dbms,
                                     dbdriver = dbdriver,
                                     dbuser = dbuser,
                                     dbuser_pwd = dbuser_pwd,
                                     db_server_host = db_server_host,
                                     dbport = dbport,
                                     db_name = db_name)

        answer = raw_input('Do you want to populate database? Type yes or no: ')
        
        if answer.lower() == 'yes':
            populate_database(sess=session_scope)

        app = QApplication(sys.argv)
        window = MyCustomDialog(scoped_session = session_scope)
        window.show()
        sys.exit(app.exec_())
    except TypeError:
        
        print "ERROR", format_exc(exc_info())
        
if __name__ == "__main__":
    main()
Melewo
User
Beiträge: 320
Registriert: Mittwoch 3. Mai 2017, 16:30

Sophus hat geschrieben:den Interval des QTimer() von derzeit 1000 auf 1. Wir wollen ja, dass das Programm ein bisschen zügiger die Daten holt.
Vielleicht sollte ich hier lieber nicht antworten, weil ich mich weder mit SQLAlchemy noch mit QThreads auskenne. Nur irgendwie sieht mir das nach einer Antwort wie dieser aus:
Ich laufe nicht 10-mal so schnell auf 100 Metern, nur weil Deine Stoppuhr 10-mal schneller rast, sondern würde bei einem Intervall nach Deiner rasenden Stoppuhr bei 10 Metern abbrechen, ohne das Ziel erreicht zu haben.
Benutzeravatar
Sophus
User
Beiträge: 1109
Registriert: Freitag 25. April 2014, 12:46
Wohnort: Osnabrück

@Melewo: Danke für deine Anmerkung. Mit dem Interval des QTimer()-Objektes kann es durchaus was zutun haben. Am Interval muss ich wohl noch ein wenig schrauben.

Allerdings bin auf meine falsche Strukturierung des Quelltextes gestoßen. Kurz gesagt, ich bin durch meiner richtigen Annahme auf die falsche Fährte geraten. Klingt ein wenig Paradox. Für meine Ausführung beziehe ich mich auf meinen letzten Beitrag.

Da ich weiß, dass ich mit QThread()-Objekten arbeiten möchte, habe ich mein Session()-Objekt dementsprechend gestaltet. Ein normales Session()-Objekt ist keineswegs thread-sicher (Is the session thread-safe?). Also bediene ich mich des scoped_session()-Objektes (in Zeile 124). Dieses Objekt ist zumindest thread-sicher (Using Custom Created Scopes). Und mit dieser Annahme habe ich angefangen, das einmalig erstellte Session()-Objekt (in Zeile 280) im Haupt-Thread (die GUI-Anwendung) an die jeweiligen Threads zu verteilen. Kurzum: Ich habe das eine Session()-Objekt zwischen den ganzen Threads (in meinem Beispiel sind es 8 Threads) geteilt. Schließlich nahm ich ja an, dass mein Session()-Objekt von nun an thread-sicher sei. Daher bin ich auf die falsche Fährte geraten.

Demzufolge habe ich einige Änderungen an meinem Beispiel vorgenommen. Zunächst habe ich in der select_all()-Methode der MasterDataManipulation()-Klasse angefangen
.
Anstatt jedesmal die Session erneut zu öffnen...

Code: Alles auswählen

    def select_all(self):
 
        try:
            with self._session_scope as session:
 
                for record in session.query(PERSON_SALUTATION):
 
                    yield record.id, record.salutation
                   
        except AttributeError:
 
            print "select all, desired_trace",  format_exc(exc_info())
 
        return
... habe ich die with-Anweisung entfernt, schließlich wollen wir ja das eine Session()-Objekt nicht mehr zwischen den Threads teilen.

Code: Alles auswählen

    def select_all(self):

        try:

            for record in self._session_scope.query(PERSON_SALUTATION):

                yield record.id, record.salutation
                    
        except AttributeError:

            print "select all, desired_trace",  format_exc(exc_info())

        return
Im nächsten Schritt habe ich in der start_all_selection()-Methode der MyCustomDialog()-Klasse eine Änderung vorgenommen.

Aus..

Code: Alles auswählen

    def start_all_selection(self):
 
        list_comboxes = self.findChildren(QComboBox)
 
        for combo_box in list_comboxes:
            combo_box.clear()
            self.start_thread(combo_box=combo_box)
... habe ich nun folgendes gemacht:

Code: Alles auswählen

    def start_all_selection(self):

        list_comboxes = self.findChildren(QComboBox)
        '''
            use one session per thread, share nothing between threads.  The  
            scoped_session should make this pretty straightforward.
        '''
        with self._session_scope as session:       
            for combo_box in list_comboxes:
                combo_box.clear()
                self.start_thread(combo_box=combo_box,session=session)
Hier wird einmal eine session erstellt, und nur diese wird an den Threads übergeben. Das heißt in meinem Fall, dass die session dann geschlossen wird, wenn die 8 Threads fertig sind. Solange bleibt der Rumpf der with-Anweisung am Leben.

Hier nun der komplette Quelltext, mit einigen englischen Kommentaren meinerseits:

Code: Alles auswählen

from PyQt4.QtCore import QObject, QThread, pyqtSignal, pyqtSlot, QTimer
from PyQt4.QtGui import QApplication, QPushButton, QVBoxLayout, QDialog, \
                        QComboBox, QLabel

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy import Table, Column, Integer, String, MetaData

from traceback import format_exc
from sys import exc_info, exit, argv
 
''' setting up root class for declarative declaration '''
Base = declarative_base()

class PERSON_SALUTATION(Base):

    __tablename__ = "person_salutation"

    id = Column(Integer, primary_key=True)
    salutation = Column(String(100), nullable=False, unique=True)


class MasterDataManipulation(object):

    def __init__(self, session_object=None):

    	self._session_scope = session_object

    def select_all(self):

        try:

            for record in self._session_scope.query(PERSON_SALUTATION):

                yield record.id, record.salutation
                    
        except AttributeError:

            print "select all, desired_trace",  format_exc(exc_info())

        return

class Worker(QObject):
 
    finish_progress = pyqtSignal()
    populate_item_signal = pyqtSignal(object, object)
   
    def __init__(self,
                 combo_box=None,
                 new_scope=None,
                 #master_data=None,
                 parent=None):
        QObject.__init__(self, parent)

        self.new_scope=new_scope

        #self.master_data_manipulation = MasterDataManipulation(session_object=self.new_scope)
        self.combo_box=combo_box

        ''' Create attributes '''
        self._run_semaphore = 1

        self._element = None

    def init_object(self):

        self.timer = QTimer()
        
        '''
            Storing new generator object, will reuse it.
            That means you have to create one generator.
        '''
        self.master_data_manipulation = MasterDataManipulation(session_object=self.new_scope)
        self._element = self.master_data_manipulation.select_all()

        # assoziiert select_all_data() mit TIMEOUT Ereignis
        self.timer.setSingleShot(False)
        self.timer.setInterval(1)
        self.timer.timeout.connect(self.populate_item)
        self.timer.start()
           
    def populate_item(self):
        try:

            if self._run_semaphore == 0:
    
                self._run_semaphore = 1

                raise StopIteration

            else:

                self.populate_item_signal.emit(next(self._element), self.combo_box)

        except StopIteration:

            print "StopIteration is raised"
            self.timer.stop()
       
    def stop(self):
        self.timer.stop()
        self._run_semaphore = 0
        self._element = None
        
class SessionScope(object):
    def __init__(self, dbms=None, dbdriver=None,
                 dbuser=None, dbuser_pwd=None,
                 db_server_host=None, dbport=None, db_name=None,
                 admin_database=None):
 
        self.dbms = dbms
        self.dbdriver = dbdriver
        self.dbuser = dbuser
        self.dbuser_pwd = dbuser_pwd
        self.db_server_host = db_server_host
        self.dbport = dbport
        self.db_name = db_name
        self.admin_database = admin_database
       
        url = '{}+{}://{}:{}@{}:{}/{}'.format(
           self.dbms, self.dbdriver, self.dbuser, self.dbuser_pwd, self.db_server_host, self.dbport, self.db_name)
 
        '''
           Currently the echo is turned on to see the auto-generated SQL.
 
           That is, the Engine is a factory for connections as well as a pool of connections,
           not the connection itself. When you say in this case close(),
           the connection is returned to the connection pool within the Engine, not actually closed.
 
           So the self._Engine will not use connection pool if you set poolclass=NullPool.
           So the connection (SQLAlchemy session) will close directly after session.close()
           that means, if you set poolclass=NullPool each call to close() will close the underlying DBAPI connection.
       '''
        self._Engine = create_engine(url, pool_size=20, encoding='utf8', echo=True)
 
        '''   Set up the session and store a sessionmaker for this db connection object  '''
 
        self.session = None

        ''' Create the session factory '''
        self._session_factory = sessionmaker(bind=self._Engine)
 
        ''' Create the scoped_session, now self._Session registry is established '''
        self._Session = scoped_session(sessionmaker(bind=self._Engine, expire_on_commit=False))
        #self._Session = sessionmaker(bind=self._Engine)

        ''' create tables '''
        Base.metadata.create_all(self._Engine)
 
    def __enter__(self):
        '''
           Now all calls to Session() will create a thread-local session.
           That means, you can now use self.session to run multiple queries, etc.
           The registry is *optionally* starts called upon explicitly to create
           a Session local to the thread and/or request. That why we return self.session
       '''
        self.session = self._Session()
        return self.session
 
    def __exit__(self, exception, exc_value, traceback):
 
        try:
            if exception:
 
                self.session.rollback()
            else:
 
                self.session.commit()
 
        finally:
 
            self.session.close()

class MyCustomDialog(QDialog):
 
    finish = pyqtSignal()
 
    def __init__(self, scoped_session=None, parent=None):
        QDialog.__init__(self, parent)

        self._session_scope = scoped_session

        self.master_data_manipulation = MasterDataManipulation(session_object=self._session_scope)

        self._list_threads = []

        self.init_ui()
        self.start_all_selection()

    def init_ui(self):

        layout = QVBoxLayout(self)
 
        self.combo_person_title = QComboBox(self)
        self.combo_person_salutation = QComboBox(self)
        self.combo_person_gender = QComboBox(self)
        self.combo_person_religion = QComboBox(self)
        self.combo_person_relationship_status = QComboBox(self)
        self.combo_person_nationality = QComboBox(self)
        self.combo_person_eye_color = QComboBox(self)
        self.combo_person_hair_color = QComboBox(self)

        self.pushButton_populate_combo = QPushButton("Re-populate", self)
        self.pushButton_stopp = QPushButton("Stopp", self)
        self.pushButton_close = QPushButton("Close", self)
        layout.addWidget(self.combo_person_title)
        layout.addWidget(self.combo_person_salutation)
        layout.addWidget(self.combo_person_gender)
        layout.addWidget(self.combo_person_religion)
        layout.addWidget(self.combo_person_nationality)
        layout.addWidget(self.combo_person_relationship_status)
        layout.addWidget(self.combo_person_eye_color)
        layout.addWidget(self.combo_person_hair_color)
        layout.addWidget(self.pushButton_populate_combo)
        layout.addWidget(self.pushButton_stopp)
        layout.addWidget(self.pushButton_close)

        self.pushButton_stopp.clicked.connect(self.on_finish)
        self.pushButton_populate_combo.clicked.connect(self.start_all_selection)
        self.pushButton_close.clicked.connect(self.close)

    def start_all_selection(self):

        list_comboxes = self.findChildren(QComboBox)

        '''
            use one session per thread, share nothing between threads.  The  
            scoped_session should make this pretty straightforward.
        '''
        with self._session_scope as session:
      
            for combo_box in list_comboxes:
                combo_box.clear()
                self.start_thread(combo_box=combo_box,session=session)
        
    def fill_combo_boxt(self, item, combo_box):
        id, text = item
        combo_box.addItem(text)
       
    def start_thread(self, combo_box=None, session=None):
        master_data_manipulation = MasterDataManipulation(session_object=self._session_scope)
        query_data=master_data_manipulation.select_all

        task_thread = QThread(self)
        task_thread.work = Worker(new_scope=session,
                                  combo_box=combo_box,)

        ''' We need to store threads '''
        self._list_threads.append((task_thread, task_thread.work))
        task_thread.work.moveToThread(task_thread)

        task_thread.work.populate_item_signal.connect(self.fill_combo_boxt)

        self.finish.connect(task_thread.work.stop)

        task_thread.started.connect(task_thread.work.init_object)

        task_thread.finished.connect(task_thread.deleteLater)

        ''' This will emit 'started' and start thread's event loop '''
        task_thread.start()

    @pyqtSlot()
    def abort_workers(self):
        self.finish.emit()
        for thread, _ in self._list_threads:
            ''' this will quit **as soon as thread event loop unblocks** '''
            thread.quit()

            ''' so you need to wait for it to *actually* quit'''
            thread.wait()

        self._list_threads[:] = []
 
    def on_finish(self):
         self.finish.emit()

    def closeEvent(self, event):
        ''' Re-implementaate to handle with created threads '''
        self.abort_workers()
        
        exit()

def populate_database(sess=None):

    try:
    
        with sess as session:

            salutations = [PERSON_SALUTATION(salutation="Mister"),
                           PERSON_SALUTATION(salutation="Miss"),
                           PERSON_SALUTATION(salutation="Lady"),
                           PERSON_SALUTATION(salutation="Ma'am"),
                           PERSON_SALUTATION(salutation="Sir"),
                           PERSON_SALUTATION(salutation="Queen"),
                           PERSON_SALUTATION(salutation="Grandma"),]
            session.add_all(salutations)
           
            session.commit()

    except SQLAlchemyError:
        print "SQLAlchemyError", format_exc(exc_info())
            
def main():
    dbms = raw_input('Enter database type: ')
    dbdriver = raw_input('Enter database driver: ')
    dbuser = raw_input('Enter user name: ')
    dbuser_pwd = raw_input('Enter user password: ')
    db_server_host = raw_input('Enter server host: ')
    dbport = raw_input('Enter port: ')
    db_name = raw_input('Enter database name: ')

    try:
        ''' create_engine and scoped_session once per process (per database). '''
        session_scope = SessionScope(dbms = dbms,
                                     dbdriver = dbdriver,
                                     dbuser = dbuser,
                                     dbuser_pwd = dbuser_pwd,
                                     db_server_host = db_server_host,
                                     dbport = dbport,
                                     db_name = db_name)

        answer = raw_input('Do you want to populate database? Type yes or no: ')

        if answer.lower() == 'yes':
            populate_database(sess=session_scope)

        app = QApplication(argv)
        window = MyCustomDialog(scoped_session = session_scope)
        window.show()
        exit(app.exec_())
    except TypeError:
        
        print "ERROR", format_exc(exc_info())
        
if __name__ == "__main__":
    main()
Woran ich jetzt denken muss, ist an die pool_size()-Methode, die ich der Engine übergeben muss. Denn Standard ist poolsize() auf 5 eingestellt. Dies ist weitestgehend ausreichend. Wenn man allerdings mit Threads arbeitet, können die 5 Verbindungen sehr knapp werden. Da muss ich mal sehen, wie niedrig ich diese Verbindungen halten kann.
BlackJack

@Sophus: Anmerkung zu den Quelltexten: Zeichenketten sind keine Kommentare.
Benutzeravatar
Sophus
User
Beiträge: 1109
Registriert: Freitag 25. April 2014, 12:46
Wohnort: Osnabrück

BlackJack hat geschrieben:@Sophus: Anmerkung zu den Quelltexten: Zeichenketten sind keine Kommentare.
Du meinst die ganzen DocStrings, die ich hier missbraucht habe? Ich muss mich entschuldigen. Irgendwie finde ich die "hübscher" als normale Kommentare, die mit # beginnen.
BlackJack

@Sophus: Da sind keine DocStrings. Also zwei sind technisch gesehen welche aber vom Inhalt her nicht. Wenn Dein Editor Kommentare nicht hübsch genug anzeigt, dann ist das eine Frage der Einstellungen des Editors.
Benutzeravatar
Sophus
User
Beiträge: 1109
Registriert: Freitag 25. April 2014, 12:46
Wohnort: Osnabrück

@BlackJack: Ich werde versuchen, mich zu bessern. 8)
Benutzeravatar
Sophus
User
Beiträge: 1109
Registriert: Freitag 25. April 2014, 12:46
Wohnort: Osnabrück

Mir ist soeben ein weiteres, unschönes Problem aufgefallen. Derzeit wird in meiner re-implementierten closeEven()t-Methode die exit()-Funktion des sys-Modules ausgeführt. Ich kann diese Funktion auch weglassen und mein Programm wird sauber geschlossen. Allerdings ist es dann nicht möglich, mit mit der Datenbank mittels des Programmes MySQL Workbench zu arbeiten. Das ist mir aufgefallen, als ich nach dem ausführen und normales Beenden meines Programmes die Tabelle löschen wollte. MySQL Workbench hat sich regelrecht aufgehängt. Erst als ich die exit()-Funktion angewendet habe, scheint alles rein zu sein. Allerdings verwirrt mich das. Denn durch meine Umstrukturierung sollte ja die session, die durch die with-Anweisung einmalig erstellt wird, und nach dem Ende aller Threads wieder freigegeben wird, und somit keine Verbindung zur Datenbank bestehen sollte. Irgendwas hält noch eine Verbindung zur Datenbank fest.

Aktualisierung:
Ich konnte das Problem lösen. Ich habe im SessionScope()-Objekt (Zeile 177 im letzten Beitrag) folgende Methode re-implementiert.

Code: Alles auswählen

    def disconnect(self):
        '''
            Make sure the dbconnection gets closed
        '''
        self.session = self._Session()

        '''
            .close() will give the connection back to the connection 
            pool of Engine and doesn't close the connection.
        '''
        self.session.close()

        '''
            dispose() will close all connections of the connection pool.
        '''
        self._Engine.dispose()

        self._Session = None
        self._Engine = None
In meinem Haupt-Thread wird auf das Session()-Objekt dann die disconnect()-Methode angewendet. Alle Verbindungen werden geschlossen, und zur Sicherheit werden die Referenzen von self._Session und self._Engine beseitigt. Nun brauche ich die exit()-Funktion nicht anwenden, und kann ganz klassisch das Programm beenden und nichts hält zur Datenbank fest. Ich kann direkt nach dem Beenden meines Programmes mit MySQL Workbench arbeiten.

Vollständigkeitshalber sieht mein SessionSpoce()-Objekt wie folgt aus:

Code: Alles auswählen

class SessionScope(object):
    def __init__(self, dbms=None, dbdriver=None,
                 dbuser=None, dbuser_pwd=None,
                 db_server_host=None, dbport=None, db_name=None,
                 admin_database=None):
 
        self.dbms = dbms
        self.dbdriver = dbdriver
        self.dbuser = dbuser
        self.dbuser_pwd = dbuser_pwd
        self.db_server_host = db_server_host
        self.dbport = dbport
        self.db_name = db_name
        self.admin_database = admin_database
       
        url = '{}+{}://{}:{}@{}:{}/{}'.format(
           self.dbms, self.dbdriver, self.dbuser, self.dbuser_pwd, self.db_server_host, self.dbport, self.db_name)
 
        '''
           Currently the echo is turned on to see the auto-generated SQL.
 
           That is, the Engine is a factory for connections as well as a pool of connections,
           not the connection itself. When you say in this case close(),
           the connection is returned to the connection pool within the Engine, not actually closed.
 
           So the self._Engine will not use connection pool if you set poolclass=NullPool.
           So the connection (SQLAlchemy session) will close directly after session.close()
           that means, if you set poolclass=NullPool each call to close() will close the underlying DBAPI connection.
       '''
        self._Engine = create_engine(url, pool_size=30, encoding='utf8', echo=True)
 
        '''
           Set up the session and store a sessionmaker for this db connection object
       '''
 
        self.session = None

        ''' Create the session factory '''
        self._session_factory = sessionmaker(bind=self._Engine)
 
        ''' Create the scoped_session, now self._Session registry is established '''
        self._Session = scoped_session(sessionmaker(bind=self._Engine, expire_on_commit=False))
        #self._Session = sessionmaker(bind=self._Engine)

        ''' create tables '''
        Base.metadata.create_all(self._Engine)
 
    def __enter__(self):
        '''
           Now all calls to Session() will create a thread-local session.
           That means, you can now use self.session to run multiple queries, etc.
           The registry is *optionally* starts called upon explicitly to create
           a Session local to the thread and/or request. That why we return self.session
       '''
        self.session = self._Session()
        return self.session
 
    def __exit__(self, exception, exc_value, traceback):
 
        try:
            if exception:
 
                self.session.rollback()
            else:
 
                self.session.commit()
 
        finally:
 
            self.session.close()

    def disconnect(self):
        '''
            Make sure the dbconnection gets closed
        '''
        self.session = self._Session()

        '''
            .close() will give the connection back to the connection 
            pool of Engine and doesn't close the connection.
        '''
        self.session.close()

        '''
            dispose() will close all connections of the connection pool.
        '''
        self._Engine.dispose()

        self._Session = None
        self._Engine = None
Benutzeravatar
Sophus
User
Beiträge: 1109
Registriert: Freitag 25. April 2014, 12:46
Wohnort: Osnabrück

Ich könnte schreien und heulen zugleich. Was bin ich verzweifelt. Meine letzte Lösung funktioniert nur bedingt. Wenn ich das Programm einmal starte, werden alle Anfragen wunderbar abgearbeitet. Alle QComboBox()-Objekte werden gefüllt. Auf der GUI sind insgesatm drei QPushButton()-Objekte. Klicke ich nun auf das QPushButton()-Objekt, mit der Beschriftung Re-populate, dann wird es richtig kritisch. Ich bekomme erst einmal eine ellenlange Fehlermeldung. Diese habe ich auf meinem BitBucket-Konto veröffentlicht. Dazu habe ich auch meinen Quelltext auf BitBucket ausgelagert. Hier: SQLAlchemy and QThread, weil der Code sonst den Rahmen sprengen würde. Der übersichtshalber habe ich das Programm in Modulen aufgeteilt. Gestartet wird das Programm mit dem main_program.py-Modul. In diesem Modul geht es auch um die GUI. Von dort aus werden die QThread()-Objekte verwaltet.

Hat jemand eine Ahnung, woran ich hier kläglich scheitere?
Melewo
User
Beiträge: 320
Registriert: Mittwoch 3. Mai 2017, 16:30

Schneidest Du eigentlich die Anfragen zur Kontrolle mit?
Ich meine, ich benutze nur MySQL, doch wenn ich mir nicht sicher war, ob die Anfragen gut aussehen oder ob ein Script unnötige Quit's verursacht, dann kommentiere ich für einen abschließenden Test eine log.txt ein (# entfernen) und die müsste ja dann bei Dir die letzte Abfrage enthalten, bei oder nach der ein Abbruch erfolgte.

Oder ich verstehe nichts, weil mir das alles zu undurchsichtig ist.
Benutzeravatar
Sophus
User
Beiträge: 1109
Registriert: Freitag 25. April 2014, 12:46
Wohnort: Osnabrück

@Melewo: Leider verstehe ich deine Anmerkung nicht. Was genau meinst du mit "mitscheiden"? Alles was ich mache, ist, dass ich mein SessionScope()-Objekt mit der with-Anweisung öffne und dann alle Threads nach und nach starte. Du kannst dir gern meinen Quelltext von BitBucket herunterladen, und es dir selbst anschauen. Beim ersten Laden klappt alles, beim Klicken auf die Schaltfläche "Re-populate" versagt alles komplett. Klicke ich nach dem Versagen noch einmal auf die gleiche Schaltfläche, dann klappt alles wieder wunderbar. Irgendwie klappt das re-populate nur bei jedem zweiten Klick.
Melewo
User
Beiträge: 320
Registriert: Mittwoch 3. Mai 2017, 16:30

Sophus hat geschrieben:Des Weiteren passiert auch hin und wieder mal, dass mir von seiten SQLAlchemy gesagt wird, dass die Verbindung geschlossen wurde. Dazu habe ich eine Fehlermeldung:
Ich bezog mich darauf und nein, ich benutze SQLAlchemy nicht und kann gut mit MySQL leben. Doch wenn ich da Abbrüche hätte, würde ich bei mir diese beiden Zeilen in der mysql/bin/my.ini einkommentieren und schauen, wie die Abfragen aussehen.

[codebox=text file=Unbenannt.txt]# general-log = 1
# general_log_file = "C:/xampp/mysql/anfragen.log"[/code]
Und wie das bei Dir aussieht, weiß ich nicht, verstehe ich nicht und habe auch nicht vor mir etwas zu installieren. Es sollte nur eine einfache Frage sein, ob Du die mitschneidest zur Kontrolle. Wobei ich ja eh nicht verstehe, wie bei Dir alles zusammenhängt.
BlackJack

@Melewo: Die Probleme liegen so wie es aussieht ja auf Python-Seite. Es gehen ja gar keine Anfragen raus, also wird man auf Datenbankseite nicht viel sehen.
Benutzeravatar
Sophus
User
Beiträge: 1109
Registriert: Freitag 25. April 2014, 12:46
Wohnort: Osnabrück

@BlackJack: Hast du eine Ahnung, wo es klemmen könnte?
DasIch
User
Beiträge: 2718
Registriert: Montag 19. Mai 2008, 04:21
Wohnort: Berlin

Überleg mal was passiert wenn zwei unterschiedliche Threads nacheinander SessionScope.__enter__() aufrufen bevor einer der beiden __exit__() aufruft.
Benutzeravatar
Sophus
User
Beiträge: 1109
Registriert: Freitag 25. April 2014, 12:46
Wohnort: Osnabrück

@DasIch: Nach drei Nächten bin ich auch auf diese Idee gekommen, Erst habe ich geglaubt, ich handhabe die Threads falsch, dann war ich im Glauben, dass ich die Sessions falsch verteile etc. Ich habe nochmals mein SessionScope()-Objekt hierher kopiert. Schaut man in den nachfolgenden Quelltext, so sehen wir in Zeile 56 und Zeile 59, dass in der magischen __enter__()-Methode zwei Session-Varianten zu sehen sind. Die Session, die ausgeklammert wurde, war diejenige Session, mit der ich bisher gearbeitet habe. Und genau da lag der Fehler. Ich habe die ganze Zeit mit der falschen Session gearbeitet. Die Session in Zeile 59 war eine ganz normale Session, die keineswegs thread-sicher ist. Die Session in Zeile 56 ist die Session die ich brauchte. Ich hatte einfach die Funktions-Klammern übersehen. Bis ich mich irgendwann fragte "Sag mal, arbeite ich mit der richtigen Session? und da sah ich, dass es die normale Session war. Also entfernte ich die Funktions-Klammern, et volà nun die jetztige Session durch die scoped_session()-Methode thread-sicher. Ich will nicht vorgreifen und meinen, dies sei die Lösung. Nach einigen Testläufen bekam ich keine Fehlermeldungen in dieser Richtung. Einzig und allein die Fehlermeldungen, dass irgendwann zu viele Verbindungen (Too many connections) vorhanden sind.

Code: Alles auswählen

class ManagedSessionScope(object):
    def __init__(self,
                 dbms=None,
                 dbdriver=None,
                 dbuser=None,
                 dbuser_pwd=None,
                 db_server_host=None,
                 dbport=None,
                 db_name=None,
                 verbose_echo=True,
                 admin_database=None):
 
        self.dbms = dbms
        self.dbdriver = dbdriver
        self.dbuser = dbuser
        self.dbuser_pwd = dbuser_pwd
        self.db_server_host = db_server_host
        self.dbport = dbport
        self.db_name = db_name
        self.admin_database = admin_database
        self.verbose_echo=verbose_echo
       
        url = '{}+{}://{}:{}@{}:{}/{}'.format(
           self.dbms, self.dbdriver, self.dbuser, self.dbuser_pwd, self.db_server_host, self.dbport, self.db_name)

        # Currently the echo is turned on to see the auto-generated SQL.

        # That is, the Engine is a factory for connections as well as a pool of connections,
        # not the connection itself. When you say in this case close(),
        # the connection is returned to the connection pool within the Engine, not actually closed.
 
        # So the self._Engine will not use connection pool if you set poolclass=NullPool.
        # So the connection (SQLAlchemy session) will close directly after session.close()
        # that means, if you set poolclass=NullPool each call to close() will close the underlying DBAPI connection.

        self._Engine = create_engine(self.url, pool_size=10, encoding='utf8', echo=self.verbose_echo)
       
        self.session = None
        
        # Set up the session and store a sessionmaker for this db connection object
        # Session registry is established

        self._Session = scoped_session(sessionmaker(bind=self._Engine))
        self._Session.remove()

        # I have to persist all tables and create them
        Base.metadata.create_all(self._Engine)


    def __enter__(self):
        # In this magic function all calls to Session() will create a thread-local session.
        # That means, you can now use self.session to run multiple queries, etc.
        # The registry is *optionally* starts called upon explicitly to create
        # a Session local to the thread and/or request. That why we return self.session
        
        self.session = self._Session # this is now a scoped session
                                     # sqlalchemy.orm.scoping.scoped_session
                                     
        #self.session = self._Session() # this is sqlalchemy.orm.session.Session

        return self.session, self
 
    def __exit__(self, exception, exc_value, traceback):

        try:
            if exception:
 
                self.session.rollback()
            else:
 
                self.session.commit()
 
        finally:
            self.session.close()


    def disconnect(self):
        ''' Make sure the dbconnection gets closed '''
        self.session = self._Session

        # close() will give the connection back to the connection 
        # pool of Engine and doesn't close the connection.
        self.session.close()


        # dispose() will close all connections of the connection pool.
        # Note that a new pool is created when you dispose the engine;
        # the database is not perminantly disconnected, but any open connections
        # are closed, and no new connections are drawn from the new pool by the dispose
        # operation itself.
        # That way the connection pool is flushed out and new connections begin

        self._Engine.dispose()

        self.session = None
        self._Engine = None
Benutzeravatar
snafu
User
Beiträge: 6731
Registriert: Donnerstag 21. Februar 2008, 17:31
Wohnort: Gelsenkirchen

scoped_session() bereitet ein aufrufbares und thread-sicheres Objekt vor. Immer dann wenn eine neue Session gestartet werden soll, ruft man das von scoped_session() gelieferte Objekt auf. Deine __enter__()-Methode sollte also sowas wie self._scoped_session() zurückgeben.
Benutzeravatar
Sophus
User
Beiträge: 1109
Registriert: Freitag 25. April 2014, 12:46
Wohnort: Osnabrück

@snafu: Meine jetzt aktualisierte __enter__()-Methode liefert die sqlalchemy.orm.scoping.scoped_session bei jedem Aufruf. Und diese Session wollte ich. Leider hatte ich die Funktionsklammern übersehen 8) Oder wolltest du mich auf einen weiteren Fehler hinweisen, den ich übersehen habe? Denn wenn ich die Funktionsklammern benutzte, dann bekomme ich wieder die normale Session, siehe Kommentar.
Benutzeravatar
Sophus
User
Beiträge: 1109
Registriert: Freitag 25. April 2014, 12:46
Wohnort: Osnabrück

@snafu: Ich muss mich nochmals melden. Meinst du etwa so? Zur Vereinfachung habe ich den gesamten Beispiel extrem gekürzt und ist damit nicht ausführbar und den Context-Manager habe ich mal eben umstrukturiert. Diese Umstrukturierung findest du in Zeile 252-280. Das eigentliche Herzstück meines Anliegen sind die Zeilen 198-207. Da öffne ich den Context Manager einmalig, und verteile die sessions an die Threads, die durch die For-Schleife nach und nach geöffnet werden. Ich hoffe, dass ich bis hierher richtig verfahre. Denn ich habe mich weitestgehend an die SQLAlchemy-Dokumentation gehalten: When do I construct a Session, when do I commit it, and when do I close it?. In diesem Beispiel werden insgesamt drei Beispiele gezeigt, eines davon sollte man auf keinen Fall verwenden, und die anderen beiden Beispiel zeigen, wie man die Session handhabt. Ich habe mich für das zweite Beispiel entschieden - sieht übersichtlicher aus. In in der MasterDataManipulation()-Klasse (Zeile 25-61) findest du derzeit einige Abfragen, für jede Kategorie. Später werden in dieser Klasse mehrere Abfragen vorhanden sein. Und die Worker()-Klasse (Zeilen 63-158) dient als Sub-Klasse, welche später zum QThread() hinzugefügt wird.

Und wie du in der session_scope()-Funtion (Zeilen 252-280) siehst, habe ich die Scoped_session, die durch die scoped_session aufbereitet wird, nun mit der Funktionsklammer aufgerufen, gleich im Zuge der yield. Meinst du das etwa so?

Code: Alles auswählen

# -*- coding: cp1252 -*-
import sys

from contextlib import contextmanager

# Here we have to import all PyQt stuff for working with GUI

import sqlalchemy
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.exc import SQLAlchemyError, OperationalError, DisconnectionError
from sqlalchemy import exc 
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy import Table, Column, Integer, String, MetaData
from sqlalchemy import event

from traceback import format_exc
from sys import exc_info

''' setting up root class for declarative declaration '''
Base = declarative_base()

# Here you can see the models of the tables

class MasterDataManipulation(object):

    def __init__(self, session_object=None):

    	self._session = session_object
                  
    def select_all(self, category):

        dict_store_session_query = {'person_gender':               lambda: self._session.query(PERSON_GENDER),
                                     'person_nationality':          lambda: self._session.query(PERSON_NATIONALITY),
                                     'person_salutation':           lambda: self._session.query(PERSON_SALUTATION),
                                     'person_title':                lambda: self._session.query(PERSON_TITLE),
                                     'person_hair_color':           lambda: self._session.query(PERSON_HAIR_COLOR),
                                     'person_eye_color':            lambda: self._session.query(PERSON_EYE_COLOR),
                                     'person_religion':             lambda: self._session.query(PERSON_RELIGION),
                                     'person_relationship_status':  lambda: self._session.query(PERSON_RELATIONSHIP_STATUS)}      

        for record in dict_store_session_query[category]():
            if category == 'person_gender':
                yield record.id, record.gender
            if category == 'person_nationality':
                yield record.id, record.nationality
            if category == 'person_salutation':
                yield record.id, record.salutation
            if category == 'person_title':
                yield record.id, record.title
            if category == 'person_hair_color':
                yield record.id, record.hair_color
            if category == 'person_eye_color':
                yield record.id, record.eye_color
            if category == 'person_religion':
                yield record.id, record.religion
            if category == 'person_relationship_status':
                yield record.id, record.relationship_status


        return

class Worker(QObject):
 
    finish_progress = pyqtSignal()
    populate_item_signal = pyqtSignal(object, object)

    stop_loop = pyqtSignal(unicode, unicode)
   
    def __init__(self,
                 combo_box=None,
                 new_scope=None,
                 category=None,
                 time_interval=None,
                 operation=None,
                 parent=None):
        QObject.__init__(self, parent)

        self.new_scope=new_scope
        self.category = category
        self.time_interval=time_interval
        self.operation = operation

        if self.time_interval is None:
            self.time_interval = 100

        #self.master_data_manipulation = MasterDataManipulation(session_object=self.new_scope)
        self.combo_box=combo_box

        ''' Create attributes '''
        self._run_semaphore = 1

        self._element = None

    def init_object(self):

        if self.operation == "select":

            self.timer = QTimer()
            
            '''
                Storing new generator object, will reuse it.
                That means you have to create one generator.
            '''
            master_data_manipulation = MasterDataManipulation(session_object=self.new_scope)
            query_data=master_data_manipulation.select_all
            self._element = query_data(self.category)

            # assoziiert select_all_data() mit TIMEOUT Ereignis
            self.timer.setSingleShot(False)
            self.timer.setInterval(int(self.time_interval))
            self.timer.timeout.connect(self.populate_item)
            self.timer.start()

        if self.operation == 'population':
            print "hier"
            master_data_manipulation = MasterDataManipulation(session_object=self.new_scope)
            master_data_manipulation.populate_data()
           
    def populate_item(self):
        try:

            if self._run_semaphore == 0:
    
                self._run_semaphore = 1

                raise StopIteration

            else:

                self.populate_item_signal.emit(next(self._element), self.combo_box)

        except StopIteration:

            self.finish_progress.emit()
            self.timer.stop()

        except SQLAlchemyError as err:
            server_said = "The server said: {server_said}".format(server_said=str(err[0]))
            #print "SQLAlchemyError, populate_item", format_exc(exc_info())
            desired_trace = format_exc(exc_info())
            self.stop_loop.emit(desired_trace, server_said)
            self.finish_progress.emit()
            self.timer.stop()

        except OperationalError as err:
            server_said = "The server said: {server_said}".format(server_said=str(err[0]))
            #print "OperationalError, populate_item", format_exc(exc_info())
            desired_trace = format_exc(exc_info())
            self.timer.stop()
            self.stop_loop.emit(desired_trace, server_said)
            self.finish_progress.emit()
            ##self.timer.stop()
       
    def stop(self):
        self.timer.stop()
        self._run_semaphore = 0
        self._element = None
         
[...]

class MyCustomDialog(QDialog):
 
    finish = pyqtSignal()
 
    def __init__(self, url=None, parent=None):
        QDialog.__init__(self, parent)

        self._url = url

[...]

    def start_all_selection(self):

        list_tuple = [
                    ("person_salutation", self.combo_person_salutation), 
                    ("person_title", self.combo_person_title), 
                    ("person_gender", self.combo_person_gender), 
                    ("person_religion", self.combo_person_religion),
                    ("person_eye_color", self.combo_person_eye_color),
                    ("person_hair_color", self.combo_person_hair_color),
                    ("person_relationship_status", self.combo_person_relationship_status),
                    ("person_nationality", self.combo_person_nationality)
                       ]

        '''
            use one session per thread, share nothing between threads.  The  
            scoped_session should make this pretty straightforward.
        '''
        try:
            # I know each session is thread-local, that means there is a separate session for each thread.
            # So I decide to pass some instances/sessions to another thread,
            # I think they will become "detached" from the session.
            # According to documentation we should use different instance of engine for every subprocess,
            # in our case we have one engine for all subprocesses, because connection pool between subprocesses
            # cannot be shared (as i understand).
            
            with session_scope(dburi=self._url, verbose=False) as session:
                for category, combobox in list_tuple:

                    combobox.clear()

                    self.start_thread(combo_box=combobox,
                                      session=session,
                                      time_interval=10,
                                      operation='select',
                                      category=category)

        except SQLAlchemyError as err:
            # do stuff with this error

        except OperationalError as OpErr:
            # do stuff with this error

       
    def start_thread(self,
                     combo_box=None,
                     session=None,
                     time_interval=None,
                     operation=None,
                     category=None):

        task_thread = QThread(self)

        task_thread.work = Worker(new_scope=session,
                                  time_interval=time_interval,
                                  category=category,
                                  operation=operation,
                                  combo_box=combo_box)

        ''' We need to store threads '''
        #self._list_threads.append(task_thread)  
        task_thread.work.moveToThread(task_thread)
        
        task_thread.work.finish_progress.connect(task_thread.quit)

        task_thread.work.stop_loop.connect(self.message_out)

        task_thread.work.populate_item_signal.connect(self.fill_combo_boxt)

        self.finish.connect(task_thread.work.stop)

        task_thread.started.connect(task_thread.work.init_object)

        task_thread.finished.connect(task_thread.deleteLater)

        ''' This will emit 'started' and start thread's event loop '''
        task_thread.start()

[...]

@contextmanager
def session_scope(dburi=None, echo_verbose=True):
    """
        Provide a transactional scope around a series of operations.
        Creates a context with an open SQLAlchemy session.
    """                                    
    engine = create_engine(dburi,
                           pool_size=10,
                           max_overflow=10,
                           pool_timeout=60,
                           echo=echo_verbose)

    # create a session maker for factory
    session_factory = sessionmaker(bind=engine)
    
    # scoped_session create one connection per each thread
    #Session  = scoped_session(sessionmaker(bind=engine))#, twophase=True))
    Scoped_session = scoped_session(session_factory)
    
    # Now all calls to Session() will create a thread-local session
    try:
        yield Scoped_session()
        Scoped_session.commit()
    except:
        Scoped_session.rollback()
        raise
    finally:
        Scoped_session.close()
[...]
Antworten