Qt verwenden, um eine Klasse offen zu lassen und ein Signal zu senden

Python und das Qt-Toolkit, erstellen von GUIs mittels des Qt-Designers.
Antworten
pyhill00
User
Beiträge: 30
Registriert: Mittwoch 20. März 2019, 22:39

Ich versuche eine `.hdf5` Datei zu einer Methode in der Hauptpythondatei zu senden:

Code: Alles auswählen

    class DesignerMainWindow(QtGui.QMainWindow, Ui_MainWindow):
        """Customization for Qt Designer created window"""
    
        signal_output_log = QtCore.Signal("QString")
        sig_clear_log = QtCore.Signal()
    
        def __init__(self, parent=None):
            super(DesignerMainWindow, self).__init__(parent)
            self.setupUi(self)
            self.image_analyzer = ImageAnalyzer(self)
            self.listener = watchdog_search.ObserverWrapper("/home/Test_Data/")
            self.on_finished_run(self.listener.wait_for_file()) 
       
        def on_finished_run(self, tuple: ()):
            self.image_analyzer.load_image(str(tuple[0]), str(tuple[1]), from_remote=True)
und die `.hdf5` Datei kommt von `watchdog_search.py`:

Code: Alles auswählen

    import time
    import traceback
    import os

    import h5py
    import queue
    from typing import Union

    from watchdog.observers import Observer
    from watchdog.events import FileSystemEventHandler, DirCreatedEvent, FileCreatedEvent


    class NewFileHandler(FileSystemEventHandler):
        """h5 file creation handler for Watchdog"""

        def __init__(self):
            self.file_queue = queue.Queue()

        # callback for File/Directory created event, called by Observer.
        def on_created(self, event: Union[DirCreatedEvent, FileCreatedEvent]):
            if event.src_path[-4:] == "hdf5":
                # run callback with path string
                self.file_queue.put(event.src_path)


    class ObserverWrapper:
        """Encapsulated Observer boilerplate"""

        def __init__(self, path: str, recursive=True):
            self.path = path
            self.recursive = recursive

            self.observer = Observer()
            self.handler = NewFileHandler()

            self.observer.schedule(self.handler, path=path, recursive=recursive)

            self.start()

        def start(self):
            """
            Starts observing for filesystem events. Runs self.routine() every 1 second.

            :param blocking: If true, blocks main thread until keyboard interrupt.
            """

            self.observer.start()
        def stop(self):
            """
            Stops the observer. When running self.start(blocking=True) then you don't need to call this.
            """

            self.observer.stop()
            self.observer.join()

        def wait_for_file(self):
            """
            Wait and Process newly created files
            """

            max_retry_count = 3500 # for test purposes now but want to set an upper bound on verifying a file is finished.
            # will try h5 file for a max of 35 seconds (upper bound) to see if the file is finished.
            # Files are usually finished within 20-30 seconds
            #
            retry_interval_seconds = .01 # every hundreth it will try the file to see if it finished writing

            # wait for file to be added
            #print(self.handler.file_queue.get(block=True))
            file_path = self.handler.file_queue.get(block=True)
            file_name = os.path.basename(file_path)

            # try to open the file
            retry_count = 0
            while True:
                try:
                    file = h5py.File(file_path, "r")
                    file.close()
                    return file_path, file_name
                except OSError:
                    if retry_count < max_retry_count:
                        retry_count += 1
                        print(f"h5 file <{file_path}> is locked, retrying {retry_count}/{max_retry_count}")
                        time.sleep(retry_interval_seconds)
                    else:
                        print(f"h5 file <{file_path}> reached max retry count, skipping")

                except Exception as err:
                    print(f"Got unexpected Error <{type(err).__name__}> while opening <{file_path}> ")
                    traceback.print_exc()

Die Datei wird in `main.py` so aufgerufen:

Code: Alles auswählen

    self.listener = watchdog_search.ObserverWrapper("/path/to/folder/of/interest")
