Code to read USB string properties

serial number, manufacturer and product name
This commit is contained in:
Kovid Goyal 2016-01-18 21:48:19 +05:30
parent aa5fc9b434
commit 904ade3e7c

View File

@ -11,9 +11,11 @@ from operator import itemgetter
from ctypes import (
Structure, POINTER, c_ubyte, windll, byref, c_void_p, WINFUNCTYPE, c_uint,
WinError, get_last_error, sizeof, c_wchar, create_string_buffer, cast,
wstring_at, addressof, create_unicode_buffer, string_at, c_uint64 as QWORD)
memset, wstring_at, addressof, create_unicode_buffer, string_at, c_uint64 as QWORD
)
from ctypes.wintypes import DWORD, WORD, ULONG, LPCWSTR, HWND, BOOL, LPWSTR, UINT, BYTE, HANDLE, USHORT
from pprint import pprint, pformat
from future_builtins import map
from calibre import prints, as_unicode
@ -61,6 +63,23 @@ REG_QWORD = 11
IOCTL_STORAGE_MEDIA_REMOVAL = 0x2D4804
IOCTL_STORAGE_EJECT_MEDIA = 0x2D4808
IOCTL_STORAGE_GET_DEVICE_NUMBER = 0x2D1080
def CTL_CODE(DeviceType, Function, Method, Access):
return (DeviceType << 16) | (Access << 14) | (Function << 2) | Method
def USB_CTL(id):
# CTL_CODE(FILE_DEVICE_USB, (id), METHOD_BUFFERED, FILE_ANY_ACCESS)
return CTL_CODE(0x22, id, 0, 0)
IOCTL_USB_GET_ROOT_HUB_NAME = USB_CTL(258)
IOCTL_USB_GET_NODE_INFORMATION = USB_CTL(258)
IOCTL_USB_GET_NODE_CONNECTION_INFORMATION = USB_CTL(259)
IOCTL_USB_GET_NODE_CONNECTION_INFORMATION_EX = USB_CTL(274)
IOCTL_USB_GET_NODE_CONNECTION_DRIVERKEY_NAME = USB_CTL(264)
IOCTL_USB_GET_NODE_CONNECTION_NAME = USB_CTL(261)
IOCTL_USB_GET_DESCRIPTOR_FROM_NODE_CONNECTION = USB_CTL(260)
USB_CONFIGURATION_DESCRIPTOR_TYPE = 2
USB_STRING_DESCRIPTOR_TYPE = 3
USB_INTERFACE_DESCRIPTOR_TYPE = 4
USB_REQUEST_GET_DESCRIPTOR = 0x06
MAXIMUM_USB_STRING_LENGTH = 255
StorageDeviceNumber = namedtuple('StorageDeviceNumber', 'type number partition_number')
@ -124,6 +143,12 @@ class USB_DEVICE_DESCRIPTOR(Structure):
('bNumConfigurations', UCHAR),
)
def __repr__(self):
return 'USBDevice(class=0x%x sub_class=0x%x protocol=0x%x vendor_id=0x%x product_id=0x%x bcd=0x%x manufacturer=%d product=%d serial_number=%d)' % (
self.bDeviceClass, self.bDeviceSubClass, self.bDeviceProtocol,
self.idVendor, self.idProduct, self.bcdDevice, self.iManufacturer,
self.iProduct, self.iSerialNumber)
class USB_ENDPOINT_DESCRIPTOR(Structure):
_fields_ = (
('bLength', UCHAR),
@ -140,21 +165,45 @@ class USB_PIPE_INFO(Structure):
('ScheduleOffset', ULONG),
)
class USB_NODE_CONNECTION_INFORMATION(Structure):
class USB_NODE_CONNECTION_INFORMATION_EX(Structure):
_fields_ = (
('ConnectionIndex', ULONG),
('DeviceDescriptor', USB_DEVICE_DESCRIPTOR),
('CurrentConfigurationValue', UCHAR),
('LowSpeed', BOOL),
('Speed', UCHAR),
('DeviceIsHub', BOOL),
('DeviceAddress', USHORT),
('NumberOfOpenPipes', ULONG),
('ConnectionStatus', c_uint),
('PipeList', USB_PIPE_INFO*0),
('PipeList', USB_PIPE_INFO*ANYSIZE_ARRAY),
)
class USB_STRING_DESCRIPTOR(Structure):
_fields_ = (
('bLength', UCHAR),
('bType', UCHAR),
('String', UCHAR * ANYSIZE_ARRAY),
)
PUSB_DEVICE_DESCRIPTOR = POINTER(USB_DEVICE_DESCRIPTOR)
class USB_DESCRIPTOR_REQUEST(Structure):
class SetupPacket(Structure):
_fields_ = (
('bmRequest', UCHAR),
('bRequest', UCHAR),
('wValue', UCHAR*2),
('wIndex', USHORT),
('wLength', USHORT),
)
_fields_ = (
('ConnectionIndex', ULONG),
('SetupPacket', SetupPacket),
('Data', USB_STRING_DESCRIPTOR),
)
PUSB_DESCRIPTOR_REQUEST = POINTER(USB_DESCRIPTOR_REQUEST)
PSP_DEVICE_INTERFACE_DETAIL_DATA = POINTER(SP_DEVICE_INTERFACE_DETAIL_DATA)
PSP_DEVICE_INTERFACE_DATA = POINTER(SP_DEVICE_INTERFACE_DATA)
INVALID_HANDLE_VALUE = c_void_p(-1).value
@ -163,16 +212,20 @@ GENERIC_WRITE = 0x40000000L
FILE_SHARE_READ = 0x1
FILE_SHARE_WRITE = 0x2
OPEN_EXISTING = 0x3
GUID_DEVINTERFACE_VOLUME = GUID(0x53F5630D, 0xB6BF, 0x11D0, 0x94, 0xF2, 0x00, 0xA0, 0xC9, 0x1E, 0xFB, 0x8B)
GUID_DEVINTERFACE_DISK = GUID(0x53F56307, 0xB6BF, 0x11D0, 0x94, 0xF2, 0x00, 0xA0, 0xC9, 0x1E, 0xFB, 0x8B)
GUID_DEVINTERFACE_CDROM = GUID(0x53f56308, 0xb6bf, 0x11d0, 0x94, 0xf2, 0x00, 0xa0, 0xc9, 0x1e, 0xfb, 0x8b)
GUID_DEVINTERFACE_FLOPPY = GUID(0x53f56311, 0xb6bf, 0x11d0, 0x94, 0xf2, 0x00, 0xa0, 0xc9, 0x1e, 0xfb, 0x8b)
GUID_DEVINTERFACE_VOLUME = GUID(0x53F5630D, 0xB6BF, 0x11D0, 0x94, 0xF2, 0x00, 0xA0, 0xC9, 0x1E, 0xFB, 0x8B)
GUID_DEVINTERFACE_DISK = GUID(0x53F56307, 0xB6BF, 0x11D0, 0x94, 0xF2, 0x00, 0xA0, 0xC9, 0x1E, 0xFB, 0x8B)
GUID_DEVINTERFACE_CDROM = GUID(0x53f56308, 0xb6bf, 0x11d0, 0x94, 0xf2, 0x00, 0xa0, 0xc9, 0x1e, 0xfb, 0x8b)
GUID_DEVINTERFACE_FLOPPY = GUID(0x53f56311, 0xb6bf, 0x11d0, 0x94, 0xf2, 0x00, 0xa0, 0xc9, 0x1e, 0xfb, 0x8b)
GUID_DEVINTERFACE_USB_DEVICE = GUID(0xA5DCBF10, 0x6530, 0x11D2, 0x90, 0x1F, 0x00, 0xC0, 0x4F, 0xB9, 0x51, 0xED)
GUID_DEVINTERFACE_USB_HUB = GUID(0xf18a0e88, 0xc30c, 0x11d0, 0x88, 0x15, 0x00, 0xa0, 0xc9, 0x06, 0xbe, 0xd8)
DRIVE_UNKNOWN, DRIVE_NO_ROOT_DIR, DRIVE_REMOVABLE, DRIVE_FIXED, DRIVE_REMOTE, DRIVE_CDROM, DRIVE_RAMDISK = 0, 1, 2, 3, 4, 5, 6
DIGCF_PRESENT = 0x00000002
DIGCF_ALLCLASSES = 0x00000004
DIGCF_DEVICEINTERFACE = 0x00000010
ERROR_INSUFFICIENT_BUFFER = 0x7a
ERROR_MORE_DATA = 234
ERROR_INVALID_DATA = 0xd
ERROR_GEN_FAILURE = 31
HDEVINFO = HANDLE
SPDRP_DEVICEDESC = DWORD(0x00000000)
SPDRP_HARDWAREID = DWORD(0x00000001)
@ -369,7 +422,7 @@ class DeviceSet(object):
SetupDiDestroyDeviceInfoList(self.dev_list)
del self.dev_list
def interfaces(self, ignore_errors=False):
def interfaces(self, ignore_errors=False, yield_devlist=False):
interface_data = SP_DEVICE_INTERFACE_DATA()
interface_data.cbSize = sizeof(SP_DEVICE_INTERFACE_DATA)
buf = None
@ -384,7 +437,10 @@ class DeviceSet(object):
if ignore_errors:
continue
raise
yield devinfo, devpath
if yield_devlist:
yield self.dev_list, devinfo, devpath
else:
yield devinfo, devpath
def devices(self):
devinfo = SP_DEVINFO_DATA()
@ -434,6 +490,19 @@ def iterancestors(devinst):
raise
yield parent.value
def device_io_control(handle, which, inbuf, outbuf, initbuf):
bytes_returned = DWORD(0)
while True:
initbuf(inbuf)
try:
DeviceIoControl(handle, which, inbuf, len(inbuf), outbuf, len(outbuf), byref(bytes_returned), None)
except WindowsError as err:
if err.winerror not in (ERROR_INSUFFICIENT_BUFFER, ERROR_MORE_DATA):
raise
outbuf = create_string_buffer(2*len(outbuf))
else:
return outbuf, bytes_returned
def get_storage_number(devpath):
sdn = STORAGE_DEVICE_NUMBER()
handle = CreateFile(devpath, 0, FILE_SHARE_READ | FILE_SHARE_WRITE, None, OPEN_EXISTING, 0, None)
@ -739,6 +808,101 @@ def eject_drive(drive_letter): # {{{
# }}}
def get_usb_info(usbdev, debug=False): # {{{
ans = {}
try:
parent = next(iterancestors(usbdev.devinst))
except StopIteration:
return ans
for devinfo, parent_path in DeviceSet(guid=GUID_DEVINTERFACE_USB_HUB).interfaces():
if devinfo.DevInst == parent:
break
else:
return ans
for devlist, devinfo, devpath in DeviceSet(guid=GUID_DEVINTERFACE_USB_DEVICE).interfaces(yield_devlist=True):
if devinfo.DevInst == usbdev.devinst:
device_port = get_device_registry_property(devlist, byref(devinfo), SPDRP_ADDRESS)[1]
break
else:
return ans
handle = CreateFile(parent_path, GENERIC_READ | GENERIC_WRITE, FILE_SHARE_READ | FILE_SHARE_WRITE, None, OPEN_EXISTING, 0, None)
try:
buf, dd = get_device_descriptor(handle, device_port)
if dd.idVendor == usbdev.vendor_id and dd.idProduct == usbdev.product_id and dd.bcdDevice == usbdev.bcd:
# Dont need to read language since we only care about english names
# buf, langs = get_device_languages(handle, device_port)
# print(111, langs)
for index, name in ((dd.iManufacturer, 'manufacturer'), (dd.iProduct, 'product'), (dd.iSerialNumber, 'serial_number')):
if index:
try:
buf, ans[name] = get_device_string(handle, device_port, index, buf=buf)
except WindowsError as err:
if debug:
if err.winerror == ERROR_GEN_FAILURE:
prints('Failed to read %s from device, try rebooting the device' % name)
else:
prints('Failed to read %s from device, with error: %s' % (name, as_unicode(err)))
finally:
CloseHandle(handle)
return ans
def alloc_descriptor_buf(buf):
if buf is None:
buf = create_string_buffer(sizeof(USB_DESCRIPTOR_REQUEST) + 700)
else:
memset(buf, 0, len(buf))
return buf
def get_device_descriptor(hub_handle, device_port, buf=None):
buf = alloc_descriptor_buf(buf)
def initbuf(b):
cast(b, POINTER(USB_NODE_CONNECTION_INFORMATION_EX)).contents.ConnectionIndex = device_port
buf, bytes_returned = device_io_control(hub_handle, IOCTL_USB_GET_NODE_CONNECTION_INFORMATION_EX, buf, buf, initbuf)
return buf, USB_DEVICE_DESCRIPTOR.from_buffer_copy(cast(buf, POINTER(USB_NODE_CONNECTION_INFORMATION_EX)).contents.DeviceDescriptor)
def get_device_string(hub_handle, device_port, index, buf=None, lang=0x409):
buf = alloc_descriptor_buf(buf)
def initbuf(b):
p = cast(b, PUSB_DESCRIPTOR_REQUEST).contents
p.ConnectionIndex = device_port
sp = p.SetupPacket
sp.bmRequest, sp.bRequest = 0x80, USB_REQUEST_GET_DESCRIPTOR
sp.wValue[0], sp.wValue[1] = index, USB_STRING_DESCRIPTOR_TYPE
sp.wIndex = lang
sp.wLength = MAXIMUM_USB_STRING_LENGTH + 2
buf, bytes_returned = device_io_control(hub_handle, IOCTL_USB_GET_DESCRIPTOR_FROM_NODE_CONNECTION, buf, buf, initbuf)
data = cast(buf, PUSB_DESCRIPTOR_REQUEST).contents.Data
sz, dtype = data.bLength, data.bType
if dtype != 0x03:
raise WindowsError('Invalid datatype for string descriptor: 0x%x' % dtype)
return buf, wstring_at(addressof(data.String), sz // 2).rstrip('\0')
def get_device_languages(hub_handle, device_port, buf=None):
' Get the languages supported by the device for strings '
buf = alloc_descriptor_buf(buf)
def initbuf(b):
p = cast(b, PUSB_DESCRIPTOR_REQUEST).contents
p.ConnectionIndex = device_port
sp = p.SetupPacket
sp.bmRequest, sp.bRequest = 0x80, USB_REQUEST_GET_DESCRIPTOR
sp.wValue[1] = USB_STRING_DESCRIPTOR_TYPE
sp.wLength = MAXIMUM_USB_STRING_LENGTH + 2
buf, bytes_returned = device_io_control(hub_handle, IOCTL_USB_GET_DESCRIPTOR_FROM_NODE_CONNECTION, buf, buf, initbuf)
data = cast(buf, PUSB_DESCRIPTOR_REQUEST).contents.Data
sz, dtype = data.bLength, data.bType
if dtype != 0x03:
raise WindowsError('Invalid datatype for string descriptor: 0x%x' % dtype)
data = cast(data.String, POINTER(USHORT*(sz//2)))
return buf, filter(None, data.contents)
# }}}
def develop(do_eject=False): # {{{
from calibre.customize.ui import device_plugins
usb_devices = scan_usb_devices()
@ -759,10 +923,15 @@ def develop(do_eject=False): # {{{
continue
connected, usbdev = dev.is_usb_connected_generic(usb_devices, debug=True)
if connected:
print('\n')
print('Potentially connected device: %s at %s' % (dev.get_gui_name(), usbdev))
print()
print('Drives for this device:')
pprint(get_drive_letters_for_device(usbdev, debug=True))
print()
print('Is device connected:', is_usb_device_connected(*usbdev[:2]))
print()
print('Device USB data:', get_usb_info(usbdev, debug=True))
if do_eject:
for drive in rd:
eject_drive(drive)