SQLAlchemy: Arbeiten mit mehreren QThreads?

Installation und Anwendung von Datenbankschnittstellen wie SQLite, PostgreSQL, MySQL, der DB-API 2.0 und sonstigen Datenbanksystemen.
Benutzeravatar
Sophus
User
Beiträge: 1035
Registriert: Freitag 25. April 2014, 12:46
Wohnort: Osnabrück

SQLAlchemy: Arbeiten mit mehreren QThreads?

Beitragvon Sophus » Dienstag 8. August 2017, 22:13

Hallo Leute,

der nachfolgende, ausführbare Quelltext wirkt auf dem ersten Blick ein wenig "groß". Aber keine Bange, bestimmte Aspekte könnt ihr getrost ignorieren. Warum ich das Programm so groß gestaltet habe? Auf diese Weise möchte ich meinem Problem ziemlich nahe kommen

Zeilen, die ihr ignorieren könnt:
- Zeile 18 - 72: hierbei handelt es sich nur um Tabellen
- Zeile 180 - 224: hier handelt es sich von einer Session, die ich in ein Context-Manager umgerüstet habe.
- Zeile 336 - 413: hier werden nur Daten eingetragen, wenn der Anwender es sich wünscht. Mit gefüllten Tabellen lässt sich besser arbeiten, als mit leeren. :)
- Zeile 415 - 447: der übliche Start eines programm

Was erwarte ich?
Auf der derzeitigen GUI sind insgesamt acht QComboBox(), für jede Kategorie eine QComboBox().
Wenn ihr einen Blick in die start_all_selection()-Methode (Zeile 272) der MyCustomDialog()-Klasse werft, dann seht ihr, dass ich
in Zeile 295-296 vor habe mehrere Threads zu starten. Warum ich der start_thread()-Methode die QComboBox() und bestimmte String übergebe, wird erst in der select_all()-Methode (Zeile 80) der MasterDataManipulation()-Klasse deutlich. Meine Ausgangsidee war, dass ich die Session einmal mittels der with-Anweisung öffne, nach bestimmten Kategorien suche, und dann die Abfrage erstelle, damit diese dann später über die die jeweilige QComboBox() ausgegeben werden. Noch einmal kurz zurück. Wenn ihr einen Blick in der start_thread()-Methode (Zeile 305) der MyCustomDialog()-Klasse werft, seht ihr, dass nicht nur jedesmal ein neuer Qthread und Worker() erstellt wird, sondern auch gleich dazu die MasterDataManipulation()-Klasse, der die aktuelle Session übergeben wird.
Auf diese Weise habe ich erwartet, dass bei jeder Abfrage eine neue Session eröffnet wird, und dann wieder geschlossen wird. Das heißt, die Abfrage für die Kategorie Gender wird eine neue Session geöffnet, dann für die Kategorie Religion etc. Also für jede Kategorie nicht nur einen neuen Thread, sondern auch neue Sitzung.

Problem
So wie der Quelltext jetzt ist, funktioniert es einwandfrei - zumindest augenscheinlich. Jetzt gehen wir in die Worker()-Klasse und ändern in Zeile 152 den Interval des QTimer() von derzeit 1000 auf 1. Wir wollen ja, dass das Programm ein bisschen zügiger die Daten holt. Wenn ich dann das Programm ausführe, verhält sich mein Programm ein wenig merkwürdig, als ob es ein Eigenleben besitzt.

Manchmal kommt es vor, dass zwar keine Ausnahmen geworfen werden, jedoch werden NICHT ALLE QComboBox()-Objekte gefüllt. Mal bleibt nur eine leer oder auch mehrere.

Des Weiteren passiert auch hin und wieder mal, dass mir von seiten SQLAlchemy gesagt wird, dass die Verbindung geschlossen wurde. Dazu habe ich eine Fehlermeldung:

