This commit is contained in:
Kovid Goyal 2016-01-18 11:12:33 +05:30
parent 97cae90435
commit 55ddf4e7bf

View File

@ -511,6 +511,33 @@ def get_device_interface_detail_data(dev_list, p_interface_data, buf=None):
# }}} # }}}
# Enumerate USB devices {{{
USBDevice = namedtuple('USBDevice', 'vendor_id product_id bcd devid devinst')
def iterusbdevices():
buf = None
pat = devid_pat()
for dev_list, devinfo in DeviceSet(guid=None, enumerator='USB', flags=DIGCF_PRESENT | DIGCF_ALLCLASSES).devices():
buf, devid = get_device_registry_property(dev_list, byref(devinfo), buf=buf)
if devid:
devid = devid[0].lower()
m = pat.search(devid)
if m is None:
yield USBDevice(None, None, None, devid, devinfo.DevInst)
else:
try:
vid, pid, bcd = map(lambda x:int(x, 16), m.group(1, 2, 3))
except Exception:
yield USBDevice(None, None, None, devid, devinfo.DevInst)
else:
yield USBDevice(vid, pid, bcd, devid, devinfo.DevInst)
def scan_usb_devices():
return tuple(iterusbdevices())
# }}}
def get_removable_drives(debug=False): # {{{ def get_removable_drives(debug=False): # {{{
drive_map = get_all_removable_drives(allow_fixed=False) drive_map = get_all_removable_drives(allow_fixed=False)
if debug: if debug:
@ -540,41 +567,36 @@ def get_removable_drives(debug=False): # {{{
return ans return ans
# }}} # }}}
def get_drive_letters_for_device(vendor_id, product_id, bcd=None, storage_number_map=None, debug=False): # {{{ def get_drive_letters_for_device(vendor_id, product_id, bcd=None, storage_number_map=None, devinst=None, debug=False): # {{{
''' '''
Get the drive letters for a connected device with the specified USB ids. bcd Get the drive letters for a connected device with the specified USB ids. bcd
can be either None, in which case it is not tested, or it must be a list or can be either None, in which case it is not tested, or it must be a list or
set like object containing bcds. The drive letters are sorted by storage number, set like object containing bcds. The drive letters are sorted by storage number,
which (I think) corresponds to the order they are exported by the firmware. which (I think) corresponds to the order they are exported by the firmware.
''' '''
rbuf = None
ans = {'pnp_id_map': {}, 'drive_letters':[]} ans = {'pnp_id_map': {}, 'drive_letters':[]}
sort_map = {} sort_map = {}
sn_map = get_storage_number_map(debug=debug) if storage_number_map is None else storage_number_map
if debug:
prints('Storage number map:')
prints(pformat(sn_map))
if not sn_map:
return ans
if devinst is None:
# First search for a device matching the specified USB ids # First search for a device matching the specified USB ids
for dev_list, devinfo in DeviceSet(enumerator='USB', flags=DIGCF_PRESENT | DIGCF_ALLCLASSES).devices(): for usbdev in iterusbdevices():
rbuf, devid = get_device_registry_property(dev_list, byref(devinfo), buf=rbuf) if vendor_id == usbdev.vendor_id and product_id == usbdev.product_id and (
if devid: bcd is None or (bcd and usbdev.bcd in bcd)):
m = devid_pat().search(devid[0]) devinst = usbdev.devinst
if m is None:
continue
try:
vid, pid, rev = map(lambda x:int(x, 16), m.group(1, 2, 3))
except Exception:
continue
if vid == vendor_id and pid == product_id and (bcd is None or (bcd and rev in bcd)):
break break
else: else:
if debug: if debug:
prints('Could not find device matching vid=0x%x pid=0x%x' % (vendor_id, product_id)) prints('Could not find device matching vid=0x%x pid=0x%x' % (vendor_id, product_id))
return ans return ans
# Get the device ids for all descendants of the found device descendants = frozenset(iterdescendants(devinst))
sn_map = get_storage_number_map(debug=debug) if storage_number_map is None else storage_number_map
if debug:
prints('Storage number map:')
prints(pformat(sn_map))
descendants = frozenset(iterdescendants(devinfo.DevInst))
for devinfo, devpath in DeviceSet(GUID_DEVINTERFACE_DISK).interfaces(): for devinfo, devpath in DeviceSet(GUID_DEVINTERFACE_DISK).interfaces():
if devinfo.DevInst in descendants: if devinfo.DevInst in descendants:
if debug: if debug:
@ -624,12 +646,7 @@ def get_storage_number_map(drive_types=(DRIVE_REMOVABLE, DRIVE_FIXED), debug=Fal
# }}} # }}}
def get_usb_devices(): # {{{ def get_usb_devices(): # {{{
ans, buf = [], None return list(x.devid for x in iterusbdevices())
for dev_list, devinfo in DeviceSet(guid=None, enumerator='USB', flags=DIGCF_PRESENT | DIGCF_ALLCLASSES).devices():
buf, devid = get_device_registry_property(dev_list, byref(devinfo), buf=buf)
if devid:
ans.append(devid[0].lower())
return ans
# }}} # }}}
def is_usb_device_connected(vendor_id, product_id): # {{{ def is_usb_device_connected(vendor_id, product_id): # {{{