Switch to non WMI based method of detecting devices on windows

This commit is contained in:
Kovid Goyal 2010-01-12 14:23:04 -07:00
parent 8a5d3cfaba
commit 786e50e2cf
6 changed files with 88 additions and 121 deletions

View File

@ -5,7 +5,7 @@ __copyright__ = '2008, Kovid Goyal <kovid at kovidgoyal.net>'
Device drivers.
'''
import sys, os, time, pprint
import sys, time, pprint
from functools import partial
from StringIO import StringIO
@ -29,7 +29,7 @@ def strftime(epoch, zone=time.gmtime):
def debug(ioreg_to_tmp=False, buf=None):
from calibre.customize.ui import device_plugins
from calibre.devices.scanner import DeviceScanner
from calibre.devices.scanner import DeviceScanner, win_pnp_drives
from calibre.constants import iswindows, isosx, __version__
from calibre import prints
oldo, olde = sys.stdout, sys.stderr
@ -37,19 +37,11 @@ def debug(ioreg_to_tmp=False, buf=None):
if buf is None:
buf = StringIO()
sys.stdout = sys.stderr = buf
if iswindows:
import pythoncom
pythoncom.CoInitialize()
try:
out = partial(prints, file=buf)
out('Version:', __version__)
wmi = Wmi =None
if iswindows:
wmi = __import__('wmi', globals(), locals(), [], -1)
Wmi = wmi.WMI(find_classes=False)
s = DeviceScanner()
s.wmi = Wmi
s.scan()
devices = (s.devices)
if not iswindows:
@ -60,21 +52,9 @@ def debug(ioreg_to_tmp=False, buf=None):
out('USB devices on system:')
out(pprint.pformat(devices))
if iswindows:
drives = []
drives = win_pnp_drives(debug=True)
out('Drives detected:')
out('\t', '(ID, Partitions, Drive letter)')
for drive in Wmi.Win32_DiskDrive():
if drive.Partitions == 0:
continue
try:
partition = drive.associators("Win32_DiskDriveToDiskPartition")[0]
logical_disk = partition.associators('Win32_LogicalDiskToPartition')[0]
prefix = logical_disk.DeviceID+os.sep
drives.append((str(drive.PNPDeviceID), drive.Index, prefix))
except IndexError:
drives.append((str(drive.PNPDeviceID), 'No mount points found'))
for drive in drives:
out('\t', drive)
out(pprint.pformat(drives))
ioreg = None
if isosx:
@ -84,13 +64,10 @@ def debug(ioreg_to_tmp=False, buf=None):
ioreg = 'Output from mount:\n\n'+mount+'\n\n'+ioreg
connected_devices = []
for dev in device_plugins():
owmi = getattr(dev, 'wmi', None)
dev.wmi = Wmi
out('Looking for', dev.__class__.__name__)
connected, det = s.is_device_connected(dev, debug=True)
if connected:
connected_devices.append((dev, det))
dev.wmi = owmi
errors = {}
success = False
@ -102,8 +79,6 @@ def debug(ioreg_to_tmp=False, buf=None):
out(' ')
for dev, det in connected_devices:
out('Trying to open', dev.name, '...', end=' ')
owmi = getattr(dev, 'wmi', None)
dev.wmi = Wmi
try:
dev.reset(detected_device=det)
dev.open()
@ -113,8 +88,6 @@ def debug(ioreg_to_tmp=False, buf=None):
errors[dev] = traceback.format_exc()
out('failed')
continue
finally:
dev.wmi = owmi
success = True
if hasattr(dev, '_main_prefix'):
out('Main memory:', repr(dev._main_prefix))
@ -142,7 +115,4 @@ def debug(ioreg_to_tmp=False, buf=None):
finally:
sys.stdout = oldo
sys.stderr = olde
if iswindows:
import pythoncom
pythoncom.CoUninitialize()

View File

@ -60,7 +60,7 @@ class DevicePlugin(Plugin):
import traceback
traceback.print_exc()
def is_usb_connected_windows(self, devices_on_system, pnp_id_iterator, debug=False):
def is_usb_connected_windows(self, devices_on_system, debug=False):
def id_iterator():
if hasattr(self.VENDOR_ID, 'keys'):
@ -85,7 +85,7 @@ class DevicePlugin(Plugin):
self.test_bcd_windows(device_id, bcd):
if debug:
self.print_usb_device_info(device_id)
if self.can_handle_windows(device_id, pnp_id_iterator, debug=debug):
if self.can_handle_windows(device_id, debug=debug):
return True
return False
@ -97,7 +97,7 @@ class DevicePlugin(Plugin):
return True
return False
def is_usb_connected(self, devices_on_system, pnp_id_iterator, debug=False):
def is_usb_connected(self, devices_on_system, debug=False):
'''
Return True, device_info if a device handled by this plugin is currently connected.
@ -105,7 +105,7 @@ class DevicePlugin(Plugin):
'''
if iswindows:
return self.is_usb_connected_windows(devices_on_system,
pnp_id_iterator, debug=debug), None
debug=debug), None
vendors_on_system = set([x[0] for x in devices_on_system])
vendors = self.VENDOR_ID if hasattr(self.VENDOR_ID, '__len__') else [self.VENDOR_ID]
@ -147,7 +147,7 @@ class DevicePlugin(Plugin):
"""
raise NotImplementedError()
def can_handle_windows(self, device_id, pnp_id_iterator, debug=False):
def can_handle_windows(self, device_id, debug=False):
'''
Optional method to perform further checks on a device to see if this driver
is capable of handling it. If it is not it should return False. This method

