Zugriff auf Inhalt eines USB-Sticks mittels USB Device ID

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Atalanttore
User
Beiträge: 407
Registriert: Freitag 6. August 2010, 17:03

Ist ein Signal eine Art globaler Variable, wo a) alle damit verbundenen Slots bei einer Änderung benachrichtigt werden und b) man an jeder Stelle im Code darauf zugreifen kann?

Gruß
Atalanttore
__deets__
User
Beiträge: 14523
Registriert: Mittwoch 14. Oktober 2015, 14:29

Nein. Warum global? Du benutzt doch andauernd Signale und Slots in Qt. Sind die global? Hast du also nur ein clicked für alle buttons die in deinem Programm sind?
Benutzeravatar
sls
User
Beiträge: 480
Registriert: Mittwoch 13. Mai 2015, 23:52
Wohnort: Country country = new Zealand();

@Atalanttore: bei deiner GUI-Anwendung gibt es einen Main-Thread der ewig läuft, der empfängt Systemnachrichten (Signale) und kann dann wiederrum andere Threads benachrichtigen. Es kann nicht jeder Thread einfach so auf ein Signal zugreifen.
When we say computer, we mean the electronic computer.
__deets__
User
Beiträge: 14523
Registriert: Mittwoch 14. Oktober 2015, 14:29

@sls: das Konzept der Signale hat mit dem System erstmal wenig zu tun. Und was hat das mit Threads zu tun?

Oh. Moment. Meinst du Signale wie im signals Modul? Das ist hier nicht gemeint. Sondern signal/slots in Qt. Die Attlantore ja auch munter benutzt.
Benutzeravatar
sls
User
Beiträge: 480
Registriert: Mittwoch 13. Mai 2015, 23:52
Wohnort: Country country = new Zealand();

@__deets__: richtig, sorry, hätte mir erst deinen letzten Post auf Seite 3 durchlesen sollen. Leider falscher Kontext.
When we say computer, we mean the electronic computer.
Atalanttore
User
Beiträge: 407
Registriert: Freitag 6. August 2010, 17:03

Ich habe nun im `LinuxDeviceMonitor` das Signal `devices_changed` erstellt.

Ausführen lässt sich der Code damit immer noch, aber beim Anstecken eines USB-Sticks erscheint nun folgende Fehlermeldung:

Code: Alles auswählen

Traceback (most recent call last):
  File "/home/ata/source/usb-detect/main.py", line 61, in process_incoming
    self.devices_changed.emit(self.devices)
TypeError: LinuxDeviceMonitor cannot be converted to PyQt5.QtCore.QObject in this context
Ich wollte eigentlich nichts konvertieren, sondern nur ein Signal erstellen.


main.py:

Code: Alles auswählen

from functools import partial
import os
import sys
from PyQt5.QtWidgets import QApplication, QMainWindow
from PyQt5.uic import loadUi
from PyQt5.QtCore import QSocketNotifier
from PyQt5.QtCore import QObject, pyqtSignal


class MainWindow(QMainWindow):

    def __init__(self, parent=None):
        super().__init__(parent)
        loadUi("mainwindow.ui", self)

    def update_path(self, path):
        self.label_usb_mass_storage_url.setText(path)


