From d8e8e90c33253bf5b82b91012791d3049512d767 Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Mon, 18 Jan 2016 11:47:03 +0530 Subject: [PATCH] Make develop work with any device known to calibre --- src/calibre/devices/interface.py | 52 ++++++++++++------------- src/calibre/devices/winusb.py | 67 +++++++++++++------------------- 2 files changed, 52 insertions(+), 67 deletions(-) diff --git a/src/calibre/devices/interface.py b/src/calibre/devices/interface.py index c81d7fade6..94df856c76 100644 --- a/src/calibre/devices/interface.py +++ b/src/calibre/devices/interface.py @@ -190,7 +190,7 @@ class DevicePlugin(Plugin): return True return False - def is_usb_connected(self, devices_on_system, debug=False, + def is_usb_connected_generic(self, devices_on_system, debug=False, only_presence=False): ''' Return True, device_info if a device handled by this plugin is currently connected. @@ -198,12 +198,9 @@ class DevicePlugin(Plugin): :param devices_on_system: List of devices currently connected ''' - if iswindows: - return self.is_usb_connected_windows(devices_on_system, - debug=debug, only_presence=only_presence) - vendors_on_system = set([x[0] for x in devices_on_system]) - vendors = self.VENDOR_ID if hasattr(self.VENDOR_ID, '__len__') else [self.VENDOR_ID] + vendors_on_system = {x[0] for x in devices_on_system} + vendors = set(self.VENDOR_ID) if hasattr(self.VENDOR_ID, '__len__') else {self.VENDOR_ID} if hasattr(self.VENDOR_ID, 'keys'): products = [] for ven in self.VENDOR_ID: @@ -211,29 +208,30 @@ class DevicePlugin(Plugin): else: products = self.PRODUCT_ID if hasattr(self.PRODUCT_ID, '__len__') else [self.PRODUCT_ID] - for vid in vendors: - if vid in vendors_on_system: - for dev in devices_on_system: - cvid, pid, bcd = dev[:3] - if cvid == vid: - if pid in products: - if hasattr(self.VENDOR_ID, 'keys'): - try: - cbcd = self.VENDOR_ID[vid][pid] - except KeyError: - # Vendor vid does not have product pid, pid - # exists for some other vendor in this - # device - continue - else: - cbcd = self.BCD - if self.test_bcd(bcd, cbcd): - if debug: - self.print_usb_device_info(dev) - if self.can_handle(dev, debug=debug): - return True, dev + for vid in vendors_on_system.intersection(vendors): + for dev in devices_on_system: + cvid, pid, bcd = dev[:3] + if cvid == vid: + if pid in products: + if hasattr(self.VENDOR_ID, 'keys'): + try: + cbcd = self.VENDOR_ID[vid][pid] + except KeyError: + # Vendor vid does not have product pid, pid + # exists for some other vendor in this + # device + continue + else: + cbcd = self.BCD + if self.test_bcd(bcd, cbcd): + if debug: + self.print_usb_device_info(dev) + if self.can_handle(dev, debug=debug): + return True, dev return False, None + is_usb_connected = is_usb_connected_windows if iswindows else is_usb_connected_generic + def detect_managed_devices(self, devices_on_system, force_refresh=False): ''' Called only if MANAGES_DEVICE_PRESENCE is True. diff --git a/src/calibre/devices/winusb.py b/src/calibre/devices/winusb.py index b343d61ec7..cfd33ee274 100644 --- a/src/calibre/devices/winusb.py +++ b/src/calibre/devices/winusb.py @@ -567,12 +567,11 @@ def get_removable_drives(debug=False): # {{{ return ans # }}} -def get_drive_letters_for_device(vendor_id, product_id, bcd=None, storage_number_map=None, devinst=None, debug=False): # {{{ +def get_drive_letters_for_device(usbdev, storage_number_map=None, debug=False): # {{{ ''' - Get the drive letters for a connected device with the specified USB ids. bcd - can be either None, in which case it is not tested, or it must be a list or - set like object containing bcds. The drive letters are sorted by storage number, - which (I think) corresponds to the order they are exported by the firmware. + Get the drive letters for a connected device (usbdev must be a USBDevice + instance). The drive letters are sorted by storage number, which (I think) + corresponds to the order they are exported by the firmware. ''' ans = {'pnp_id_map': {}, 'drive_letters':[]} sort_map = {} @@ -584,19 +583,7 @@ def get_drive_letters_for_device(vendor_id, product_id, bcd=None, storage_number if not sn_map: return ans - if devinst is None: - # First search for a device matching the specified USB ids - for usbdev in iterusbdevices(): - if vendor_id == usbdev.vendor_id and product_id == usbdev.product_id and ( - bcd is None or (bcd and usbdev.bcd in bcd)): - devinst = usbdev.devinst - break - else: - if debug: - prints('Could not find device matching vid=0x%x pid=0x%x' % (vendor_id, product_id)) - return ans - - descendants = frozenset(iterdescendants(devinst)) + descendants = frozenset(iterdescendants(usbdev.devinst)) for devinfo, devpath in DeviceSet(GUID_DEVINTERFACE_DISK).interfaces(): if devinfo.DevInst in descendants: if debug: @@ -650,18 +637,9 @@ def get_usb_devices(): # {{{ # }}} def is_usb_device_connected(vendor_id, product_id): # {{{ - buf = None - for dev_list, devinfo in DeviceSet(guid=None, enumerator='USB', flags=DIGCF_PRESENT | DIGCF_ALLCLASSES).devices(): - buf, devid = get_device_registry_property(dev_list, byref(devinfo), buf=buf) - if devid: - m = devid_pat().search(devid[0]) - if m is not None: - try: - vid, pid = map(lambda x: int(x, 16), m.group(1, 2)) - except Exception: - continue - if vid == vendor_id and pid == product_id: - return True + for usbdev in iterusbdevices(): + if usbdev.vendor_id == vendor_id and usbdev.product_id == product_id: + return True return False # }}} @@ -699,25 +677,34 @@ def eject_drive(drive_letter): # {{{ # }}} -def develop(vendor_id=0x1949, product_id=0x4, bcd=None, do_eject=False): # {{{ - pprint(get_usb_devices()) +def develop(do_eject=False): # {{{ + from calibre.customize.ui import device_plugins + usb_devices = scan_usb_devices() + pprint(usb_devices) print() - print('Is device connected:', is_usb_device_connected(vendor_id, product_id)) print('\nAll removable drives:') pprint(get_all_removable_drives(allow_fixed=False)) print('\nRemovable drives:') rd = get_removable_drives(debug=True) pprint(rd) - print('\nDrive letters for vid=0x%x, pid=0x%x:' % (vendor_id, product_id)) - pprint(get_drive_letters_for_device(vendor_id, product_id, bcd=bcd, debug=True)) + print() + devplugins = list(sorted(device_plugins(), cmp=lambda + x,y:cmp(x.__class__.__name__, y.__class__.__name__))) + for dev in devplugins: + dev.startup() + for dev in devplugins: + if dev.MANAGES_DEVICE_PRESENCE: + continue + connected, usbdev = dev.is_usb_connected_generic(usb_devices, debug=True) + if connected: + print('Potentially connected device: %s at %s' % (dev.gui_name, usbdev)) + print('Drives for this device:') + pprint(get_drive_letters_for_device(usbdev, debug=True)) + print('Is device connected:', is_usb_device_connected(*usbdev[:2])) if do_eject: for drive in rd: eject_drive(drive) if __name__ == '__main__': - if len(sys.argv) > 1: - vendor_id, product_id, bcd = map(lambda x:int(x, 16), sys.argv[-3:]) - else: - vendor_id, product_id, bcd = 0x1949, 0x4, 0x100 - develop(vendor_id, product_id, (bcd,)) + develop() # }}}