Ist ein Signal eine Art globaler Variable, wo a) alle damit verbundenen Slots bei einer Änderung benachrichtigt werden und b) man an jeder Stelle im Code darauf zugreifen kann?
Gruß
Atalanttore
Zugriff auf Inhalt eines USB-Sticks mittels USB Device ID
- sls
- User
- Beiträge: 480
- Registriert: Mittwoch 13. Mai 2015, 23:52
- Wohnort: Country country = new Zealand();
@Atalanttore: bei deiner GUI-Anwendung gibt es einen Main-Thread der ewig läuft, der empfängt Systemnachrichten (Signale) und kann dann wiederrum andere Threads benachrichtigen. Es kann nicht jeder Thread einfach so auf ein Signal zugreifen.
When we say computer, we mean the electronic computer.
@sls: das Konzept der Signale hat mit dem System erstmal wenig zu tun. Und was hat das mit Threads zu tun?
Oh. Moment. Meinst du Signale wie im signals Modul? Das ist hier nicht gemeint. Sondern signal/slots in Qt. Die Attlantore ja auch munter benutzt.
Oh. Moment. Meinst du Signale wie im signals Modul? Das ist hier nicht gemeint. Sondern signal/slots in Qt. Die Attlantore ja auch munter benutzt.
-
- User
- Beiträge: 407
- Registriert: Freitag 6. August 2010, 17:03
Ich habe nun im `LinuxDeviceMonitor` das Signal `devices_changed` erstellt.
Ausführen lässt sich der Code damit immer noch, aber beim Anstecken eines USB-Sticks erscheint nun folgende Fehlermeldung:
Ich wollte eigentlich nichts konvertieren, sondern nur ein Signal erstellen.
main.py:
Gruß
Atalanttore
Ausführen lässt sich der Code damit immer noch, aber beim Anstecken eines USB-Sticks erscheint nun folgende Fehlermeldung:
Code: Alles auswählen
Traceback (most recent call last):
File "/home/ata/source/usb-detect/main.py", line 61, in process_incoming
self.devices_changed.emit(self.devices)
TypeError: LinuxDeviceMonitor cannot be converted to PyQt5.QtCore.QObject in this context
main.py:
Code: Alles auswählen
from functools import partial
import os
import sys
from PyQt5.QtWidgets import QApplication, QMainWindow
from PyQt5.uic import loadUi
from PyQt5.QtCore import QSocketNotifier
from PyQt5.QtCore import QObject, pyqtSignal
class MainWindow(QMainWindow):
def __init__(self, parent=None):
super().__init__(parent)
loadUi("mainwindow.ui", self)
def update_path(self, path):
self.label_usb_mass_storage_url.setText(path)
class LinuxDeviceMonitor:
devices_changed = pyqtSignal()
def __init__(self, main_window):
import pyudev
self._context = pyudev.Context()
self._monitor = pyudev.Monitor.from_netlink(self._context)
self._monitor.start()
self._devices = set()
self._listeners = []
self.rescan()
self.main_window = main_window
def rescan(self):
self._process_devices(
self._context.list_devices(),
action="add"
)
def add_listener(self, listener):
self._listeners.append(listener)
def remove_listener(self, listener):
self._listeners.remove(listener)
def fileno(self):
return self._monitor.fileno()
@property
def devices(self):
import pyudev
return [pyudev.Devices.from_path(self._context, device).get("DEVNAME") for device in self._devices]
def process_incoming(self, *_args):
from pyudev._util import eintr_retry_call
read_device = partial(eintr_retry_call, self._monitor.poll, timeout=0)
self._process_devices(iter(read_device, None))
self.devices_changed.emit(self.devices)
self.main_window.update_path(str(self.devices))
def _process_devices(self, devices, action=None):
old = set(self._devices)
for device in devices:
action = device.action if action is None else action
if action in ("add", "change") and self._is_usb_mass_storage_device(device):
self._devices.add(device.sys_path)
elif "remove" == action and device.sys_path in self._devices:
self._devices.remove(device.sys_path)
if self._devices != old:
devices = self.devices
for listener in self._listeners:
listener(devices)
def _is_usb_mass_storage_device(self, device):
def dev_path(device, name):
return "{}/{}".format(device.sys_path, name)
def removable(device):
removable_path = dev_path(device, "removable")
try:
return os.path.exists(removable_path) and int(open(removable_path).read())
except ValueError:
return False
def has_size(device):
size_path = dev_path(device, "size")
try:
return os.path.exists(size_path) and int(open(size_path).read())
except ValueError:
return False
def has_usb(device):
return device.get("ID_BUS") == "usb"
def has_no_disc(device):
return device.get("ID_CDROM") is None
return removable(device) and has_size(device) and has_usb(device) and has_no_disc(device)
def main():
app = QApplication(sys.argv)
main_window = MainWindow()
main_window.show()
linux_device_monitor = LinuxDeviceMonitor(main_window)
notifier = QSocketNotifier(linux_device_monitor.fileno(), QSocketNotifier.Read)
notifier.activated.connect(linux_device_monitor.process_incoming)
sys.exit(app.exec_())
if __name__ == '__main__':
main()
Atalanttore
-
- User
- Beiträge: 407
- Registriert: Freitag 6. August 2010, 17:03
@__deets__: Deine Vorschläge wollte ich umsetzen, aber es endete mit folgenden Fehlermeldungen:
main.py:
Gruß
Atalanttore
Code: Alles auswählen
Traceback (most recent call last):
File "/home/ata/source/usb-detect/main.py", line 130, in <module>
main()
File "/home/ata/source/usb-detect/main.py", line 122, in main
linux_device_monitor = LinuxDeviceMonitor(main_window)
File "/home/ata/source/usb-detect/main.py", line 43, in __init__
self.main_window.update_devices.connect(MainWindow.update_devices_slot)
AttributeError: 'MainWindow' object has no attribute 'update_devices'
main.py:
Code: Alles auswählen
from functools import partial
import os
import sys
from PyQt5.QtWidgets import QApplication, QMainWindow
from PyQt5.uic import loadUi
from PyQt5.QtCore import QSocketNotifier
from PyQt5.QtCore import QObject, pyqtSignal, pyqtSlot
class Signals(QObject):
update_devices = pyqtSignal(str)
def __init__(self):
super().__init__()
class MainWindow(QMainWindow):
def __init__(self, parent=None):
super().__init__(parent)
loadUi("mainwindow.ui", self)
@pyqtSlot()
def update_devices_slot(self, devices):
self.label_usb_mass_storage_url.setText(str(devices))
class LinuxDeviceMonitor:
def __init__(self, main_window):
import pyudev
self._context = pyudev.Context()
self._monitor = pyudev.Monitor.from_netlink(self._context)
self._monitor.start()
self._devices = set()
self._listeners = []
self.rescan()
self.main_window = main_window
self.main_window.update_devices.connect(MainWindow.update_devices_slot)
def rescan(self):
self._process_devices(
self._context.list_devices(),
action="add"
)
def add_listener(self, listener):
self._listeners.append(listener)
def remove_listener(self, listener):
self._listeners.remove(listener)
def fileno(self):
return self._monitor.fileno()
@property
def devices(self):
import pyudev
return [pyudev.Devices.from_path(self._context, device).get("DEVNAME") for device in self._devices]
def process_incoming(self, *_args):
from pyudev._util import eintr_retry_call
read_device = partial(eintr_retry_call, self._monitor.poll, timeout=0)
self._process_devices(iter(read_device, None))
#self.devices_changed.emit(self.devices)
self.update_devices.emit(self.devices)
#self.main_window.update_path(str(self.devices))
def _process_devices(self, devices, action=None):
old = set(self._devices)
for device in devices:
action = device.action if action is None else action
if action in ("add", "change") and self._is_usb_mass_storage_device(device):
self._devices.add(device.sys_path)
elif "remove" == action and device.sys_path in self._devices:
self._devices.remove(device.sys_path)
if self._devices != old:
devices = self.devices
for listener in self._listeners:
listener(devices)
def _is_usb_mass_storage_device(self, device):
def dev_path(device, name):
return "{}/{}".format(device.sys_path, name)
def removable(device):
removable_path = dev_path(device, "removable")
try:
return os.path.exists(removable_path) and int(open(removable_path).read())
except ValueError:
return False
def has_size(device):
size_path = dev_path(device, "size")
try:
return os.path.exists(size_path) and int(open(size_path).read())
except ValueError:
return False
def has_usb(device):
return device.get("ID_BUS") == "usb"
def has_no_disc(device):
return device.get("ID_CDROM") is None
return removable(device) and has_size(device) and has_usb(device) and has_no_disc(device)
def main():
app = QApplication(sys.argv)
signals = Signals()
main_window = MainWindow()
main_window.show()
linux_device_monitor = LinuxDeviceMonitor(main_window)
notifier = QSocketNotifier(linux_device_monitor.fileno(), QSocketNotifier.Read)
notifier.activated.connect(linux_device_monitor.process_incoming)
sys.exit(app.exec_())
if __name__ == '__main__':
main()
Atalanttore
Wie du von “LinuxDeviceMonitor muss von qobject ableiten” zu dem kommst was du da getan hast habe ich keine Ahnung. Und wie main_window die Instanz mit MainWindow der Klasse mit sich selbst irgendwas irgendwie connecten soll noch weniger.
Ich glaube du musst mal ein paar ganz einfache, grundlegende Beispiele der OO Programmierung in Python im allgemeinen und dann mit Qt und seinem signal slot Mechanismus im speziellen durcharbeiten. 10 Zeilen. Mehr nicht. Aber jede verstanden.
Ich glaube du musst mal ein paar ganz einfache, grundlegende Beispiele der OO Programmierung in Python im allgemeinen und dann mit Qt und seinem signal slot Mechanismus im speziellen durcharbeiten. 10 Zeilen. Mehr nicht. Aber jede verstanden.
-
- User
- Beiträge: 407
- Registriert: Freitag 6. August 2010, 17:03
@__deets__: Welche 10 Zeilen meinst du?
-
- User
- Beiträge: 407
- Registriert: Freitag 6. August 2010, 17:03
@__deets__: Ich habe einen Beispielcode in der Doku probiert, aber nicht mal dieser Code tut was er soll.
Es wird gar kein Text auf der Konsole ausgegeben.
Warum ist mein ursprünglicher Code "FALSCH", obwohl er funktioniert?
Gruß
Atalanttore
Code: Alles auswählen
from PyQt5.QtCore import QObject, pyqtSignal
class Foo(QObject):
# Define a new signal called 'trigger' that has no arguments.
trigger = pyqtSignal()
def connect_and_emit_trigger(self):
# Connect the trigger signal to a slot.
self.trigger.connect(self.handle_trigger)
# Emit the signal.
self.trigger.emit()
def handle_trigger(self):
# Show that the slot has been called.
print("trigger signal received")
Warum ist mein ursprünglicher Code "FALSCH", obwohl er funktioniert?
Gruß
Atalanttore
Er ist falsch, weil er Entwurfsprinzipien verletzt, wie zB zu enge Kopplung der Objekte aneinander, str(collection) um eine String-Repraesentation einer Collection zu bekommen statt vernuenftig damit zu arbeiten, etc. Das ist also kein absolutes falsch. Genauso wie es nicht absolut falsch ist, ein Auto einfach mit 5 Leuten zu schieben. Es ist nur nicht so benutzt, wie gedacht. Wenn du das aber so lieber machst, bitte gerne. Nimm den Code, der funktioniert, und sei zufrieden.
Der Code da oben tut nichts, also tut er auch nichts. Ich zumindest sehe keine Erstellung eines Foo-Objektes. Du? Ich sehe auch keinen Aufruf der connect_and_emit_trigger-Methode. Du? Wie soll also etwas passieren?
Der Code da oben tut nichts, also tut er auch nichts. Ich zumindest sehe keine Erstellung eines Foo-Objektes. Du? Ich sehe auch keinen Aufruf der connect_and_emit_trigger-Methode. Du? Wie soll also etwas passieren?
-
- User
- Beiträge: 407
- Registriert: Freitag 6. August 2010, 17:03
Der Code wurde beschrieben mit
Unter "demonstrate" verstehe ich als Praktiker, dass auch etwas passiert.
Mit folgender Erweiterung scheint es dann zu funktionieren:
Wenn ich den Beispielcode wie im ursprünglichen Programm in zwei Klassen aufteile, passiert leider nichts mehr:
Gruß
Atalanttore
Code: Alles auswählen
The following code demonstrates the definition, connection and emit of a signal without arguments:
Mit folgender Erweiterung scheint es dann zu funktionieren:
Code: Alles auswählen
foo = Foo()
foo.connect_and_emit_trigger()
Code: Alles auswählen
from PyQt5.QtCore import QObject, pyqtSignal
class Window:
def handle_trigger(self):
# Show that the slot has been called.
print("trigger signal received")
class Device(QObject):
# Define a new signal called 'trigger' that has no arguments.
trigger = pyqtSignal()
def connect_and_emit_trigger(self):
# Emit the signal.
self.trigger.emit()
device = Device()
window = Window()
device.connect_and_emit_trigger()
# Connect the trigger signal to a slot.
device.trigger.connect(window.handle_trigger)
Atalanttore
Zu wissen, dass ein Objekt erstellt werden muss, scheint halt vorausgesetzt zu werden. Und genau das meine ich mit meinen Anmerkungen: es muss dir klar sein, WARUM da ohne weiteres nichts passiert ist. Das scheint mir nicht der Fall, und somit funktionieren deine Versuche zur Erweiterung nicht.
Und was deine Modifikation angeht: in unserem Universum gibt es üblicherweise eine Eigenschaft, die eine Wirkung von einer Ursache abhängig macht. Nicht umgekehrt. In deinem Code gilt das auch. Welche Wirkung und welche Ursache hast du da wohl vertauscht?
Und was deine Modifikation angeht: in unserem Universum gibt es üblicherweise eine Eigenschaft, die eine Wirkung von einer Ursache abhängig macht. Nicht umgekehrt. In deinem Code gilt das auch. Welche Wirkung und welche Ursache hast du da wohl vertauscht?
-
- User
- Beiträge: 407
- Registriert: Freitag 6. August 2010, 17:03
`device.trigger.connect(window.handle_trigger)` und `device.connect_and_emit_trigger()` war vertauscht. In umgedrehter Reihenfolge funktioniert es jetzt.
Die Kopplung zwischen den Objekten `device` und `window` findet nun außerhalb der beiden Klassen statt. Ist das nach den Entwurfsprinzipien so korrekt?
Mittlerweile habe ich es endlich geschafft, eine Liste mit dem Signal zu übergeben und erst danach in einen String zu konvertieren.
Code:
So habe ich das mit Signal und Slots jetzt verstanden:
Ist das so korrekt (kommentiert)?
Gruß
Atalanttore
Die Kopplung zwischen den Objekten `device` und `window` findet nun außerhalb der beiden Klassen statt. Ist das nach den Entwurfsprinzipien so korrekt?
Mittlerweile habe ich es endlich geschafft, eine Liste mit dem Signal zu übergeben und erst danach in einen String zu konvertieren.
Code:
Code: Alles auswählen
from PyQt5.QtCore import QObject, pyqtSignal
class Window:
def handle_trigger(self, text):
# Show that the slot has been called.
text_as_string = ' '.join(text)
print(text_as_string)
class Device(QObject):
# Define a new signal called 'trigger' that has no arguments.
trigger = pyqtSignal([list])
def __init__(self):
super().__init__()
def connect_and_emit_trigger(self):
text = ["trigger", "signal", "received"]
# Emit the signal.
self.trigger.emit(text)
if __name__ == "__main__":
device = Device()
window = Window()
# Connect the trigger signal to a slot.
device.trigger.connect(window.handle_trigger)
device.connect_and_emit_trigger()
So habe ich das mit Signal und Slots jetzt verstanden:
Code: Alles auswählen
from PyQt5.QtCore import QObject, pyqtSignal
class Receiver:
def slot(self, local_variable):
print(local_variable)
class Sender(QObject): # muss von QObject abgeleitet werden
signal = pyqtSignal([str]) # Datentyp des mit dem Signal übergebenen Arguments angeben
def __init__(self):
super().__init__()
def emit_signal(self):
self.signal.emit("Hello World")
def main():
receiver = Receiver()
sender = Sender()
sender.signal.connect(receiver.slot)
sender.emit_signal()
if __name__ == "__main__":
main()
Gruß
Atalanttore
Schöner Fortschritt. Du kannst dir AFAIK beim Signal die Liste um die Typen sparen, du hast ja nur ein Argument. Dann sollte eigentlich list bzw str direkt funktionieren.
Und ja, das ist vom Entwurf her besser. Denn so müssen Receiver und Sender nichts voneinander wissen, und können trotzdem kollaborieren. Gleiches gilt auch für den DeviceMonitor, und das Window. Ersterer sollte darum auch NICHT das Window als Argument bekommen. Denn er könnte ja auch in einem Fensterlosen Kontext benutzt werden. Stattdessen kannst du die beiden auch von außen verschalten.
Und ja, das ist vom Entwurf her besser. Denn so müssen Receiver und Sender nichts voneinander wissen, und können trotzdem kollaborieren. Gleiches gilt auch für den DeviceMonitor, und das Window. Ersterer sollte darum auch NICHT das Window als Argument bekommen. Denn er könnte ja auch in einem Fensterlosen Kontext benutzt werden. Stattdessen kannst du die beiden auch von außen verschalten.
-
- User
- Beiträge: 407
- Registriert: Freitag 6. August 2010, 17:03
Danke. Der Fortschritt war eine ganz schwere Geburt.
Die Übergabe eines Typs beim Signal funktioniert bei mir auch ohne Liste. Also weg mit der Liste.
Nun würde ich ganz gerne das Problem mit dem Fehler `DeviceNotFoundAtPathError` lösen, der erscheint, wenn man einen USB-Stick, ohne ihn vorher auszuhängen, einfach aus der USB-Buchse zieht. Dafür muss ich aber den Code erst mal komplett verstehen. Manche Codestellen verstehe ich aber nach wie vor nicht. Zum Code habe ich noch mehrere Fragen:
- Wozu sind die Methoden zum Hinzufügen und Entfernen eines `listener` in der Klasse `LinuxDeviceMonitor` gut?
- Wofür braucht man überhaupt einen `listener`?
- Was wird mit `from pyudev._util import eintr_retry_call` in der Methode `process_incoming()` importiert?
- Was bewirkt die Anweisung `partial(eintr_retry_call, self._monitor.poll, timeout=0)` in der Methode `process_incoming()`?
- Was hat es mit dem Vergleich `"remove" == action` in der Methode `_process_devices()` auf sich?
- Welchen Zweck erfüllt `int(open(removable_path).read())` in der lokalen Funktion `removable` der Methode `_is_usb_mass_storage_device`?
- Welchen Zweck erfüllt `int(open(size_path).read())` in der lokalen Funktion `has_size` der Methode `_is_usb_mass_storage_device`?
Code: Alles auswählen
from functools import partial
import os
import sys
from PyQt5.QtWidgets import QApplication, QMainWindow
from PyQt5.uic import loadUi
from PyQt5.QtCore import QSocketNotifier
from PyQt5.QtCore import QObject, pyqtSignal
class MainWindow(QMainWindow):
def __init__(self, parent=None):
super().__init__(parent)
loadUi("mainwindow.ui", self)
def update_path(self, name):
self.label_usb_mass_storage_url.setText(str(name))
class LinuxDeviceMonitor(QObject):
devices_changed = pyqtSignal(list)
def __init__(self):
import pyudev
super().__init__()
self._context = pyudev.Context() # Für pyudev notwendiges Context-Objekt erstellen
self._monitor = pyudev.Monitor.from_netlink(self._context) # Mit Kernel daemon verbinden
self._monitor.start() # Verbindung aufbauen
self._devices = set()
self._listeners = []
# Iterator (genannt Enumerator) mit allen udev-Geräten und Keyword Argument "add" an Methode "self._process_devices()" übergeben
self._process_devices(self._context.list_devices(), action="add")
def add_listener(self, listener):
self._listeners.append(listener)
def remove_listener(self, listener):
self._listeners.remove(listener)
def fileno(self):
return self._monitor.fileno() # Dateideskriptor zurückgeben
@property
def device_names(self):
import pyudev
# Pfade zu Geräten in self._devices zurückgeben
# !!! DeviceNotFoundAtPathError entsteht hier !!!
return [pyudev.Devices.from_path(self._context, device).get("DEVNAME") for device in self._devices]
def process_incoming(self):
from pyudev._util import eintr_retry_call # ?
read_device = partial(eintr_retry_call, self._monitor.poll, timeout=0) # ?
self._process_devices(iter(read_device, None)) # Iterator mit Sentinel "None" an Methode "self._process_devices()" übergeben
self.devices_changed.emit(self.device_names) # Signal mit Namen der Geräte emitieren
def _process_devices(self, devices, action=None):
old = set(self._devices)
for device in devices:
action = device.action if action is None else action
if action in ("add", "change") and self._is_usb_mass_storage_device(device):
self._devices.add(device.sys_path) # Einhängepunkt einzelner USB-Geräte zu Liste "self._devices" hinzufügen
elif "remove" == action and device.sys_path in self._devices: # Warum '"remove" == action'?
self._devices.remove(device.sys_path)
if self.device_names != old:
devices = self._devices
for listener in self._listeners:
listener(devices)
def _is_usb_mass_storage_device(self, device):
def device_path(device, name):
return "{}/{}".format(device.sys_path, name) # Zu Gerätenamen passenden Pfad zurückgeben
def removable(device):
removable_path = device_path(device, "removable")
try:
return os.path.exists(removable_path) and int(open(removable_path).read()) # Warum 'int(open(removable_path).read())'?
except ValueError:
return False
def has_size(device):
size_path = device_path(device, "size")
try:
return os.path.exists(size_path) and int(open(size_path).read()) # Warum 'int(open(size_path).read())'?
except ValueError:
return False
def has_usb(device):
return device.get("ID_BUS") == "usb"
def has_no_disc(device):
return device.get("ID_CDROM") is None
return removable(device) and has_size(device) and has_usb(device) and has_no_disc(device)
def main():
app = QApplication(sys.argv)
main_window = MainWindow()
main_window.show()
linux_device_monitor = LinuxDeviceMonitor()
notifier = QSocketNotifier(linux_device_monitor.fileno(), QSocketNotifier.Read)
notifier.activated.connect(linux_device_monitor.process_incoming)
linux_device_monitor.devices_changed.connect(main_window.update_path)
sys.exit(app.exec_())
if __name__ == '__main__':
main()
Atalanttore