class LinuxDeviceMonitor:
    devices_changed = pyqtSignal()

    def __init__(self, main_window):
        import pyudev
        self._context = pyudev.Context()

        self._monitor = pyudev.Monitor.from_netlink(self._context)
        self._monitor.start()

        self._devices = set()
        self._listeners = []
        self.rescan()

        self.main_window = main_window

    def rescan(self):
        self._process_devices(
            self._context.list_devices(),
            action="add"
        )

    def add_listener(self, listener):
        self._listeners.append(listener)

    def remove_listener(self, listener):
        self._listeners.remove(listener)

    def fileno(self):
        return self._monitor.fileno()

    @property
    def devices(self):
        import pyudev
        return [pyudev.Devices.from_path(self._context, device).get("DEVNAME") for device in self._devices]

    def process_incoming(self, *_args):
        from pyudev._util import eintr_retry_call
        read_device = partial(eintr_retry_call, self._monitor.poll, timeout=0)
        self._process_devices(iter(read_device, None))

        self.devices_changed.emit(self.devices)

        self.main_window.update_path(str(self.devices))

    def _process_devices(self, devices, action=None):
        old = set(self._devices)
        for device in devices:
            action = device.action if action is None else action
            if action in ("add", "change") and self._is_usb_mass_storage_device(device):
                self._devices.add(device.sys_path)
            elif "remove" == action and device.sys_path in self._devices:
                self._devices.remove(device.sys_path)
        if self._devices != old:
            devices = self.devices
            for listener in self._listeners:
                listener(devices)

    def _is_usb_mass_storage_device(self, device):
        def dev_path(device, name):
            return "{}/{}".format(device.sys_path, name)

        def removable(device):
            removable_path = dev_path(device, "removable")
            try:
                return os.path.exists(removable_path) and int(open(removable_path).read())
            except ValueError:
                return False

        def has_size(device):
            size_path = dev_path(device, "size")
            try:
                return os.path.exists(size_path) and int(open(size_path).read())
            except ValueError:
                return False

        def has_usb(device):
            return device.get("ID_BUS") == "usb"

        def has_no_disc(device):
            return device.get("ID_CDROM") is None

        return removable(device) and has_size(device) and has_usb(device) and has_no_disc(device)


def main():
    app = QApplication(sys.argv)
    main_window = MainWindow()
    main_window.show()
    linux_device_monitor = LinuxDeviceMonitor(main_window)

    notifier = QSocketNotifier(linux_device_monitor.fileno(), QSocketNotifier.Read)
    notifier.activated.connect(linux_device_monitor.process_incoming)

    sys.exit(app.exec_())

if __name__ == '__main__':
    main()
Gruß
Atalanttore
__deets__
User
Beiträge: 14523
Registriert: Mittwoch 14. Oktober 2015, 14:29

Du musst für Qts Mechanismus der Signale und slots von QObject ableiten. super().__init__() nicht vergessen.
__deets__
User
Beiträge: 14523
Registriert: Mittwoch 14. Oktober 2015, 14:29

Und ich glaube du musst auch einen Typ angeben in deinem Signal. Und nochmal: die Konvertierung zu einem String ist da FALSCH. Schreib einen extra Slot in deinem Fenster (oder ein lambda) das das tut.
Atalanttore
User
Beiträge: 407
Registriert: Freitag 6. August 2010, 17:03

@__deets__: Deine Vorschläge wollte ich umsetzen, aber es endete mit folgenden Fehlermeldungen:

Code: Alles auswählen

Traceback (most recent call last):
  File "/home/ata/source/usb-detect/main.py", line 130, in <module>
    main()
  File "/home/ata/source/usb-detect/main.py", line 122, in main
    linux_device_monitor = LinuxDeviceMonitor(main_window)
  File "/home/ata/source/usb-detect/main.py", line 43, in __init__
    self.main_window.update_devices.connect(MainWindow.update_devices_slot)
AttributeError: 'MainWindow' object has no attribute 'update_devices'

main.py:

Code: Alles auswählen

from functools import partial
import os
import sys
from PyQt5.QtWidgets import QApplication, QMainWindow
from PyQt5.uic import loadUi
from PyQt5.QtCore import QSocketNotifier
from PyQt5.QtCore import QObject, pyqtSignal, pyqtSlot


class Signals(QObject):
    update_devices = pyqtSignal(str)

    def __init__(self):
        super().__init__()


class MainWindow(QMainWindow):

    def __init__(self, parent=None):
        super().__init__(parent)
        loadUi("mainwindow.ui", self)

    @pyqtSlot()
    def update_devices_slot(self, devices):
        self.label_usb_mass_storage_url.setText(str(devices))


