diff --git a/src/calibre/devices/winusb.py b/src/calibre/devices/winusb.py index ed81863bcd..b343d61ec7 100644 --- a/src/calibre/devices/winusb.py +++ b/src/calibre/devices/winusb.py @@ -511,6 +511,33 @@ def get_device_interface_detail_data(dev_list, p_interface_data, buf=None): # }}} +# Enumerate USB devices {{{ + +USBDevice = namedtuple('USBDevice', 'vendor_id product_id bcd devid devinst') + +def iterusbdevices(): + buf = None + pat = devid_pat() + for dev_list, devinfo in DeviceSet(guid=None, enumerator='USB', flags=DIGCF_PRESENT | DIGCF_ALLCLASSES).devices(): + buf, devid = get_device_registry_property(dev_list, byref(devinfo), buf=buf) + if devid: + devid = devid[0].lower() + m = pat.search(devid) + if m is None: + yield USBDevice(None, None, None, devid, devinfo.DevInst) + else: + try: + vid, pid, bcd = map(lambda x:int(x, 16), m.group(1, 2, 3)) + except Exception: + yield USBDevice(None, None, None, devid, devinfo.DevInst) + else: + yield USBDevice(vid, pid, bcd, devid, devinfo.DevInst) + +def scan_usb_devices(): + return tuple(iterusbdevices()) + +# }}} + def get_removable_drives(debug=False): # {{{ drive_map = get_all_removable_drives(allow_fixed=False) if debug: @@ -540,41 +567,36 @@ def get_removable_drives(debug=False): # {{{ return ans # }}} -def get_drive_letters_for_device(vendor_id, product_id, bcd=None, storage_number_map=None, debug=False): # {{{ +def get_drive_letters_for_device(vendor_id, product_id, bcd=None, storage_number_map=None, devinst=None, debug=False): # {{{ ''' Get the drive letters for a connected device with the specified USB ids. bcd can be either None, in which case it is not tested, or it must be a list or set like object containing bcds. The drive letters are sorted by storage number, which (I think) corresponds to the order they are exported by the firmware. ''' - rbuf = None ans = {'pnp_id_map': {}, 'drive_letters':[]} sort_map = {} - # First search for a device matching the specified USB ids - for dev_list, devinfo in DeviceSet(enumerator='USB', flags=DIGCF_PRESENT | DIGCF_ALLCLASSES).devices(): - rbuf, devid = get_device_registry_property(dev_list, byref(devinfo), buf=rbuf) - if devid: - m = devid_pat().search(devid[0]) - if m is None: - continue - try: - vid, pid, rev = map(lambda x:int(x, 16), m.group(1, 2, 3)) - except Exception: - continue - if vid == vendor_id and pid == product_id and (bcd is None or (bcd and rev in bcd)): + sn_map = get_storage_number_map(debug=debug) if storage_number_map is None else storage_number_map + if debug: + prints('Storage number map:') + prints(pformat(sn_map)) + if not sn_map: + return ans + + if devinst is None: + # First search for a device matching the specified USB ids + for usbdev in iterusbdevices(): + if vendor_id == usbdev.vendor_id and product_id == usbdev.product_id and ( + bcd is None or (bcd and usbdev.bcd in bcd)): + devinst = usbdev.devinst break else: if debug: prints('Could not find device matching vid=0x%x pid=0x%x' % (vendor_id, product_id)) return ans - # Get the device ids for all descendants of the found device - sn_map = get_storage_number_map(debug=debug) if storage_number_map is None else storage_number_map - if debug: - prints('Storage number map:') - prints(pformat(sn_map)) - descendants = frozenset(iterdescendants(devinfo.DevInst)) + descendants = frozenset(iterdescendants(devinst)) for devinfo, devpath in DeviceSet(GUID_DEVINTERFACE_DISK).interfaces(): if devinfo.DevInst in descendants: if debug: @@ -624,12 +646,7 @@ def get_storage_number_map(drive_types=(DRIVE_REMOVABLE, DRIVE_FIXED), debug=Fal # }}} def get_usb_devices(): # {{{ - ans, buf = [], None - for dev_list, devinfo in DeviceSet(guid=None, enumerator='USB', flags=DIGCF_PRESENT | DIGCF_ALLCLASSES).devices(): - buf, devid = get_device_registry_property(dev_list, byref(devinfo), buf=buf) - if devid: - ans.append(devid[0].lower()) - return ans + return list(x.devid for x in iterusbdevices()) # }}} def is_usb_device_connected(vendor_id, product_id): # {{{