Zugriff auf Inhalt eines USB-Sticks mittels USB Device ID

Wenn du dir nicht sicher bist, in welchem der anderen Foren du die Frage stellen sollst, dann bist du hier im Forum für allgemeine Fragen sicher richtig.
Benutzeravatar
sparrow
User
Beiträge: 4195
Registriert: Freitag 17. April 2009, 10:28

Atalanttore hat geschrieben: Sonntag 10. März 2019, 21:12Mit Device ID meinte ich die Vendor ID und Product ID von USB-Geräten. Eigentlich wollte ich damit den Einhängepunkt eines USB-Sticks herausfinden, aber das funktioniert offensichtlich nicht so einfach und auch nicht plattformunabhängig.
Trotz __deets__ Anmerkungen: du hast gesehen, dass mein Code das für Windows und Linux tut?
Atalanttore
User
Beiträge: 407
Registriert: Freitag 6. August 2010, 17:03

@sparrow: Ja, wenn man beim Aufruf von `get_path_for_usbid()` die Vendor ID und Product ID des USB-Sticks übergibt, dann erhält man den Pfad zum USB-Stick. Unter Ubuntu hat es bei mir funktioniert. Unter Windows habe ich es noch nicht getestet.

Der Code von __deets__ dagegen liefert zur Laufzeit die Namen aller gerade eingesteckten USB-Sticks unter Linux zurück. Mit `lsblk -o MOUNTPOINT <Name des USB-Sticks>` konnte ich anschließend den Pfad zum USB-Stick ausgeben lassen.

Eine Kombination der Vorteile von beiden Programmen wäre ideal. Also ohne Polling eine ständig aktuelle Liste mit den Pfaden zu allen gerade eingesteckten USB-Sticks unter Linux bzw. Windows.

Gruß
Atalanttore
__deets__
User
Beiträge: 14543
Registriert: Mittwoch 14. Oktober 2015, 14:29

Warum du str() um das set aufrufst ist mir unklar. Die Kopplung des device monitors an das Fenster ist ebenfalls extrem unschön. Du arbeitest doch die ganze Zeit mit Qt. Das Prinzip von Dingen, die Signale aussenden, und Dingen, die sei dann verarbeiten ohne von ersteren intime Kenntnisse zu haben wird da doch andauern vorgelebt. Warum probierst du das nicht mal so?

Und ich vermute mal, das das abrupfen des Sticks Events auslöst, die der bestehende Code nicht korrekt verarbeitet, und dadurch bleibt een device Eintrag bestehen, auch wenn das device nicht mehr existiert. Du kannst also entweder die richtigen Events das device entfernen lassen, oder den Fehler als das dazu gehörige Ereignis benuzten.
Atalanttore
User
Beiträge: 407
Registriert: Freitag 6. August 2010, 17:03

@__deets__: Die Methode `setText()` erwartet einen String. Deshalb die Umwandlung mittels `str()`. Mit einer Liste kommt es zu folgendem Fehler:

Code: Alles auswählen

TypeError: setText(self, str): argument 1 has unexpected type 'list'
Was meinst du mit "intimen Kenntnissen"?

Gruß
Atalanttore
__deets__
User
Beiträge: 14543
Registriert: Mittwoch 14. Oktober 2015, 14:29

Das ist nicht, was ich meinte. Du programmierst falsch, wenn du ein eine solche Kopplung von "dieses Objekt hier hat Daten" und "dieses Objekt will diese Daten aber auf genau diese art und weise haben" herstellst. Wenn der Linux-Device-Monitor 10 unterschiedliche Quellen in deinem Programm bedient, schreibst du dann 10 mal speziellen Code in dem Monitor? Das wäre keine gute Idee.

Stattdessen macht man ein Signal, dass die Devices als set verschickt. Und jedes, das sich dafür interessiert, muss das dann so umwandeln, wie es das benötigt.
Atalanttore
User
Beiträge: 407
Registriert: Freitag 6. August 2010, 17:03

Ist ein Signal eine Art globaler Variable, wo a) alle damit verbundenen Slots bei einer Änderung benachrichtigt werden und b) man an jeder Stelle im Code darauf zugreifen kann?

Gruß
Atalanttore
__deets__
User
Beiträge: 14543
Registriert: Mittwoch 14. Oktober 2015, 14:29

Nein. Warum global? Du benutzt doch andauernd Signale und Slots in Qt. Sind die global? Hast du also nur ein clicked für alle buttons die in deinem Programm sind?
Benutzeravatar
sls
User
Beiträge: 480
Registriert: Mittwoch 13. Mai 2015, 23:52
Wohnort: Country country = new Zealand();

