Create a libusb based device scanner and use it on OS X. This is needed to enable MTP support.

This commit is contained in:
Kovid Goyal 2012-08-25 23:28:32 +05:30
parent 47fa81f0bd
commit cf416ed13c
5 changed files with 222 additions and 26 deletions

View File

@ -188,10 +188,15 @@ if iswindows:
if isosx:
extensions.append(Extension('usbobserver',
['calibre/devices/usbobserver/usbobserver.c'],
ldflags=['-framework', 'IOKit'])
ldflags=['-framework', 'CoreServices', '-framework', 'IOKit'])
)
if islinux:
if islinux or isosx:
extensions.append(Extension('libusb',
['calibre/devices/libusb/libusb.c'],
libraries=['usb-1.0']
))
extensions.append(Extension('libmtp',
[
'calibre/devices/mtp/unix/devices.c',

View File

@ -438,12 +438,15 @@ class Py2App(object):
@flush
def add_misc_libraries(self):
for x in ('usb', 'unrar', 'readline.6.1', 'wmflite-0.2.7', 'chm.0',
'sqlite3.0'):
for x in ('usb-1.0.0', 'mtp.9', 'unrar', 'readline.6.1',
'wmflite-0.2.7', 'chm.0', 'sqlite3.0'):
info('\nAdding', x)
x = 'lib%s.dylib'%x
shutil.copy2(join(SW, 'lib', x), self.frameworks_dir)
self.set_id(join(self.frameworks_dir, x), self.FID+'/'+x)
dest = join(self.frameworks_dir, x)
self.set_id(dest, self.FID+'/'+x)
if 'mtp' in x:
self.fix_dependencies_in_lib(dest)
@flush
def add_site_packages(self):

View File

@ -94,7 +94,8 @@ class Plugins(collections.Mapping):
plugins.extend(['winutil', 'wpd'])
if isosx:
plugins.append('usbobserver')
if islinux:
if islinux or isosx:
plugins.append('libusb')
plugins.append('libmtp')
self.plugins = frozenset(plugins)

View File

@ -0,0 +1,144 @@
/*
* libusb.c
* Copyright (C) 2012 Kovid Goyal <kovid at kovidgoyal.net>
*
* Distributed under terms of the GPL3 license.
*/
#define UNICODE
#include <Python.h>
#include <libusb-1.0/libusb.h>
static PyObject *Error = NULL;
static PyObject *cache = NULL;
static PyObject* format_err(int err) {
PyErr_SetString(Error, libusb_error_name(err));
return NULL;
}
static PyObject* read_string_property(libusb_device_handle *dev, uint8_t idx) {
unsigned char buf[301];
int err;
PyObject *ans = NULL;
Py_BEGIN_ALLOW_THREADS;
err = libusb_get_string_descriptor_ascii(dev, idx, buf, 300);
Py_END_ALLOW_THREADS;
if (err > 0) {
ans = PyUnicode_FromStringAndSize((char *)buf, err);
}
return ans;
}
static PyObject* read_string_data(libusb_device *dev, uint8_t manufacturer, uint8_t product, uint8_t serial) {
libusb_device_handle *handle;
int err;
PyObject *ans = NULL, *p;
ans = PyDict_New();
if (ans == NULL) return PyErr_NoMemory();
err = libusb_open(dev, &handle);
if (err == 0) {
p = read_string_property(handle, manufacturer);
if (p != NULL) { PyDict_SetItemString(ans, "manufacturer", p); Py_DECREF(p); }
p = read_string_property(handle, product);
if (p != NULL) { PyDict_SetItemString(ans, "product", p); Py_DECREF(p); };
p = read_string_property(handle, serial);
if (p != NULL) { PyDict_SetItemString(ans, "serial", p); Py_DECREF(p); };
libusb_close(handle);
}
return ans;
}
static PyObject* get_devices(PyObject *self, PyObject *args) {
PyObject *ans = NULL, *d = NULL, *t = NULL, *rec = NULL;
int err, i = 0;
libusb_device **devs = NULL, *dev = NULL;
ssize_t count;
ans = PyList_New(0);
if (ans == NULL) return PyErr_NoMemory();
Py_BEGIN_ALLOW_THREADS;
count = libusb_get_device_list(NULL, &devs);
Py_END_ALLOW_THREADS;
if (count < 0) { Py_DECREF(ans); return format_err((int)count); }
while ( (dev = devs[i++]) != NULL ) {
struct libusb_device_descriptor desc;
err = libusb_get_device_descriptor(dev, &desc);
if (err != 0) { format_err(err); break; }
if (desc.bDeviceClass == LIBUSB_CLASS_HUB) continue;
d = Py_BuildValue("(HHHHH)", libusb_get_bus_number(dev),
libusb_get_device_address(dev), desc.idVendor, desc.idProduct,
desc.bcdDevice);
if (d == NULL) break;
t = PyDict_GetItem(cache, d);
if (t == NULL) {
t = read_string_data(dev, desc.iManufacturer, desc.iProduct, desc.iSerialNumber);
if (t == NULL) { Py_DECREF(d); break; }
PyDict_SetItem(cache, d, t);
Py_DECREF(t);
}
rec = Py_BuildValue("(NO)", d, t);
if (rec == NULL) { Py_DECREF(d); break; }
PyList_Append(ans, rec);
Py_DECREF(rec);
}
if (dev != NULL) {
// An error occurred
Py_DECREF(ans); ans = NULL;
}
if (devs != NULL) libusb_free_device_list(devs, 1);
return ans;
}
static PyMethodDef libusb_methods[] = {
{"get_devices", get_devices, METH_VARARGS,
"get_devices()\n\nGet the list of USB devices on the system."
},
{NULL, NULL, 0, NULL}
};
PyMODINIT_FUNC
initlibusb(void) {
PyObject *m;
// We deliberately use the default context. This is the context used by
// libmtp and we want to ensure that the busnum/devnum numbers are the same
// here and for libmtp.
if(libusb_init(NULL) != 0) return;
Error = PyErr_NewException("libusb.Error", NULL, NULL);
if (Error == NULL) return;
cache = PyDict_New();
if (cache == NULL) return;
m = Py_InitModule3("libusb", libusb_methods, "Interface to libusb.");
if (m == NULL) return;
PyModule_AddObject(m, "Error", Error);
PyModule_AddObject(m, "cache", cache);
}

View File

@ -13,18 +13,13 @@ from calibre import prints, as_unicode
from calibre.constants import (iswindows, isosx, plugins, islinux, isfreebsd,
isnetbsd)
osx_scanner = win_scanner = linux_scanner = None
osx_scanner = win_scanner = linux_scanner = freebsd_scanner = netbsd_scanner = None
if iswindows:
try:
win_scanner = plugins['winutil'][0].get_usb_devices
except:
raise RuntimeError('Failed to load the winutil plugin: %s'%plugins['winutil'][1])
elif isosx:
try:
osx_scanner = plugins['usbobserver'][0].get_usb_devices
except:
raise RuntimeError('Failed to load the usbobserver plugin: %s'%plugins['usbobserver'][1])
class Drive(str):
@ -118,6 +113,54 @@ class USBDevice(_USBDevice):
_USBDevice.__init__(self, *args, **kwargs)
self.busnum = self.devnum = -1
def __repr__(self):
return (u'USBDevice(busnum=%s, devnum=%s, '
'vendor_id=0x%04x, product_id=0x%04x, bcd=0x%04x, '
'manufacturer=%s, product=%s, serial=%s)')%(
self.busnum, self.devnum, self.vendor_id, self.product_id,
self.bcd, self.manufacturer, self.product, self.serial)
__str__ = __repr__
__unicode__ = __repr__
class LibUSBScanner(object):
def __call__(self):
if not hasattr(self, 'libusb'):
self.libusb, self.libusb_err = plugins['libusb']
if self.libusb is None:
raise ValueError(
'DeviceScanner needs libusb to work. Error: %s'%
self.libusb_err)
ans = set()
seen = set()
for fingerprint, ids in self.libusb.get_devices():
seen.add(fingerprint)
man = ids.get('manufacturer', None)
prod = ids.get('product', None)
serial = ids.get('serial', None)
dev = fingerprint[2:] + (man, prod, serial)
dev = USBDevice(*dev)
dev.busnum, dev.devnum = fingerprint[:2]
ans.add(dev)
extra = set(self.libusb.cache.iterkeys()) - seen
for x in extra:
self.libusb.cache.pop(x, None)
return ans
def check_for_mem_leak(self):
import gc
from calibre.utils.mem import memory
memory()
for num in (1, 10, 100):
start = memory()
for i in xrange(num):
self()
for i in xrange(3): gc.collect()
print 'Mem consumption increased by:', memory() - start, 'MB',
print 'after', num, 'repeats'
class LinuxScanner(object):
SYSFS_PATH = os.environ.get('SYSFS_PATH', '/sys')
@ -165,17 +208,17 @@ class LinuxScanner(object):
except:
continue
try:
dev.append(read(man))
dev.append(read(man).decode('utf-8'))
except:
dev.append(b'')
dev.append(u'')
try:
dev.append(read(prod_string))
dev.append(read(prod_string).decode('utf-8'))
except:
dev.append(b'')
dev.append(u'')
try:
dev.append(read(serial))
dev.append(read(serial).decode('utf-8'))
except:
dev.append(b'')
dev.append(u'')
dev = USBDevice(*dev)
try:
@ -244,18 +287,16 @@ class FreeBSDScanner(object):
linux_scanner = None
if islinux:
linux_scanner = LinuxScanner()
freebsd_scanner = None
libusb_scanner = LibUSBScanner()
if isosx:
osx_scanner = libusb_scanner
if isfreebsd:
freebsd_scanner = FreeBSDScanner()
netbsd_scanner = None
''' NetBSD support currently not written yet '''
if isnetbsd:
netbsd_scanner = None
@ -263,9 +304,11 @@ if isnetbsd:
class DeviceScanner(object):
def __init__(self, *args):
if isosx and osx_scanner is None:
raise RuntimeError('The Python extension usbobserver must be available on OS X.')
self.scanner = win_scanner if iswindows else osx_scanner if isosx else freebsd_scanner if isfreebsd else netbsd_scanner if isnetbsd else linux_scanner
self.scanner = (win_scanner if iswindows else osx_scanner if isosx else
freebsd_scanner if isfreebsd else netbsd_scanner if isnetbsd
else linux_scanner if islinux else libusb_scanner)
if self.scanner is None:
self.scanner = libusb_scanner
self.devices = []
def scan(self):