View File

@ -9,7 +9,7 @@ For usage information run the script.
import StringIO, sys, time, os
from optparse import OptionParser
from calibre import __version__, iswindows, __appname__
from calibre import __version__, __appname__
from calibre.devices.errors import PathError
from calibre.utils.terminfo import TerminalController
from calibre.devices.errors import ArgumentError, DeviceError, DeviceLocked
@ -198,14 +198,9 @@ def main():
args = args[1:]
dev = None
scanner = DeviceScanner()
if iswindows:
import wmi, pythoncom
pythoncom.CoInitialize()
scanner.wmi = wmi.WMI(find_classes=False)
scanner.scan()
connected_devices = []
for d in device_plugins():
d.wmi = scanner.wmi
ok, det = scanner.is_device_connected(d)
if ok:
dev = d

View File

@ -6,6 +6,7 @@ manner.
'''
import sys, os
from threading import RLock
from calibre import iswindows, isosx, plugins, islinux
@ -22,6 +23,54 @@ elif isosx:
except:
raise RuntimeError('Failed to load the usbobserver plugin: %s'%plugins['usbobserver'][1])
class WinPNPScanner(object):
def __init__(self):
self.scanner = None
if iswindows:
self.scanner = plugins['winutil'][0].get_removable_drives
self.lock = RLock()
def drive_is_ok(self, letter, debug=False):
import win32api, win32file
with self.lock:
oldError = win32api.SetErrorMode(1) #SEM_FAILCRITICALERRORS = 1
try:
ans = True
try:
win32file.GetDiskFreeSpaceEx(letter+':\\')
except:
ans = False
return ans
finally:
win32api.SetErrorMode(oldError)
def __call__(self, debug=False):
if self.scanner is None:
return {}
try:
drives = self.scanner(debug)
except:
drives = {}
if debug:
import traceback
traceback.print_exc()
remove = set([])
for letter in drives:
if not self.drive_is_ok(letter, debug=debug):
remove.add(letter)
for letter in remove:
drives.pop(letter)
ans = {}
for key, val in drives.items():
val = [x.upper() for x in val]
val = [x for x in val if 'USBSTOR' in x]
if val:
ans[key+':\\'] = val[-1]
return ans
win_pnp_drives = WinPNPScanner()
class LinuxScanner(object):
SYSFS_PATH = os.environ.get('SYSFS_PATH', '/sys')
@ -85,26 +134,13 @@ class DeviceScanner(object):
raise RuntimeError('DeviceScanner requires the /sys filesystem to work.')
self.scanner = win_scanner if iswindows else osx_scanner if isosx else linux_scanner
self.devices = []
self.wmi = None
self.pnp_ids = set([])
self.rescan_pnp_ids = True
def scan(self):
'''Fetch list of connected USB devices from operating system'''
self.devices = self.scanner()
if self.rescan_pnp_ids:
self.pnp_ids = set([])
def pnp_id_iterator(self):
if self.wmi is not None and not self.pnp_ids:
for drive in self.wmi.Win32_DiskDrive():
if drive.Partitions > 0:
self.pnp_ids.add(str(drive.PNPDeviceID))
for x in self.pnp_ids:
yield x
def is_device_connected(self, device, debug=False):
return device.is_usb_connected(self.devices, self.pnp_id_iterator, debug=debug)
return device.is_usb_connected(self.devices, debug=debug)
def main(args=sys.argv):

View File

@ -203,18 +203,6 @@ class Device(DeviceConfig, DevicePlugin):
return False
def windows_get_drive_prefix(self, drive):
prefix = None
try:
partition = drive.associators("Win32_DiskDriveToDiskPartition")[0]
logical_disk = partition.associators('Win32_LogicalDiskToPartition')[0]
prefix = logical_disk.DeviceID + os.sep
except IndexError:
pass
return prefix
def windows_sort_drives(self, drives):
'''
Called to disambiguate main memory and storage card for devices that
@ -223,8 +211,10 @@ class Device(DeviceConfig, DevicePlugin):
'''
return drives
def can_handle_windows(self, device_id, pnp_id_iterator, debug=False):
for pnp_id in pnp_id_iterator():
def can_handle_windows(self, device_id, debug=False):
from calibre.devices.scanner import win_pnp_drives
drives = win_pnp_drives()
for pnp_id in drives.values():
if self.windows_match_device(pnp_id, 'WINDOWS_MAIN_MEM'):
return True
if debug:
@ -232,29 +222,20 @@ class Device(DeviceConfig, DevicePlugin):
return False
def open_windows(self):
from calibre.devices.scanner import win_pnp_drives
def matches_q(drive, attr):
q = getattr(self, attr)
if q is None: return False
if isinstance(q, basestring):
q = [q]
pnp = str(drive.PNPDeviceID)
for x in q:
if x in pnp:
return True
return False
time.sleep(8)
time.sleep(5)
drives = {}
c = self.wmi
for drive in c.Win32_DiskDrive():
pnp_id = str(drive.PNPDeviceID)
if self.windows_match_device(pnp_id, 'WINDOWS_CARD_A_MEM') and not drives.get('carda', None):
drives['carda'] = self.windows_get_drive_prefix(drive)
elif self.windows_match_device(pnp_id, 'WINDOWS_CARD_B_MEM') and not drives.get('cardb', None):
drives['cardb'] = self.windows_get_drive_prefix(drive)
elif self.windows_match_device(pnp_id, 'WINDOWS_MAIN_MEM') and not drives.get('main', None):
drives['main'] = self.windows_get_drive_prefix(drive)
for drive, pnp_id in win_pnp_drives().items():
if self.windows_match_device(pnp_id, 'WINDOWS_CARD_A_MEM') and \
not drives.get('carda', False):
drives['carda'] = drive
elif self.windows_match_device(pnp_id, 'WINDOWS_CARD_B_MEM') and \
not drives.get('cardb', False):
drives['cardb'] = drive
elif self.windows_match_device(pnp_id, 'WINDOWS_MAIN_MEM') and \
not drives.get('main', False):
drives['main'] = drive
if 'main' in drives.keys() and 'carda' in drives.keys() and \
'cardb' in drives.keys():

View File

@ -13,7 +13,6 @@ from PyQt4.Qt import QMenu, QAction, QActionGroup, QIcon, SIGNAL, QPixmap, \
from calibre.customize.ui import available_input_formats, available_output_formats, \
device_plugins
from calibre.devices.interface import DevicePlugin
from calibre.constants import iswindows
from calibre.gui2.dialogs.choose_format import ChooseFormatDialog
from calibre.utils.ipc.job import BaseJob
from calibre.devices.scanner import DeviceScanner
@ -85,7 +84,6 @@ class DeviceManager(Thread):
self.job_manager = job_manager
self.current_job = None
self.scanner = DeviceScanner()
self.wmi = None
self.connected_device = None
self.ejected_devices = set([])
@ -133,7 +131,6 @@ class DeviceManager(Thread):
self.connected_device = None
def detect_device(self):
self.scanner.rescan_pnp_ids = not self.is_device_connected
self.scanner.scan()
if self.is_device_connected:
connected, detected_device = \
@ -170,30 +167,18 @@ class DeviceManager(Thread):
pass
def run(self):
if iswindows:
import pythoncom
pythoncom.CoInitialize()
wmi = __import__('wmi', globals(), locals(), [], -1)
self.wmi = wmi.WMI(find_classes=False)
self.scanner.wmi = self.wmi
for x in self.devices:
x.wmi = self.wmi
try:
while self.keep_going:
self.detect_device()
while True:
job = self.next()
if job is not None:
self.current_job = job
self.device.set_progress_reporter(job.report_progress)
self.current_job.run()
self.current_job = None
else:
break
time.sleep(self.sleep_time)
finally:
if iswindows:
pythoncom.CoUninitialize()
while self.keep_going:
self.detect_device()
while True:
job = self.next()
if job is not None:
self.current_job = job
self.device.set_progress_reporter(job.report_progress)
self.current_job.run()
self.current_job = None
else:
break
time.sleep(self.sleep_time)
def create_job(self, func, done, description, args=[], kwargs={}):