Multithreading in QT richtig anstellen

Python und das Qt-Toolkit, erstellen von GUIs mittels des Qt-Designers.
Antworten
Sansch
User
Beiträge: 10
Registriert: Freitag 19. Januar 2018, 15:41

Freitag 19. Januar 2018, 16:20

Hallo liebe Community,

Ich bin zur Zeit etwas skeptisch gegenüber meiner Multithreading Klassen, es fühlt sich im allgemeinen sehr Ressourcenlastig an und das "spawnen" von neuen Threads wird von Zeit zur Zeit auch immer langsamer. Vorgabe ist es Threads zu starten welche eine Liste von URL's abfragen (können Tausende sein) und den Status an die GUI übergeben, dieses sollte pausiert und auch gestoppt werden können. Die Summe an gleichzeitig laufenden Threads sollte einstellbar sein. Ich habe dazu eine vereinfachte GUI mit den notwendigen Thread Klassen erstellt.

Das ganze läuft unter Qt4 mit Python 2.
Ich bin für jeden Rat und Lösungsansatz dankbar!

Es tut mir im Voraus leid, dass ich so viel Code poste. Ich wüsste allerdings nicht, wie ich das ganze vereinfachen sollte so dass man dies auch testen kann.

GUI

Code: Alles auswählen

import sys
import requests

from PyQt4 import QtCore, QtGui
from random import choice

try:
    _fromUtf8 = QtCore.QString.fromUtf8
except AttributeError:
    def _fromUtf8(s):
        return s

try:
    _encoding = QtGui.QApplication.UnicodeUTF8

    def _translate(context, text, disambig):
        return QtGui.QApplication.translate(context, text, disambig, _encoding)
except AttributeError:
    def _translate(context, text, disambig):
        return QtGui.QApplication.translate(context, text, disambig)


class Ui_MainWindow(object):
    def __init__(self):
        self.thread_manager = None
        self.url_list = None

    def setupUi(self, MainWindow):
        MainWindow.setObjectName(_fromUtf8("MainWindow"))
        MainWindow.resize(410, 301)
        self.centralwidget = QtGui.QWidget(MainWindow)
        self.centralwidget.setObjectName(_fromUtf8("centralwidget"))
        self.result_list = QtGui.QListWidget(self.centralwidget)
        self.result_list.setGeometry(QtCore.QRect(10, 10, 391, 221))
        self.result_list.setObjectName(_fromUtf8("result_list"))
        self.widget = QtGui.QWidget(self.centralwidget)
        self.widget.setGeometry(QtCore.QRect(10, 240, 391, 56))
        self.widget.setObjectName(_fromUtf8("widget"))
        self.verticalLayout = QtGui.QVBoxLayout(self.widget)
        self.verticalLayout.setObjectName(_fromUtf8("verticalLayout"))
        self.horizontalLayout = QtGui.QHBoxLayout()
        self.horizontalLayout.setObjectName(_fromUtf8("horizontalLayout"))
        self.start_button = QtGui.QPushButton(self.widget)
        self.start_button.setObjectName(_fromUtf8("start_button"))
        self.horizontalLayout.addWidget(self.start_button)
        self.stop_button = QtGui.QPushButton(self.widget)
        self.stop_button.setObjectName(_fromUtf8("stop_button"))
        self.horizontalLayout.addWidget(self.stop_button)
        self.verticalLayout.addLayout(self.horizontalLayout)
        self.pause_button = QtGui.QPushButton(self.widget)
        self.pause_button.setObjectName(_fromUtf8("pause_button"))
        self.verticalLayout.addWidget(self.pause_button)
        MainWindow.setCentralWidget(self.centralwidget)

        self.retranslateUi(MainWindow)
        self._connect_widgets()
        QtCore.QMetaObject.connectSlotsByName(MainWindow)

    def retranslateUi(self, MainWindow):
        MainWindow.setWindowTitle(_translate("MainWindow", "Multithreading Tests", None))
        self.start_button.setText(_translate("MainWindow", "Start", None))
        self.stop_button.setText(_translate("MainWindow", "Stop", None))
        self.pause_button.setText(_translate("MainWindow", "Pause", None))

    def _connect_widgets(self):
        self.start_button.clicked.connect(lambda: self._thread_handler(0))
        self.pause_button.clicked.connect(lambda: self._thread_handler(1))
        self.stop_button.clicked.connect(lambda: self._thread_handler(2))
        # Liste mit vielen, vielen, vielen Url's
        self.url_list = open('urlList.txt', 'rb').read().splitlines()

    def _thread_handler(self, cmd):
        if cmd == 0:
            self.thread_manager = ThreadManager(self.url_list)
            self.thread_manager.connect(self.thread_manager, QtCore.SIGNAL('WORKER_DONE'), self._add_result)
            self.thread_manager.start()
        elif cmd == 1:
            self.thread_manager.pause()
        elif cmd == 2:
            self.thread_manager.stop()

    def _add_result(self, status):
        self.result_list.addItem(status)


