Re: Zugriff auf Inhalt eines USB-Sticks mittels USB Device ID
Verfasst: Dienstag 5. März 2019, 23:10
Kann man unter Linux über DBus auch den Einhängepunkt eines USB-Speichers ermitteln?
Gruß
Atalanttore
Gruß
Atalanttore
Seit 2002 Diskussionen rund um die Programmiersprache Python
https://www.python-forum.de/
Code: Alles auswählen
from functools import partial
import os
class LinuxDeviceMonitor:
def __init__(self):
import pyudev
self._context = pyudev.Context()
self._monitor = pyudev.Monitor.from_netlink(self._context)
self._monitor.start()
self._devices = set()
self._listeners = []
self.rescan()
def rescan(self):
self._process_devices(
self._context.list_devices(),
action="add"
)
def add_listener(self, listener):
self._listeners.append(listener)
def remove_listener(self, listener):
self._listeners.remove(listener)
def fileno(self):
return self._monitor.fileno()
@property
def devices(self):
import pyudev
return [pyudev.Devices.from_path(self._context, device).get("DEVNAME") for device in self._devices]
def process_incoming(self, *_args):
from pyudev._util import eintr_retry_call
read_device = partial(eintr_retry_call, self._monitor.poll, timeout=0)
self._process_devices(iter(read_device, None))
def _process_devices(self, devices, action=None):
old = set(self._devices)
for device in devices:
action = device.action if action is None else action
if action in ("add", "change") and self._is_usb_mass_storage_device(device):
self._devices.add(device.sys_path)
elif "remove" == action and device.sys_path in self._devices:
self._devices.remove(device.sys_path)
if self._devices != old:
devices = self.devices
for listener in self._listeners:
listener(devices)
def _is_usb_mass_storage_device(self, device):
def dev_path(device, name):
return "{}/{}".format(device.sys_path, name)
def removable(device):
removable_path = dev_path(device, "removable")
try:
return os.path.exists(removable_path) and int(open(removable_path).read())
except ValueError:
return False
def has_size(device):
size_path = dev_path(device, "size")
try:
return os.path.exists(size_path) and int(open(size_path).read())
except ValueError:
return False
return device.subsystem == "block" and removable(device) and has_size(device)
if __name__ == "__main__":
linux_device_monitor = LinuxDeviceMonitor()
print(linux_device_monitor.devices)
Code: Alles auswählen
['/dev/sdb', '/dev/sr0']
Code: Alles auswählen
udevadm info -q all /dev/sr0
P: /devices/pci0000:00/0000:00:17.0/ata4/host3/target3:0:0/3:0:0:0/block/sr0
N: sr0
L: -100
S: cdrom
S: cdrw
S: disk/by-id/ata-HL-DT-ST_DVDRAM_GT80N_M44E61M3110
S: disk/by-id/wwn-0x5001480000000000
S: disk/by-path/pci-0000:00:17.0-ata-4
S: dvd
S: dvdrw
E: DEVLINKS=/dev/disk/by-id/wwn-0x5001480000000000 /dev/disk/by-path/pci-0000:00:17.0-ata-4 /dev/disk/by-id/ata-HL-DT-ST_DVDRAM_GT80N_M44E61M3110 /dev/cdrom /dev/cdrw /dev/dvdrw /dev/dvd
E: DEVNAME=/dev/sr0
E: DEVPATH=/devices/pci0000:00/0000:00:17.0/ata4/host3/target3:0:0/3:0:0:0/block/sr0
E: DEVTYPE=disk
E: ID_ATA=1
E: ID_ATA_FEATURE_SET_PM=1
E: ID_ATA_FEATURE_SET_PM_ENABLED=1
E: ID_ATA_SATA=1
E: ID_ATA_SATA_SIGNAL_RATE_GEN1=1
E: ID_BUS=ata
E: ID_CDROM=1
E: ID_CDROM_CD=1
E: ID_CDROM_CD_R=1
E: ID_CDROM_CD_RW=1
E: ID_CDROM_DVD=1
E: ID_CDROM_DVD_PLUS_R=1
E: ID_CDROM_DVD_PLUS_RW=1
E: ID_CDROM_DVD_PLUS_R_DL=1
E: ID_CDROM_DVD_R=1
E: ID_CDROM_DVD_RAM=1
E: ID_CDROM_DVD_RW=1
E: ID_CDROM_MRW=1
E: ID_CDROM_MRW_W=1
E: ID_FOR_SEAT=block-pci-0000_00_17_0-ata-4
E: ID_MODEL=HL-DT-ST_DVDRAM_GT80N
E: ID_MODEL_ENC=HL-DT-ST\x20DVDRAM\x20GT80N\x20\x20\x20\x20\x20\x20\x20\x20\x20\x20\x20\x20\x20\x20\x20\x20\x20\x20\x20
E: ID_PATH=pci-0000:00:17.0-ata-4
E: ID_PATH_TAG=pci-0000_00_17_0-ata-4
E: ID_REVISION=LN85
E: ID_SERIAL=HL-DT-ST_DVDRAM_GT80N_M44E61M3110
E: ID_SERIAL_SHORT=M44E61M3110
E: ID_TYPE=cd
E: ID_WWN=0x5001480000000000
E: ID_WWN_WITH_EXTENSION=0x5001480000000000
E: MAJOR=11
E: MINOR=0
E: SUBSYSTEM=block
E: SYSTEMD_MOUNT_DEVICE_BOUND=1
E: TAGS=:uaccess:seat:systemd:
E: USEC_INITIALIZED=3050832
udevadm info -q all /dev/sdb
P: /devices/pci0000:00/0000:00:14.0/usb1/1-6/1-6:1.0/host4/target4:0:0/4:0:0:0/block/sdb
N: sdb
S: disk/by-id/usb-090c_1000_10061017000299-0:0
S: disk/by-path/pci-0000:00:14.0-usb-0:6:1.0-scsi-0:0:0:0
E: DEVLINKS=/dev/disk/by-id/usb-090c_1000_10061017000299-0:0 /dev/disk/by-path/pci-0000:00:14.0-usb-0:6:1.0-scsi-0:0:0:0
E: DEVNAME=/dev/sdb
E: DEVPATH=/devices/pci0000:00/0000:00:14.0/usb1/1-6/1-6:1.0/host4/target4:0:0/4:0:0:0/block/sdb
E: DEVTYPE=disk
E: ID_BUS=usb
E: ID_INSTANCE=0:0
E: ID_MODEL=1000
E: ID_MODEL_ENC=1000
E: ID_MODEL_ID=1000
E: ID_PART_TABLE_TYPE=dos
E: ID_PART_TABLE_UUID=01a8fd35
E: ID_PATH=pci-0000:00:14.0-usb-0:6:1.0-scsi-0:0:0:0
E: ID_PATH_TAG=pci-0000_00_14_0-usb-0_6_1_0-scsi-0_0_0_0
E: ID_REVISION=1000
E: ID_SERIAL=090c_1000_10061017000299-0:0
E: ID_SERIAL_SHORT=10061017000299
E: ID_TYPE=disk
E: ID_USB_DRIVER=usb-storage
E: ID_USB_INTERFACES=:080650:
E: ID_USB_INTERFACE_NUM=00
E: ID_VENDOR=090c
E: ID_VENDOR_ENC=090c
E: ID_VENDOR_ID=090c
E: MAJOR=8
E: MINOR=16
E: SUBSYSTEM=block
E: TAGS=:systemd:
E: USEC_INITIALIZED=7887489066
Code: Alles auswählen
import subprocess
import sys
if sys.platform == "win32":
import winreg
def linux_find_mountpoint(device):
out, err = subprocess.Popen(("mount",), stdout=subprocess.PIPE).communicate()
searchstring = "/dev/{0}".format(device)
for line in out.decode("UTF-8").splitlines():
if line.startswith(searchstring):
return line.split()[2]
def linux_read_device_from_dmesg(vendor_id, product_id):
vendor_search = "IDVENDOR={0}".format(vendor_id)
product_search = "IDPRODUCT={0}".format(product_id)
out, err = subprocess.Popen(("dmesg",), stdout=subprocess.PIPE).communicate()
usb_index = None
scsi_host = None
device = None
for line in out.decode("UTF-8").splitlines():
line = line.upper()
if not usb_index \
and "USB" in line and vendor_search in line and product_search in line:
usb_index = line[line.find("USB")+4:line.find(":")]
if usb_index and not scsi_host \
and "SCSI HOST" in line and usb_index in line:
scsi_host = line[line.find("SCSI HOST")+9:line.find(":")]
if usb_index and scsi_host and not device \
and "SD {0}:0:0:0:".format(scsi_host) in line \
and "ATTACHED SCSI REMOVABLE DISK" in line:
device = line[line.rfind("[")+1:line.rfind("]")].lower()
if usb_index and scsi_host and device \
and "USB {0}".format(usb_index) in line \
and "USB DISCONNECT" in line:
usb_index = None
scsi_host = None
device = None
return device
def win_find_device_id(vendor_id, product_id):
subkey = r"Software\Microsoft\Windows\CurrentVersion\Explorer\MountPoints2\CPC\Volume"
volumeroot = winreg.OpenKey(winreg.HKEY_CURRENT_USER, subkey)
volumenames = []
try:
for i in range(sys.maxsize):
volumenames.append(winreg.EnumKey(volumeroot, i))
except OSError:
pass
searchstring = bytearray("VID_{0}&PID_{1}".format(vendor_id, product_id),
"UTF-8")
for volumename in volumenames:
volumekey = winreg.OpenKey(volumeroot, volumename)
data_value = winreg.QueryValueEx(volumekey, "Data")[0]
cleared_value = data_value.replace(b"\0", b"")
if searchstring in cleared_value:
device_id = cleared_value[cleared_value.find(b"{"):
cleared_value.find(b"}")]
return device_id
def win_find_mountpoint_for_device(device_id):
subkey = r"SYSTEM\MountedDevices"
deviceroot = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, subkey)
try:
for i in range(sys.maxsize):
name, value, type_ = winreg.EnumValue(deviceroot, i)
cleared_value = value.replace(b"\0", b"")
if device_id in cleared_value and "DosDevices" in name:
return name.split("\\")[-1]
except OSError:
pass
def get_path_for_usbid(vendor_id, product_id):
vendor_id, product_id = vendor_id.upper(), product_id.upper()
if sys.platform == "win32":
device_id = win_find_device_id(vendor_id, product_id)
if device_id:
mountpoint = win_find_mountpoint_for_device(device_id)
return mountpoint
elif sys.platform == "linux":
device = linux_read_device_from_dmesg(vendor_id, product_id)
if device:
return linux_find_mountpoint(device)
else:
error_text = "This does not work on {0}".format(sys.platform)
raise NotImplementedError(error_text)
if __name__ == "__main__":
print(get_path_for_usbid("abcd", "1234"))
Code: Alles auswählen
Traceback (most recent call last):
File "/home/ata/source/usb-detect/main.py", line 104, in <lambda>
notifier.activated.connect(lambda: main_window.update_path(str(linux_device_monitor.devices)))
File "/home/ata/source/usb-detect/main.py", line 50, in devices
return [pyudev.Devices.from_path(self._context, device).get("DEVNAME") for device in self._devices]
File "/home/ata/source/usb-detect/main.py", line 50, in <listcomp>
return [pyudev.Devices.from_path(self._context, device).get("DEVNAME") for device in self._devices]
File "/usr/lib/python3/dist-packages/pyudev/device/_device.py", line 83, in from_path
return cls.from_sys_path(context, path)
File "/usr/lib/python3/dist-packages/pyudev/device/_device.py", line 108, in from_sys_path
raise DeviceNotFoundAtPathError(sys_path)
pyudev.device._errors.DeviceNotFoundAtPathError: No device at '/sys/devices/pci0000:00/0000:00:14.0/usb1/1-6/1-6:1.0/host4/target4:0:0/4:0:0:0/block/sdb'
Code: Alles auswählen
from functools import partial
import os
import sys
from PyQt5.QtWidgets import QApplication, QMainWindow
from PyQt5.uic import loadUi
from PyQt5.QtCore import QSocketNotifier
class MainWindow(QMainWindow):
def __init__(self, parent=None):
super().__init__(parent)
loadUi("mainwindow.ui", self)
def update_path(self, path):
self.label_usb_mass_storage_url.setText(path)
class LinuxDeviceMonitor:
def __init__(self):
import pyudev
self._context = pyudev.Context()
self._monitor = pyudev.Monitor.from_netlink(self._context)
self._monitor.start()
self._devices = set()
self._listeners = []
self.rescan()
def rescan(self):
self._process_devices(
self._context.list_devices(),
action="add"
)
def add_listener(self, listener):
self._listeners.append(listener)
def remove_listener(self, listener):
self._listeners.remove(listener)
def fileno(self):
return self._monitor.fileno()
@property
def devices(self):
import pyudev
return [pyudev.Devices.from_path(self._context, device).get("DEVNAME") for device in self._devices]
def process_incoming(self, *_args):
from pyudev._util import eintr_retry_call
read_device = partial(eintr_retry_call, self._monitor.poll, timeout=0)
self._process_devices(iter(read_device, None))
def _process_devices(self, devices, action=None):
old = set(self._devices)
for device in devices:
action = device.action if action is None else action
if action in ("add", "change") and self._is_usb_mass_storage_device(device):
self._devices.add(device.sys_path)
elif "remove" == action and device.sys_path in self._devices:
self._devices.remove(device.sys_path)
if self._devices != old:
devices = self.devices
for listener in self._listeners:
listener(devices)
def _is_usb_mass_storage_device(self, device):
def dev_path(device, name):
return "{}/{}".format(device.sys_path, name)
def removable(device):
removable_path = dev_path(device, "removable")
try:
return os.path.exists(removable_path) and int(open(removable_path).read())
except ValueError:
return False
def has_size(device):
size_path = dev_path(device, "size")
try:
return os.path.exists(size_path) and int(open(size_path).read())
except ValueError:
return False
def has_usb(device):
return device.get("ID_BUS") == "usb"
def has_no_disc(device):
return device.get("ID_CDROM") is None
return removable(device) and has_size(device) and has_usb(device) and has_no_disc(device)
def main():
app = QApplication(sys.argv)
main_window = MainWindow()
main_window.show()
linux_device_monitor = LinuxDeviceMonitor()
notifier = QSocketNotifier(linux_device_monitor.fileno(), QSocketNotifier.Read, linux_device_monitor.process_incoming())
notifier.activated.connect(lambda: main_window.update_path(str(linux_device_monitor.devices)))
sys.exit(app.exec_())
if __name__ == '__main__':
main()
Code: Alles auswählen
<?xml version="1.0" encoding="UTF-8"?>
<ui version="4.0">
<class>MainWindow</class>
<widget class="QMainWindow" name="MainWindow">
<property name="geometry">
<rect>
<x>0</x>
<y>0</y>
<width>500</width>
<height>250</height>
</rect>
</property>
<property name="windowTitle">
<string>MainWindow</string>
</property>
<widget class="QWidget" name="centralwidget">
<layout class="QGridLayout" name="gridLayout">
<item row="0" column="0">
<widget class="QLabel" name="label_path_to_mass_storage">
<property name="text">
<string>USB-Massenspeicher:</string>
</property>
</widget>
</item>
<item row="0" column="1">
<widget class="QLabel" name="label_usb_mass_storage_url">
<property name="text">
<string/>
</property>
</widget>
</item>
</layout>
</widget>
<widget class="QMenuBar" name="menubar">
<property name="geometry">
<rect>
<x>0</x>
<y>0</y>
<width>500</width>
<height>28</height>
</rect>
</property>
</widget>
<widget class="QStatusBar" name="statusbar"/>
</widget>
<resources/>
<connections/>
</ui>
Mit Device ID meinte ich die Vendor ID und Product ID von USB-Geräten. Eigentlich wollte ich damit den Einhängepunkt eines USB-Sticks herausfinden, aber das funktioniert offensichtlich nicht so einfach und auch nicht plattformunabhängig.sparrow hat geschrieben: Sonntag 10. März 2019, 17:00 Wann genau willst du eigentlich die USB Device ID auswerten, die du im Threadtitel geschriebebn hast?
Und was meinst du mit der Device ID?
__deets__ hat geschrieben: Sonntag 10. März 2019, 17:47 Du rufst process_incoming AUF, statt es nur als callback zu übergeben.
Ohne Beispielcode musste ich mir den Code mithilfe der Doku selbst erstellen. Dabei kommt es bei mir regelmäßig zu Kollateralschäden, aber mit deinen Verbesserungsvorschlägen funktioniert es jetzt.__deets__ hat geschrieben: Sonntag 10. März 2019, 18:32 Ok, nochmal genauer hingeschaut. Das ist ja noch etwas kruder... die process_incoming-Methode muss aufgerufen werden, wenn etwas auf dem Socket passiert. Was du stattdessen machst ist, deren Ruckgabewert (der zufällig None ist) als Parent an den notifier zu übergeben. WTF?
Stattdessen musst du den im activate Signal die process_incoming aufrufen lassen. Und die kann dann ggf zu Änderungen der GUI auffordern.
Code: Alles auswählen
Traceback (most recent call last):
File "/home/ata/source//usb-detect/main.py", line 58, in process_incoming
self.main_window.update_path(str(self.devices))
File "/home/ata/source//usb-detect/main.py", line 52, in devices
return [pyudev.Devices.from_path(self._context, device).get("DEVNAME") for device in self._devices]
File "/home/ata/source//usb-detect/main.py", line 52, in <listcomp>
return [pyudev.Devices.from_path(self._context, device).get("DEVNAME") for device in self._devices]
File "/usr/lib/python3/dist-packages/pyudev/device/_device.py", line 83, in from_path
return cls.from_sys_path(context, path)
File "/usr/lib/python3/dist-packages/pyudev/device/_device.py", line 108, in from_sys_path
raise DeviceNotFoundAtPathError(sys_path)
pyudev.device._errors.DeviceNotFoundAtPathError: No device at '/sys/devices/pci0000:00/0000:00:14.0/usb1/1-6/1-6:1.0/host4/target4:0:0/4:0:0:0/block/sdb'
Code: Alles auswählen
from functools import partial
import os
import sys
from PyQt5.QtWidgets import QApplication, QMainWindow
from PyQt5.uic import loadUi
from PyQt5.QtCore import QSocketNotifier
class MainWindow(QMainWindow):
def __init__(self, parent=None):
super().__init__(parent)
loadUi("mainwindow.ui", self)
def update_path(self, path):
self.label_usb_mass_storage_url.setText(path)
class LinuxDeviceMonitor:
def __init__(self, main_window):
import pyudev
self._context = pyudev.Context()
self._monitor = pyudev.Monitor.from_netlink(self._context)
self._monitor.start()
self._devices = set()
self._listeners = []
self.rescan()
self.main_window = main_window
def rescan(self):
self._process_devices(
self._context.list_devices(),
action="add"
)
def add_listener(self, listener):
self._listeners.append(listener)
def remove_listener(self, listener):
self._listeners.remove(listener)
def fileno(self):
return self._monitor.fileno()
@property
def devices(self):
import pyudev
return [pyudev.Devices.from_path(self._context, device).get("DEVNAME") for device in self._devices]
def process_incoming(self, *_args):
from pyudev._util import eintr_retry_call
read_device = partial(eintr_retry_call, self._monitor.poll, timeout=0)
self._process_devices(iter(read_device, None))
self.main_window.update_path(str(self.devices))
def _process_devices(self, devices, action=None):
old = set(self._devices)
for device in devices:
action = device.action if action is None else action
if action in ("add", "change") and self._is_usb_mass_storage_device(device):
self._devices.add(device.sys_path)
elif "remove" == action and device.sys_path in self._devices:
self._devices.remove(device.sys_path)
if self._devices != old:
devices = self.devices
for listener in self._listeners:
listener(devices)
def _is_usb_mass_storage_device(self, device):
def dev_path(device, name):
return "{}/{}".format(device.sys_path, name)
def removable(device):
removable_path = dev_path(device, "removable")
try:
return os.path.exists(removable_path) and int(open(removable_path).read())
except ValueError:
return False
def has_size(device):
size_path = dev_path(device, "size")
try:
return os.path.exists(size_path) and int(open(size_path).read())
except ValueError:
return False
def has_usb(device):
return device.get("ID_BUS") == "usb"
def has_no_disc(device):
return device.get("ID_CDROM") is None
return removable(device) and has_size(device) and has_usb(device) and has_no_disc(device)
def main():
app = QApplication(sys.argv)
main_window = MainWindow()
main_window.show()
linux_device_monitor = LinuxDeviceMonitor(main_window)
notifier = QSocketNotifier(linux_device_monitor.fileno(), QSocketNotifier.Read)
notifier.activated.connect(linux_device_monitor.process_incoming)
sys.exit(app.exec_())
if __name__ == '__main__':
main()