This commit is contained in:
Kovid Goyal 2008-01-09 19:09:53 +00:00
parent f1a6c8c706
commit 2d0e0707a6
12 changed files with 259 additions and 56 deletions

View File

@ -234,7 +234,7 @@ setup(
'iconfile' : 'icons/library.icns',
'frameworks': ['libusb.dylib', 'libunrar.dylib'],
'includes' : ['sip', 'pkg_resources', 'PyQt4.QtSvg',
'mechanize', 'ClientForm'],
'mechanize', 'ClientForm', 'usbobserver'],
'packages' : ['PIL', 'Authorization', 'rtf2xml', 'lxml'],
'excludes' : ['pydoc'],
'plist' : { 'CFBundleGetInfoString' : '''libprs500, an E-book management application.'''

View File

@ -50,16 +50,6 @@ class Device(object):
'''Return the FDI description of this device for HAL on linux.'''
return ''
@classmethod
def is_connected(cls, helper=None):
'''
Return True iff the device is physically connected to the computer
@param helper: Platform dependent helper object. For e.g. on windows
an instantiated WMI interface.
'''
raise NotImplementedError()
def set_progress_reporter(self, report_progress):
'''
@param report_progress: Function that is called with a % progress

View File

@ -118,21 +118,6 @@ class KINDLE(Device):
return True
return False
@classmethod
def is_connected(cls, helper=None):
if iswindows:
for c in helper.USBControllerDevice():
if cls.is_device(c.Dependent.DeviceID):
return True
return False
else:
try:
return get_device_by_id(cls.VENDOR_ID, cls.PRODUCT_ID) != None
except USBError:
return False
return False
def open_osx(self):
mount = subprocess.Popen('mount', shell=True,
stdout=subprocess.PIPE).stdout.read()

View File

@ -357,3 +357,16 @@ def get_device_by_id(idVendor, idProduct):
if dev.device_descriptor.idVendor == idVendor and \
dev.device_descriptor.idProduct == idProduct:
return dev
def has_library():
return _libusb is not None
def get_devices():
buslist = busses()
ans = []
for bus in buslist:
devices = bus.device_list
for dev in devices:
device = (dev.device_descriptor.idVendor, dev.device_descriptor.idProduct)
ans.append(device)
return ans

View File

@ -26,6 +26,7 @@ from libprs500.devices.errors import PathError
from libprs500.devices.prs500.cli.terminfo import TerminalController
from libprs500.devices.errors import ArgumentError, DeviceError, DeviceLocked
from libprs500.devices import devices
from libprs500.devices.scanner import DeviceScanner
MINIMUM_COL_WIDTH = 12 #: Minimum width of columns in ls output
@ -207,13 +208,15 @@ def main():
command = args[0]
args = args[1:]
dev = None
helper = None
_wmi = None
if iswindows:
import wmi, pythoncom
pythoncom.CoInitialize()
helper = wmi.WMI()
_wmi = wmi.WMI()
scanner = DeviceScanner(_wmi)
scanner.scan()
for d in devices():
if d.is_connected(helper):
if scanner.is_device_connected(d):
dev = d(log_packets=options.log_packets)
if dev is None:

View File

@ -113,21 +113,6 @@ class PRS505(Device):
return True
return False
@classmethod
def is_connected(cls, helper=None):
if iswindows:
for c in helper.USBControllerDevice():
if cls.is_device(c.Dependent.DeviceID):
return True
return False
else:
try:
return get_device_by_id(cls.VENDOR_ID, cls.PRODUCT_ID) != None
except USBError:
return False
return False
def open_osx(self):
mount = subprocess.Popen('mount', shell=True,
stdout=subprocess.PIPE).stdout.read()

View File

@ -0,0 +1,84 @@
## Copyright (C) 2008 Kovid Goyal kovid@kovidgoyal.net
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License along
## with this program; if not, write to the Free Software Foundation, Inc.,
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
'''
Device scanner that fetches list of devices on system ina platform dependent
manner.
'''
import sys
from libprs500 import iswindows, isosx
from libprs500.devices import libusb
osx_scanner = None
try:
import usbobserver
osx_scanner = usbobserver.get_devices
except ImportError:
pass
linux_scanner = libusb.get_devices
class DeviceScanner(object):
def __init__(self, wmi=None):
self.wmi = wmi
if iswindows and wmi is None:
raise RuntimeError('You must pass a wmi instance to DeviceScanner on windows.')
if isosx and osx_scanner is None:
raise RuntimeError('The Python extension usbobserver must be available on OS X.')
if not (isosx or iswindows) and not libusb.has_library():
raise RuntimeError('DeviceScanner requires libusb to work.')
self.devices = []
def get_devices(self):
if iswindows:
devices = []
for c in self.wmi.USBControllerDevice():
devices.append(c.Dependent.DeviceID.upper())
return devices
if isosx:
return osx_scanner()
return linux_scanner()
def scan(self):
self.devices = self.get_devices()
def is_device_connected(self, device):
if iswindows:
for device_id in self.devices:
if 'VEN_'+device.VENDOR_NAME in device_id and \
'PROD_'+device.PRODUCT_NAME in device_id:
return True
vid, pid = hex(device.VENDOR_ID)[2:], hex(device.PRODUCT_ID)[2:]
if len(vid) < 4: vid = '0'+vid
if len(pid) < 4: pid = '0'+pid
if 'VID_'+vid in device_id and \
'PID_'+pid in device_id:
return True
return False
else:
for vendor, product in self.devices:
if device.VENDOR_ID == vendor and device.PRODUCT_ID == product:
return True
return False
def main(args=sys.argv):
return 0
if __name__ == '__main__':
sys.exit(main())

View File

@ -0,0 +1,12 @@
usbobserver.so : usbobserver.o
gcc -arch i386 -arch ppc -bundle usbobserver.o -o usbobserver.so -framework Python -framework IOKit -framework CoreFoundation
usbobserver.o : usbobserver.c
gcc -arch i386 -arch ppc -dynamic -I/Library/Frameworks/Python.framework/Versions/2.5/include/python2.5 -c usbobserver.c -o usbobserver.o
install : usbobserver.so
cp usbobserver.so /Library/Frameworks/Python.framework/Versions/2.5/lib/python2.5/site-packages/
clean :
rm -f *.o *.so

View File

@ -0,0 +1,7 @@
import gc, usbobserver
a = None
print len(gc.get_objects())
usbobserver.get_devices()
gc.collect()
print len(gc.get_objects())

View File

@ -0,0 +1,124 @@
/* Copyright (C) 2008 Kovid Goyal kovid@kovidgoyal.net
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along
* with this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Python extension to scan the system for USB devices on OS X machines.
* To use
* >>> import usbobserver
* >>> usbobserver.get_devices()
*/
#include <Python.h>
#include <stdio.h>
#include <IOKit/usb/IOUSBLib.h>
#include <IOKit/IOCFPlugIn.h>
static PyObject *
usbobserver_get_devices(PyObject *self, PyObject *args) {
mach_port_t masterPort;
CFMutableDictionaryRef matchingDict;
kern_return_t kr;
/* Create a master port for communication with IOKit */
kr = IOMasterPort(MACH_PORT_NULL, &masterPort);
if (kr || !masterPort) {
PyErr_SetString(PyExc_RuntimeError, "Couldn't create master IOKit port");
return NULL;
}
//Set up matching dictionary for class IOUSBDevice and its subclasses
matchingDict = IOServiceMatching(kIOUSBDeviceClassName);
if (!matchingDict) {
PyErr_SetString(PyExc_RuntimeError, "Couldn't create a USB matching dictionary");
mach_port_deallocate(mach_task_self(), masterPort);
return NULL;
}
io_iterator_t iter;
IOServiceGetMatchingServices(kIOMasterPortDefault, matchingDict, &iter);
io_service_t usbDevice;
IOCFPlugInInterface **plugInInterface = NULL;
SInt32 score;
IOUSBDeviceInterface182 **dev = NULL;
UInt16 vendor, product;
PyObject *devices, *device;
devices = PyList_New(0);
if (devices == NULL) {
PyErr_SetString(PyExc_RuntimeError, "Out of memory allocating list");
mach_port_deallocate(mach_task_self(), masterPort);
return NULL;
}
while (usbDevice = IOIteratorNext(iter)) {
plugInInterface = NULL; dev = NULL;
//Create an intermediate plugin
kr = IOCreatePlugInInterfaceForService(usbDevice, kIOUSBDeviceUserClientTypeID, kIOCFPlugInInterfaceID, &plugInInterface, &score);
if ((kIOReturnSuccess != kr) || !plugInInterface)
printf("Unable to create a plug-in (%08x)\n", kr);
//Now create the device interface
HRESULT result = (*plugInInterface)->QueryInterface(plugInInterface, CFUUIDGetUUIDBytes(kIOUSBDeviceInterfaceID), (LPVOID)&dev);
if (result || !dev) printf("Couldn't create a device interface (%08x)\n", (int) result);
kr = (*dev)->GetDeviceVendor(dev, &vendor);
kr = (*dev)->GetDeviceProduct(dev, &product);
device = Py_BuildValue("(ii)", vendor, product);
if (device == NULL) {
IOObjectRelease(usbDevice);
(*plugInInterface)->Release(plugInInterface);
(*dev)->Release(dev);
Py_DECREF(devices);
return NULL;
}
if (PyList_Append(devices, device) == -1) {
IOObjectRelease(usbDevice);
(*plugInInterface)->Release(plugInInterface);
(*dev)->Release(dev);
Py_DECREF(devices);
Py_DECREF(device);
return NULL;
}
IOObjectRelease(usbDevice);
(*plugInInterface)->Release(plugInInterface);
(*dev)->Release(dev);
Py_DECREF(device);
}
//Finished with master port
mach_port_deallocate(mach_task_self(), masterPort);
return Py_BuildValue("N", devices);
}
static PyMethodDef usbobserver_methods[] = {
{"get_devices", usbobserver_get_devices, METH_VARARGS,
"Get list of connected USB devices. Returns a list of tuples. Each tuple is of the form (vendor_id, product_id)."
},
{NULL, NULL, 0, NULL}
};
PyMODINIT_FUNC
initusbobserver(void) {
(void) Py_InitModule("usbobserver", usbobserver_methods);
}

View File

@ -12,12 +12,13 @@
## You should have received a copy of the GNU General Public License along
## with this program; if not, write to the Free Software Foundation, Inc.,
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.Warning
import os, sys, traceback
import os, traceback
from PyQt4.QtCore import QThread, SIGNAL, QObject
from libprs500 import iswindows
from libprs500.devices import devices
from libprs500.devices.scanner import DeviceScanner
class DeviceDetector(QThread):
'''
@ -35,17 +36,16 @@ class DeviceDetector(QThread):
QThread.__init__(self)
def run(self):
helper = None
_wmi = None
if iswindows:
import wmi, pythoncom
pythoncom.CoInitialize()
helper = wmi.WMI()
_wmi = wmi.WMI()
scanner = DeviceScanner(_wmi)
while True:
scanner.scan()
for device in self.devices:
try:
connected = device[0].is_connected(helper=helper)
except:
connected = False
connected = scanner.is_device_connected(device[0])
if connected and not device[1]:
try:
dev = device[0]()