def main():
    app = QtGui.QApplication(sys.argv)
    MainWindow = QtGui.QMainWindow()
    ui = Ui_MainWindow()
    ui.setupUi(MainWindow)
    MainWindow.show()
    sys.exit(app.exec_())


if __name__ == "__main__":
    main()

Lösung mit QThread's - Threads lassen sich abrupt terminieren. Allerdings ist die Erstellung einer Threadliste wahrscheinlich nicht die eleganteste Lösung.

Code: Alles auswählen

class ThreadManager(QtCore.QThread):
    def __init__(self, url_list):
        QtCore.QThread.__init__(self)

        self.url_list = url_list
        self.thread_pool = []
        self.max_threads = 5

        self.paused = False
        self.stopped = False

    def run(self):
        while True:
            if not self.paused:
                if len(self.thread_pool) < self.max_threads:
                    thread = WorkerThread(choice(self.url_list))
                    thread.connect(thread, QtCore.SIGNAL('DONE'), self.return_result)
                    thread.start()
                    self.thread_pool.append(thread)

            if self.stopped:
                break

    def return_result(self, result):
        self.emit(QtCore.SIGNAL('WORKER_DONE'), result[0])
        self.thread_pool.remove(result[1])

    def stop(self):
        self.stopped = False if self.stopped else True
        # Terminate running QThreads
        [t.terminate() for t in self.thread_pool]

    def pause(self):
        self.paused = False if self.stopped else True


class WorkerThread(QtCore.QThread):
    def __init__(self, url):
        QtCore.QThread.__init__(self)
        self.url = url

    def run(self):
        try:
            a = requests.get(self.url).status_code
            self.emit(QtCore.SIGNAL('DONE'), [str(a), self])
        except:
            # Richtiger Weg um ein Thread mit den gleichen Konditionen neu zu starten?
            self.run()
Lösung mit QRunnable's - Threads die einmal im QThreadPool sind können nicht terminiert werden. Somit muss das hinzufügen neuer Threads in den ThreadPool limitiert werden um ein möglichst schnelles pausieren und stoppen zu ermöglichen

Code: Alles auswählen

class ThreadManager(QtCore.QThread):
    def __init__(self, url_list):
        QtCore.QThread.__init__(self)

        self.url_list = url_list
        self.thread_pool = QtCore.QThreadPool()
        self.thread_pool.setMaxThreadCount(5)
        self.queue = 0

        self.paused = False
        self.stopped = False

    def run(self):
        while True:
            if not self.paused:
                if self.queue <= 30:  # Limitieren des ThreadPools
                    thread = WorkerThread(choice(self.url_list))
                    thread.signals.result.connect(self.return_result)
                    self.queue += 1
                    self.thread_pool.start(thread)

            if self.stopped:
                break

        self.thread_pool.waitForDone()

    def return_result(self, result):
        self.queue -= 1
        self.emit(QtCore.SIGNAL('WORKER_DONE'), result)

    def stop(self):
        self.stopped = False if self.stopped else True

    def pause(self):
        self.paused = False if self.stopped else True


class WorkerThread(QtCore.QRunnable):
    def __init__(self, url):
        QtCore.QRunnable.__init__(self)
        self.url = url
        self.signals = WorkerSignals()

    def run(self):
        try:
            a = requests.get(self.url).status_code
            self.signals.result.emit(str(a))
        except:
            # Richtiger Weg um ein Thread mit den gleichen Konditionen neu zu starten?
            self.run()


class WorkerSignals(QtCore.QObject):
    finished = QtCore.pyqtSignal()
    error = QtCore.pyqtSignal(tuple)
    result = QtCore.pyqtSignal(object)
    progress = QtCore.pyqtSignal(int)
__deets__
User
Beiträge: 3299
Registriert: Mittwoch 14. Oktober 2015, 14:29

Freitag 19. Januar 2018, 16:51

Der Selbst-Aufruf von run in WorkeThread ist definitiv ein Bug, weil du damit eine Rekursion ausloest. So etwas waere normalerweise eine while-Schleife mit Abbruchbedingung

Generell ist dein Ansatz ziemlich knoedelig. Und wenn das Ziel der vielen Threads mehr Durchsatz ist, dann auch nicht vor Erfolg gekroent, da Python nicht wirklich multi-threaded ist. Stichwort: global interpreter lock, GIL.