Traceback (most recent call last):
File "D:\Dan\Python\Xarphus\xarphus\subclass_master_data_load_data_item.py", line 151, in populate_item
self.populate_item_signal.emit(next(self._element))
File "D:\Dan\Python\Xarphus\xarphus\core\manage_data_manipulation_master_data.py", line 232, in select_all
yield record.id, record.relationship
File "D:\Dan\Python\Xarphus\xarphus\core\manage_db_connection.py", line 245, in __exit__
self.session.commit()
File "C:\Python27\lib\site-packages\sqlalchemy\orm\session.py", line 906, in commit
self.transaction.commit()
File "C:\Python27\lib\site-packages\sqlalchemy\orm\session.py", line 465, in commit
t[1].commit()
File "C:\Python27\lib\site-packages\sqlalchemy\engine\base.py", line 1632, in commit
self._do_commit()
File "C:\Python27\lib\site-packages\sqlalchemy\engine\base.py", line 1663, in _do_commit
self.connection._commit_impl()
File "C:\Python27\lib\site-packages\sqlalchemy\engine\base.py", line 726, in _commit_impl
self.connection._reset_agent is self.__transaction:
File "C:\Python27\lib\site-packages\sqlalchemy\engine\base.py", line 351, in connection
self._handle_dbapi_exception(e, None, None, None, None)
File "C:\Python27\lib\site-packages\sqlalchemy\engine\base.py", line 1405, in _handle_dbapi_exception
util.reraise(*exc_info)
File "C:\Python27\lib\site-packages\sqlalchemy\engine\base.py", line 349, in connection
return self._revalidate_connection()
File "C:\Python27\lib\site-packages\sqlalchemy\engine\base.py", line 429, in _revalidate_connection
raise exc.ResourceClosedError("This Connection is closed")
ResourceClosedError: This Connection is closed


  1. import sys
  2.  
  3. from PyQt4.QtCore import QObject, QThread, pyqtSignal, pyqtSlot, QTimer
  4. from PyQt4.QtGui import QApplication, QPushButton, QVBoxLayout, QDialog, \
  5.                         QComboBox, QLabel
  6.  
  7. from sqlalchemy.ext.declarative import declarative_base
  8. from sqlalchemy import create_engine
  9. from sqlalchemy.exc import SQLAlchemyError
  10. from sqlalchemy.orm import sessionmaker, scoped_session
  11. from sqlalchemy import Table, Column, Integer, String, MetaData
  12.  
  13. from traceback import format_exc
  14. from sys import exc_info
  15.  
  16. Base = declarative_base()
  17.  
  18. class PERSON_GENDER(Base):
  19.  
  20.     __tablename__ = "person_gender"
  21.  
  22.     id = Column(Integer, primary_key=True)
  23.     gender = Column(String(50), nullable=False, unique=True)
  24.  
  25. class PERSON_NATIONALITY(Base):
  26.  
  27.     __tablename__ = "person_nationality"
  28.  
  29.     id = Column(Integer, primary_key=True)
  30.     nationality = Column(String(100), nullable=False, unique=True)
  31.  
  32. class PERSON_SALUTATION(Base):
  33.  
  34.     __tablename__ = "person_salutation"
  35.  
  36.     id = Column(Integer, primary_key=True)
  37.     salutation = Column(String(50), nullable=False, unique=True)
  38.  
  39. class PERSON_TITLE(Base):
  40.  
  41.     __tablename__ = "person_title"
  42.  
  43.     id = Column(Integer, primary_key=True)
  44.     title = Column(String(50), nullable=False, unique=True)
  45.  
  46. class PERSON_HAIR_COLOR(Base):
  47.  
  48.     __tablename__ = "person_hair_color"
  49.  
  50.     id = Column(Integer, primary_key=True)
  51.     hair_color = Column(String(50), nullable=False, unique=True)
  52.  
  53. class PERSON_EYE_COLOR(Base):
  54.  
  55.     __tablename__ = "person_eye_color"
  56.  
  57.     id = Column(Integer, primary_key=True)
  58.     eye_color = Column(String(50), nullable=False, unique=True)
  59.  
  60. class PERSON_RELIGION(Base):
  61.  
  62.     __tablename__ = "person_religion"
  63.  
  64.     id = Column(Integer, primary_key=True)
  65.     religion = Column(String(50), nullable=False, unique=True)
  66.  
  67. class PERSON_RELATIONSHIP_STATUS(Base):
  68.  
  69.     __tablename__ = "person_relationship_status"
  70.  
  71.     id = Column(Integer, primary_key=True)
  72.     relationship_status = Column(String(100), nullable=False, unique=True)
  73.  
  74. class MasterDataManipulation(object):
  75.  
  76.     def __init__(self, session_object=None):
  77.  
  78.         self._session_scope = session_object
  79.  
  80.     def select_all(self, category):
  81.  
  82.         try:
  83.             with self._session_scope as session:
  84.  
  85.                 dict_store_session_query = {'person_gender':               lambda: session.query(PERSON_GENDER),
  86.                                              'person_nationality':          lambda: session.query(PERSON_NATIONALITY),
  87.                                              'person_salutation':           lambda: session.query(PERSON_SALUTATION),
  88.                                              'person_title':                lambda: session.query(PERSON_TITLE),
  89.                                              'person_hair_color':           lambda: session.query(PERSON_HAIR_COLOR),
  90.                                              'person_eye_color':            lambda: session.query(PERSON_EYE_COLOR),
  91.                                              'person_religion':             lambda: session.query(PERSON_RELIGION),
  92.                                              'person_relationship_status':  lambda: session.query(PERSON_RELATIONSHIP_STATUS)}
  93.  
  94.                 for record in dict_store_session_query[category]():
  95.                     if category == 'person_gender':
  96.                         yield record.id, record.gender
  97.                     if category == 'person_nationality':
  98.                         yield record.id, record.nationality
  99.                     if category == 'person_salutation':
  100.                         yield record.id, record.salutation
  101.                     if category == 'person_title':
  102.                         yield record.id, record.title
  103.                     if category == 'person_hair_color':
  104.                         yield record.id, record.hair_color
  105.                     if category == 'person_eye_color':
  106.                         yield record.id, record.eye_color
  107.                     if category == 'person_religion':
  108.                         yield record.id, record.religion
  109.                     if category == 'person_relationship_status':
  110.                         yield record.id, record.relationship_status
  111.  
  112.         except AttributeError:
  113.  
  114.             print "select all, desired_trace",  format_exc(exc_info())
  115.  
  116.         return
  117.  
  118. class Worker(QObject):
  119.  
  120.     finish_progress = pyqtSignal()
  121.     populate_item_signal = pyqtSignal(object, object, object)
  122.    
  123.     def __init__(self,
  124.                  category=None,
  125.                  combo_box=None,
  126.                  query_data=None,
  127.                  parent=None):
  128.         QObject.__init__(self, parent)
  129.  
  130.         self.category = category
  131.         self.query_data = query_data
  132.         self.combo_box=combo_box
  133.  
  134.         ''' Create attributes '''
  135.         self._run_semaphore = 1
  136.  
  137.     def init_object(self):
  138.  
  139.         '''
  140.            Storing new generator object, will reuse it.
  141.            That means you have to create one generator.
  142.        '''
  143.         self._element = self.query_data(self.category)
  144.  
  145.         self.timer = QTimer()
  146.  
  147.         # assoziiert select_all_data() mit TIMEOUT Ereignis
  148.         self.timer.setSingleShot(False)
  149.         self.timer.setInterval(1000)
  150.         self.timer.timeout.connect(self.populate_item)
  151.         self.timer.start()
  152.        
  153.     def populate_item(self):
  154.         try:
  155.  
  156.             if self._run_semaphore == 0:
  157.    
  158.                 self._run_semaphore = 1
  159.  
  160.                 raise StopIteration
  161.  
  162.             else:
  163.  
  164.                 self.populate_item_signal.emit(next(self._element), self.category, self.combo_box)
  165.  
  166.         except StopIteration:
  167.  
  168.             print "StopIteration is raised"
  169.  
  170.             self.timer.stop()
  171.        
  172.     def stop(self):
  173.         self._run_semaphore == 0
  174.         #self.timer.stop()
  175.        
  176.        
  177. class SessionScope(object):
  178.     def __init__(self, dbms=None, dbdriver=None,
  179.                  dbuser=None, dbuser_pwd=None,
  180.                  db_server_host=None, dbport=None, db_name=None,
  181.                  admin_database=None):
  182.  
  183.         self.dbms = dbms
  184.         self.dbdriver = dbdriver
  185.         self.dbuser = dbuser
  186.         self.dbuser_pwd = dbuser_pwd
  187.         self.db_server_host = db_server_host
  188.         self.dbport = dbport
  189.         self.db_name = db_name
  190.         self.admin_database = admin_database
  191.        
  192.         url = '{}+{}://{}:{}@{}:{}/{}'.format(
  193.            self.dbms, self.dbdriver, self.dbuser, self.dbuser_pwd, self.db_server_host, self.dbport, self.db_name)
  194.  
  195.         self._Engine = create_engine(url, encoding='utf8', echo=True)
  196.  
  197.         self.session = None
  198.  
  199.         self._session_factory = sessionmaker(bind=self._Engine)
  200.  
  201.         self._Session = scoped_session(sessionmaker(bind=self._Engine, expire_on_commit=False))
  202.  
  203.         Base.metadata.create_all(self._Engine)
  204.  
  205.     def __enter__(self):
  206.         self.session = self._Session()
  207.         return self.session
  208.  
  209.     def __exit__(self, exception, exc_value, traceback):
  210.  
  211.         try:
  212.             if exception:
  213.  
  214.                 self.session.rollback()
  215.             else:
  216.  
  217.                 self.session.commit()
  218.  
  219.         finally:
  220.  
  221.             self.session.close()
  222.  
  223. class MyCustomDialog(QDialog):
  224.  
  225.     finish = pyqtSignal()
  226.  
  227.     def __init__(self, scoped_session=None, parent=None):
  228.         QDialog.__init__(self, parent)
  229.  
  230.         self._session_scope = scoped_session
  231.  
  232.         self._list_threads = []
  233.  
  234.         self.init_ui()
  235.         self.start_all_selection()
  236.  
  237.     def init_ui(self):
  238.  
  239.         layout = QVBoxLayout(self)
  240.  
  241.         self.combo_person_title = QComboBox(self)
  242.         self.combo_person_salutation = QComboBox(self)
  243.         self.combo_person_gender = QComboBox(self)
  244.         self.combo_person_religion = QComboBox(self)
  245.         self.combo_person_relationship_status = QComboBox(self)
  246.         self.combo_person_nationality = QComboBox(self)
  247.         self.combo_person_eye_color = QComboBox(self)
  248.         self.combo_person_hair_color = QComboBox(self)
  249.  
  250.         self.pushButton_populate_combo = QPushButton("Re-populate", self)
  251.         self.pushButton_stopp = QPushButton("Stopp", self)
  252.         self.pushButton_close = QPushButton("Close", self)
  253.         layout.addWidget(self.combo_person_title)
  254.         layout.addWidget(self.combo_person_salutation)
  255.         layout.addWidget(self.combo_person_gender)
  256.         layout.addWidget(self.combo_person_religion)
  257.         layout.addWidget(self.combo_person_nationality)
  258.         layout.addWidget(self.combo_person_relationship_status)
  259.         layout.addWidget(self.combo_person_eye_color)
  260.         layout.addWidget(self.combo_person_hair_color)
  261.         layout.addWidget(self.pushButton_populate_combo)
  262.         layout.addWidget(self.pushButton_stopp)
  263.         layout.addWidget(self.pushButton_close)
  264.  
  265.         self.pushButton_stopp.clicked.connect(self.on_finish)
  266.         self.pushButton_populate_combo.clicked.connect(self.start_all_selection)
  267.         self.pushButton_close.clicked.connect(self.close)
  268.  
  269.     def start_all_selection(self):
  270.  
  271.         list_comboxes = self.findChildren(QComboBox)
  272.  
  273.         for combox in list_comboxes:
  274.             combox.clear()
  275.        
  276.         list_start_threads = [('person_gender',self.combo_person_gender),
  277.                               ('person_nationality', self.combo_person_nationality),
  278.                               ('person_salutation', self.combo_person_salutation),
  279.                               ('person_title', self.combo_person_title),
  280.                               ('person_hair_color',self.combo_person_hair_color),
  281.                               ('person_eye_color', self.combo_person_eye_color),
  282.                               ('person_religion', self.combo_person_religion),
  283.                               ('person_relationship_status', self.combo_person_relationship_status)]
  284.        
  285.         for category, combo_box in list_start_threads:
  286.             self.start_thread(category=category, combo_box=combo_box)
  287.        
  288.     def fill_combo_boxt(self, item, category, combo_box):
  289.         _, text = item
  290.         combo_box.addItem(text)
  291.        
  292.     def on_label(self, i):
  293.          self.label.setText("Result: {}".format(i))
  294.        
  295.     def start_thread(self, category=None, combo_box=None):
  296.         master_data_manipulation = MasterDataManipulation(session_object=self._session_scope)
  297.         query_data=master_data_manipulation.select_all
  298.  
  299.         task_thread = QThread(self)
  300.         task_thread.work = Worker(query_data=query_data,
  301.                                   combo_box=combo_box,
  302.                                   category=category)
  303.  
  304.         ''' We need to store threads '''
  305.         self._list_threads.append(task_thread)  
  306.         task_thread.work.moveToThread(task_thread)
  307.  
  308.         task_thread.work.populate_item_signal.connect(self.fill_combo_boxt)
  309.  
  310.         self.finish.connect(task_thread.work.stop)
  311.  
  312.         task_thread.started.connect(task_thread.work.init_object)
  313.  
  314.         task_thread.finished.connect(task_thread.deleteLater)
  315.  
  316.         ''' This will emit 'started' and start thread's event loop '''
  317.         task_thread.start()
  318.  
  319.     @pyqtSlot()
  320.     def abort_workers(self):
  321.         self.finish.emit()
  322.         for thread in self._list_threads:
  323.             ''' this will quit **as soon as thread event loop unblocks** '''
  324.             thread.quit()
  325.  
  326.             ''' so you need to wait for it to *actually* quit'''
  327.             thread.wait()
  328.  
  329.     def on_finish(self):
  330.          self.finish.emit()
  331.  
  332.     def closeEvent(self, event):
  333.         ''' Re-implementaate to handle with created threads '''
  334.         self.abort_workers()
  335.  
  336. def populate_database(sess=None):
  337.  
  338.     try:
  339.    
  340.         with sess as session:
  341.             genders = [PERSON_GENDER(gender="male"),
  342.                        PERSON_GENDER(gender="female"),
  343.                        PERSON_GENDER(gender="hybrid"),
  344.                        PERSON_GENDER(gender="trans")]
  345.             session.add_all(genders)
  346.  
  347.             nationalitys = [PERSON_NATIONALITY(nationality="german"),
  348.                            PERSON_NATIONALITY(nationality="english"),
  349.                            PERSON_NATIONALITY(nationality="french"),
  350.                            PERSON_NATIONALITY(nationality="spanish"),
  351.                            PERSON_NATIONALITY(nationality="greek"),
  352.                            PERSON_NATIONALITY(nationality="mexican"),
  353.                            PERSON_NATIONALITY(nationality="sweden"),]
  354.             session.add_all(nationalitys)
  355.  
  356.             salutations = [PERSON_SALUTATION(salutation="Mister"),
  357.                            PERSON_SALUTATION(salutation="Miss"),
  358.                            PERSON_SALUTATION(salutation="Lady"),
  359.                            PERSON_SALUTATION(salutation="Ma'am"),
  360.                            PERSON_SALUTATION(salutation="Sir"),
  361.                            PERSON_SALUTATION(salutation="Queen"),
  362.                            PERSON_SALUTATION(salutation="Grandma"),]
  363.             session.add_all(salutations)
  364.  
  365.             titles = [PERSON_TITLE(title="Prof."),
  366.                       PERSON_TITLE(title="Dr."),
  367.                       PERSON_TITLE(title="Sir"),
  368.                       PERSON_TITLE(title="B.A."),
  369.                       PERSON_TITLE(title="M.A."),
  370.                       PERSON_TITLE(title="Bishop"),
  371.                       PERSON_TITLE(title="God"),]
  372.             session.add_all(titles)
  373.  
  374.             hair_colors = [PERSON_HAIR_COLOR(hair_color="blond."),
  375.                            PERSON_HAIR_COLOR(hair_color="gray"),
  376.                            PERSON_HAIR_COLOR(hair_color="blue"),
  377.                            PERSON_HAIR_COLOR(hair_color="white"),
  378.                            PERSON_HAIR_COLOR(hair_color="black"),
  379.                            PERSON_HAIR_COLOR(hair_color="violet"),
  380.                            PERSON_HAIR_COLOR(hair_color="brunette"),]
  381.             session.add_all(hair_colors)
  382.  
  383.             eye_colors = [PERSON_EYE_COLOR(eye_color="blue."),
  384.                           PERSON_EYE_COLOR(eye_color="blue-gray"),
  385.                           PERSON_EYE_COLOR(eye_color="green"),
  386.                           PERSON_EYE_COLOR(eye_color="white"),
  387.                           PERSON_EYE_COLOR(eye_color="black"),
  388.                           PERSON_EYE_COLOR(eye_color="violet"),
  389.                           PERSON_EYE_COLOR(eye_color="brunette"),]
  390.             session.add_all(eye_colors)
  391.  
  392.             religions = [PERSON_RELIGION(religion="Catholic."),
  393.                          PERSON_RELIGION(religion="Protestant"),
  394.                          PERSON_RELIGION(religion="Jew"),
  395.                          PERSON_RELIGION(religion="Muslim"),
  396.                          PERSON_RELIGION(religion="Islam"),
  397.                          PERSON_RELIGION(religion="Hindu"),
  398.                          PERSON_RELIGION(religion="Buddha"),]
  399.             session.add_all(religions)
  400.  
  401.             relationship_status = [PERSON_RELATIONSHIP_STATUS(relationship_status="Single."),
  402.                                    PERSON_RELATIONSHIP_STATUS(relationship_status="In a relationship"),
  403.                                    PERSON_RELATIONSHIP_STATUS(relationship_status="Married"),
  404.                                    PERSON_RELATIONSHIP_STATUS(relationship_status="In a open relationship"),
  405.                                    PERSON_RELATIONSHIP_STATUS(relationship_status="Engaged"),
  406.                                    PERSON_RELATIONSHIP_STATUS(relationship_status="Divorced"),
  407.                                    PERSON_RELATIONSHIP_STATUS(relationship_status="Separate"),]
  408.             session.add_all(relationship_status)
  409.  
  410.             session.commit()
  411.  
  412.     except SQLAlchemyError:
  413.         print "SQLAlchemyError", format_exc(exc_info())
  414.            
  415. def main():
  416.     dbms = raw_input('Enter database type: ')
  417.     dbdriver = raw_input('Enter database driver: ')
  418.     dbuser = raw_input('Enter user name: ')
  419.     dbuser_pwd = raw_input('Enter user password: ')
  420.     db_server_host = raw_input('Enter server host: ')
  421.     dbport = raw_input('Enter port: ')
  422.     db_name = raw_input('Enter database name: ')
  423.  
  424.     try:
  425.         session_scope = SessionScope(dbms = dbms,
  426.                                      dbdriver = dbdriver,
  427.                                      dbuser = dbuser,
  428.                                      dbuser_pwd = dbuser_pwd,
  429.                                      db_server_host = db_server_host,
  430.                                      dbport = dbport,
  431.                                      db_name = db_name)
  432.  
  433.         answer = raw_input('Do you want to populate database? Type yes or no: ')
  434.        
  435.         if answer.lower() == 'yes':
  436.             populate_database(sess=session_scope)
  437.  
  438.         app = QApplication(sys.argv)
  439.         window = MyCustomDialog(scoped_session = session_scope)
  440.         window.show()
  441.         sys.exit(app.exec_())
  442.     except TypeError:
  443.        
  444.         print "ERROR", format_exc(exc_info())
  445.        
  446. if __name__ == "__main__":
  447.     main()