class LinuxDeviceMonitor:

    def __init__(self, main_window):
        import pyudev
        self._context = pyudev.Context()

        self._monitor = pyudev.Monitor.from_netlink(self._context)
        self._monitor.start()

        self._devices = set()
        self._listeners = []
        self.rescan()

        self.main_window = main_window

        self.main_window.update_devices.connect(MainWindow.update_devices_slot)

    def rescan(self):
        self._process_devices(
            self._context.list_devices(),
            action="add"
        )

    def add_listener(self, listener):
        self._listeners.append(listener)

    def remove_listener(self, listener):
        self._listeners.remove(listener)

    def fileno(self):
        return self._monitor.fileno()

    @property
    def devices(self):
        import pyudev
        return [pyudev.Devices.from_path(self._context, device).get("DEVNAME") for device in self._devices]

    def process_incoming(self, *_args):
        from pyudev._util import eintr_retry_call
        read_device = partial(eintr_retry_call, self._monitor.poll, timeout=0)
        self._process_devices(iter(read_device, None))

        #self.devices_changed.emit(self.devices)

        self.update_devices.emit(self.devices)

        #self.main_window.update_path(str(self.devices))

    def _process_devices(self, devices, action=None):
        old = set(self._devices)
        for device in devices:
            action = device.action if action is None else action
            if action in ("add", "change") and self._is_usb_mass_storage_device(device):
                self._devices.add(device.sys_path)
            elif "remove" == action and device.sys_path in self._devices:
                self._devices.remove(device.sys_path)
        if self._devices != old:
            devices = self.devices
            for listener in self._listeners:
                listener(devices)

    def _is_usb_mass_storage_device(self, device):
        def dev_path(device, name):
            return "{}/{}".format(device.sys_path, name)

        def removable(device):
            removable_path = dev_path(device, "removable")
            try:
                return os.path.exists(removable_path) and int(open(removable_path).read())
            except ValueError:
                return False

        def has_size(device):
            size_path = dev_path(device, "size")
            try:
                return os.path.exists(size_path) and int(open(size_path).read())
            except ValueError:
                return False

        def has_usb(device):
            return device.get("ID_BUS") == "usb"

        def has_no_disc(device):
            return device.get("ID_CDROM") is None

        return removable(device) and has_size(device) and has_usb(device) and has_no_disc(device)


def main():
    app = QApplication(sys.argv)
    signals = Signals()

    main_window = MainWindow()
    main_window.show()
    linux_device_monitor = LinuxDeviceMonitor(main_window)

    notifier = QSocketNotifier(linux_device_monitor.fileno(), QSocketNotifier.Read)
    notifier.activated.connect(linux_device_monitor.process_incoming)

    sys.exit(app.exec_())

if __name__ == '__main__':
    main()
Gruß
Atalanttore
__deets__
User
Beiträge: 14523
Registriert: Mittwoch 14. Oktober 2015, 14:29

Wie du von “LinuxDeviceMonitor muss von qobject ableiten” zu dem kommst was du da getan hast habe ich keine Ahnung. Und wie main_window die Instanz mit MainWindow der Klasse mit sich selbst irgendwas irgendwie connecten soll noch weniger.

Ich glaube du musst mal ein paar ganz einfache, grundlegende Beispiele der OO Programmierung in Python im allgemeinen und dann mit Qt und seinem signal slot Mechanismus im speziellen durcharbeiten. 10 Zeilen. Mehr nicht. Aber jede verstanden.
Atalanttore
User
Beiträge: 407
Registriert: Freitag 6. August 2010, 17:03

@__deets__: Welche 10 Zeilen meinst du?
__deets__
User
Beiträge: 14523
Registriert: Mittwoch 14. Oktober 2015, 14:29

Die sind selbst zu schreiben oder nachzuschlagen. Wie definiert man Signale, wie benutzt man sie. Etc. ohne gui ohne alles. Nur wenige Zeilen Code.
Atalanttore
User
Beiträge: 407
Registriert: Freitag 6. August 2010, 17:03

@__deets__: Ich habe einen Beispielcode in der Doku probiert, aber nicht mal dieser Code tut was er soll.

Code: Alles auswählen

from PyQt5.QtCore import QObject, pyqtSignal