Und normalerweise arbeitet man so, das man eine Reihe von Threads spawnt, die sich eine *Datenstruktur* teilen, ueblicherweise eine Queu. Und da steckt man Arbeitsauftraege rein, und die Ergebnisse in eine zweite, ebenfalls geteilte Datenstruktur. Auf die Art und weise holen sich die Threads ihr Arbeitspaket, und liefern es wieder ab. Statt fuer jede URL einen Thread zu starten (so zumindest sieht das aus).

Generell wuerde ich das anders machen:

- benutz Pythons eigene Threading oder Multiprocessing Mechanismen
- wenn der Durchsatz wirklich maximal sein soll, nutze asyncio
- das der ThreadManager selbst ein Thread ist, ist unnoetig.
- wechsel auf Python3 und vor allem Qt5, Qt4 ist schon jenseits seiner Lebenszeit.
Sansch
User
Beiträge: 10
Registriert: Freitag 19. Januar 2018, 15:41

Freitag 19. Januar 2018, 18:25

Super, vielen dank das du dir die Zeit genommen hast dir den Code anzuschauen.
Ich dachte mir schon dass ich das ganze falsch angehe. Ich spawne in der Tat für jede URL ein neuen Thread.

- benutz Pythons eigene Threading oder Multiprocessing Mechanismen
Spricht etwas prinzipiell gegen QThreads bzw. QRunnables? Oder sprichst du da aus erfahrung?
- wenn der Durchsatz wirklich maximal sein soll, nutze asyncio
Asyncio ist mir bis jetzt noch neu. Werde mich einlesen
- das der ThreadManager selbst ein Thread ist, ist unnoetig.
Kann ich das ganze denn von der GUI Klasse steuern? Friert mir da die GUI nicht ein?
- wechsel auf Python3 und vor allem Qt5, Qt4 ist schon jenseits seiner Lebenszeit.
Tatsache? Habe etwas Angst vor den Wechsel da ich doch größere Veränderungen erwarte.

Da ich auch mit der GIL nicht wirklich vertraut bin. Lockt diese denn nicht nur C code? Purer python code sollte doch threadbar sein oder? Ich meine immerhin sind die Threads ja deutlich schneller als wenn ich nach und nach die URL's im Hauptscript abfrage.

Ich werde mich definitiv mal einlesen und demnächst hier nochmal ein Code posten.
__deets__
User
Beiträge: 3299
Registriert: Mittwoch 14. Oktober 2015, 14:29

Freitag 19. Januar 2018, 18:37

Dein denken ist genau andersherum als die Realitaet - C-Code ist *nicht* gelockt, Python-Code ist immer nur serialisiert ausfuehrbar.

Die Unterschiede zwischen Qt4 und Qt5 sind jetzt nicht so gewaltig, aber Qt4 wird halt nicht mehr supported. Fuer ein in der Entwicklung befindliches Programm darum die falsche Wahl.

Und das schedulen neuer Threads ist nicht schwergewichtig, darum braucht der Thread-Manager selbst auch nicht ein Thread sein.

Die bordeigenen Mittel zu nutzen hat den Vorteil, das du das ganze auch ohne weiteres standalone testen kannst, und hoehere Portabilitaet hast. Und wenn du *wirklich* nebenlaeufig sein willst, ist multiprocessing deutlich einfacher als drop-in fuer threading als etwas, das auf Qt beruht. Da hast du dann alles von vorne zu machen.
Sansch
User
Beiträge: 10
Registriert: Freitag 19. Januar 2018, 15:41

Freitag 19. Januar 2018, 23:17

Nochmal vielen Dank. Habe nun dank deiner tips und einigen Tutorials aus dem Netz folgendes gebastelt. Fühlt sich viel leichter und besser im handling an.

Code: Alles auswählen

import sys
import asyncio

from aiohttp import ClientSession
from PyQt5 import QtCore, QtGui, QtWidgets
from random import choice


