Start work on migrating to new device detection algorithm on windows

This commit is contained in:
Kovid Goyal 2016-01-18 13:44:46 +05:30
parent deb72df077
commit 4cc81568e6
5 changed files with 67 additions and 282 deletions

View File

@ -5,7 +5,7 @@ __copyright__ = '2008, Kovid Goyal <kovid at kovidgoyal.net>'
Device drivers.
'''
import sys, time, pprint, operator
import sys, time, pprint
from functools import partial
from StringIO import StringIO
@ -66,7 +66,7 @@ def debug(ioreg_to_tmp=False, buf=None, plugins=None,
import textwrap
from calibre.customize.ui import device_plugins, disabled_device_plugins
from calibre.debug import print_basic_debug_info
from calibre.devices.scanner import DeviceScanner, win_pnp_drives
from calibre.devices.scanner import DeviceScanner
from calibre.constants import iswindows, isosx
from calibre import prints
oldo, olde = sys.stdout, sys.stderr
@ -101,12 +101,6 @@ def debug(ioreg_to_tmp=False, buf=None, plugins=None,
d[i] = hex(d[i])
out('USB devices on system:')
out(pprint.pformat(devices))
if iswindows:
drives = win_pnp_drives(debug=True)
out('Drives detected:')
for drive in sorted(drives.keys(),
key=operator.attrgetter('order')):
prints(u'\t(%d)'%drive.order, drive, '~', drives[drive])
ioreg = None
if isosx:
@ -125,7 +119,8 @@ def debug(ioreg_to_tmp=False, buf=None, plugins=None,
out('\nNo disabled plugins')
found_dev = False
for dev in devplugins:
if not dev.MANAGES_DEVICE_PRESENCE: continue
if not dev.MANAGES_DEVICE_PRESENCE:
continue
out('Looking for devices of type:', dev.__class__.__name__)
if dev.debug_managed_device_detection(s.devices, buf):
found_dev = True
@ -135,7 +130,8 @@ def debug(ioreg_to_tmp=False, buf=None, plugins=None,
if not found_dev:
out('Looking for devices...')
for dev in devplugins:
if dev.MANAGES_DEVICE_PRESENCE: continue
if dev.MANAGES_DEVICE_PRESENCE:
continue
connected, det = s.is_device_connected(dev, debug=True)
if connected:
out('\t\tDetected possible device', dev.__class__.__name__)
@ -152,6 +148,7 @@ def debug(ioreg_to_tmp=False, buf=None, plugins=None,
out(' ')
for dev, det in connected_devices:
out('Trying to open', dev.name, '...', end=' ')
dev.do_device_debug = True
try:
dev.reset(detected_device=det)
dev.open(det, None)
@ -161,6 +158,7 @@ def debug(ioreg_to_tmp=False, buf=None, plugins=None,
errors[dev] = traceback.format_exc()
out('failed')
continue
dev.do_device_debug = False
success = True
if hasattr(dev, '_main_prefix'):
out('Main memory:', repr(dev._main_prefix))

View File

@ -3,8 +3,9 @@ __copyright__ = '2008, Kovid Goyal <kovid at kovidgoyal.net>'
import os
from collections import namedtuple
from calibre.customize import Plugin
from calibre import prints
from calibre.constants import iswindows
from calibre.customize import Plugin
class DevicePlugin(Plugin):
"""
@ -130,58 +131,6 @@ class DevicePlugin(Plugin):
return cls.name
# Device detection {{{
def test_bcd_windows(self, device_id, bcd):
if bcd is None or len(bcd) == 0:
return True
for c in bcd:
rev = 'rev_%4.4x'%c
# Bug in winutil.get_usb_devices sometimes converts a to :
if rev in device_id or rev.replace('a', ':') in device_id:
return True
return False
def print_usb_device_info(self, info):
try:
print '\t', repr(info)
except:
import traceback
traceback.print_exc()
def is_usb_connected_windows(self, devices_on_system, debug=False,
only_presence=False):
def id_iterator():
if hasattr(self.VENDOR_ID, 'keys'):
for vid in self.VENDOR_ID:
vend = self.VENDOR_ID[vid]
for pid in vend:
bcd = vend[pid]
yield vid, pid, bcd
else:
vendors = self.VENDOR_ID if hasattr(self.VENDOR_ID, '__len__') else [self.VENDOR_ID]
products = self.PRODUCT_ID if hasattr(self.PRODUCT_ID, '__len__') else [self.PRODUCT_ID]
for vid in vendors:
for pid in products:
yield vid, pid, self.BCD
for vendor_id, product_id, bcd in id_iterator():
vid, pid = 'vid_%4.4x'%vendor_id, 'pid_%4.4x'%product_id
vidd, pidd = 'vid_%i'%vendor_id, 'pid_%i'%product_id
for device_id in devices_on_system:
if (vid in device_id or vidd in device_id) and \
(pid in device_id or pidd in device_id) and \
self.test_bcd_windows(device_id, bcd):
if debug:
self.print_usb_device_info(device_id)
if only_presence or self.can_handle_windows(device_id, debug=debug):
try:
bcd = int(device_id.rpartition(
'rev_')[-1].replace(':', 'a'), 16)
except:
bcd = None
return True, (vendor_id, product_id, bcd, None, None, None)
return False, None
def test_bcd(self, bcdDevice, bcd):
if bcd is None or len(bcd) == 0:
return True
@ -190,15 +139,13 @@ class DevicePlugin(Plugin):
return True
return False
def is_usb_connected_generic(self, devices_on_system, debug=False,
only_presence=False):
def is_usb_connected(self, devices_on_system, debug=False, only_presence=False):
'''
Return True, device_info if a device handled by this plugin is currently connected.
:param devices_on_system: List of devices currently connected
'''
vendors_on_system = {x[0] for x in devices_on_system}
vendors = set(self.VENDOR_ID) if hasattr(self.VENDOR_ID, '__len__') else {self.VENDOR_ID}
if hasattr(self.VENDOR_ID, 'keys'):
@ -208,6 +155,7 @@ class DevicePlugin(Plugin):
else:
products = self.PRODUCT_ID if hasattr(self.PRODUCT_ID, '__len__') else [self.PRODUCT_ID]
ch = self.can_handle_windows if iswindows else self.can_handle
for vid in vendors_on_system.intersection(vendors):
for dev in devices_on_system:
cvid, pid, bcd = dev[:3]
@ -225,13 +173,11 @@ class DevicePlugin(Plugin):
cbcd = self.BCD
if self.test_bcd(bcd, cbcd):
if debug:
self.print_usb_device_info(dev)
if self.can_handle(dev, debug=debug):
prints(dev)
if ch(dev, debug=debug):
return True, dev
return False, None
is_usb_connected = is_usb_connected_windows if iswindows else is_usb_connected_generic
def detect_managed_devices(self, devices_on_system, force_refresh=False):
'''
Called only if MANAGES_DEVICE_PRESENCE is True.