class Foo(QObject):

    # Define a new signal called 'trigger' that has no arguments.
    trigger = pyqtSignal()

    def connect_and_emit_trigger(self):
        # Connect the trigger signal to a slot.
        self.trigger.connect(self.handle_trigger)

        # Emit the signal.
        self.trigger.emit()

    def handle_trigger(self):
        # Show that the slot has been called.

        print("trigger signal received")
Es wird gar kein Text auf der Konsole ausgegeben.

Warum ist mein ursprünglicher Code "FALSCH", obwohl er funktioniert?

Gruß
Atalanttore
__deets__
User
Beiträge: 14523
Registriert: Mittwoch 14. Oktober 2015, 14:29

Er ist falsch, weil er Entwurfsprinzipien verletzt, wie zB zu enge Kopplung der Objekte aneinander, str(collection) um eine String-Repraesentation einer Collection zu bekommen statt vernuenftig damit zu arbeiten, etc. Das ist also kein absolutes falsch. Genauso wie es nicht absolut falsch ist, ein Auto einfach mit 5 Leuten zu schieben. Es ist nur nicht so benutzt, wie gedacht. Wenn du das aber so lieber machst, bitte gerne. Nimm den Code, der funktioniert, und sei zufrieden.

Der Code da oben tut nichts, also tut er auch nichts. Ich zumindest sehe keine Erstellung eines Foo-Objektes. Du? Ich sehe auch keinen Aufruf der connect_and_emit_trigger-Methode. Du? Wie soll also etwas passieren?
Atalanttore
User
Beiträge: 407
Registriert: Freitag 6. August 2010, 17:03

Der Code wurde beschrieben mit

Code: Alles auswählen

The following code demonstrates the definition, connection and emit of a signal without arguments:
Unter "demonstrate" verstehe ich als Praktiker, dass auch etwas passiert.

Mit folgender Erweiterung scheint es dann zu funktionieren:

Code: Alles auswählen

foo = Foo()
foo.connect_and_emit_trigger()
Wenn ich den Beispielcode wie im ursprünglichen Programm in zwei Klassen aufteile, passiert leider nichts mehr:

Code: Alles auswählen

from PyQt5.QtCore import QObject, pyqtSignal


class Window:

    def handle_trigger(self):
        # Show that the slot has been called.

        print("trigger signal received")


class Device(QObject):

    # Define a new signal called 'trigger' that has no arguments.
    trigger = pyqtSignal()

    def connect_and_emit_trigger(self):
        # Emit the signal.
        self.trigger.emit()


device = Device()
window = Window()

device.connect_and_emit_trigger()

# Connect the trigger signal to a slot.
device.trigger.connect(window.handle_trigger)
Gruß
Atalanttore
Sirius3
User
Beiträge: 17738
Registriert: Sonntag 21. Oktober 2012, 17:20

Wo ist der connect-Teil von `connect_and_emit_trigger`?
__deets__
User
Beiträge: 14523
Registriert: Mittwoch 14. Oktober 2015, 14:29

Zu wissen, dass ein Objekt erstellt werden muss, scheint halt vorausgesetzt zu werden. Und genau das meine ich mit meinen Anmerkungen: es muss dir klar sein, WARUM da ohne weiteres nichts passiert ist. Das scheint mir nicht der Fall, und somit funktionieren deine Versuche zur Erweiterung nicht.

Und was deine Modifikation angeht: in unserem Universum gibt es üblicherweise eine Eigenschaft, die eine Wirkung von einer Ursache abhängig macht. Nicht umgekehrt. In deinem Code gilt das auch. Welche Wirkung und welche Ursache hast du da wohl vertauscht?
Atalanttore
User
Beiträge: 407
Registriert: Freitag 6. August 2010, 17:03

`device.trigger.connect(window.handle_trigger)` und `device.connect_and_emit_trigger()` war vertauscht. In umgedrehter Reihenfolge funktioniert es jetzt.

Die Kopplung zwischen den Objekten `device` und `window` findet nun außerhalb der beiden Klassen statt. Ist das nach den Entwurfsprinzipien so korrekt?


Mittlerweile habe ich es endlich geschafft, eine Liste mit dem Signal zu übergeben und erst danach in einen String zu konvertieren.

Code:

Code: Alles auswählen

