@snafu: Ich muss mich nochmals melden. Meinst du etwa so? Zur Vereinfachung habe ich den gesamten Beispiel extrem gekürzt und ist damit nicht ausführbar und den Context-Manager habe ich mal eben umstrukturiert. Diese Umstrukturierung findest du in
Zeile 252-280. Das eigentliche Herzstück meines Anliegen sind die
Zeilen 198-207. Da öffne ich den Context Manager einmalig, und verteile die
sessions an die Threads, die durch die For-Schleife nach und nach geöffnet werden. Ich hoffe, dass ich bis hierher richtig verfahre. Denn ich habe mich weitestgehend an die SQLAlchemy-Dokumentation gehalten:
When do I construct a Session, when do I commit it, and when do I close it?. In diesem Beispiel werden insgesamt drei Beispiele gezeigt, eines davon sollte man auf keinen Fall verwenden, und die anderen beiden Beispiel zeigen, wie man die Session handhabt. Ich habe mich für das zweite Beispiel entschieden - sieht übersichtlicher aus. In in der
MasterDataManipulation()-Klasse (
Zeile 25-61) findest du derzeit einige Abfragen, für jede Kategorie. Später werden in dieser Klasse mehrere Abfragen vorhanden sein. Und die
Worker()-Klasse (Zeilen 63-158) dient als Sub-Klasse, welche später zum
QThread() hinzugefügt wird.
Und wie du in der
session_scope()-Funtion (
Zeilen 252-280) siehst, habe ich die
Scoped_session, die durch die
scoped_session aufbereitet wird, nun mit der Funktionsklammer aufgerufen, gleich im Zuge der
yield. Meinst du das etwa so?
Code: Alles auswählen
# -*- coding: cp1252 -*-
import sys
from contextlib import contextmanager
# Here we have to import all PyQt stuff for working with GUI
import sqlalchemy
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import create_engine
from sqlalchemy.exc import SQLAlchemyError, OperationalError, DisconnectionError
from sqlalchemy import exc
from sqlalchemy.orm import sessionmaker, scoped_session
from sqlalchemy import Table, Column, Integer, String, MetaData
from sqlalchemy import event
from traceback import format_exc
from sys import exc_info
''' setting up root class for declarative declaration '''
Base = declarative_base()
# Here you can see the models of the tables
class MasterDataManipulation(object):
def __init__(self, session_object=None):
self._session = session_object
def select_all(self, category):
dict_store_session_query = {'person_gender': lambda: self._session.query(PERSON_GENDER),
'person_nationality': lambda: self._session.query(PERSON_NATIONALITY),
'person_salutation': lambda: self._session.query(PERSON_SALUTATION),
'person_title': lambda: self._session.query(PERSON_TITLE),
'person_hair_color': lambda: self._session.query(PERSON_HAIR_COLOR),
'person_eye_color': lambda: self._session.query(PERSON_EYE_COLOR),
'person_religion': lambda: self._session.query(PERSON_RELIGION),
'person_relationship_status': lambda: self._session.query(PERSON_RELATIONSHIP_STATUS)}
for record in dict_store_session_query[category]():
if category == 'person_gender':
yield record.id, record.gender
if category == 'person_nationality':
yield record.id, record.nationality
if category == 'person_salutation':
yield record.id, record.salutation
if category == 'person_title':
yield record.id, record.title
if category == 'person_hair_color':
yield record.id, record.hair_color
if category == 'person_eye_color':
yield record.id, record.eye_color
if category == 'person_religion':
yield record.id, record.religion
if category == 'person_relationship_status':
yield record.id, record.relationship_status
return
class Worker(QObject):
finish_progress = pyqtSignal()
populate_item_signal = pyqtSignal(object, object)
stop_loop = pyqtSignal(unicode, unicode)
def __init__(self,
combo_box=None,
new_scope=None,
category=None,
time_interval=None,
operation=None,
parent=None):
QObject.__init__(self, parent)
self.new_scope=new_scope
self.category = category
self.time_interval=time_interval
self.operation = operation
if self.time_interval is None:
self.time_interval = 100
#self.master_data_manipulation = MasterDataManipulation(session_object=self.new_scope)
self.combo_box=combo_box
''' Create attributes '''
self._run_semaphore = 1
self._element = None
def init_object(self):
if self.operation == "select":
self.timer = QTimer()
'''
Storing new generator object, will reuse it.
That means you have to create one generator.
'''
master_data_manipulation = MasterDataManipulation(session_object=self.new_scope)
query_data=master_data_manipulation.select_all
self._element = query_data(self.category)
# assoziiert select_all_data() mit TIMEOUT Ereignis
self.timer.setSingleShot(False)
self.timer.setInterval(int(self.time_interval))
self.timer.timeout.connect(self.populate_item)
self.timer.start()
if self.operation == 'population':
print "hier"
master_data_manipulation = MasterDataManipulation(session_object=self.new_scope)
master_data_manipulation.populate_data()
def populate_item(self):
try:
if self._run_semaphore == 0:
self._run_semaphore = 1
raise StopIteration
else:
self.populate_item_signal.emit(next(self._element), self.combo_box)
except StopIteration:
self.finish_progress.emit()
self.timer.stop()
except SQLAlchemyError as err:
server_said = "The server said: {server_said}".format(server_said=str(err[0]))
#print "SQLAlchemyError, populate_item", format_exc(exc_info())
desired_trace = format_exc(exc_info())
self.stop_loop.emit(desired_trace, server_said)
self.finish_progress.emit()
self.timer.stop()
except OperationalError as err:
server_said = "The server said: {server_said}".format(server_said=str(err[0]))
#print "OperationalError, populate_item", format_exc(exc_info())
desired_trace = format_exc(exc_info())
self.timer.stop()
self.stop_loop.emit(desired_trace, server_said)
self.finish_progress.emit()
##self.timer.stop()
def stop(self):
self.timer.stop()
self._run_semaphore = 0
self._element = None
[...]
class MyCustomDialog(QDialog):
finish = pyqtSignal()
def __init__(self, url=None, parent=None):
QDialog.__init__(self, parent)
self._url = url
[...]
def start_all_selection(self):
list_tuple = [
("person_salutation", self.combo_person_salutation),
("person_title", self.combo_person_title),
("person_gender", self.combo_person_gender),
("person_religion", self.combo_person_religion),
("person_eye_color", self.combo_person_eye_color),
("person_hair_color", self.combo_person_hair_color),
("person_relationship_status", self.combo_person_relationship_status),
("person_nationality", self.combo_person_nationality)
]
'''
use one session per thread, share nothing between threads. The
scoped_session should make this pretty straightforward.
'''
try:
# I know each session is thread-local, that means there is a separate session for each thread.
# So I decide to pass some instances/sessions to another thread,
# I think they will become "detached" from the session.
# According to documentation we should use different instance of engine for every subprocess,
# in our case we have one engine for all subprocesses, because connection pool between subprocesses
# cannot be shared (as i understand).
with session_scope(dburi=self._url, verbose=False) as session:
for category, combobox in list_tuple:
combobox.clear()
self.start_thread(combo_box=combobox,
session=session,
time_interval=10,
operation='select',
category=category)
except SQLAlchemyError as err:
# do stuff with this error
except OperationalError as OpErr:
# do stuff with this error
def start_thread(self,
combo_box=None,
session=None,
time_interval=None,
operation=None,
category=None):
task_thread = QThread(self)
task_thread.work = Worker(new_scope=session,
time_interval=time_interval,
category=category,
operation=operation,
combo_box=combo_box)
''' We need to store threads '''
#self._list_threads.append(task_thread)
task_thread.work.moveToThread(task_thread)
task_thread.work.finish_progress.connect(task_thread.quit)
task_thread.work.stop_loop.connect(self.message_out)
task_thread.work.populate_item_signal.connect(self.fill_combo_boxt)
self.finish.connect(task_thread.work.stop)
task_thread.started.connect(task_thread.work.init_object)
task_thread.finished.connect(task_thread.deleteLater)
''' This will emit 'started' and start thread's event loop '''
task_thread.start()
[...]
@contextmanager
def session_scope(dburi=None, echo_verbose=True):
"""
Provide a transactional scope around a series of operations.
Creates a context with an open SQLAlchemy session.
"""
engine = create_engine(dburi,
pool_size=10,
max_overflow=10,
pool_timeout=60,
echo=echo_verbose)
# create a session maker for factory
session_factory = sessionmaker(bind=engine)
# scoped_session create one connection per each thread
#Session = scoped_session(sessionmaker(bind=engine))#, twophase=True))
Scoped_session = scoped_session(session_factory)
# Now all calls to Session() will create a thread-local session
try:
yield Scoped_session()
Scoped_session.commit()
except:
Scoped_session.rollback()
raise
finally:
Scoped_session.close()
[...]