This commit is contained in:
Kovid Goyal 2016-01-18 08:10:12 +05:30
parent a239bf686a
commit 4f5146e2ea

View File

@ -6,7 +6,6 @@ from __future__ import (unicode_literals, division, absolute_import,
print_function)
import os, string, _winreg as winreg, re, time, sys
from contextlib import contextmanager
from ctypes import (
Structure, POINTER, c_ubyte, windll, byref, c_void_p, WINFUNCTYPE,
WinError, get_last_error, sizeof, c_wchar, create_string_buffer, cast,
@ -292,11 +291,42 @@ CM_Get_Device_Interface_List = cwrap('CM_Get_Device_Interface_ListW', CONFIGRET,
# }}}
# Utility functions {{{
@contextmanager
def get_device_set(guid=byref(GUID_DEVINTERFACE_VOLUME), enumerator=None, flags=DIGCF_PRESENT | DIGCF_DEVICEINTERFACE):
dev_list = SetupDiGetClassDevs(guid, enumerator, None, flags)
yield dev_list
SetupDiDestroyDeviceInfoList(dev_list)
class DeviceSet(object):
def __init__(self, guid=GUID_DEVINTERFACE_VOLUME, enumerator=None, flags=DIGCF_PRESENT | DIGCF_DEVICEINTERFACE):
self.guid_ref, self.enumerator, self.flags = (None if guid is None else byref(guid)), enumerator, flags
self.dev_list = SetupDiGetClassDevs(self.guid_ref, self.enumerator, None, self.flags)
def __del__(self):
SetupDiDestroyDeviceInfoList(self.dev_list)
del self.dev_list
def interfaces(self, ignore_errors=False):
interface_data = SP_DEVICE_INTERFACE_DATA()
interface_data.cbSize = sizeof(SP_DEVICE_INTERFACE_DATA)
buf = None
i = -1
while True:
i += 1
if not SetupDiEnumDeviceInterfaces(self.dev_list, None, self.guid_ref, i, byref(interface_data)):
break
try:
buf, devinfo, devpath = get_device_interface_detail_data(self.dev_list, byref(interface_data), buf)
except WindowsError:
if ignore_errors:
continue
raise
yield devinfo, devpath
def devices(self):
devinfo = SP_DEVINFO_DATA()
devinfo.cbSize = sizeof(SP_DEVINFO_DATA)
i = -1
while True:
i += 1
if not SetupDiEnumDeviceInfo(self.dev_list, i, byref(devinfo)):
break
yield self.dev_list, devinfo
def iterchildren(parent_devinst):
@ -450,41 +480,31 @@ def get_removable_drives(debug=False): # {{{
if not drive_map:
raise NoRemovableDrives('No removable drives found!')
buf = None
pbuf = create_unicode_buffer(512)
ans = {}
with get_device_set() as dev_list:
interface_data = SP_DEVICE_INTERFACE_DATA()
interface_data.cbSize = sizeof(SP_DEVICE_INTERFACE_DATA)
i = -1
ans, buf = {}, None
for devinfo, devpath in DeviceSet().interfaces():
candidates = []
# Get the devpaths for all parents of this device. This is not
# actually necessary on Vista+, so we simply ignore any windows API
# failures.
parent = DEVINST(devinfo.DevInst)
while True:
i += 1
if not SetupDiEnumDeviceInterfaces(dev_list, None, byref(GUID_DEVINTERFACE_VOLUME), i, byref(interface_data)):
try:
CM_Get_Parent(byref(parent), parent, 0)
except WindowsError:
break
buf, devinfo, devpath = get_device_interface_detail_data(dev_list, byref(interface_data), buf)
candidates = []
# Get the devpaths for all parents of this device. This is not
# actually necessary on Vista+, so we simply ignore any windows API
# failures.
parent = DEVINST(devinfo.DevInst)
while True:
try:
CM_Get_Parent(byref(parent), parent, 0)
except WindowsError:
break
try:
devid, pbuf = get_device_id(parent, buf=pbuf)
except WindowsError:
break
candidates.append(devid)
candidates.append(devpath)
try:
devid, buf = get_device_id(parent, buf=buf)
except WindowsError:
break
candidates.append(devid)
candidates.append(devpath)
drive_letter = drive_letter_from_volume_devpath(devpath, drive_map)
if drive_letter:
ans[drive_letter] = candidates
if debug:
prints('Found volume with device path:', devpath, ' Drive letter:', drive_letter, 'Is removable:', drive_letter in ans)
return ans
drive_letter = drive_letter_from_volume_devpath(devpath, drive_map)
if drive_letter:
ans[drive_letter] = candidates
if debug:
prints('Found volume with device path:', devpath, ' Drive letter:', drive_letter, 'Is removable:', drive_letter in ans)
return ans
# }}}
def get_drive_letters_for_device(vendor_id, product_id, bcd=None, debug=False): # {{{
@ -497,28 +517,19 @@ def get_drive_letters_for_device(vendor_id, product_id, bcd=None, debug=False):
ans = []
# First search for a device matching the specified USB ids
with get_device_set(enumerator='USB', flags=DIGCF_PRESENT | DIGCF_ALLCLASSES) as dev_list:
devinfo = SP_DEVINFO_DATA()
devinfo.cbSize = sizeof(SP_DEVINFO_DATA)
i = -1
found_at = None
while True:
i += 1
if not SetupDiEnumDeviceInfo(dev_list, i, byref(devinfo)):
for dev_list, devinfo in DeviceSet(enumerator='USB', flags=DIGCF_PRESENT | DIGCF_ALLCLASSES).devices():
rbuf, devid = get_device_registry_property(dev_list, byref(devinfo), buf=rbuf)
if devid:
m = devid_pat().search(devid[0])
if m is None:
continue
try:
vid, pid, rev = map(lambda x:int(x, 16), m.group(1, 2, 3))
except Exception:
continue
if vid == vendor_id and pid == product_id and (bcd is None or (bcd and rev in bcd)):
break
rbuf, devid = get_device_registry_property(dev_list, byref(devinfo), buf=rbuf)
if devid:
m = devid_pat().search(devid[0])
if m is None:
continue
try:
vid, pid, rev = map(lambda x:int(x, 16), m.group(1, 2, 3))
except Exception:
continue
if vid == vendor_id and pid == product_id and (bcd is None or (bcd and rev in bcd)):
found_at = i - 1
break
if found_at is None:
else:
if debug:
prints('Could not find device matching vid=0x%x pid=0x%x' % (vendor_id, product_id))
return ans
@ -577,61 +588,37 @@ def get_storage_number_map(drive_types=(DRIVE_REMOVABLE, DRIVE_FIXED), debug=Fal
def find_drive(devinst, storage_number_map, debug=False):
buf = None
interface_data = SP_DEVICE_INTERFACE_DATA()
interface_data.cbSize = sizeof(SP_DEVICE_INTERFACE_DATA)
with get_device_set(guid=byref(GUID_DEVINTERFACE_DISK)) as dev_list:
i = -1
while True:
i += 1
if not SetupDiEnumDeviceInterfaces(dev_list, None, byref(GUID_DEVINTERFACE_DISK), i, byref(interface_data)):
break
buf, devinfo, devpath = get_device_interface_detail_data(dev_list, byref(interface_data), buf=buf)
if devinfo.DevInst == devinst:
storage_number = get_storage_number(devpath)
drive_letter = storage_number_map.get(storage_number)
if drive_letter:
return drive_letter
for devinfo, devpath in DeviceSet(GUID_DEVINTERFACE_DISK).interfaces():
if devinfo.DevInst == devinst:
storage_number = get_storage_number(devpath)
drive_letter = storage_number_map.get(storage_number)
if drive_letter:
return drive_letter
# }}}
def get_usb_devices(): # {{{
buf = None
ans = []
with get_device_set(guid=None, enumerator='USB', flags=DIGCF_PRESENT | DIGCF_ALLCLASSES) as dev_list:
devinfo = SP_DEVINFO_DATA()
devinfo.cbSize = sizeof(SP_DEVINFO_DATA)
i = -1
while True:
i += 1
if not SetupDiEnumDeviceInfo(dev_list, i, byref(devinfo)):
break
buf, devid = get_device_registry_property(dev_list, byref(devinfo), buf=buf)
if devid:
ans.append(devid[0].lower())
ans, buf = [], None
for dev_list, devinfo in DeviceSet(guid=None, enumerator='USB', flags=DIGCF_PRESENT | DIGCF_ALLCLASSES).devices():
buf, devid = get_device_registry_property(dev_list, byref(devinfo), buf=buf)
if devid:
ans.append(devid[0].lower())
return ans
# }}}
def is_usb_device_connected(vendor_id, product_id): # {{{
buf = None
with get_device_set(guid=None, enumerator='USB', flags=DIGCF_PRESENT | DIGCF_ALLCLASSES) as dev_list:
devinfo = SP_DEVINFO_DATA()
devinfo.cbSize = sizeof(SP_DEVINFO_DATA)
i = -1
while True:
i += 1
if not SetupDiEnumDeviceInfo(dev_list, i, byref(devinfo)):
break
buf, devid = get_device_registry_property(dev_list, byref(devinfo), buf=buf)
if devid:
m = devid_pat().search(devid[0])
if m is not None:
try:
vid, pid = map(lambda x: int(x, 16), m.group(1, 2))
except Exception:
continue
if vid == vendor_id and pid == product_id:
return True
for dev_list, devinfo in DeviceSet(guid=None, enumerator='USB', flags=DIGCF_PRESENT | DIGCF_ALLCLASSES).devices():
buf, devid = get_device_registry_property(dev_list, byref(devinfo), buf=buf)
if devid:
m = devid_pat().search(devid[0])
if m is not None:
try:
vid, pid = map(lambda x: int(x, 16), m.group(1, 2))
except Exception:
continue
if vid == vendor_id and pid == product_id:
return True
return False
# }}}
@ -663,7 +650,6 @@ def eject_drive(drive_letter): # {{{
def devinst_from_device_number(drive_letter, device_number):
drive_root = drive_letter + ':' + os.sep
buf = create_unicode_buffer(512)
bbuf = None
sdn = STORAGE_DEVICE_NUMBER()
drive_type = GetDriveType(drive_root)
QueryDosDevice(drive_letter + ':', buf, len(buf))
@ -676,25 +662,14 @@ def devinst_from_device_number(drive_letter, device_number):
guid = GUID_DEVINTERFACE_CDROM
else:
raise ValueError('Unknown drive_type: %d' % drive_type)
with get_device_set(guid=guid) as dev_list:
interface_data = SP_DEVICE_INTERFACE_DATA()
interface_data.cbSize = sizeof(SP_DEVICE_INTERFACE_DATA)
i = -1
while True:
i += 1
if not SetupDiEnumDeviceInterfaces(dev_list, None, byref(guid), i, byref(interface_data)):
break
try:
bbuf, devinfo, devpath = get_device_interface_detail_data(dev_list, byref(interface_data), bbuf)
except WindowsError:
continue
handle = CreateFile(devpath, 0, FILE_SHARE_READ | FILE_SHARE_WRITE, None, OPEN_EXISTING, 0, None)
try:
DeviceIoControl(handle, IOCTL_STORAGE_GET_DEVICE_NUMBER, None, 0, byref(sdn), sizeof(STORAGE_DEVICE_NUMBER), None, None)
finally:
CloseHandle(handle)
if sdn.DeviceNumber == device_number:
return devinfo.DevInst
for devinfo, devpath in DeviceSet(guid=guid).interfaces(ignore_errors=True):
handle = CreateFile(devpath, 0, FILE_SHARE_READ | FILE_SHARE_WRITE, None, OPEN_EXISTING, 0, None)
try:
DeviceIoControl(handle, IOCTL_STORAGE_GET_DEVICE_NUMBER, None, 0, byref(sdn), sizeof(STORAGE_DEVICE_NUMBER), None, None)
finally:
CloseHandle(handle)
if sdn.DeviceNumber == device_number:
return devinfo.DevInst
# }}}