from PyQt5.QtCore import QObject, pyqtSignal


class Window:

    def handle_trigger(self, text):
        # Show that the slot has been called.
        text_as_string = ' '.join(text)
        print(text_as_string)


class Device(QObject):

    # Define a new signal called 'trigger' that has no arguments.
    trigger = pyqtSignal([list])

    def __init__(self):
        super().__init__()

    def connect_and_emit_trigger(self):
        text = ["trigger", "signal", "received"]
        # Emit the signal.
        self.trigger.emit(text)


if __name__ == "__main__":
    device = Device()
    window = Window()

    # Connect the trigger signal to a slot.
    device.trigger.connect(window.handle_trigger)

    device.connect_and_emit_trigger()

So habe ich das mit Signal und Slots jetzt verstanden:

Code: Alles auswählen

from PyQt5.QtCore import QObject, pyqtSignal


class Receiver:

    def slot(self, local_variable):
        print(local_variable)


class Sender(QObject):  # muss von QObject abgeleitet werden

    signal = pyqtSignal([str])  # Datentyp des mit dem Signal übergebenen Arguments angeben

    def __init__(self):
        super().__init__()

    def emit_signal(self):
        self.signal.emit("Hello World")


def main():
    receiver = Receiver()
    sender = Sender()

    sender.signal.connect(receiver.slot)
    sender.emit_signal()


if __name__ == "__main__":
    main()
Ist das so korrekt (kommentiert)?


Gruß
Atalanttore
__deets__
User
Beiträge: 14523
Registriert: Mittwoch 14. Oktober 2015, 14:29

Schöner Fortschritt. Du kannst dir AFAIK beim Signal die Liste um die Typen sparen, du hast ja nur ein Argument. Dann sollte eigentlich list bzw str direkt funktionieren.

Und ja, das ist vom Entwurf her besser. Denn so müssen Receiver und Sender nichts voneinander wissen, und können trotzdem kollaborieren. Gleiches gilt auch für den DeviceMonitor, und das Window. Ersterer sollte darum auch NICHT das Window als Argument bekommen. Denn er könnte ja auch in einem Fensterlosen Kontext benutzt werden. Stattdessen kannst du die beiden auch von außen verschalten.
Atalanttore
User
Beiträge: 407
Registriert: Freitag 6. August 2010, 17:03

__deets__ hat geschrieben: Sonntag 17. März 2019, 01:01 Schöner Fortschritt. Du kannst dir AFAIK beim Signal die Liste um die Typen sparen, du hast ja nur ein Argument. Dann sollte eigentlich list bzw str direkt funktionieren.
Danke. Der Fortschritt war eine ganz schwere Geburt.
Die Übergabe eines Typs beim Signal funktioniert bei mir auch ohne Liste. Also weg mit der Liste.


Nun würde ich ganz gerne das Problem mit dem Fehler `DeviceNotFoundAtPathError` lösen, der erscheint, wenn man einen USB-Stick, ohne ihn vorher auszuhängen, einfach aus der USB-Buchse zieht. Dafür muss ich aber den Code erst mal komplett verstehen. Manche Codestellen verstehe ich aber nach wie vor nicht. Zum Code habe ich noch mehrere Fragen:
  1. Wozu sind die Methoden zum Hinzufügen und Entfernen eines `listener` in der Klasse `LinuxDeviceMonitor` gut?
  2. Wofür braucht man überhaupt einen `listener`?
  3. Was wird mit `from pyudev._util import eintr_retry_call` in der Methode `process_incoming()` importiert?
  4. Was bewirkt die Anweisung `partial(eintr_retry_call, self._monitor.poll, timeout=0)` in der Methode `process_incoming()`?
  5. Was hat es mit dem Vergleich `"remove" == action` in der Methode `_process_devices()` auf sich?
  6. Welchen Zweck erfüllt `int(open(removable_path).read())` in der lokalen Funktion `removable` der Methode `_is_usb_mass_storage_device`?
  7. Welchen Zweck erfüllt `int(open(size_path).read())` in der lokalen Funktion `has_size` der Methode `_is_usb_mass_storage_device`?