Benutzeravatar
Sophus
User
Beiträge: 1035
Registriert: Freitag 25. April 2014, 12:46
Wohnort: Osnabrück

Re: SQLAlchemy: Arbeiten mit mehreren QThreads?

Beitragvon Sophus » Dienstag 8. August 2017, 23:07

Hallo Leute, ich konnte den oberen Quelltext wesentlich kürzen, um am Ende auf das gleiche Problem zu stoßen. Anstatt mit 8 Tabellen zu arbeiten habe ich nur eine Tabelle angelegt. Dennoch werden weiterhin 8 Abfragen erstellt, die dann an die Datenbank gesendet werden. Auch hier kommen gleiche Probleme wie beim Programm oben. Außerdem musste ich sys.exit() verwenden, damit das Programm richtig beendet wird. Ich denke, dass ist auch ziemlich unüblich.

  1. import sys
  2.  
  3. from PyQt4.QtCore import QObject, QThread, pyqtSignal, pyqtSlot, QTimer
  4. from PyQt4.QtGui import QApplication, QPushButton, QVBoxLayout, QDialog, \
  5.                         QComboBox, QLabel
  6.  
  7. from sqlalchemy.ext.declarative import declarative_base
  8. from sqlalchemy import create_engine
  9. from sqlalchemy.exc import SQLAlchemyError
  10. from sqlalchemy.orm import sessionmaker, scoped_session
  11. from sqlalchemy import Table, Column, Integer, String, MetaData
  12.  
  13. from traceback import format_exc
  14. from sys import exc_info
  15.  
  16. ''' setting up root class for declarative declaration '''
  17. Base = declarative_base()
  18.  
  19. class PERSON_SALUTATION(Base):
  20.  
  21.     __tablename__ = "person_salutation"
  22.  
  23.     id = Column(Integer, primary_key=True)
  24.     salutation = Column(String(50), nullable=False, unique=True)
  25.  
  26.  
  27. class MasterDataManipulation(object):
  28.  
  29.     def __init__(self, session_object=None):
  30.  
  31.         self._session_scope = session_object
  32.  
  33.     def select_all(self):
  34.  
  35.         try:
  36.             with self._session_scope as session:
  37.  
  38.                 for record in session.query(PERSON_SALUTATION):
  39.  
  40.                     yield record.id, record.salutation
  41.                    
  42.         except AttributeError:
  43.  
  44.             print "select all, desired_trace",  format_exc(exc_info())
  45.  
  46.         return
  47.  
  48. class Worker(QObject):
  49.  
  50.     finish_progress = pyqtSignal()
  51.     populate_item_signal = pyqtSignal(object, object)
  52.    
  53.     def __init__(self,
  54.                  combo_box=None,
  55.                  query_data=None,
  56.                  parent=None):
  57.         QObject.__init__(self, parent)
  58.  
  59.         self.query_data = query_data
  60.         self.combo_box=combo_box
  61.  
  62.         ''' Create attributes '''
  63.         self._run_semaphore = 1
  64.  
  65.     def init_object(self):
  66.  
  67.         self._element = self.query_data()
  68.  
  69.         self.timer = QTimer()
  70.  
  71.         # assoziiert select_all_data() mit TIMEOUT Ereignis
  72.         self.timer.setSingleShot(False)
  73.         self.timer.setInterval(1)
  74.         self.timer.timeout.connect(self.populate_item)
  75.         self.timer.start()
  76.        
  77.     def populate_item(self):
  78.         try:
  79.  
  80.             if self._run_semaphore == 0:
  81.    
  82.                 self._run_semaphore = 1
  83.  
  84.                 return#raise StopIteration
  85.  
  86.             else:
  87.  
  88.                 self.populate_item_signal.emit(next(self._element), self.combo_box)
  89.  
  90.         except StopIteration:
  91.  
  92.             print "StopIteration is raised"
  93.  
  94.             self.timer.stop()
  95.        
  96.     def stop(self):
  97.         self._run_semaphore == 0
  98.         self.timer.stop()
  99.        
  100. class SessionScope(object):
  101.     def __init__(self, dbms=None, dbdriver=None,
  102.                  dbuser=None, dbuser_pwd=None,
  103.                  db_server_host=None, dbport=None, db_name=None,
  104.                  admin_database=None):
  105.  
  106.         self.dbms = dbms
  107.         self.dbdriver = dbdriver
  108.         self.dbuser = dbuser
  109.         self.dbuser_pwd = dbuser_pwd
  110.         self.db_server_host = db_server_host
  111.         self.dbport = dbport
  112.         self.db_name = db_name
  113.         self.admin_database = admin_database
  114.        
  115.         url = '{}+{}://{}:{}@{}:{}/{}'.format(
  116.            self.dbms, self.dbdriver, self.dbuser, self.dbuser_pwd, self.db_server_host, self.dbport, self.db_name)
  117.  
  118.         self._Engine = create_engine(url, encoding='utf8', echo=True)
  119.  
  120.         self.session = None
  121.  
  122.         self._session_factory = sessionmaker(bind=self._Engine)
  123.  
  124.         self._Session = scoped_session(sessionmaker(bind=self._Engine, expire_on_commit=False))
  125.  
  126.         ''' create tables '''
  127.         Base.metadata.create_all(self._Engine)
  128.  
  129.     def __enter__(self):
  130.         self.session = self._Session()
  131.         return self.session
  132.  
  133.     def __exit__(self, exception, exc_value, traceback):
  134.  
  135.         try:
  136.             if exception:
  137.  
  138.                 self.session.rollback()
  139.             else:
  140.  
  141.                 self.session.commit()
  142.  
  143.         finally:
  144.  
  145.             self.session.close()
  146.  
  147. class MyCustomDialog(QDialog):
  148.  
  149.     finish = pyqtSignal()
  150.  
  151.     def __init__(self, scoped_session=None, parent=None):
  152.         QDialog.__init__(self, parent)
  153.  
  154.         self._session_scope = scoped_session
  155.  
  156.         self._list_threads = []
  157.  
  158.         self.init_ui()
  159.         self.start_all_selection()
  160.  
  161.     def init_ui(self):
  162.  
  163.         layout = QVBoxLayout(self)
  164.  
  165.         self.combo_person_title = QComboBox(self)
  166.         self.combo_person_salutation = QComboBox(self)
  167.         self.combo_person_gender = QComboBox(self)
  168.         self.combo_person_religion = QComboBox(self)
  169.         self.combo_person_relationship_status = QComboBox(self)
  170.         self.combo_person_nationality = QComboBox(self)
  171.         self.combo_person_eye_color = QComboBox(self)
  172.         self.combo_person_hair_color = QComboBox(self)
  173.  
  174.         self.pushButton_populate_combo = QPushButton("Re-populate", self)
  175.         self.pushButton_stopp = QPushButton("Stopp", self)
  176.         self.pushButton_close = QPushButton("Close", self)
  177.         layout.addWidget(self.combo_person_title)
  178.         layout.addWidget(self.combo_person_salutation)
  179.         layout.addWidget(self.combo_person_gender)
  180.         layout.addWidget(self.combo_person_religion)
  181.         layout.addWidget(self.combo_person_nationality)
  182.         layout.addWidget(self.combo_person_relationship_status)
  183.         layout.addWidget(self.combo_person_eye_color)
  184.         layout.addWidget(self.combo_person_hair_color)
  185.         layout.addWidget(self.pushButton_populate_combo)
  186.         layout.addWidget(self.pushButton_stopp)
  187.         layout.addWidget(self.pushButton_close)
  188.  
  189.         self.pushButton_stopp.clicked.connect(self.on_finish)
  190.         self.pushButton_populate_combo.clicked.connect(self.start_all_selection)
  191.         self.pushButton_close.clicked.connect(self.close)
  192.  
  193.     def start_all_selection(self):
  194.  
  195.         list_comboxes = self.findChildren(QComboBox)
  196.  
  197.         for combo_box in list_comboxes:
  198.             combo_box.clear()
  199.             self.start_thread(combo_box=combo_box)
  200.        
  201.     def fill_combo_boxt(self, item, combo_box):
  202.         id, text = item
  203.         combo_box.addItem(text)
  204.  
  205.     def on_label(self, i):
  206.          self.label.setText("Result: {}".format(i))
  207.        
  208.     def start_thread(self, combo_box=None):
  209.         master_data_manipulation = MasterDataManipulation(session_object=self._session_scope)
  210.         query_data=master_data_manipulation.select_all
  211.  
  212.         task_thread = QThread(self)
  213.         task_thread.work = Worker(query_data=query_data,
  214.                                   combo_box=combo_box,)
  215.  
  216.         ''' We need to store threads '''
  217.         self._list_threads.append(task_thread)  
  218.         task_thread.work.moveToThread(task_thread)
  219.  
  220.         task_thread.work.populate_item_signal.connect(self.fill_combo_boxt)
  221.  
  222.         self.finish.connect(task_thread.work.stop)
  223.  
  224.         task_thread.started.connect(task_thread.work.init_object)
  225.  
  226.         task_thread.finished.connect(task_thread.deleteLater)
  227.  
  228.         ''' This will emit 'started' and start thread's event loop '''
  229.         task_thread.start()
  230.  
  231.     @pyqtSlot()
  232.     def abort_workers(self):
  233.         self.finish.emit()
  234.         for thread in self._list_threads:
  235.             ''' this will quit **as soon as thread event loop unblocks** '''
  236.             thread.quit()
  237.  
  238.             ''' so you need to wait for it to *actually* quit'''
  239.             thread.wait()
  240.  
  241.     def on_finish(self):
  242.          self.finish.emit()
  243.  
  244.     def closeEvent(self, event):
  245.         ''' Re-implementaate to handle with created threads '''
  246.         self.abort_workers()
  247.        
  248.         sys.exit()
  249.  
  250. def populate_database(sess=None):
  251.  
  252.     try:
  253.    
  254.         with sess as session:
  255.  
  256.             salutations = [PERSON_SALUTATION(salutation="Mister"),
  257.                            PERSON_SALUTATION(salutation="Miss"),
  258.                            PERSON_SALUTATION(salutation="Lady"),
  259.                            PERSON_SALUTATION(salutation="Ma'am"),
  260.                            PERSON_SALUTATION(salutation="Sir"),
  261.                            PERSON_SALUTATION(salutation="Queen"),
  262.                            PERSON_SALUTATION(salutation="Grandma"),]
  263.             session.add_all(salutations)
  264.            
  265.             session.commit()
  266.  
  267.     except SQLAlchemyError:
  268.         print "SQLAlchemyError", format_exc(exc_info())
  269.            
  270. def main():
  271.     dbms = raw_input('Enter database type: ')
  272.     dbdriver = raw_input('Enter database driver: ')
  273.     dbuser = raw_input('Enter user name: ')
  274.     dbuser_pwd = raw_input('Enter user password: ')
  275.     db_server_host = raw_input('Enter server host: ')
  276.     dbport = raw_input('Enter port: ')
  277.     db_name = raw_input('Enter database name: ')
  278.  
  279.     try:
  280.         session_scope = SessionScope(dbms = dbms,
  281.                                      dbdriver = dbdriver,
  282.                                      dbuser = dbuser,
  283.                                      dbuser_pwd = dbuser_pwd,
  284.                                      db_server_host = db_server_host,
  285.                                      dbport = dbport,
  286.                                      db_name = db_name)
  287.  
  288.         answer = raw_input('Do you want to populate database? Type yes or no: ')
  289.        
  290.         if answer.lower() == 'yes':
  291.             populate_database(sess=session_scope)
  292.  
  293.         app = QApplication(sys.argv)
  294.         window = MyCustomDialog(scoped_session = session_scope)
  295.         window.show()
  296.         sys.exit(app.exec_())
  297.     except TypeError:
  298.        
  299.         print "ERROR", format_exc(exc_info())
  300.        
  301. if __name__ == "__main__":
  302.     main()
