Re: Zugriff auf Inhalt eines USB-Sticks mittels USB Device ID
Verfasst: Samstag 16. März 2019, 17:32
Wo ist der connect-Teil von `connect_and_emit_trigger`?
Seit 2002 Diskussionen rund um die Programmiersprache Python
https://www.python-forum.de/
Code: Alles auswählen
from PyQt5.QtCore import QObject, pyqtSignal
class Window:
def handle_trigger(self, text):
# Show that the slot has been called.
text_as_string = ' '.join(text)
print(text_as_string)
class Device(QObject):
# Define a new signal called 'trigger' that has no arguments.
trigger = pyqtSignal([list])
def __init__(self):
super().__init__()
def connect_and_emit_trigger(self):
text = ["trigger", "signal", "received"]
# Emit the signal.
self.trigger.emit(text)
if __name__ == "__main__":
device = Device()
window = Window()
# Connect the trigger signal to a slot.
device.trigger.connect(window.handle_trigger)
device.connect_and_emit_trigger()
Code: Alles auswählen
from PyQt5.QtCore import QObject, pyqtSignal
class Receiver:
def slot(self, local_variable):
print(local_variable)
class Sender(QObject): # muss von QObject abgeleitet werden
signal = pyqtSignal([str]) # Datentyp des mit dem Signal übergebenen Arguments angeben
def __init__(self):
super().__init__()
def emit_signal(self):
self.signal.emit("Hello World")
def main():
receiver = Receiver()
sender = Sender()
sender.signal.connect(receiver.slot)
sender.emit_signal()
if __name__ == "__main__":
main()
Danke. Der Fortschritt war eine ganz schwere Geburt.__deets__ hat geschrieben: Sonntag 17. März 2019, 01:01 Schöner Fortschritt. Du kannst dir AFAIK beim Signal die Liste um die Typen sparen, du hast ja nur ein Argument. Dann sollte eigentlich list bzw str direkt funktionieren.
Code: Alles auswählen
from functools import partial
import os
import sys
from PyQt5.QtWidgets import QApplication, QMainWindow
from PyQt5.uic import loadUi
from PyQt5.QtCore import QSocketNotifier
from PyQt5.QtCore import QObject, pyqtSignal
class MainWindow(QMainWindow):
def __init__(self, parent=None):
super().__init__(parent)
loadUi("mainwindow.ui", self)
def update_path(self, name):
self.label_usb_mass_storage_url.setText(str(name))
class LinuxDeviceMonitor(QObject):
devices_changed = pyqtSignal(list)
def __init__(self):
import pyudev
super().__init__()
self._context = pyudev.Context() # Für pyudev notwendiges Context-Objekt erstellen
self._monitor = pyudev.Monitor.from_netlink(self._context) # Mit Kernel daemon verbinden
self._monitor.start() # Verbindung aufbauen
self._devices = set()
self._listeners = []
# Iterator (genannt Enumerator) mit allen udev-Geräten und Keyword Argument "add" an Methode "self._process_devices()" übergeben
self._process_devices(self._context.list_devices(), action="add")
def add_listener(self, listener):
self._listeners.append(listener)
def remove_listener(self, listener):
self._listeners.remove(listener)
def fileno(self):
return self._monitor.fileno() # Dateideskriptor zurückgeben
@property
def device_names(self):
import pyudev
# Pfade zu Geräten in self._devices zurückgeben
# !!! DeviceNotFoundAtPathError entsteht hier !!!
return [pyudev.Devices.from_path(self._context, device).get("DEVNAME") for device in self._devices]
def process_incoming(self):
from pyudev._util import eintr_retry_call # ?
read_device = partial(eintr_retry_call, self._monitor.poll, timeout=0) # ?
self._process_devices(iter(read_device, None)) # Iterator mit Sentinel "None" an Methode "self._process_devices()" übergeben
self.devices_changed.emit(self.device_names) # Signal mit Namen der Geräte emitieren
def _process_devices(self, devices, action=None):
old = set(self._devices)
for device in devices:
action = device.action if action is None else action
if action in ("add", "change") and self._is_usb_mass_storage_device(device):
self._devices.add(device.sys_path) # Einhängepunkt einzelner USB-Geräte zu Liste "self._devices" hinzufügen
elif "remove" == action and device.sys_path in self._devices: # Warum '"remove" == action'?
self._devices.remove(device.sys_path)
if self.device_names != old:
devices = self._devices
for listener in self._listeners:
listener(devices)
def _is_usb_mass_storage_device(self, device):
def device_path(device, name):
return "{}/{}".format(device.sys_path, name) # Zu Gerätenamen passenden Pfad zurückgeben
def removable(device):
removable_path = device_path(device, "removable")
try:
return os.path.exists(removable_path) and int(open(removable_path).read()) # Warum 'int(open(removable_path).read())'?
except ValueError:
return False
def has_size(device):
size_path = device_path(device, "size")
try:
return os.path.exists(size_path) and int(open(size_path).read()) # Warum 'int(open(size_path).read())'?
except ValueError:
return False
def has_usb(device):
return device.get("ID_BUS") == "usb"
def has_no_disc(device):
return device.get("ID_CDROM") is None
return removable(device) and has_size(device) and has_usb(device) and has_no_disc(device)
def main():
app = QApplication(sys.argv)
main_window = MainWindow()
main_window.show()
linux_device_monitor = LinuxDeviceMonitor()
notifier = QSocketNotifier(linux_device_monitor.fileno(), QSocketNotifier.Read)
notifier.activated.connect(linux_device_monitor.process_incoming)
linux_device_monitor.devices_changed.connect(main_window.update_path)
sys.exit(app.exec_())
if __name__ == '__main__':
main()
Code: Alles auswählen
@classmethod
def _read_device_flag(device, name):
path = os.path.join(device.sys_path, name)
try:
with open(path) as data:
return bool(int(data.read()))
except (IOError, ValueError):
return False
def _is_usb_mass_storage_device(self, device):
is_removable = self._read_device_flag(device, "removable")
has_size = self._read_device_flag(device, "size")
has_usb = device.get("ID_BUS") == "usb"
has_no_disc = device.get("ID_CDROM") is None
return is_removable and has_size and has_usb and has_no_disc
Soviel ich verstanden habe ist `partial()` eine Methode zum Erstellen einer Funktion, die eine andere Funktion mit vorgegebenen Argumenten aufruft.__deets__ hat geschrieben: Freitag 22. März 2019, 23:49 - 4: lies dich in partial ein. Dann erklärt sich das hoffentlich.
"actions" sind die Aktionen, die beim Anstecken und Abstecken eines USB-Geräts auftreten. Was mich daran verwunderte war der umgekehrte Vergleich `"remove" == action`, also "Wert" == Variable und nicht Variable == "Wert". Anscheinend macht die Reihenfolge keinen Unterschied oder etwa doch?__deets__ hat geschrieben: Freitag 22. März 2019, 23:49 - 5: schau dir die actions an, was können die sein? Was denkst du kann so passieren wenn man eben ein USB gerät einsteckt, oder aussteckt?
Code: Alles auswählen
from functools import partial
import os
import sys
import pyudev
from PyQt5.QtWidgets import QApplication, QMainWindow
from PyQt5.uic import loadUi
from PyQt5.QtCore import QSocketNotifier
from PyQt5.QtCore import QObject, pyqtSignal
class MainWindow(QMainWindow):
def __init__(self, parent=None):
super().__init__(parent)
loadUi("mainwindow.ui", self)
def update_path(self, name):
self.label_usb_mass_storage_url.setText(str(name))
class LinuxDeviceMonitor(QObject):
devices_changed = pyqtSignal(list)
def __init__(self):
super().__init__()
self._context = pyudev.Context() # Für pyudev notwendiges Context-Objekt erstellen
self._monitor = pyudev.Monitor.from_netlink(self._context) # Mit Kernel daemon verbinden
self._monitor.start() # Verbindung aufbauen
self._devices = set()
# Iterator (genannt Enumerator) mit allen udev-Geräten und Keyword Argument "add" an Methode "self._process_devices()" übergeben
self._process_devices(self._context.list_devices(), action="add")
def fileno(self):
return self._monitor.fileno() # Dateideskriptor zurückgeben
@property
def device_names(self):
# Pfade zu Geräten in self._devices zurückgeben
# !!! DeviceNotFoundAtPathError entsteht hier !!!
return [pyudev.Devices.from_path(self._context, device).get("DEVNAME") for device in self._devices]
def process_incoming(self):
# Notwendig, um die Daten auf dem Socket/Dateideskriptor abzuholen und Ereignisse zu generieren
read_device = partial(pyudev._util.eintr_retry_call, self._monitor.poll, timeout=0)
# Iterator mit Sentinel "None" an Methode "self._process_devices()" übergeben
self._process_devices(iter(read_device, None))
# Signal mit Namen der Geräte emitieren
self.devices_changed.emit(self.device_names)
def _process_devices(self, devices, action=None):
for device in devices:
action = device.action if action is None else action
if action in ("add", "change") and self._is_usb_mass_storage_device(device):
self._devices.add(device.sys_path) # Einhängepunkt einzelner USB-Geräte zu Liste "self._devices" hinzufügen
elif action == "remove" and device.sys_path in self._devices:
self._devices.remove(device.sys_path)
@classmethod
def _read_device_flag(self, device, name):
path = os.path.join(device.sys_path, name)
try:
with open(path) as data:
return bool(int(data.read()))
except (IOError, ValueError):
return False
def _is_usb_mass_storage_device(self, device):
is_removable = self._read_device_flag(device, "removable")
has_size = self._read_device_flag(device, "size")
has_usb = device.get("ID_BUS") == "usb"
has_no_disc = device.get("ID_CDROM") is None
return is_removable and has_size and has_usb and has_no_disc
def main():
app = QApplication(sys.argv)
main_window = MainWindow()
main_window.show()
linux_device_monitor = LinuxDeviceMonitor()
notifier = QSocketNotifier(linux_device_monitor.fileno(), QSocketNotifier.Read)
notifier.activated.connect(linux_device_monitor.process_incoming)
linux_device_monitor.devices_changed.connect(main_window.update_path)
sys.exit(app.exec_())
if __name__ == '__main__':
main()
Code: Alles auswählen
import win32serviceutil
import win32service
import win32event
import servicemanager
import win32gui
import win32gui_struct
struct = win32gui_struct.struct
pywintypes = win32gui_struct.pywintypes
import win32con
GUID_DEVINTERFACE_USB_DEVICE = "{A5DCBF10-6530-11D2-901F-00C04FB951ED}"
DBT_DEVICEARRIVAL = 0x8000
DBT_DEVICEREMOVECOMPLETE = 0x8004
import ctypes
#
# Cut-down clone of UnpackDEV_BROADCAST from win32gui_struct, to be
# used for monkey-patching said module with correct handling
# of the "name" param of DBT_DEVTYPE_DEVICEINTERFACE
#
def _UnpackDEV_BROADCAST(lparam):
if lparam == 0: return None
hdr_format = "iii"
hdr_size = struct.calcsize(hdr_format)
hdr_buf = win32gui.PyGetMemory(lparam, hdr_size)
size, devtype, reserved = struct.unpack("iii", hdr_buf)
# Due to x64 alignment issues, we need to use the full format string over
# the entire buffer. ie, on x64:
# calcsize('iiiP') != calcsize('iii')+calcsize('P')
buf = win32gui.PyGetMemory(lparam, size)
extra = {}
if devtype == win32con.DBT_DEVTYP_DEVICEINTERFACE:
fmt = hdr_format + "16s"
_, _, _, guid_bytes = struct.unpack(fmt, buf[:struct.calcsize(fmt)])
extra['classguid'] = pywintypes.IID(guid_bytes, True)
extra['name'] = ctypes.wstring_at(lparam + struct.calcsize(fmt))
else:
raise NotImplementedError("unknown device type %d" % (devtype,))
return win32gui_struct.DEV_BROADCAST_INFO(devtype, **extra)
win32gui_struct.UnpackDEV_BROADCAST = _UnpackDEV_BROADCAST
class DeviceEventService(win32serviceutil.ServiceFramework):
_svc_name_ = "DevEventHandler"
_svc_display_name_ = "Device Event Handler"
_svc_description_ = "Handle device notification events"
def __init__(self, args):
win32serviceutil.ServiceFramework.__init__(self, args)
self.hWaitStop = win32event.CreateEvent(None, 0, 0, None)
#
# Specify that we're interested in device interface
# events for USB devices
#
filter = win32gui_struct.PackDEV_BROADCAST_DEVICEINTERFACE(
GUID_DEVINTERFACE_USB_DEVICE
)
self.hDevNotify = win32gui.RegisterDeviceNotification(
self.ssh, # copy of the service status handle
filter,
win32con.DEVICE_NOTIFY_SERVICE_HANDLE
)
#
# Add to the list of controls already handled by the underlying
# ServiceFramework class. We're only interested in device events
#
def GetAcceptedControls(self):
rc = win32serviceutil.ServiceFramework.GetAcceptedControls(self)
rc |= win32service.SERVICE_CONTROL_DEVICEEVENT
return rc
#
# Handle non-standard service events (including our device broadcasts)
# by logging to the Application event log
#
def SvcOtherEx(self, control, event_type, data):
if control == win32service.SERVICE_CONTROL_DEVICEEVENT:
info = win32gui_struct.UnpackDEV_BROADCAST(data)
#
# This is the key bit here where you'll presumably
# do something other than log the event. Perhaps pulse
# a named event or write to a secure pipe etc. etc.
#
if event_type == DBT_DEVICEARRIVAL:
servicemanager.LogMsg(
servicemanager.EVENTLOG_INFORMATION_TYPE,
0xF000,
("Device %s arrived" % info.name, '')
)
elif event_type == DBT_DEVICEREMOVECOMPLETE:
servicemanager.LogMsg(
servicemanager.EVENTLOG_INFORMATION_TYPE,
0xF000,
("Device %s removed" % info.name, '')
)
#
# Standard stuff for stopping and running service; nothing
# specific to device notifications
#
def SvcStop(self):
self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
win32event.SetEvent(self.hWaitStop)
def SvcDoRun(self):
win32event.WaitForSingleObject(self.hWaitStop, win32event.INFINITE)
servicemanager.LogMsg(
servicemanager.EVENTLOG_INFORMATION_TYPE,
servicemanager.PYS_SERVICE_STOPPED,
(self._svc_name_, '')
)
if __name__ == '__main__':
win32serviceutil.HandleCommandLine(DeviceEventService)
Code: Alles auswählen
Starting service DevEventHandler
Error starting service: Zugriff verweigert
Code: Alles auswählen
runas /user:administrator "python main.py start"
runas /user:administrator "python main.py"
Code: Alles auswählen
runas /user:administrator "python main.py start"
Code: Alles auswählen
Starting service DevEventHandler
Error starting service: Der angegebene Dienst ist kein installierter Dienst.