mirror of
https://github.com/kovidgoyal/calibre.git
synced 2025-07-09 03:04:10 -04:00
Make develop work with any device known to calibre
This commit is contained in:
parent
55ddf4e7bf
commit
d8e8e90c33
@ -190,7 +190,7 @@ class DevicePlugin(Plugin):
|
||||
return True
|
||||
return False
|
||||
|
||||
def is_usb_connected(self, devices_on_system, debug=False,
|
||||
def is_usb_connected_generic(self, devices_on_system, debug=False,
|
||||
only_presence=False):
|
||||
'''
|
||||
Return True, device_info if a device handled by this plugin is currently connected.
|
||||
@ -198,12 +198,9 @@ class DevicePlugin(Plugin):
|
||||
:param devices_on_system: List of devices currently connected
|
||||
|
||||
'''
|
||||
if iswindows:
|
||||
return self.is_usb_connected_windows(devices_on_system,
|
||||
debug=debug, only_presence=only_presence)
|
||||
|
||||
vendors_on_system = set([x[0] for x in devices_on_system])
|
||||
vendors = self.VENDOR_ID if hasattr(self.VENDOR_ID, '__len__') else [self.VENDOR_ID]
|
||||
vendors_on_system = {x[0] for x in devices_on_system}
|
||||
vendors = set(self.VENDOR_ID) if hasattr(self.VENDOR_ID, '__len__') else {self.VENDOR_ID}
|
||||
if hasattr(self.VENDOR_ID, 'keys'):
|
||||
products = []
|
||||
for ven in self.VENDOR_ID:
|
||||
@ -211,29 +208,30 @@ class DevicePlugin(Plugin):
|
||||
else:
|
||||
products = self.PRODUCT_ID if hasattr(self.PRODUCT_ID, '__len__') else [self.PRODUCT_ID]
|
||||
|
||||
for vid in vendors:
|
||||
if vid in vendors_on_system:
|
||||
for dev in devices_on_system:
|
||||
cvid, pid, bcd = dev[:3]
|
||||
if cvid == vid:
|
||||
if pid in products:
|
||||
if hasattr(self.VENDOR_ID, 'keys'):
|
||||
try:
|
||||
cbcd = self.VENDOR_ID[vid][pid]
|
||||
except KeyError:
|
||||
# Vendor vid does not have product pid, pid
|
||||
# exists for some other vendor in this
|
||||
# device
|
||||
continue
|
||||
else:
|
||||
cbcd = self.BCD
|
||||
if self.test_bcd(bcd, cbcd):
|
||||
if debug:
|
||||
self.print_usb_device_info(dev)
|
||||
if self.can_handle(dev, debug=debug):
|
||||
return True, dev
|
||||
for vid in vendors_on_system.intersection(vendors):
|
||||
for dev in devices_on_system:
|
||||
cvid, pid, bcd = dev[:3]
|
||||
if cvid == vid:
|
||||
if pid in products:
|
||||
if hasattr(self.VENDOR_ID, 'keys'):
|
||||
try:
|
||||
cbcd = self.VENDOR_ID[vid][pid]
|
||||
except KeyError:
|
||||
# Vendor vid does not have product pid, pid
|
||||
# exists for some other vendor in this
|
||||
# device
|
||||
continue
|
||||
else:
|
||||
cbcd = self.BCD
|
||||
if self.test_bcd(bcd, cbcd):
|
||||
if debug:
|
||||
self.print_usb_device_info(dev)
|
||||
if self.can_handle(dev, debug=debug):
|
||||
return True, dev
|
||||
return False, None
|
||||
|
||||
is_usb_connected = is_usb_connected_windows if iswindows else is_usb_connected_generic
|
||||
|
||||
def detect_managed_devices(self, devices_on_system, force_refresh=False):
|
||||
'''
|
||||
Called only if MANAGES_DEVICE_PRESENCE is True.
|
||||
|
@ -567,12 +567,11 @@ def get_removable_drives(debug=False): # {{{
|
||||
return ans
|
||||
# }}}
|
||||
|
||||
def get_drive_letters_for_device(vendor_id, product_id, bcd=None, storage_number_map=None, devinst=None, debug=False): # {{{
|
||||
def get_drive_letters_for_device(usbdev, storage_number_map=None, debug=False): # {{{
|
||||
'''
|
||||
Get the drive letters for a connected device with the specified USB ids. bcd
|
||||
can be either None, in which case it is not tested, or it must be a list or
|
||||
set like object containing bcds. The drive letters are sorted by storage number,
|
||||
which (I think) corresponds to the order they are exported by the firmware.
|
||||
Get the drive letters for a connected device (usbdev must be a USBDevice
|
||||
instance). The drive letters are sorted by storage number, which (I think)
|
||||
corresponds to the order they are exported by the firmware.
|
||||
'''
|
||||
ans = {'pnp_id_map': {}, 'drive_letters':[]}
|
||||
sort_map = {}
|
||||
@ -584,19 +583,7 @@ def get_drive_letters_for_device(vendor_id, product_id, bcd=None, storage_number
|
||||
if not sn_map:
|
||||
return ans
|
||||
|
||||
if devinst is None:
|
||||
# First search for a device matching the specified USB ids
|
||||
for usbdev in iterusbdevices():
|
||||
if vendor_id == usbdev.vendor_id and product_id == usbdev.product_id and (
|
||||
bcd is None or (bcd and usbdev.bcd in bcd)):
|
||||
devinst = usbdev.devinst
|
||||
break
|
||||
else:
|
||||
if debug:
|
||||
prints('Could not find device matching vid=0x%x pid=0x%x' % (vendor_id, product_id))
|
||||
return ans
|
||||
|
||||
descendants = frozenset(iterdescendants(devinst))
|
||||
descendants = frozenset(iterdescendants(usbdev.devinst))
|
||||
for devinfo, devpath in DeviceSet(GUID_DEVINTERFACE_DISK).interfaces():
|
||||
if devinfo.DevInst in descendants:
|
||||
if debug:
|
||||
@ -650,18 +637,9 @@ def get_usb_devices(): # {{{
|
||||
# }}}
|
||||
|
||||
def is_usb_device_connected(vendor_id, product_id): # {{{
|
||||
buf = None
|
||||
for dev_list, devinfo in DeviceSet(guid=None, enumerator='USB', flags=DIGCF_PRESENT | DIGCF_ALLCLASSES).devices():
|
||||
buf, devid = get_device_registry_property(dev_list, byref(devinfo), buf=buf)
|
||||
if devid:
|
||||
m = devid_pat().search(devid[0])
|
||||
if m is not None:
|
||||
try:
|
||||
vid, pid = map(lambda x: int(x, 16), m.group(1, 2))
|
||||
except Exception:
|
||||
continue
|
||||
if vid == vendor_id and pid == product_id:
|
||||
return True
|
||||
for usbdev in iterusbdevices():
|
||||
if usbdev.vendor_id == vendor_id and usbdev.product_id == product_id:
|
||||
return True
|
||||
return False
|
||||
# }}}
|
||||
|
||||
@ -699,25 +677,34 @@ def eject_drive(drive_letter): # {{{
|
||||
|
||||
# }}}
|
||||
|
||||
def develop(vendor_id=0x1949, product_id=0x4, bcd=None, do_eject=False): # {{{
|
||||
pprint(get_usb_devices())
|
||||
def develop(do_eject=False): # {{{
|
||||
from calibre.customize.ui import device_plugins
|
||||
usb_devices = scan_usb_devices()
|
||||
pprint(usb_devices)
|
||||
print()
|
||||
print('Is device connected:', is_usb_device_connected(vendor_id, product_id))
|
||||
print('\nAll removable drives:')
|
||||
pprint(get_all_removable_drives(allow_fixed=False))
|
||||
print('\nRemovable drives:')
|
||||
rd = get_removable_drives(debug=True)
|
||||
pprint(rd)
|
||||
print('\nDrive letters for vid=0x%x, pid=0x%x:' % (vendor_id, product_id))
|
||||
pprint(get_drive_letters_for_device(vendor_id, product_id, bcd=bcd, debug=True))
|
||||
print()
|
||||
devplugins = list(sorted(device_plugins(), cmp=lambda
|
||||
x,y:cmp(x.__class__.__name__, y.__class__.__name__)))
|
||||
for dev in devplugins:
|
||||
dev.startup()
|
||||
for dev in devplugins:
|
||||
if dev.MANAGES_DEVICE_PRESENCE:
|
||||
continue
|
||||
connected, usbdev = dev.is_usb_connected_generic(usb_devices, debug=True)
|
||||
if connected:
|
||||
print('Potentially connected device: %s at %s' % (dev.gui_name, usbdev))
|
||||
print('Drives for this device:')
|
||||
pprint(get_drive_letters_for_device(usbdev, debug=True))
|
||||
print('Is device connected:', is_usb_device_connected(*usbdev[:2]))
|
||||
if do_eject:
|
||||
for drive in rd:
|
||||
eject_drive(drive)
|
||||
|
||||
if __name__ == '__main__':
|
||||
if len(sys.argv) > 1:
|
||||
vendor_id, product_id, bcd = map(lambda x:int(x, 16), sys.argv[-3:])
|
||||
else:
|
||||
vendor_id, product_id, bcd = 0x1949, 0x4, 0x100
|
||||
develop(vendor_id, product_id, (bcd,))
|
||||
develop()
|
||||
# }}}
|
||||
|
Loading…
x
Reference in New Issue
Block a user