@Atalanttore: bei deiner GUI-Anwendung gibt es einen Main-Thread der ewig läuft, der empfängt Systemnachrichten (Signale) und kann dann wiederrum andere Threads benachrichtigen. Es kann nicht jeder Thread einfach so auf ein Signal zugreifen.
When we say computer, we mean the electronic computer.
__deets__
User
Beiträge: 14543
Registriert: Mittwoch 14. Oktober 2015, 14:29

@sls: das Konzept der Signale hat mit dem System erstmal wenig zu tun. Und was hat das mit Threads zu tun?

Oh. Moment. Meinst du Signale wie im signals Modul? Das ist hier nicht gemeint. Sondern signal/slots in Qt. Die Attlantore ja auch munter benutzt.
Benutzeravatar
sls
User
Beiträge: 480
Registriert: Mittwoch 13. Mai 2015, 23:52
Wohnort: Country country = new Zealand();

@__deets__: richtig, sorry, hätte mir erst deinen letzten Post auf Seite 3 durchlesen sollen. Leider falscher Kontext.
When we say computer, we mean the electronic computer.
Atalanttore
User
Beiträge: 407
Registriert: Freitag 6. August 2010, 17:03

Ich habe nun im `LinuxDeviceMonitor` das Signal `devices_changed` erstellt.

Ausführen lässt sich der Code damit immer noch, aber beim Anstecken eines USB-Sticks erscheint nun folgende Fehlermeldung:

Code: Alles auswählen

Traceback (most recent call last):
  File "/home/ata/source/usb-detect/main.py", line 61, in process_incoming
    self.devices_changed.emit(self.devices)
TypeError: LinuxDeviceMonitor cannot be converted to PyQt5.QtCore.QObject in this context
Ich wollte eigentlich nichts konvertieren, sondern nur ein Signal erstellen.


main.py:

Code: Alles auswählen

from functools import partial
import os
import sys
from PyQt5.QtWidgets import QApplication, QMainWindow
from PyQt5.uic import loadUi
from PyQt5.QtCore import QSocketNotifier
from PyQt5.QtCore import QObject, pyqtSignal


class MainWindow(QMainWindow):

    def __init__(self, parent=None):
        super().__init__(parent)
        loadUi("mainwindow.ui", self)

    def update_path(self, path):
        self.label_usb_mass_storage_url.setText(path)


class LinuxDeviceMonitor:
    devices_changed = pyqtSignal()

    def __init__(self, main_window):
        import pyudev
        self._context = pyudev.Context()

        self._monitor = pyudev.Monitor.from_netlink(self._context)
        self._monitor.start()

        self._devices = set()
        self._listeners = []
        self.rescan()

        self.main_window = main_window

    def rescan(self):
        self._process_devices(
            self._context.list_devices(),
            action="add"
        )

    def add_listener(self, listener):
        self._listeners.append(listener)

    def remove_listener(self, listener):
        self._listeners.remove(listener)

    def fileno(self):
        return self._monitor.fileno()

    @property
    def devices(self):
        import pyudev
        return [pyudev.Devices.from_path(self._context, device).get("DEVNAME") for device in self._devices]

    def process_incoming(self, *_args):
        from pyudev._util import eintr_retry_call
        read_device = partial(eintr_retry_call, self._monitor.poll, timeout=0)
        self._process_devices(iter(read_device, None))

        self.devices_changed.emit(self.devices)

        self.main_window.update_path(str(self.devices))

    def _process_devices(self, devices, action=None):
        old = set(self._devices)
        for device in devices:
            action = device.action if action is None else action
            if action in ("add", "change") and self._is_usb_mass_storage_device(device):
                self._devices.add(device.sys_path)
            elif "remove" == action and device.sys_path in self._devices:
                self._devices.remove(device.sys_path)
        if self._devices != old:
            devices = self.devices
            for listener in self._listeners:
                listener(devices)

    def _is_usb_mass_storage_device(self, device):
        def dev_path(device, name):
            return "{}/{}".format(device.sys_path, name)

        def removable(device):
            removable_path = dev_path(device, "removable")
            try:
                return os.path.exists(removable_path) and int(open(removable_path).read())
            except ValueError:
                return False

        def has_size(device):
            size_path = dev_path(device, "size")
            try:
                return os.path.exists(size_path) and int(open(size_path).read())
            except ValueError:
                return False

        def has_usb(device):
            return device.get("ID_BUS") == "usb"

        def has_no_disc(device):
            return device.get("ID_CDROM") is None

        return removable(device) and has_size(device) and has_usb(device) and has_no_disc(device)