View File

@ -5,9 +5,9 @@ Device scanner that fetches list of devices on system ina platform dependent
manner.
'''
import sys, os, re
from threading import Lock
import sys, os, time
from collections import namedtuple
from threading import Lock
from calibre import prints, as_unicode
from calibre.constants import (iswindows, isosx, plugins, islinux, isfreebsd,
@ -15,94 +15,27 @@ from calibre.constants import (iswindows, isosx, plugins, islinux, isfreebsd,
osx_scanner = linux_scanner = freebsd_scanner = netbsd_scanner = None
class Drive(str):
def __new__(self, val, order=0):
typ = str.__new__(self, val)
typ.order = order
return typ
def drivecmp(a, b):
ans = cmp(getattr(a, 'order', 0), getattr(b, 'order', 0))
if ans == 0:
ans = cmp(a, b)
return ans
class WinPNPScanner(object):
def __init__(self):
if iswindows:
self.lock = Lock()
drive_ok_lock = Lock()
@property
def scanner(self):
if iswindows:
from calibre.devices.winusb import get_removable_drives
return get_removable_drives
return None
def drive_is_ok(self, letter, debug=False):
def drive_is_ok(letter, max_tries=10, debug=False):
import win32api, win32file
with self.lock:
with drive_ok_lock:
oldError = win32api.SetErrorMode(1) # SEM_FAILCRITICALERRORS = 1
try:
ans = True
for i in xrange(max_tries):
try:
win32file.GetDiskFreeSpaceEx(letter+':\\')
return True
except Exception as e:
if debug:
if i >= max_tries - 1 and debug:
prints('Unable to get free space for drive:', letter)
prints(as_unicode(e))
ans = False
return ans
time.sleep(0.2)
return False
finally:
win32api.SetErrorMode(oldError)
def drive_order(self, pnp_id):
order = 0
match = re.search(r'REV_.*?&(\d+)#', pnp_id)
if match is None:
# Windows XP
# On the Nook Color this is the last digit
#
# USBSTOR\DISK&VEN_B&N&PROD_EBOOK_DISK&REV_0100\7&13EAFDB8&0&2004760017462009&1
# USBSTOR\DISK&VEN_B&N&PROD_EBOOK_DISK&REV_0100\7&13EAFDB8&0&2004760017462009&0
#
match = re.search(r'REV_.*&(\d+)', pnp_id)
if match is not None:
order = int(match.group(1))
return order
def __call__(self, debug=False):
# import traceback
# traceback.print_stack()
if self.scanner is None:
return {}
try:
drives = self.scanner(debug)
except:
drives = {}
if debug:
import traceback
traceback.print_exc()
remove = set()
for letter in drives:
if not self.drive_is_ok(letter, debug=debug):
remove.add(letter)
for letter in remove:
drives.pop(letter)
ans = {}
for key, val in drives.items():
val = [x.upper() for x in val]
val = [x for x in val if 'USBSTOR' in x]
if val:
ans[Drive(key+':\\', order=self.drive_order(val[-1]))] = val[-1]
return ans
win_pnp_drives = WinPNPScanner()
_USBDevice = namedtuple('USBDevice',
'vendor_id product_id bcd manufacturer product serial')
@ -313,7 +246,7 @@ class DeviceScanner(object):
def __init__(self, *args):
if iswindows:
from calibre.devices.winusb import get_usb_devices as win_scanner
from calibre.devices.winusb import scan_usb_devices as win_scanner
self.scanner = (win_scanner if iswindows else osx_scanner if isosx else
freebsd_scanner if isfreebsd else netbsd_scanner if isnetbsd
else linux_scanner if islinux else libusb_scanner)
@ -326,8 +259,7 @@ class DeviceScanner(object):
self.devices = self.scanner()
def is_device_connected(self, device, debug=False, only_presence=False):
''' If only_presence is True don't perform any expensive checks (used
only in windows)'''
''' If only_presence is True don't perform any expensive checks '''
return device.is_usb_connected(self.devices, debug=debug,
only_presence=only_presence)
@ -357,25 +289,6 @@ def test_for_mem_leak():
diff_hists(h1, gc_histogram())
prints()
if not iswindows:
return
for reps in (1, 10, 100, 1000):
for i in xrange(3):
gc.collect()
h1 = gc_histogram()
startmem = memory()
for i in xrange(reps):
win_pnp_drives()
for i in xrange(3):
gc.collect()
usedmem = memory(startmem)
prints('Memory used in %d repetitions of pnp_scan(): %.5f KB'%(reps,
1024*usedmem))
prints('Differences in python object counts:')
diff_hists(h1, gc_histogram())
prints()
def main(args=sys.argv):
test_for_mem_leak()
return 0

