Im Wesentlichen: der Code funktioniert, wenn eine .hdf5-Datei zum Ordner hinzugefügt wird, aber wenn andere Dateien hinzugefügt werden, werden sie vom Code nicht erkannt.
Hier ist der Code, der den Watchdog-Code auslöst (dieser befindet sich in einer anderen .py-Datei):
Code: Alles auswählen
try:
print('try to connect to event service ...')
self.listener = watchdog_search.get_qt_listener()
self.listener.listener.finishedRun.connect(self.on_finished_run)
except Exception as e:
print(e)
Code: Alles auswählen
import time
import traceback
import os
import h5py
import queue
from typing import Union
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler, DirCreatedEvent, FileCreatedEvent
from .tools.qt import QtCore
class NewFileHandler(FileSystemEventHandler):
"""h5 file creation handler for Watchdog"""
def __init__(self):
super().__init__()
self.file_queue = queue.Queue()
#self.la = [] #here is the list code I was referring to. I wanted to try to avoid using Queue but the attempt didnt work since an empty list at the beginning gives an error
# callback for File/Directory created event, called by Observer.
def on_created(self, event: Union[DirCreatedEvent, FileCreatedEvent]):
if event.src_path[-4:] == "hdf5":
# run callback with path string
self.file_queue.put(event.src_path)
#self.la.append(event.src_path)
class ObserverWrapper(QtCore.QObject): # New LabBusSubscriber
"""Encapsulated Observer boilerplate"""
if hasattr(QtCore, "QString"):
finishedRun = QtCore.Signal(QtCore.QString, QtCore.QString)
else:
finishedRun = QtCore.Signal(str, str)
can_listen = True
def __init__(self, path: str):#, recursive=True):
super().__init__()
self.path = path
self.observer = Observer()
self.handler = NewFileHandler()
self.observer.schedule(self.handler, path=path, recursive=True)
self.start()
def start(self):
"""
Starts observing for filesystem events. Runs self.routine() every 1 second.
:param blocking: If true, blocks main thread until keyboard interrupt.
"""
self.observer.start()
def stop(self):
"""
Stops the observer. When running self.start(blocking=True) then you don't need to call this.
"""
self.observer.stop()
self.observer.join()
def event(self, event):
"""Here we define what to do at which signal.
In general we will transmit a QT signal to which the other components connect.
"""
print("EVENT", event)
if isinstance(event, QtCore.QEvent):
# make sure the QObject code reacts on QEvents
return QtCore.QObject.event(self, event)
self.finishedRun.emit(event[0], event[1])
def wait_for_file(self):
"""
Wait and Process newly created files
"""
max_retry_count = 3500 # for test purposes now but want to set an upper bound on verifying a file is finished.
# will try h5 file for a max of 35 seconds (upper bound) to see if the file is finished.
# Files are usually finished within 20-30 seconds
#
retry_interval_seconds = .01 # every hundreth it will try the file to see if it finished writing
#file_path = self.handler.la[-1] #this expects already at the beginning an entry but there is no file until one is populated into the folder
file_path = self.handler.file_queue.get(block=True)
file_name = os.path.basename(file_path)
# try to open the file
retry_count = 0
while True:
try:
file = h5py.File(file_path, "r")
file.close()
self.event([file_path, file_name])
break
except OSError:
if retry_count < max_retry_count:
retry_count += 1
print(f"h5 file <{file_path}> is locked, retrying {retry_count}/{max_retry_count}")
time.sleep(retry_interval_seconds)
else:
print(f"h5 file <{file_path}> reached max retry count, skipping")
except Exception as err:
print(f"Got unexpected Error <{type(err).__name__}> while opening <{file_path}> ")
traceback.print_exc()
class QtEventSubscriber(QtCore.QThread):
"""The listener thread"""
def __init__(self):
QtCore.QThread.__init__(self)
self.listener = ObserverWrapper("/home/xxx/Desktop/xxx/MPQ/test_image_analyzer_files/Test_Data/")
self.listener.moveToThread(self)
def run(self):
if self.listener.can_listen:
print("start listener thread")
self.listener.wait_for_file()
else:
print("can't listen: no listener thread")
pass
_qtlistener = None
def get_qt_listener():
global _qtlistener
if _qtlistener is not None:
return _qtlistener
_qtlistener = QtEventSubscriber()
_qtlistener.start()
print(_qtlistener.listener.thread(), _qtlistener)
return _qtlistener