def main():
    app = QApplication(sys.argv)
    main_window = MainWindow()
    main_window.show()
    linux_device_monitor = LinuxDeviceMonitor(main_window)

    notifier = QSocketNotifier(linux_device_monitor.fileno(), QSocketNotifier.Read)
    notifier.activated.connect(linux_device_monitor.process_incoming)

    sys.exit(app.exec_())

if __name__ == '__main__':
    main()
Gruß
Atalanttore
__deets__
User
Beiträge: 14543
Registriert: Mittwoch 14. Oktober 2015, 14:29

Du musst für Qts Mechanismus der Signale und slots von QObject ableiten. super().__init__() nicht vergessen.
__deets__
User
Beiträge: 14543
Registriert: Mittwoch 14. Oktober 2015, 14:29

Und ich glaube du musst auch einen Typ angeben in deinem Signal. Und nochmal: die Konvertierung zu einem String ist da FALSCH. Schreib einen extra Slot in deinem Fenster (oder ein lambda) das das tut.
Atalanttore
User
Beiträge: 407
Registriert: Freitag 6. August 2010, 17:03

@__deets__: Deine Vorschläge wollte ich umsetzen, aber es endete mit folgenden Fehlermeldungen:

Code: Alles auswählen

Traceback (most recent call last):
  File "/home/ata/source/usb-detect/main.py", line 130, in <module>
    main()
  File "/home/ata/source/usb-detect/main.py", line 122, in main
    linux_device_monitor = LinuxDeviceMonitor(main_window)
  File "/home/ata/source/usb-detect/main.py", line 43, in __init__
    self.main_window.update_devices.connect(MainWindow.update_devices_slot)
AttributeError: 'MainWindow' object has no attribute 'update_devices'

main.py:

Code: Alles auswählen

from functools import partial
import os
import sys
from PyQt5.QtWidgets import QApplication, QMainWindow
from PyQt5.uic import loadUi
from PyQt5.QtCore import QSocketNotifier
from PyQt5.QtCore import QObject, pyqtSignal, pyqtSlot


class Signals(QObject):
    update_devices = pyqtSignal(str)

    def __init__(self):
        super().__init__()


class MainWindow(QMainWindow):

    def __init__(self, parent=None):
        super().__init__(parent)
        loadUi("mainwindow.ui", self)

    @pyqtSlot()
    def update_devices_slot(self, devices):
        self.label_usb_mass_storage_url.setText(str(devices))


class LinuxDeviceMonitor:

    def __init__(self, main_window):
        import pyudev
        self._context = pyudev.Context()

        self._monitor = pyudev.Monitor.from_netlink(self._context)
        self._monitor.start()

        self._devices = set()
        self._listeners = []
        self.rescan()

        self.main_window = main_window

        self.main_window.update_devices.connect(MainWindow.update_devices_slot)

    def rescan(self):
        self._process_devices(
            self._context.list_devices(),
            action="add"
        )

    def add_listener(self, listener):
        self._listeners.append(listener)

    def remove_listener(self, listener):
        self._listeners.remove(listener)

    def fileno(self):
        return self._monitor.fileno()

    @property
    def devices(self):
        import pyudev
        return [pyudev.Devices.from_path(self._context, device).get("DEVNAME") for device in self._devices]

    def process_incoming(self, *_args):
        from pyudev._util import eintr_retry_call
        read_device = partial(eintr_retry_call, self._monitor.poll, timeout=0)
        self._process_devices(iter(read_device, None))

        #self.devices_changed.emit(self.devices)

        self.update_devices.emit(self.devices)

        #self.main_window.update_path(str(self.devices))

    def _process_devices(self, devices, action=None):
        old = set(self._devices)
        for device in devices:
            action = device.action if action is None else action
            if action in ("add", "change") and self._is_usb_mass_storage_device(device):
                self._devices.add(device.sys_path)
            elif "remove" == action and device.sys_path in self._devices:
                self._devices.remove(device.sys_path)
        if self._devices != old:
            devices = self.devices
            for listener in self._listeners:
                listener(devices)

    def _is_usb_mass_storage_device(self, device):
        def dev_path(device, name):
            return "{}/{}".format(device.sys_path, name)

        def removable(device):
            removable_path = dev_path(device, "removable")
            try:
                return os.path.exists(removable_path) and int(open(removable_path).read())
            except ValueError:
                return False

        def has_size(device):
            size_path = dev_path(device, "size")
            try:
                return os.path.exists(size_path) and int(open(size_path).read())
            except ValueError:
                return False

        def has_usb(device):
            return device.get("ID_BUS") == "usb"

        def has_no_disc(device):
            return device.get("ID_CDROM") is None

        return removable(device) and has_size(device) and has_usb(device) and has_no_disc(device)