View File

@ -188,37 +188,6 @@ class Device(DeviceConfig, DevicePlugin):
def windows_filter_pnp_id(self, pnp_id):
return False
def windows_match_device(self, pnp_id, attr):
device_id = getattr(self, attr)
def test_vendor():
vendors = [self.VENDOR_NAME] if isinstance(self.VENDOR_NAME,
basestring) else self.VENDOR_NAME
for v in vendors:
if 'VEN_'+str(v).upper() in pnp_id:
return True
return False
if device_id is None or not test_vendor():
return False
if self.windows_filter_pnp_id(pnp_id):
return False
if hasattr(device_id, 'search'):
return device_id.search(pnp_id) is not None
if isinstance(device_id, basestring):
device_id = [device_id]
for x in device_id:
x = x.upper()
if 'PROD_' + x in pnp_id:
return True
return False
def windows_sort_drives(self, drives):
'''
Called to disambiguate main memory and storage card for devices that
@ -227,80 +196,43 @@ class Device(DeviceConfig, DevicePlugin):
'''
return drives
def can_handle_windows(self, device_id, debug=False):
from calibre.devices.scanner import win_pnp_drives
drives = win_pnp_drives()
for pnp_id in drives.values():
if self.windows_match_device(pnp_id, 'WINDOWS_MAIN_MEM'):
return True
if debug:
print '\tNo match found in:', pnp_id
return False
def open_windows(self):
from calibre.devices.scanner import win_pnp_drives, drivecmp
time.sleep(5)
drives = {}
seen = set()
prod_pat = re.compile(r'PROD_(.+?)&')
dup_prod_id = False
def check_for_dups(pnp_id):
from calibre.devices.scanner import drive_is_ok
from calibre.devices.winusb import get_drive_letters_for_device
usbdev = self.device_being_opened
debug = DEBUG or getattr(self, 'do_device_debug', False)
try:
match = prod_pat.search(pnp_id)
if match is not None:
prodid = match.group(1)
if prodid in seen:
return True
else:
seen.add(prodid)
except:
pass
return False
dlmap = get_drive_letters_for_device(usbdev, debug=debug)
except Exception:
dlmap = []
for drive, pnp_id in win_pnp_drives().items():
if self.windows_match_device(pnp_id, 'WINDOWS_CARD_A_MEM') and \
not drives.get('carda', False):
drives['carda'] = drive
dup_prod_id |= check_for_dups(pnp_id)
elif self.windows_match_device(pnp_id, 'WINDOWS_CARD_B_MEM') and \
not drives.get('cardb', False):
drives['cardb'] = drive
dup_prod_id |= check_for_dups(pnp_id)
elif self.windows_match_device(pnp_id, 'WINDOWS_MAIN_MEM') and \
not drives.get('main', False):
drives['main'] = drive
dup_prod_id |= check_for_dups(pnp_id)
if not dlmap['drive_letters']:
time.sleep(7)
dlmap = get_drive_letters_for_device(usbdev, debug=debug)
if 'main' in drives.keys() and 'carda' in drives.keys() and \
'cardb' in drives.keys():
break
filtered = set()
for dl in dlmap['drive_letters']:
pnp_id = dlmap['pnp_id_map'][dl]
if self.windows_filter_pnp_id(pnp_id):
filtered.add(dl)
if debug:
prints('Ignoring the drive %s because of a PNP filter on %s' % (dl, pnp_id))
dlmap['drive_letters'] = [dl for dl in dlmap['drive_letters'] if dl not in filtered]
filtered = set()
for dl in dlmap['drive_letters']:
if not drive_is_ok(dl, debug=debug):
filtered.add(dl)
if debug:
prints('Ignoring the drive %s because failed to get free space for it' % dl)
dlmap['drive_letters'] = [dl for dl in dlmap['drive_letters'] if dl not in filtered]
# This is typically needed when the device has the same
# WINDOWS_MAIN_MEM and WINDOWS_CARD_A_MEM in which case
# if the device is connected without a card, the above
# will incorrectly identify the main mem as carda
# See for example the driver for the Nook
if drives.get('carda', None) is not None and \
drives.get('main', None) is None:
drives['main'] = drives.pop('carda')
if not dlmap['drive_letters']:
raise DeviceError(_('Unable to detect any disk drives for the device: %s. Try rebooting') % self.get_gui_name())
if drives.get('main', None) is None:
raise DeviceError(
_('Unable to detect the %s disk drive. Try rebooting.') %
self.__class__.__name__)
# Sort drives by their PNP drive numbers if the CARD and MAIN
# MEM strings are identical
if dup_prod_id or \
self.WINDOWS_MAIN_MEM in (self.WINDOWS_CARD_A_MEM,
self.WINDOWS_CARD_B_MEM) or \
self.WINDOWS_CARD_A_MEM == self.WINDOWS_CARD_B_MEM:
letters = sorted(drives.values(), cmp=drivecmp)
drives = {}
for which, letter in zip(['main', 'carda', 'cardb'], letters):
drives[which] = letter
for drive_letter, which in zip(dlmap['drive_letters'], 'main carda cardb'.split()):
drives[which] = drive_letter + ':\\'
drives = self.windows_sort_drives(drives)
self._main_prefix = drives.get('main')
@ -880,10 +812,6 @@ class Device(DeviceConfig, DevicePlugin):
time.sleep(2)
self.open_freebsd()
if iswindows:
try:
self.open_windows()
except DeviceError:
time.sleep(7)
self.open_windows()
if isosx:
try:

View File

@ -970,7 +970,7 @@ def develop(do_eject=False): # {{{
for dev in devplugins:
if dev.MANAGES_DEVICE_PRESENCE:
continue
connected, usbdev = dev.is_usb_connected_generic(usb_devices, debug=True)
connected, usbdev = dev.is_usb_connected(usb_devices, debug=True)
if connected:
print('\n')
print('Potentially connected device: %s at %s' % (dev.get_gui_name(), usbdev))