Melewo
User
Beiträge: 320
Registriert: Mittwoch 3. Mai 2017, 16:30

Re: SQLAlchemy: Arbeiten mit mehreren QThreads?

Beitragvon Melewo » Mittwoch 9. August 2017, 06:19

Sophus hat geschrieben:den Interval des QTimer() von derzeit 1000 auf 1. Wir wollen ja, dass das Programm ein bisschen zügiger die Daten holt.

Vielleicht sollte ich hier lieber nicht antworten, weil ich mich weder mit SQLAlchemy noch mit QThreads auskenne. Nur irgendwie sieht mir das nach einer Antwort wie dieser aus:
Ich laufe nicht 10-mal so schnell auf 100 Metern, nur weil Deine Stoppuhr 10-mal schneller rast, sondern würde bei einem Intervall nach Deiner rasenden Stoppuhr bei 10 Metern abbrechen, ohne das Ziel erreicht zu haben.
Benutzeravatar
Sophus
User
Beiträge: 1035
Registriert: Freitag 25. April 2014, 12:46
Wohnort: Osnabrück

Re: SQLAlchemy: Arbeiten mit mehreren QThreads?

Beitragvon Sophus » Mittwoch 9. August 2017, 15:46

@Melewo: Danke für deine Anmerkung. Mit dem Interval des QTimer()-Objektes kann es durchaus was zutun haben. Am Interval muss ich wohl noch ein wenig schrauben.