def main():
    app = QApplication(sys.argv)
    signals = Signals()

    main_window = MainWindow()
    main_window.show()
    linux_device_monitor = LinuxDeviceMonitor(main_window)

    notifier = QSocketNotifier(linux_device_monitor.fileno(), QSocketNotifier.Read)
    notifier.activated.connect(linux_device_monitor.process_incoming)

    sys.exit(app.exec_())

if __name__ == '__main__':
    main()
Gruß
Atalanttore
__deets__
User
Beiträge: 14543
Registriert: Mittwoch 14. Oktober 2015, 14:29

Wie du von “LinuxDeviceMonitor muss von qobject ableiten” zu dem kommst was du da getan hast habe ich keine Ahnung. Und wie main_window die Instanz mit MainWindow der Klasse mit sich selbst irgendwas irgendwie connecten soll noch weniger.

Ich glaube du musst mal ein paar ganz einfache, grundlegende Beispiele der OO Programmierung in Python im allgemeinen und dann mit Qt und seinem signal slot Mechanismus im speziellen durcharbeiten. 10 Zeilen. Mehr nicht. Aber jede verstanden.
Atalanttore
User
Beiträge: 407
Registriert: Freitag 6. August 2010, 17:03

@__deets__: Welche 10 Zeilen meinst du?
__deets__
User
Beiträge: 14543
Registriert: Mittwoch 14. Oktober 2015, 14:29

Die sind selbst zu schreiben oder nachzuschlagen. Wie definiert man Signale, wie benutzt man sie. Etc. ohne gui ohne alles. Nur wenige Zeilen Code.
Atalanttore
User
Beiträge: 407
Registriert: Freitag 6. August 2010, 17:03

@__deets__: Ich habe einen Beispielcode in der Doku probiert, aber nicht mal dieser Code tut was er soll.

Code: Alles auswählen

from PyQt5.QtCore import QObject, pyqtSignal


class Foo(QObject):

    # Define a new signal called 'trigger' that has no arguments.
    trigger = pyqtSignal()

    def connect_and_emit_trigger(self):
        # Connect the trigger signal to a slot.
        self.trigger.connect(self.handle_trigger)

        # Emit the signal.
        self.trigger.emit()

    def handle_trigger(self):
        # Show that the slot has been called.

        print("trigger signal received")
Es wird gar kein Text auf der Konsole ausgegeben.

Warum ist mein ursprünglicher Code "FALSCH", obwohl er funktioniert?

Gruß
Atalanttore
__deets__
User
Beiträge: 14543
Registriert: Mittwoch 14. Oktober 2015, 14:29

Er ist falsch, weil er Entwurfsprinzipien verletzt, wie zB zu enge Kopplung der Objekte aneinander, str(collection) um eine String-Repraesentation einer Collection zu bekommen statt vernuenftig damit zu arbeiten, etc. Das ist also kein absolutes falsch. Genauso wie es nicht absolut falsch ist, ein Auto einfach mit 5 Leuten zu schieben. Es ist nur nicht so benutzt, wie gedacht. Wenn du das aber so lieber machst, bitte gerne. Nimm den Code, der funktioniert, und sei zufrieden.

Der Code da oben tut nichts, also tut er auch nichts. Ich zumindest sehe keine Erstellung eines Foo-Objektes. Du? Ich sehe auch keinen Aufruf der connect_and_emit_trigger-Methode. Du? Wie soll also etwas passieren?
Atalanttore
User
Beiträge: 407
Registriert: Freitag 6. August 2010, 17:03

Der Code wurde beschrieben mit

Code: Alles auswählen

The following code demonstrates the definition, connection and emit of a signal without arguments:
Unter "demonstrate" verstehe ich als Praktiker, dass auch etwas passiert.

Mit folgender Erweiterung scheint es dann zu funktionieren:

Code: Alles auswählen

foo = Foo()
foo.connect_and_emit_trigger()
Wenn ich den Beispielcode wie im ursprünglichen Programm in zwei Klassen aufteile, passiert leider nichts mehr:

Code: Alles auswählen

from PyQt5.QtCore import QObject, pyqtSignal


class Window:

    def handle_trigger(self):
        # Show that the slot has been called.

        print("trigger signal received")


class Device(QObject):

    # Define a new signal called 'trigger' that has no arguments.
    trigger = pyqtSignal()

    def connect_and_emit_trigger(self):
        # Emit the signal.
        self.trigger.emit()


device = Device()
window = Window()

device.connect_and_emit_trigger()

# Connect the trigger signal to a slot.
device.trigger.connect(window.handle_trigger)
Gruß
Atalanttore
Antworten