diff --git a/src/calibre/devices/winusb.py b/src/calibre/devices/winusb.py index 6e97542fab..06e4118b90 100644 --- a/src/calibre/devices/winusb.py +++ b/src/calibre/devices/winusb.py @@ -6,7 +6,6 @@ from __future__ import (unicode_literals, division, absolute_import, print_function) import os, string, _winreg as winreg, re, time, sys -from contextlib import contextmanager from ctypes import ( Structure, POINTER, c_ubyte, windll, byref, c_void_p, WINFUNCTYPE, WinError, get_last_error, sizeof, c_wchar, create_string_buffer, cast, @@ -292,11 +291,42 @@ CM_Get_Device_Interface_List = cwrap('CM_Get_Device_Interface_ListW', CONFIGRET, # }}} # Utility functions {{{ -@contextmanager -def get_device_set(guid=byref(GUID_DEVINTERFACE_VOLUME), enumerator=None, flags=DIGCF_PRESENT | DIGCF_DEVICEINTERFACE): - dev_list = SetupDiGetClassDevs(guid, enumerator, None, flags) - yield dev_list - SetupDiDestroyDeviceInfoList(dev_list) +class DeviceSet(object): + + def __init__(self, guid=GUID_DEVINTERFACE_VOLUME, enumerator=None, flags=DIGCF_PRESENT | DIGCF_DEVICEINTERFACE): + self.guid_ref, self.enumerator, self.flags = (None if guid is None else byref(guid)), enumerator, flags + self.dev_list = SetupDiGetClassDevs(self.guid_ref, self.enumerator, None, self.flags) + + def __del__(self): + SetupDiDestroyDeviceInfoList(self.dev_list) + del self.dev_list + + def interfaces(self, ignore_errors=False): + interface_data = SP_DEVICE_INTERFACE_DATA() + interface_data.cbSize = sizeof(SP_DEVICE_INTERFACE_DATA) + buf = None + i = -1 + while True: + i += 1 + if not SetupDiEnumDeviceInterfaces(self.dev_list, None, self.guid_ref, i, byref(interface_data)): + break + try: + buf, devinfo, devpath = get_device_interface_detail_data(self.dev_list, byref(interface_data), buf) + except WindowsError: + if ignore_errors: + continue + raise + yield devinfo, devpath + + def devices(self): + devinfo = SP_DEVINFO_DATA() + devinfo.cbSize = sizeof(SP_DEVINFO_DATA) + i = -1 + while True: + i += 1 + if not SetupDiEnumDeviceInfo(self.dev_list, i, byref(devinfo)): + break + yield self.dev_list, devinfo def iterchildren(parent_devinst): @@ -450,41 +480,31 @@ def get_removable_drives(debug=False): # {{{ if not drive_map: raise NoRemovableDrives('No removable drives found!') - buf = None - pbuf = create_unicode_buffer(512) - ans = {} - with get_device_set() as dev_list: - interface_data = SP_DEVICE_INTERFACE_DATA() - interface_data.cbSize = sizeof(SP_DEVICE_INTERFACE_DATA) - i = -1 + ans, buf = {}, None + for devinfo, devpath in DeviceSet().interfaces(): + candidates = [] + # Get the devpaths for all parents of this device. This is not + # actually necessary on Vista+, so we simply ignore any windows API + # failures. + parent = DEVINST(devinfo.DevInst) while True: - i += 1 - if not SetupDiEnumDeviceInterfaces(dev_list, None, byref(GUID_DEVINTERFACE_VOLUME), i, byref(interface_data)): + try: + CM_Get_Parent(byref(parent), parent, 0) + except WindowsError: break - buf, devinfo, devpath = get_device_interface_detail_data(dev_list, byref(interface_data), buf) - candidates = [] - # Get the devpaths for all parents of this device. This is not - # actually necessary on Vista+, so we simply ignore any windows API - # failures. - parent = DEVINST(devinfo.DevInst) - while True: - try: - CM_Get_Parent(byref(parent), parent, 0) - except WindowsError: - break - try: - devid, pbuf = get_device_id(parent, buf=pbuf) - except WindowsError: - break - candidates.append(devid) - candidates.append(devpath) + try: + devid, buf = get_device_id(parent, buf=buf) + except WindowsError: + break + candidates.append(devid) + candidates.append(devpath) - drive_letter = drive_letter_from_volume_devpath(devpath, drive_map) - if drive_letter: - ans[drive_letter] = candidates - if debug: - prints('Found volume with device path:', devpath, ' Drive letter:', drive_letter, 'Is removable:', drive_letter in ans) - return ans + drive_letter = drive_letter_from_volume_devpath(devpath, drive_map) + if drive_letter: + ans[drive_letter] = candidates + if debug: + prints('Found volume with device path:', devpath, ' Drive letter:', drive_letter, 'Is removable:', drive_letter in ans) + return ans # }}} def get_drive_letters_for_device(vendor_id, product_id, bcd=None, debug=False): # {{{ @@ -497,28 +517,19 @@ def get_drive_letters_for_device(vendor_id, product_id, bcd=None, debug=False): ans = [] # First search for a device matching the specified USB ids - with get_device_set(enumerator='USB', flags=DIGCF_PRESENT | DIGCF_ALLCLASSES) as dev_list: - devinfo = SP_DEVINFO_DATA() - devinfo.cbSize = sizeof(SP_DEVINFO_DATA) - i = -1 - found_at = None - while True: - i += 1 - if not SetupDiEnumDeviceInfo(dev_list, i, byref(devinfo)): + for dev_list, devinfo in DeviceSet(enumerator='USB', flags=DIGCF_PRESENT | DIGCF_ALLCLASSES).devices(): + rbuf, devid = get_device_registry_property(dev_list, byref(devinfo), buf=rbuf) + if devid: + m = devid_pat().search(devid[0]) + if m is None: + continue + try: + vid, pid, rev = map(lambda x:int(x, 16), m.group(1, 2, 3)) + except Exception: + continue + if vid == vendor_id and pid == product_id and (bcd is None or (bcd and rev in bcd)): break - rbuf, devid = get_device_registry_property(dev_list, byref(devinfo), buf=rbuf) - if devid: - m = devid_pat().search(devid[0]) - if m is None: - continue - try: - vid, pid, rev = map(lambda x:int(x, 16), m.group(1, 2, 3)) - except Exception: - continue - if vid == vendor_id and pid == product_id and (bcd is None or (bcd and rev in bcd)): - found_at = i - 1 - break - if found_at is None: + else: if debug: prints('Could not find device matching vid=0x%x pid=0x%x' % (vendor_id, product_id)) return ans @@ -577,61 +588,37 @@ def get_storage_number_map(drive_types=(DRIVE_REMOVABLE, DRIVE_FIXED), debug=Fal def find_drive(devinst, storage_number_map, debug=False): - buf = None - interface_data = SP_DEVICE_INTERFACE_DATA() - interface_data.cbSize = sizeof(SP_DEVICE_INTERFACE_DATA) - with get_device_set(guid=byref(GUID_DEVINTERFACE_DISK)) as dev_list: - i = -1 - while True: - i += 1 - if not SetupDiEnumDeviceInterfaces(dev_list, None, byref(GUID_DEVINTERFACE_DISK), i, byref(interface_data)): - break - buf, devinfo, devpath = get_device_interface_detail_data(dev_list, byref(interface_data), buf=buf) - if devinfo.DevInst == devinst: - storage_number = get_storage_number(devpath) - drive_letter = storage_number_map.get(storage_number) - if drive_letter: - return drive_letter + for devinfo, devpath in DeviceSet(GUID_DEVINTERFACE_DISK).interfaces(): + if devinfo.DevInst == devinst: + storage_number = get_storage_number(devpath) + drive_letter = storage_number_map.get(storage_number) + if drive_letter: + return drive_letter # }}} def get_usb_devices(): # {{{ - buf = None - ans = [] - with get_device_set(guid=None, enumerator='USB', flags=DIGCF_PRESENT | DIGCF_ALLCLASSES) as dev_list: - devinfo = SP_DEVINFO_DATA() - devinfo.cbSize = sizeof(SP_DEVINFO_DATA) - i = -1 - while True: - i += 1 - if not SetupDiEnumDeviceInfo(dev_list, i, byref(devinfo)): - break - buf, devid = get_device_registry_property(dev_list, byref(devinfo), buf=buf) - if devid: - ans.append(devid[0].lower()) + ans, buf = [], None + for dev_list, devinfo in DeviceSet(guid=None, enumerator='USB', flags=DIGCF_PRESENT | DIGCF_ALLCLASSES).devices(): + buf, devid = get_device_registry_property(dev_list, byref(devinfo), buf=buf) + if devid: + ans.append(devid[0].lower()) return ans # }}} def is_usb_device_connected(vendor_id, product_id): # {{{ buf = None - with get_device_set(guid=None, enumerator='USB', flags=DIGCF_PRESENT | DIGCF_ALLCLASSES) as dev_list: - devinfo = SP_DEVINFO_DATA() - devinfo.cbSize = sizeof(SP_DEVINFO_DATA) - i = -1 - while True: - i += 1 - if not SetupDiEnumDeviceInfo(dev_list, i, byref(devinfo)): - break - buf, devid = get_device_registry_property(dev_list, byref(devinfo), buf=buf) - if devid: - m = devid_pat().search(devid[0]) - if m is not None: - try: - vid, pid = map(lambda x: int(x, 16), m.group(1, 2)) - except Exception: - continue - if vid == vendor_id and pid == product_id: - return True + for dev_list, devinfo in DeviceSet(guid=None, enumerator='USB', flags=DIGCF_PRESENT | DIGCF_ALLCLASSES).devices(): + buf, devid = get_device_registry_property(dev_list, byref(devinfo), buf=buf) + if devid: + m = devid_pat().search(devid[0]) + if m is not None: + try: + vid, pid = map(lambda x: int(x, 16), m.group(1, 2)) + except Exception: + continue + if vid == vendor_id and pid == product_id: + return True return False # }}} @@ -663,7 +650,6 @@ def eject_drive(drive_letter): # {{{ def devinst_from_device_number(drive_letter, device_number): drive_root = drive_letter + ':' + os.sep buf = create_unicode_buffer(512) - bbuf = None sdn = STORAGE_DEVICE_NUMBER() drive_type = GetDriveType(drive_root) QueryDosDevice(drive_letter + ':', buf, len(buf)) @@ -676,25 +662,14 @@ def devinst_from_device_number(drive_letter, device_number): guid = GUID_DEVINTERFACE_CDROM else: raise ValueError('Unknown drive_type: %d' % drive_type) - with get_device_set(guid=guid) as dev_list: - interface_data = SP_DEVICE_INTERFACE_DATA() - interface_data.cbSize = sizeof(SP_DEVICE_INTERFACE_DATA) - i = -1 - while True: - i += 1 - if not SetupDiEnumDeviceInterfaces(dev_list, None, byref(guid), i, byref(interface_data)): - break - try: - bbuf, devinfo, devpath = get_device_interface_detail_data(dev_list, byref(interface_data), bbuf) - except WindowsError: - continue - handle = CreateFile(devpath, 0, FILE_SHARE_READ | FILE_SHARE_WRITE, None, OPEN_EXISTING, 0, None) - try: - DeviceIoControl(handle, IOCTL_STORAGE_GET_DEVICE_NUMBER, None, 0, byref(sdn), sizeof(STORAGE_DEVICE_NUMBER), None, None) - finally: - CloseHandle(handle) - if sdn.DeviceNumber == device_number: - return devinfo.DevInst + for devinfo, devpath in DeviceSet(guid=guid).interfaces(ignore_errors=True): + handle = CreateFile(devpath, 0, FILE_SHARE_READ | FILE_SHARE_WRITE, None, OPEN_EXISTING, 0, None) + try: + DeviceIoControl(handle, IOCTL_STORAGE_GET_DEVICE_NUMBER, None, 0, byref(sdn), sizeof(STORAGE_DEVICE_NUMBER), None, None) + finally: + CloseHandle(handle) + if sdn.DeviceNumber == device_number: + return devinfo.DevInst # }}}