Allerdings bin auf meine falsche Strukturierung des Quelltextes gestoßen. Kurz gesagt, ich bin durch meiner richtigen Annahme auf die falsche Fährte geraten. Klingt ein wenig Paradox. Für meine Ausführung beziehe ich mich auf meinen letzten Beitrag.

Da ich weiß, dass ich mit QThread()-Objekten arbeiten möchte, habe ich mein Session()-Objekt dementsprechend gestaltet. Ein normales Session()-Objekt ist keineswegs thread-sicher (Is the session thread-safe?). Also bediene ich mich des scoped_session()-Objektes (in Zeile 124). Dieses Objekt ist zumindest thread-sicher (Using Custom Created Scopes). Und mit dieser Annahme habe ich angefangen, das einmalig erstellte Session()-Objekt (in Zeile 280) im Haupt-Thread (die GUI-Anwendung) an die jeweiligen Threads zu verteilen. Kurzum: Ich habe das eine Session()-Objekt zwischen den ganzen Threads (in meinem Beispiel sind es 8 Threads) geteilt. Schließlich nahm ich ja an, dass mein Session()-Objekt von nun an thread-sicher sei. Daher bin ich auf die falsche Fährte geraten.

Demzufolge habe ich einige Änderungen an meinem Beispiel vorgenommen. Zunächst habe ich in der select_all()-Methode der MasterDataManipulation()-Klasse angefangen
.
Anstatt jedesmal die Session erneut zu öffnen...
  1.     def select_all(self):
  2.  
  3.         try:
  4.             with self._session_scope as session:
  5.  
  6.                 for record in session.query(PERSON_SALUTATION):
  7.  
  8.                     yield record.id, record.salutation
  9.                    
  10.         except AttributeError:
  11.  
  12.             print "select all, desired_trace",  format_exc(exc_info())
  13.  
  14.         return


... habe ich die with-Anweisung entfernt, schließlich wollen wir ja das eine Session()-Objekt nicht mehr zwischen den Threads teilen.

  1.     def select_all(self):
  2.  
  3.         try:
  4.  
  5.             for record in self._session_scope.query(PERSON_SALUTATION):
  6.  
  7.                 yield record.id, record.salutation
  8.                    
  9.         except AttributeError:
  10.  
  11.             print "select all, desired_trace",  format_exc(exc_info())
  12.  
  13.         return


Im nächsten Schritt habe ich in der start_all_selection()-Methode der MyCustomDialog()-Klasse eine Änderung vorgenommen.

Aus..
  1.     def start_all_selection(self):
  2.  
  3.         list_comboxes = self.findChildren(QComboBox)
  4.  
  5.         for combo_box in list_comboxes:
  6.             combo_box.clear()
  7.             self.start_thread(combo_box=combo_box)


... habe ich nun folgendes gemacht:
  1.     def start_all_selection(self):
  2.  
  3.         list_comboxes = self.findChildren(QComboBox)
  4.         '''
  5.            use one session per thread, share nothing between threads.  The  
  6.            scoped_session should make this pretty straightforward.
  7.        '''
  8.         with self._session_scope as session:      
  9.             for combo_box in list_comboxes:
  10.                 combo_box.clear()
  11.                 self.start_thread(combo_box=combo_box,session=session)

Hier wird einmal eine session erstellt, und nur diese wird an den Threads übergeben. Das heißt in meinem Fall, dass die session dann geschlossen wird, wenn die 8 Threads fertig sind. Solange bleibt der Rumpf der with-Anweisung am Leben.