Code (mit meinen Kommentaren):

Code: Alles auswählen

from functools import partial
import os
import sys
from PyQt5.QtWidgets import QApplication, QMainWindow
from PyQt5.uic import loadUi
from PyQt5.QtCore import QSocketNotifier
from PyQt5.QtCore import QObject, pyqtSignal


class MainWindow(QMainWindow):

    def __init__(self, parent=None):
        super().__init__(parent)
        loadUi("mainwindow.ui", self)

    def update_path(self, name):
        self.label_usb_mass_storage_url.setText(str(name))


class LinuxDeviceMonitor(QObject):
    devices_changed = pyqtSignal(list)

    def __init__(self):
        import pyudev
        super().__init__()
        self._context = pyudev.Context()  # Für pyudev notwendiges Context-Objekt erstellen

        self._monitor = pyudev.Monitor.from_netlink(self._context)  # Mit Kernel daemon verbinden
        self._monitor.start()  # Verbindung aufbauen

        self._devices = set()
        self._listeners = []

        # Iterator (genannt Enumerator) mit allen udev-Geräten und Keyword Argument "add" an Methode "self._process_devices()" übergeben
        self._process_devices(self._context.list_devices(), action="add")

    def add_listener(self, listener):
        self._listeners.append(listener)

    def remove_listener(self, listener):
        self._listeners.remove(listener)

    def fileno(self):
        return self._monitor.fileno()  # Dateideskriptor zurückgeben

    @property
    def device_names(self):
        import pyudev
        # Pfade zu Geräten in self._devices zurückgeben
        # !!! DeviceNotFoundAtPathError entsteht hier !!!
        return [pyudev.Devices.from_path(self._context, device).get("DEVNAME") for device in self._devices]

    def process_incoming(self):
        from pyudev._util import eintr_retry_call  # ?
        read_device = partial(eintr_retry_call, self._monitor.poll, timeout=0)  # ?
        self._process_devices(iter(read_device, None))  # Iterator mit Sentinel "None" an Methode "self._process_devices()" übergeben
        self.devices_changed.emit(self.device_names)  # Signal mit Namen der Geräte emitieren

    def _process_devices(self, devices, action=None):
        old = set(self._devices)

        for device in devices:
            action = device.action if action is None else action

            if action in ("add", "change") and self._is_usb_mass_storage_device(device):
                self._devices.add(device.sys_path)  # Einhängepunkt einzelner USB-Geräte zu Liste "self._devices" hinzufügen
            elif "remove" == action and device.sys_path in self._devices:  # Warum '"remove" == action'?
                self._devices.remove(device.sys_path)

        if self.device_names != old:
            devices = self._devices

            for listener in self._listeners:
                listener(devices)

    def _is_usb_mass_storage_device(self, device):
        def device_path(device, name):
            return "{}/{}".format(device.sys_path, name)  # Zu Gerätenamen passenden Pfad zurückgeben

        def removable(device):
            removable_path = device_path(device, "removable")
            try:
                return os.path.exists(removable_path) and int(open(removable_path).read())  # Warum 'int(open(removable_path).read())'?
            except ValueError:
                return False

        def has_size(device):
            size_path = device_path(device, "size")
            try:
                return os.path.exists(size_path) and int(open(size_path).read())  # Warum 'int(open(size_path).read())'?
            except ValueError:
                return False

        def has_usb(device):
            return device.get("ID_BUS") == "usb"

        def has_no_disc(device):
            return device.get("ID_CDROM") is None

        return removable(device) and has_size(device) and has_usb(device) and has_no_disc(device)


def main():
    app = QApplication(sys.argv)

    main_window = MainWindow()
    main_window.show()

    linux_device_monitor = LinuxDeviceMonitor()

    notifier = QSocketNotifier(linux_device_monitor.fileno(), QSocketNotifier.Read)
    notifier.activated.connect(linux_device_monitor.process_incoming)

    linux_device_monitor.devices_changed.connect(main_window.update_path)

    sys.exit(app.exec_())


if __name__ == '__main__':
    main()
Gruß
Atalanttore
Antworten