aber nur eine hdf5 Datei wird gesendet und danach hört es auf. Watchdog muss offen bleiben und muss neue hdf5 Dateien zu `main.py``immer wieder senden. Weißt jemand, wie das mit Qt geht? Es ist eher eine Frage von asynchronous programming and Qt. Am Ende muss ich `file_path` and `file_name` der`.hdf5` zu `on_finished_run()` oder vielleicht wird `on_finished_run()` als der subscriber des Signals aber ich bin mir nicht sicher wie ich es machen kann vor allem, weil es ein `try` gibt, das kein .emit/Signal gibt. Kann mir jemand helfen?
__deets__
User
Beiträge: 14493
Registriert: Mittwoch 14. Oktober 2015, 14:29

Indem man ein qt Signal schickt. Und dabei eine queued connection verwendet, weil du aus einem anderen thread arbeitest.
Benutzeravatar
__blackjack__
User
Beiträge: 13004
Registriert: Samstag 2. Juni 2018, 10:21
Wohnort: 127.0.0.1
Kontaktdaten:

@pyhill00: Es passiert ja genau das was Du mit dem Code sagst was passieren soll.

Asynchron Nachrichten austauschen geht bei Qt über Signal/Slot. Man müsste also ein `QObject` erstellen, das bei jeder neuen Datei ein Signal sendet. Und darauf kannst Du dann, wo auch immer Du das verbindest, drauf reagieren.

`wait_for_file()` hat übrigens eine komische API. Es werden entweder Pfad mit Dateiname und dann noch mal der Dateiname, der ja schon in Pfad mit Dateiname enthalten ist, zurückgegeben, oder *implizit* `None`. Letzteres sollte explizit sein und die Redundanz ist komisch. Oder vielleicht besser eine Ausnahme statt `None` gemeldet werden, mit passender Information *warum* es denn nicht geklappt hat. Die `print()`-Aufrufe dort wären auch eher etwas für `logging` oder etwas vergleichbares.

Und für die Interaktion mit Qt möchte man für so ein Problem vielleicht auch ein Signal zur Verfügung stellen, damit man das auch irgendwie an eine GUI melden kann.

`pathlib` wäre auch einen Blick wert. Ich würde Pfade so bald wie möglich in `Path`-Objekte verpacken.
“Most people find the concept of programming obvious, but the doing impossible.” — Alan J. Perlis
pyhill00
User
Beiträge: 30
Registriert: Mittwoch 20. März 2019, 22:39

@__blackjack__ danke für die Hilfe.
__blackjack__ hat geschrieben: Dienstag 5. Oktober 2021, 15:58 Asynchron Nachrichten austauschen geht bei Qt über Signal/Slot. Man müsste also ein `QObject` erstellen, das bei jeder neuen Datei ein Signal sendet. Und darauf kannst Du dann, wo auch immer Du das verbindest, drauf reagieren.
Empfiehlst du dann, dass ich Signal/Slot verwende? Ich habe es ausprobiert aber ich kriege es nicht hin im Laufe zu bringen:

Code: Alles auswählen

    class DesignerMainWindow(QtGui.QMainWindow, Ui_MainWindow):
        """Customization for Qt Designer created window"""
    
        signal_output_log = QtCore.Signal("QString")
        sig_clear_log = QtCore.Signal()
    
        def __init__(self, parent=None):
            super(DesignerMainWindow, self).__init__(parent)
            self.setupUi(self)
            self.image_analyzer = ImageAnalyzer(self)
            self.listener = watchdog_search.ObserverWrapper("/Test_Data/")
            self.handler.bridge.created.connect(self.on_finished_run)
        def on_finished_run(self, tuple: ()):
            self.image_analyzer.load_image(str(tuple[0]), str(tuple[1]), from_remote=True)

und von `watchdog_search.py`

Code: Alles auswählen

import time
import traceback
import os

import h5py
import queue
from typing import Union

from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, DirCreatedEvent, FileCreatedEvent

from .tools.qt import QtCore


class Bridge(QtCore.QObject):
    created = QtCore.Signal(FileCreatedEvent)

class NewFileHandler(FileSystemEventHandler):
    """h5 file creation handler for Watchdog"""

    def __init__(self):
        super().__init__()
        #self.file_queue = queue.Queue()
        self.bridge = Bridge()

    # callback for File/Directory created event, called by Observer.
    def on_created(self, event: Union[DirCreatedEvent, FileCreatedEvent]):
        if event.src_path[-4:] == "hdf5":
            # run callback with path string
            #self.file_queue.put(event.src_path)
            self.bridge.created.emit(event.src_path)


class ObserverWrapper:
    """Encapsulated Observer boilerplate"""

    def __init__(self, path: str):#, recursive=True):
        self.path = path
        #self.recursive = recursive

        self.observer = Observer()
        self.handler = NewFileHandler()

        self.observer.schedule(self.handler, path=path, recursive=True)

        self.start()


    def start(self):
        """
        Starts observing for filesystem events. Runs self.routine() every 1 second.

        :param blocking: If true, blocks main thread until keyboard interrupt.
        """

        self.observer.start()
    def stop(self):
        """
        Stops the observer. When running self.start(blocking=True) then you don't need to call this.
        """

        self.observer.stop()
        self.observer.join()

    def wait_for_file(self):
        """
        Wait and Process newly created files
        """

        max_retry_count = 3500 # for test purposes now but want to set an upper bound on verifying a file is finished.
        # will try h5 file for a max of 35 seconds (upper bound) to see if the file is finished.
        # Files are usually finished within 20-30 seconds
        #
        retry_interval_seconds = .01 # every hundreth it will try the file to see if it finished writing

        # wait for file to be added
        #print(self.handler.file_queue.get(block=True))

        #print(self.handler.on_created(self.path))
        #file_path =
        #print(self.handler.file_queue.get(block=True))
        #file_name = os.path.basename(file_path)

        # try to open the file
        retry_count = 0
        while True:
            try:
                file = h5py.File(file_path, "r")
                file.close()
                return file_path, file_name
            except OSError:
                if retry_count < max_retry_count:
                    retry_count += 1
                    print(f"h5 file <{file_path}> is locked, retrying {retry_count}/{max_retry_count}")
                    time.sleep(retry_interval_seconds)
                else:
                    print(f"h5 file <{file_path}> reached max retry count, skipping")

            except Exception as err:
                print(f"Got unexpected Error <{type(err).__name__}> while opening <{file_path}> ")
                traceback.print_exc()
__deets__
User
Beiträge: 14493
Registriert: Mittwoch 14. Oktober 2015, 14:29

Wie ich schon sagte, das MUSS eine queued connection sein. Sonst rufst du aus dem Arbeitsthread direkt in die GUI, mit potentiell katastrophalen Folgen.

Ich würde auch erstmal ein simples Beispiel bauen, das nur (am besten sogar mit einem QThread) etwas an die GUI meldet. Und dann im nächsten Schritt deinen FS-Watchdog einbauen in die bestehende Mechanik.
pyhill00
User
Beiträge: 30
Registriert: Mittwoch 20. März 2019, 22:39

Und wie mache ich eine queued connection?

Das Programm (die GUI) ist leider legacy code, der gerade mit python3 nicht funktioniert. Der watchdog code war teilweise eine Lösung, sodass es mit python3 kompatibel sein kann und ich teste Sachen, sodass es am Ende mit python3 funktioniert. Ich kann leider kein simples Beispiel bauen.
__deets__
User
Beiträge: 14493
Registriert: Mittwoch 14. Oktober 2015, 14:29

Indem du das mal recherchierst? Ich verstehe auch nicht, wieso du kein Beispiel bauen kannst - du kannst doch einfach eine neue Python Datei anlegen, und damit arbeiten. Das hat doch erstmal nichts mit dem bestehenden Projekt zu tun. Als beruflicher Entwickler gehe ich andauernd so vor - eine neue Technik wird erstmal fuer sich studiert, bevor man sie integriert.

Das habe ich zB hier mal gemacht: viewtopic.php?f=24&t=44250&start=15#p335559

Und den Hintergrund stellt https://doc.qt.io/qt-5/thread-basics.html dar.

Ja, ist viel Zeug. Aber Nebenlaeufigkeit ist eben auch schwierig.
pyhill00
User
Beiträge: 30
Registriert: Mittwoch 20. März 2019, 22:39

Hier ist was ich jetzt habe. Die GUI öffnet von der `main.py` und alles aber die `try` Schleife läuft nicht. Hier ist der aktuelle Code:
Meine Vemutung ist dass es vielleicht etwas mit Queue zu tun?

Code: Alles auswählen

import time
import traceback
import os

import h5py
import queue
from typing import Union

from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, DirCreatedEvent, FileCreatedEvent

from .tools.qt import QtCore

import sys
from PyQt5.QtWidgets import *
from PyQt5.QtCore import *


class NewFileHandler(FileSystemEventHandler):
    """h5 file creation handler for Watchdog"""

    def __init__(self):
        super().__init__()
        self.file_queue = queue.Queue()
        #self.bridge = Bridge()

    # callback for File/Directory created event, called by Observer.
    def on_created(self, event: Union[DirCreatedEvent, FileCreatedEvent]):
        if event.src_path[-4:] == "hdf5":
            # run callback with path string
            self.file_queue.put(event.src_path)
            # self.bridge.created.emit(event.src_path)

class ObserverWrapper(QtCore.QObject): # New LabBusSubscriber
    """Encapsulated Observer boilerplate"""
    if hasattr(QtCore, "QString"):
        finishedRun = QtCore.Signal(QtCore.QString, QtCore.QString)
    else:
        finishedRun = QtCore.Signal(str, str)

    def __init__(self, path: str):#, recursive=True):
        super().__init__()
        self.path = path

        self.observer = Observer()
        self.handler = NewFileHandler()

        self.observer.schedule(self.handler, path=path, recursive=True)

        self.start()


    def start(self):
        """
        Starts observing for filesystem events. Runs self.routine() every 1 second.

        :param blocking: If true, blocks main thread until keyboard interrupt.
        """

        self.observer.start()
    def stop(self):
        """
        Stops the observer. When running self.start(blocking=True) then you don't need to call this.
        """

        self.observer.stop()
        self.observer.join()

    def event(self, event):
        """Here we define what to do at which signal.
        In general we will transmit a QT signal to which the other components connect.
        """
        print("EVENT", event)
        if isinstance(event, QtCore.QEvent):
            # make sure the QObject code reacts on QEvents
            return QtCore.QObject.event(self, event)
        max_retry_count = 350 # for test purposes now but want to set an upper bound on verifying a file is finished.
        # will try h5 file for a max of 35 seconds (upper bound) to see if the file is finished.
        # Files are usually finished within 20-30 seconds
        #
        retry_interval_seconds = .1 # every hundreth it will try the file to see if it finished writing


        file_path = self.handler.file_queue.get(block=True)
        file_name = os.path.basename(file_path)

        # try to open the file
        retry_count = 0
        while True:
            try:
                file = h5py.File(file_path, "r")
                file.close()
                self.finishedRun.emit(file_path, file_name)
                break # return file_path, file_name
            except OSError:
                if retry_count < max_retry_count:
                    retry_count += 1
                    print(f"h5 file <{file_path}> is locked, retrying {retry_count}/{max_retry_count}")
                    time.sleep(retry_interval_seconds)
                else:
                    print(f"h5 file <{file_path}> reached max retry count, skipping")

            except Exception as err:
                print(f"Got unexpected Error <{type(err).__name__}> while opening <{file_path}> ")
                traceback.print_exc()

class QtEventSubscriber(QtCore.QThread):
    """The listener thread"""

    def __init__(self):
        QtCore.QThread.__init__(self)
        self.listener = ObserverWrapper()
        self.listener.moveToThread(self)

    def run(self):
        if self.listener.can_listen:
            print("start listener thread")
            self.listener.setThreading(0)
            self.listener.listen()
        else:
            print("can't listen: no listener thread")
            pass


_qtlistener = None


def get_qt_listener():
    global _qtlistener
    if _qtlistener is not None:
        return _qtlistener
    _qtlistener = QtEventSubscriber()
    _qtlistener.start()
    print(_qtlistener.listener.thread(), _qtlistener)
    return _qtlistener
__deets__
User
Beiträge: 14493
Registriert: Mittwoch 14. Oktober 2015, 14:29

Es gibt da keine Connection. Darum kann auch nichts ankommen. Leider ist watchdog nicht so besonders gut programmiert, so dass du im Qthread, von dem man auch nicht ableiten sollte, sondern den *worker* benutzt (dazu ist der da), auf Eintraege in die Queue warten musst mit queue.get(), um dann eben ein emit zu machen.

Ausserdem habe ich dir geraten, ein eigenstaendiges Beispiel zu schaffen, um diesen Mechanismus zu begreifen. Das du darauf bestehst, das ganze in den bestehenden Code einzuhaemmern, fuehrt lediglich dazu, dass hier niemand das nachvollziehen kann, und das Problem auch noch komplizierter wird.
Antworten