Hier nun der komplette Quelltext, mit einigen englischen Kommentaren meinerseits:
  1. from PyQt4.QtCore import QObject, QThread, pyqtSignal, pyqtSlot, QTimer
  2. from PyQt4.QtGui import QApplication, QPushButton, QVBoxLayout, QDialog, \
  3.                         QComboBox, QLabel
  4.  
  5. from sqlalchemy.ext.declarative import declarative_base
  6. from sqlalchemy import create_engine
  7. from sqlalchemy.exc import SQLAlchemyError
  8. from sqlalchemy.orm import sessionmaker, scoped_session
  9. from sqlalchemy import Table, Column, Integer, String, MetaData
  10.  
  11. from traceback import format_exc
  12. from sys import exc_info, exit, argv
  13.  
  14. ''' setting up root class for declarative declaration '''
  15. Base = declarative_base()
  16.  
  17. class PERSON_SALUTATION(Base):
  18.  
  19.     __tablename__ = "person_salutation"
  20.  
  21.     id = Column(Integer, primary_key=True)
  22.     salutation = Column(String(100), nullable=False, unique=True)
  23.  
  24.  
  25. class MasterDataManipulation(object):
  26.  
  27.     def __init__(self, session_object=None):
  28.  
  29.         self._session_scope = session_object
  30.  
  31.     def select_all(self):
  32.  
  33.         try:
  34.  
  35.             for record in self._session_scope.query(PERSON_SALUTATION):
  36.  
  37.                 yield record.id, record.salutation
  38.                    
  39.         except AttributeError:
  40.  
  41.             print "select all, desired_trace",  format_exc(exc_info())
  42.  
  43.         return
  44.  
  45. class Worker(QObject):
  46.  
  47.     finish_progress = pyqtSignal()
  48.     populate_item_signal = pyqtSignal(object, object)
  49.    
  50.     def __init__(self,
  51.                  combo_box=None,
  52.                  new_scope=None,
  53.                  #master_data=None,
  54.                  parent=None):
  55.         QObject.__init__(self, parent)
  56.  
  57.         self.new_scope=new_scope
  58.  
  59.         #self.master_data_manipulation = MasterDataManipulation(session_object=self.new_scope)
  60.         self.combo_box=combo_box
  61.  
  62.         ''' Create attributes '''
  63.         self._run_semaphore = 1
  64.  
  65.         self._element = None
  66.  
  67.     def init_object(self):
  68.  
  69.         self.timer = QTimer()
  70.        
  71.         '''
  72.            Storing new generator object, will reuse it.
  73.            That means you have to create one generator.
  74.        '''
  75.         self.master_data_manipulation = MasterDataManipulation(session_object=self.new_scope)
  76.         self._element = self.master_data_manipulation.select_all()
  77.  
  78.         # assoziiert select_all_data() mit TIMEOUT Ereignis
  79.         self.timer.setSingleShot(False)
  80.         self.timer.setInterval(1)
  81.         self.timer.timeout.connect(self.populate_item)
  82.         self.timer.start()
  83.            
  84.     def populate_item(self):
  85.         try:
  86.  
  87.             if self._run_semaphore == 0:
  88.    
  89.                 self._run_semaphore = 1
  90.  
  91.                 raise StopIteration
  92.  
  93.             else:
  94.  
  95.                 self.populate_item_signal.emit(next(self._element), self.combo_box)
  96.  
  97.         except StopIteration:
  98.  
  99.             print "StopIteration is raised"
  100.             self.timer.stop()
  101.        
  102.     def stop(self):
  103.         self.timer.stop()
  104.         self._run_semaphore = 0
  105.         self._element = None
  106.        
  107. class SessionScope(object):
  108.     def __init__(self, dbms=None, dbdriver=None,
  109.                  dbuser=None, dbuser_pwd=None,
  110.                  db_server_host=None, dbport=None, db_name=None,
  111.                  admin_database=None):
  112.  
  113.         self.dbms = dbms
  114.         self.dbdriver = dbdriver
  115.         self.dbuser = dbuser
  116.         self.dbuser_pwd = dbuser_pwd
  117.         self.db_server_host = db_server_host
  118.         self.dbport = dbport
  119.         self.db_name = db_name
  120.         self.admin_database = admin_database
  121.        
  122.         url = '{}+{}://{}:{}@{}:{}/{}'.format(
  123.            self.dbms, self.dbdriver, self.dbuser, self.dbuser_pwd, self.db_server_host, self.dbport, self.db_name)
  124.  
  125.         '''
  126.           Currently the echo is turned on to see the auto-generated SQL.
  127.  
  128.           That is, the Engine is a factory for connections as well as a pool of connections,
  129.           not the connection itself. When you say in this case close(),
  130.           the connection is returned to the connection pool within the Engine, not actually closed.
  131.  
  132.           So the self._Engine will not use connection pool if you set poolclass=NullPool.
  133.           So the connection (SQLAlchemy session) will close directly after session.close()
  134.           that means, if you set poolclass=NullPool each call to close() will close the underlying DBAPI connection.
  135.       '''
  136.         self._Engine = create_engine(url, pool_size=20, encoding='utf8', echo=True)
  137.  
  138.         '''   Set up the session and store a sessionmaker for this db connection object  '''
  139.  
  140.         self.session = None
  141.  
  142.         ''' Create the session factory '''
  143.         self._session_factory = sessionmaker(bind=self._Engine)
  144.  
  145.         ''' Create the scoped_session, now self._Session registry is established '''
  146.         self._Session = scoped_session(sessionmaker(bind=self._Engine, expire_on_commit=False))
  147.         #self._Session = sessionmaker(bind=self._Engine)
  148.  
  149.         ''' create tables '''
  150.         Base.metadata.create_all(self._Engine)
  151.  
  152.     def __enter__(self):
  153.         '''
  154.           Now all calls to Session() will create a thread-local session.
  155.           That means, you can now use self.session to run multiple queries, etc.
  156.           The registry is *optionally* starts called upon explicitly to create
  157.           a Session local to the thread and/or request. That why we return self.session
  158.       '''
  159.         self.session = self._Session()
  160.         return self.session
  161.  
  162.     def __exit__(self, exception, exc_value, traceback):
  163.  
  164.         try:
  165.             if exception:
  166.  
  167.                 self.session.rollback()
  168.             else:
  169.  
  170.                 self.session.commit()
  171.  
  172.         finally:
  173.  
  174.             self.session.close()
  175.  
  176. class MyCustomDialog(QDialog):
  177.  
  178.     finish = pyqtSignal()
  179.  
  180.     def __init__(self, scoped_session=None, parent=None):
  181.         QDialog.__init__(self, parent)
  182.  
  183.         self._session_scope = scoped_session
  184.  
  185.         self.master_data_manipulation = MasterDataManipulation(session_object=self._session_scope)
  186.  
  187.         self._list_threads = []
  188.  
  189.         self.init_ui()
  190.         self.start_all_selection()
  191.  
  192.     def init_ui(self):
  193.  
  194.         layout = QVBoxLayout(self)
  195.  
  196.         self.combo_person_title = QComboBox(self)
  197.         self.combo_person_salutation = QComboBox(self)
  198.         self.combo_person_gender = QComboBox(self)
  199.         self.combo_person_religion = QComboBox(self)
  200.         self.combo_person_relationship_status = QComboBox(self)
  201.         self.combo_person_nationality = QComboBox(self)
  202.         self.combo_person_eye_color = QComboBox(self)
  203.         self.combo_person_hair_color = QComboBox(self)
  204.  
  205.         self.pushButton_populate_combo = QPushButton("Re-populate", self)
  206.         self.pushButton_stopp = QPushButton("Stopp", self)
  207.         self.pushButton_close = QPushButton("Close", self)
  208.         layout.addWidget(self.combo_person_title)
  209.         layout.addWidget(self.combo_person_salutation)
  210.         layout.addWidget(self.combo_person_gender)
  211.         layout.addWidget(self.combo_person_religion)
  212.         layout.addWidget(self.combo_person_nationality)
  213.         layout.addWidget(self.combo_person_relationship_status)
  214.         layout.addWidget(self.combo_person_eye_color)
  215.         layout.addWidget(self.combo_person_hair_color)
  216.         layout.addWidget(self.pushButton_populate_combo)
  217.         layout.addWidget(self.pushButton_stopp)
  218.         layout.addWidget(self.pushButton_close)
  219.  
  220.         self.pushButton_stopp.clicked.connect(self.on_finish)
  221.         self.pushButton_populate_combo.clicked.connect(self.start_all_selection)
  222.         self.pushButton_close.clicked.connect(self.close)
  223.  
  224.     def start_all_selection(self):
  225.  
  226.         list_comboxes = self.findChildren(QComboBox)
  227.  
  228.         '''
  229.            use one session per thread, share nothing between threads.  The  
  230.            scoped_session should make this pretty straightforward.
  231.        '''
  232.         with self._session_scope as session:
  233.      
  234.             for combo_box in list_comboxes:
  235.                 combo_box.clear()
  236.                 self.start_thread(combo_box=combo_box,session=session)
  237.        
  238.     def fill_combo_boxt(self, item, combo_box):
  239.         id, text = item
  240.         combo_box.addItem(text)
  241.        
  242.     def start_thread(self, combo_box=None, session=None):
  243.         master_data_manipulation = MasterDataManipulation(session_object=self._session_scope)
  244.         query_data=master_data_manipulation.select_all
  245.  
  246.         task_thread = QThread(self)
  247.         task_thread.work = Worker(new_scope=session,
  248.                                   combo_box=combo_box,)
  249.  
  250.         ''' We need to store threads '''
  251.         self._list_threads.append((task_thread, task_thread.work))
  252.         task_thread.work.moveToThread(task_thread)
  253.  
  254.         task_thread.work.populate_item_signal.connect(self.fill_combo_boxt)
  255.  
  256.         self.finish.connect(task_thread.work.stop)
  257.  
  258.         task_thread.started.connect(task_thread.work.init_object)
  259.  
  260.         task_thread.finished.connect(task_thread.deleteLater)
  261.  
  262.         ''' This will emit 'started' and start thread's event loop '''
  263.         task_thread.start()
  264.  
  265.     @pyqtSlot()
  266.     def abort_workers(self):
  267.         self.finish.emit()
  268.         for thread, _ in self._list_threads:
  269.             ''' this will quit **as soon as thread event loop unblocks** '''
  270.             thread.quit()
  271.  
  272.             ''' so you need to wait for it to *actually* quit'''
  273.             thread.wait()
  274.  
  275.         self._list_threads[:] = []
  276.  
  277.     def on_finish(self):
  278.          self.finish.emit()
  279.  
  280.     def closeEvent(self, event):
  281.         ''' Re-implementaate to handle with created threads '''
  282.         self.abort_workers()
  283.        
  284.         exit()
  285.  
  286. def populate_database(sess=None):
  287.  
  288.     try:
  289.    
  290.         with sess as session:
  291.  
  292.             salutations = [PERSON_SALUTATION(salutation="Mister"),
  293.                            PERSON_SALUTATION(salutation="Miss"),
  294.                            PERSON_SALUTATION(salutation="Lady"),
  295.                            PERSON_SALUTATION(salutation="Ma'am"),
  296.                            PERSON_SALUTATION(salutation="Sir"),
  297.                            PERSON_SALUTATION(salutation="Queen"),
  298.                            PERSON_SALUTATION(salutation="Grandma"),]
  299.             session.add_all(salutations)
  300.            
  301.             session.commit()
  302.  
  303.     except SQLAlchemyError:
  304.         print "SQLAlchemyError", format_exc(exc_info())
  305.            
  306. def main():
  307.     dbms = raw_input('Enter database type: ')
  308.     dbdriver = raw_input('Enter database driver: ')
  309.     dbuser = raw_input('Enter user name: ')
  310.     dbuser_pwd = raw_input('Enter user password: ')
  311.     db_server_host = raw_input('Enter server host: ')
  312.     dbport = raw_input('Enter port: ')
  313.     db_name = raw_input('Enter database name: ')
  314.  
  315.     try:
  316.         ''' create_engine and scoped_session once per process (per database). '''
  317.         session_scope = SessionScope(dbms = dbms,
  318.                                      dbdriver = dbdriver,
  319.                                      dbuser = dbuser,
  320.                                      dbuser_pwd = dbuser_pwd,
  321.                                      db_server_host = db_server_host,
  322.                                      dbport = dbport,
  323.                                      db_name = db_name)
  324.  
  325.         answer = raw_input('Do you want to populate database? Type yes or no: ')
  326.  
  327.         if answer.lower() == 'yes':
  328.             populate_database(sess=session_scope)
  329.  
  330.         app = QApplication(argv)
  331.         window = MyCustomDialog(scoped_session = session_scope)
  332.         window.show()
  333.         exit(app.exec_())
  334.     except TypeError:
  335.        
  336.         print "ERROR", format_exc(exc_info())
  337.        
  338. if __name__ == "__main__":
  339.     main()