class MainUI(object):
    def setupUi(self, main_ui):
        main_ui.setObjectName("main_ui")
        main_ui.resize(410, 301)
        self.centralwidget = QtWidgets.QWidget(main_ui)
        self.centralwidget.setObjectName("centralwidget")
        self.result_list = QtWidgets.QListWidget(self.centralwidget)
        self.result_list.setGeometry(QtCore.QRect(10, 10, 391, 221))
        self.result_list.setObjectName("result_list")
        self.widget = QtWidgets.QWidget(self.centralwidget)
        self.widget.setGeometry(QtCore.QRect(10, 240, 391, 56))
        self.widget.setObjectName("widget")
        self.verticalLayout = QtWidgets.QVBoxLayout(self.widget)
        self.verticalLayout.setContentsMargins(0, 0, 0, 0)
        self.verticalLayout.setObjectName("verticalLayout")
        self.horizontalLayout = QtWidgets.QHBoxLayout()
        self.horizontalLayout.setObjectName("horizontalLayout")
        self.start_button = QtWidgets.QPushButton(self.widget)
        self.start_button.setObjectName("start_button")
        self.horizontalLayout.addWidget(self.start_button)
        self.stop_button = QtWidgets.QPushButton(self.widget)
        self.stop_button.setObjectName("stop_button")
        self.horizontalLayout.addWidget(self.stop_button)
        self.verticalLayout.addLayout(self.horizontalLayout)
        self.pause_button = QtWidgets.QPushButton(self.widget)
        self.pause_button.setObjectName("pause_button")
        self.verticalLayout.addWidget(self.pause_button)
        main_ui.setCentralWidget(self.centralwidget)

        self.retranslateUi(main_ui)
        
    def retranslateUi(self, main_ui):
        _translate = QtCore.QCoreApplication.translate
        main_ui.setWindowTitle(_translate("main_ui", "Multithreading Tests"))
        self.start_button.setText(_translate("main_ui", "Start"))
        self.stop_button.setText(_translate("main_ui", "Stop"))
        self.pause_button.setText(_translate("main_ui", "Pause"))
        self.bind_widgets()

    def bind_widgets(self):
        self.start_button.clicked.connect(lambda: self.thread_handler(0))
        self.pause_button.clicked.connect(lambda: self.thread_handler(1))
        self.stop_button.clicked.connect(lambda: self.thread_handler(2))

    def thread_handler(self, cmd):
        if cmd == 0:
            self.thr_manager = ThreadManager()
            self.thr_manager.add_status.connect(self.add_status)
            self.thr_manager.start()
            self.start_button.setEnabled(False)
        elif cmd == 1:
            self.thr_manager.pause()
        elif cmd == 2:
            self.thr_manager.stop()

    def add_status(self, s):
        self.result_list.addItem(s)


class ThreadManager(QtCore.QThread):
    add_status = QtCore.pyqtSignal(str)

    def __init__(self):
        QtCore.QThread.__init__(self)
        self.urls = open('urlList.txt').read().splitlines()
        self.tasks = []
        self.paused = False
        self.stopped = False
        self.loop = asyncio.new_event_loop()

        self.max_threads = 5

    async def fetch(self, url, sem):
        async with ClientSession() as session:
            async with sem:
                if not self.stopped:
                    while self.paused:
                        asyncio.sleep(0.5)
                        pass

                    try:
                        async with session.get(url) as c:
                            self.add_status.emit(str(c.status))
                    except Exception as e:
                        print(e)
                        # Diesmal richtig?
                        self.append_task(url, sem)

    async def thread_runner(self, r):
        url = choice(self.urls)
        sem = asyncio.BoundedSemaphore(self.max_threads)

        [self.append_task(url, sem) for _ in range(r)]
        await asyncio.gather(*self.tasks)
        # Fertig.

    def append_task(self, url, sem):
        task = asyncio.ensure_future(self.fetch(url, sem))
        self.tasks.append(task)

    def run(self):
        number = 10000
        asyncio.set_event_loop(self.loop)
        future = asyncio.ensure_future(self.thread_runner(number))
        self.loop.run_until_complete(future)

    def pause(self):
        self.paused = False if self.paused else True

    def stop(self):
        self.stopped = True


def main():
    app = QtWidgets.QApplication(sys.argv)
    main_ui = QtWidgets.QMainWindow()
    ui = MainUI()
    ui.setupUi(main_ui)
    main_ui.show()
    sys.exit(app.exec_())


if __name__ == "__main__":
    main()
Sansch
User
Beiträge: 10
Registriert: Freitag 19. Januar 2018, 15:41

Samstag 20. Januar 2018, 18:01

Nochmal einen Nachtrag zu dem obigen code:
Das erstellen einer aiohttp session für jeden asynchronen Aufruf ist eine schlechte Idee. Das ist sehr.. sehr ... seeehr speicher lastig.

Lösung:
Eine Session für alle requests benutzen. Funktioniert bis jetzt ganz gut.
Sansch
User
Beiträge: 10
Registriert: Freitag 19. Januar 2018, 15:41

Samstag 20. Januar 2018, 19:28

Erneut vor einem neuen Problem zum oben Beispiel.
Die Taskliste die generiert wird, wird unglaublich groß und futtert GB an Speicher.

Bin ich denn gezwungen eine Taskliste zu generieren? Oder gibt es da andere Möglichkeiten?
Sansch
User
Beiträge: 10
Registriert: Freitag 19. Januar 2018, 15:41

Sonntag 21. Januar 2018, 02:41

Und noch mal ich selbst.

Queue war die Lösung. Genau wie deets bereits gesagt hat. Scheint so als hätte ich jetzt den Dreh raus.
Antworten