Change algorithm to get drives from device ids

No longer depends on the device having a UUID
This commit is contained in:
Kovid Goyal 2016-01-16 22:56:55 +05:30
parent 845b896803
commit 6e02c4a32d

View File

@ -5,7 +5,7 @@
from __future__ import (unicode_literals, division, absolute_import, from __future__ import (unicode_literals, division, absolute_import,
print_function) print_function)
import os, string, _winreg as winreg, re, time, errno import os, string, _winreg as winreg, re, time, sys
from contextlib import contextmanager from contextlib import contextmanager
from ctypes import ( from ctypes import (
Structure, POINTER, c_ubyte, windll, byref, c_void_p, WINFUNCTYPE, Structure, POINTER, c_ubyte, windll, byref, c_void_p, WINFUNCTYPE,
@ -13,8 +13,7 @@ from ctypes import (
wstring_at, addressof, create_unicode_buffer, string_at, c_uint64 as QWORD) wstring_at, addressof, create_unicode_buffer, string_at, c_uint64 as QWORD)
from ctypes.wintypes import DWORD, WORD, ULONG, LPCWSTR, HWND, BOOL, LPWSTR, UINT, BYTE, HANDLE from ctypes.wintypes import DWORD, WORD, ULONG, LPCWSTR, HWND, BOOL, LPWSTR, UINT, BYTE, HANDLE
from calibre.constants import is64bit is64bit = sys.maxsize > (1 << 32)
from calibre.utils.winreg.lib import Key, HKEY_LOCAL_MACHINE
# Data and function type definitions {{{ # Data and function type definitions {{{
@ -426,6 +425,17 @@ def get_device_interface_detail_data(dev_list, p_interface_data, buf=None):
class NoRemovableDrives(WindowsError): class NoRemovableDrives(WindowsError):
pass pass
def drive_letter_from_volume_devpath(devpath, drive_map):
pbuf = create_unicode_buffer(512)
if not devpath.endswith(os.sep):
devpath += os.sep
try:
GetVolumeNameForVolumeMountPoint(devpath, pbuf, len(pbuf))
except WindowsError:
pass
else:
return drive_map.get(pbuf.value)
def get_removable_drives(debug=False): def get_removable_drives(debug=False):
drive_map = get_all_removable_drives() drive_map = get_all_removable_drives()
if not drive_map: if not drive_map:
@ -460,14 +470,8 @@ def get_removable_drives(debug=False):
candidates.append(devid) candidates.append(devid)
candidates.append(devpath) candidates.append(devpath)
if not devpath.endswith(os.sep): drive_letter = drive_letter_from_volume_devpath(devpath, drive_map)
devpath += os.sep if drive_letter:
try:
GetVolumeNameForVolumeMountPoint(devpath, pbuf, len(pbuf))
except WindowsError:
continue
drive_letter = drive_map.get(pbuf.value)
if drive_letter is not None:
ans[drive_letter] = candidates ans[drive_letter] = candidates
return ans return ans
# }}} # }}}
@ -510,45 +514,44 @@ def get_drive_letters_for_device(vendor_id, product_id, debug=False):
break break
if found_at is None: if found_at is None:
return ans return ans
try:
with Key(open_at='SYSTEM\\CurrentControlSet\\Enum\\USB\\VID_%04X&PID_%04X' % (vid, pid), root=HKEY_LOCAL_MACHINE) as key: # Get the device ids for all descendants of the found device
available_uuids = {uuid.lower() for uuid in key.iterkeynames()} device_ids = set()
except WindowsError as err: for devinst in iterdescendants(devinfo.DevInst):
if err.errno != errno.ENOENT: devid, wbuf = get_device_id(devinst, buf=wbuf)
raise device_ids.add(devid.upper().replace(os.sep, '#'))
return ans if debug:
if not available_uuids: print('Device ids: %r' % device_ids)
if not device_ids:
return ans return ans
drive_map = get_all_removable_drives()
if not drive_map:
raise NoRemovableDrives('No removable drives found!')
# Now look for volumes whose device path contains one of the child device
# ids we found earlier
with get_device_set() as dev_list: with get_device_set() as dev_list:
interface_data = SP_DEVICE_INTERFACE_DATA() interface_data = SP_DEVICE_INTERFACE_DATA()
interface_data.cbSize = sizeof(SP_DEVICE_INTERFACE_DATA) interface_data.cbSize = sizeof(SP_DEVICE_INTERFACE_DATA)
i = -1 i = -1
drive_map = get_all_removable_drives()
vbuf = create_unicode_buffer(100)
while True: while True:
i += 1 i += 1
if not SetupDiEnumDeviceInterfaces(dev_list, None, byref(GUID_DEVINTERFACE_VOLUME), i, byref(interface_data)): if not SetupDiEnumDeviceInterfaces(dev_list, None, byref(GUID_DEVINTERFACE_VOLUME), i, byref(interface_data)):
break break
buf, devinfo, devpath = get_device_interface_detail_data(dev_list, byref(interface_data), buf) buf, devinfo, devpath = get_device_interface_detail_data(dev_list, byref(interface_data), buf)
devid, wbuf = get_device_id(devinfo.DevInst, buf=wbuf) devpath = devpath.upper()
if 'USBSTOR' not in devid: matched = False
continue for q in device_ids:
parts = devid.split('#') if q in devpath:
if len(parts) < 3: matched = True
continue break
uuid = parts[2].partition('&')[0].lower() if matched:
if uuid not in available_uuids: drive_letter = drive_letter_from_volume_devpath(devpath, drive_map)
continue if drive_letter:
if not devpath.endswith(os.sep):
devpath += os.sep
try:
GetVolumeNameForVolumeMountPoint(devpath, vbuf, len(vbuf))
except WindowsError:
continue
drive_letter = drive_map.get(vbuf.value)
if drive_letter is not None:
ans.append(drive_letter) ans.append(drive_letter)
return ans return ans
# }}} # }}}
def get_usb_devices(): # {{{ def get_usb_devices(): # {{{
@ -660,6 +663,6 @@ if __name__ == '__main__':
pprint(get_all_removable_drives()) pprint(get_all_removable_drives())
rd = get_removable_drives() rd = get_removable_drives()
pprint(rd) pprint(rd)
pprint(get_drive_letters_for_device(0x1949, 0x4)) pprint(get_drive_letters_for_device(0x1949, 0x4, debug=True))
for drive in rd: for drive in rd:
eject_drive(drive) eject_drive(drive)