Woran ich jetzt denken muss, ist an die pool_size()-Methode, die ich der Engine übergeben muss. Denn Standard ist poolsize() auf 5 eingestellt. Dies ist weitestgehend ausreichend. Wenn man allerdings mit Threads arbeitet, können die 5 Verbindungen sehr knapp werden. Da muss ich mal sehen, wie niedrig ich diese Verbindungen halten kann.
BlackJack

Re: SQLAlchemy: Arbeiten mit mehreren QThreads?

Beitragvon BlackJack » Mittwoch 9. August 2017, 15:51

@Sophus: Anmerkung zu den Quelltexten: Zeichenketten sind keine Kommentare.
Benutzeravatar
Sophus
User
Beiträge: 1035
Registriert: Freitag 25. April 2014, 12:46
Wohnort: Osnabrück

Re: SQLAlchemy: Arbeiten mit mehreren QThreads?

Beitragvon Sophus » Mittwoch 9. August 2017, 15:58

BlackJack hat geschrieben:@Sophus: Anmerkung zu den Quelltexten: Zeichenketten sind keine Kommentare.


Du meinst die ganzen DocStrings, die ich hier missbraucht habe? Ich muss mich entschuldigen. Irgendwie finde ich die "hübscher" als normale Kommentare, die mit # beginnen.
BlackJack

Re: SQLAlchemy: Arbeiten mit mehreren QThreads?

Beitragvon BlackJack » Mittwoch 9. August 2017, 16:23

@Sophus: Da sind keine DocStrings. Also zwei sind technisch gesehen welche aber vom Inhalt her nicht. Wenn Dein Editor Kommentare nicht hübsch genug anzeigt, dann ist das eine Frage der Einstellungen des Editors.
Benutzeravatar
Sophus
User
Beiträge: 1035
Registriert: Freitag 25. April 2014, 12:46
Wohnort: Osnabrück

Re: SQLAlchemy: Arbeiten mit mehreren QThreads?

Beitragvon Sophus » Mittwoch 9. August 2017, 16:34

@BlackJack: Ich werde versuchen, mich zu bessern. 8)
Benutzeravatar
Sophus
User
Beiträge: 1035
Registriert: Freitag 25. April 2014, 12:46
Wohnort: Osnabrück

Re: SQLAlchemy: Arbeiten mit mehreren QThreads?

Beitragvon Sophus » Mittwoch 9. August 2017, 16:44

Mir ist soeben ein weiteres, unschönes Problem aufgefallen. Derzeit wird in meiner re-implementierten closeEven()t-Methode die exit()-Funktion des sys-Modules ausgeführt. Ich kann diese Funktion auch weglassen und mein Programm wird sauber geschlossen. Allerdings ist es dann nicht möglich, mit mit der Datenbank mittels des Programmes MySQL Workbench zu arbeiten. Das ist mir aufgefallen, als ich nach dem ausführen und normales Beenden meines Programmes die Tabelle löschen wollte. MySQL Workbench hat sich regelrecht aufgehängt. Erst als ich die exit()-Funktion angewendet habe, scheint alles rein zu sein. Allerdings verwirrt mich das. Denn durch meine Umstrukturierung sollte ja die session, die durch die with-Anweisung einmalig erstellt wird, und nach dem Ende aller Threads wieder freigegeben wird, und somit keine Verbindung zur Datenbank bestehen sollte. Irgendwas hält noch eine Verbindung zur Datenbank fest.

Aktualisierung:
Ich konnte das Problem lösen. Ich habe im SessionScope()-Objekt (Zeile 177 im letzten Beitrag) folgende Methode re-implementiert.

  1.     def disconnect(self):
  2.         '''
  3.            Make sure the dbconnection gets closed
  4.        '''
  5.         self.session = self._Session()
  6.  
  7.         '''
  8.            .close() will give the connection back to the connection
  9.            pool of Engine and doesn't close the connection.
  10.        '''
  11.         self.session.close()
  12.  
  13.         '''
  14.            dispose() will close all connections of the connection pool.
  15.        '''
  16.         self._Engine.dispose()
  17.  
  18.         self._Session = None
  19.         self._Engine = None


In meinem Haupt-Thread wird auf das Session()-Objekt dann die disconnect()-Methode angewendet. Alle Verbindungen werden geschlossen, und zur Sicherheit werden die Referenzen von self._Session und self._Engine beseitigt. Nun brauche ich die exit()-Funktion nicht anwenden, und kann ganz klassisch das Programm beenden und nichts hält zur Datenbank fest. Ich kann direkt nach dem Beenden meines Programmes mit MySQL Workbench arbeiten.

Vollständigkeitshalber sieht mein SessionSpoce()-Objekt wie folgt aus:
  1. class SessionScope(object):
  2.     def __init__(self, dbms=None, dbdriver=None,
  3.                  dbuser=None, dbuser_pwd=None,
  4.                  db_server_host=None, dbport=None, db_name=None,
  5.                  admin_database=None):
  6.  
  7.         self.dbms = dbms
  8.         self.dbdriver = dbdriver
  9.         self.dbuser = dbuser
  10.         self.dbuser_pwd = dbuser_pwd
  11.         self.db_server_host = db_server_host
  12.         self.dbport = dbport
  13.         self.db_name = db_name
  14.         self.admin_database = admin_database
  15.        
  16.         url = '{}+{}://{}:{}@{}:{}/{}'.format(
  17.            self.dbms, self.dbdriver, self.dbuser, self.dbuser_pwd, self.db_server_host, self.dbport, self.db_name)
  18.  
  19.         '''
  20.           Currently the echo is turned on to see the auto-generated SQL.
  21.  
  22.           That is, the Engine is a factory for connections as well as a pool of connections,
  23.           not the connection itself. When you say in this case close(),
  24.           the connection is returned to the connection pool within the Engine, not actually closed.
  25.  
  26.           So the self._Engine will not use connection pool if you set poolclass=NullPool.
  27.           So the connection (SQLAlchemy session) will close directly after session.close()
  28.           that means, if you set poolclass=NullPool each call to close() will close the underlying DBAPI connection.
  29.       '''
  30.         self._Engine = create_engine(url, pool_size=30, encoding='utf8', echo=True)
  31.  
  32.         '''
  33.           Set up the session and store a sessionmaker for this db connection object
  34.       '''
  35.  
  36.         self.session = None
  37.  
  38.         ''' Create the session factory '''
  39.         self._session_factory = sessionmaker(bind=self._Engine)
  40.  
  41.         ''' Create the scoped_session, now self._Session registry is established '''
  42.         self._Session = scoped_session(sessionmaker(bind=self._Engine, expire_on_commit=False))
  43.         #self._Session = sessionmaker(bind=self._Engine)
  44.  
  45.         ''' create tables '''
  46.         Base.metadata.create_all(self._Engine)
  47.  
  48.     def __enter__(self):
  49.         '''
  50.           Now all calls to Session() will create a thread-local session.
  51.           That means, you can now use self.session to run multiple queries, etc.
  52.           The registry is *optionally* starts called upon explicitly to create
  53.           a Session local to the thread and/or request. That why we return self.session
  54.       '''
  55.         self.session = self._Session()
  56.         return self.session
  57.  
  58.     def __exit__(self, exception, exc_value, traceback):
  59.  
  60.         try:
  61.             if exception:
  62.  
  63.                 self.session.rollback()
  64.             else:
  65.  
  66.                 self.session.commit()
  67.  
  68.         finally:
  69.  
  70.             self.session.close()
  71.  
  72.     def disconnect(self):
  73.         '''
  74.            Make sure the dbconnection gets closed
  75.        '''
  76.         self.session = self._Session()
  77.  
  78.         '''
  79.            .close() will give the connection back to the connection
  80.            pool of Engine and doesn't close the connection.
  81.        '''
  82.         self.session.close()
  83.  
  84.         '''
  85.            dispose() will close all connections of the connection pool.
  86.        '''
  87.         self._Engine.dispose()
  88.  
  89.         self._Session = None
  90.         self._Engine = None
Benutzeravatar
Sophus
User
Beiträge: 1035
Registriert: Freitag 25. April 2014, 12:46
Wohnort: Osnabrück

Re: SQLAlchemy: Arbeiten mit mehreren QThreads?

Beitragvon Sophus » Donnerstag 10. August 2017, 17:36

Ich könnte schreien und heulen zugleich. Was bin ich verzweifelt. Meine letzte Lösung funktioniert nur bedingt. Wenn ich das Programm einmal starte, werden alle Anfragen wunderbar abgearbeitet. Alle QComboBox()-Objekte werden gefüllt. Auf der GUI sind insgesatm drei QPushButton()-Objekte. Klicke ich nun auf das QPushButton()-Objekt, mit der Beschriftung Re-populate, dann wird es richtig kritisch. Ich bekomme erst einmal eine ellenlange Fehlermeldung. Diese habe ich auf meinem BitBucket-Konto veröffentlicht. Dazu habe ich auch meinen Quelltext auf BitBucket ausgelagert. Hier: SQLAlchemy and QThread, weil der Code sonst den Rahmen sprengen würde. Der übersichtshalber habe ich das Programm in Modulen aufgeteilt. Gestartet wird das Programm mit dem main_program.py-Modul. In diesem Modul geht es auch um die GUI. Von dort aus werden die QThread()-Objekte verwaltet.

Hat jemand eine Ahnung, woran ich hier kläglich scheitere?
Melewo
User
Beiträge: 320
Registriert: Mittwoch 3. Mai 2017, 16:30

Re: SQLAlchemy: Arbeiten mit mehreren QThreads?

Beitragvon Melewo » Donnerstag 10. August 2017, 22:11

Schneidest Du eigentlich die Anfragen zur Kontrolle mit?
Ich meine, ich benutze nur MySQL, doch wenn ich mir nicht sicher war, ob die Anfragen gut aussehen oder ob ein Script unnötige Quit's verursacht, dann kommentiere ich für einen abschließenden Test eine log.txt ein (# entfernen) und die müsste ja dann bei Dir die letzte Abfrage enthalten, bei oder nach der ein Abbruch erfolgte.

Oder ich verstehe nichts, weil mir das alles zu undurchsichtig ist.
Benutzeravatar
Sophus
User
Beiträge: 1035
Registriert: Freitag 25. April 2014, 12:46
Wohnort: Osnabrück

Re: SQLAlchemy: Arbeiten mit mehreren QThreads?

Beitragvon Sophus » Donnerstag 10. August 2017, 22:24

@Melewo: Leider verstehe ich deine Anmerkung nicht. Was genau meinst du mit "mitscheiden"? Alles was ich mache, ist, dass ich mein SessionScope()-Objekt mit der with-Anweisung öffne und dann alle Threads nach und nach starte. Du kannst dir gern meinen Quelltext von BitBucket herunterladen, und es dir selbst anschauen. Beim ersten Laden klappt alles, beim Klicken auf die Schaltfläche "Re-populate" versagt alles komplett. Klicke ich nach dem Versagen noch einmal auf die gleiche Schaltfläche, dann klappt alles wieder wunderbar. Irgendwie klappt das re-populate nur bei jedem zweiten Klick.
Melewo
User
Beiträge: 320
Registriert: Mittwoch 3. Mai 2017, 16:30

Re: SQLAlchemy: Arbeiten mit mehreren QThreads?

Beitragvon Melewo » Donnerstag 10. August 2017, 22:41

Sophus hat geschrieben:Des Weiteren passiert auch hin und wieder mal, dass mir von seiten SQLAlchemy gesagt wird, dass die Verbindung geschlossen wurde. Dazu habe ich eine Fehlermeldung:

Ich bezog mich darauf und nein, ich benutze SQLAlchemy nicht und kann gut mit MySQL leben. Doch wenn ich da Abbrüche hätte, würde ich bei mir diese beiden Zeilen in der mysql/bin/my.ini einkommentieren und schauen, wie die Abfragen aussehen.

  1. # general-log = 1
  2. # general_log_file = "C:/xampp/mysql/anfragen.log"

Und wie das bei Dir aussieht, weiß ich nicht, verstehe ich nicht und habe auch nicht vor mir etwas zu installieren. Es sollte nur eine einfache Frage sein, ob Du die mitschneidest zur Kontrolle. Wobei ich ja eh nicht verstehe, wie bei Dir alles zusammenhängt.
BlackJack

Re: SQLAlchemy: Arbeiten mit mehreren QThreads?

Beitragvon BlackJack » Freitag 11. August 2017, 00:02

@Melewo: Die Probleme liegen so wie es aussieht ja auf Python-Seite. Es gehen ja gar keine Anfragen raus, also wird man auf Datenbankseite nicht viel sehen.
Benutzeravatar
Sophus
User
Beiträge: 1035
Registriert: Freitag 25. April 2014, 12:46
Wohnort: Osnabrück

Re: SQLAlchemy: Arbeiten mit mehreren QThreads?

Beitragvon Sophus » Freitag 11. August 2017, 00:10

@BlackJack: Hast du eine Ahnung, wo es klemmen könnte?

Wer ist online?

Mitglieder in diesem Forum: 0 Mitglieder