Linux device detection: Drop libusb1 in favor of parsing sysfs directly

This commit is contained in:
Kovid Goyal 2009-12-12 17:41:53 -07:00
parent 8919eacd7a
commit 6a9a18ecd1
6 changed files with 57 additions and 250 deletions

View File

@ -136,11 +136,6 @@ if isosx:
['calibre/devices/usbobserver/usbobserver.c'],
ldflags=['-framework', 'IOKit'])
)
if islinux:
extensions.append(Extension('libusb',
['calibre/devices/libusb.c'],
ldflags=['-lusb-1.0']))
if isunix:
cc = os.environ.get('CC', 'gcc')

View File

@ -57,7 +57,6 @@ if plugins is None:
for plugin in ['pictureflow', 'lzx', 'msdes', 'podofo', 'cPalmdoc',
'fontconfig', 'pdfreflow', 'progress_indicator'] + \
(['winutil'] if iswindows else []) + \
(['libusb'] if islinux else []) + \
(['usbobserver'] if isosx else []):
try:
p, err = __import__(plugin), ''

View File

@ -8,8 +8,7 @@ a backend that implement the Device interface for the SONY PRS500 Reader.
import os
from calibre.customize import Plugin
from calibre.constants import iswindows, islinux
from calibre.devices.libusb1 import info
from calibre.constants import iswindows
class DevicePlugin(Plugin):
"""
@ -109,15 +108,15 @@ class DevicePlugin(Plugin):
for vid in vendors:
if vid in vendors_on_system:
for cvid, pid, bcd in devices_on_system:
for dev in devices_on_system:
cvid, pid, bcd = dev[:3]
if cvid == vid:
if pid in products:
if hasattr(cls.VENDOR_ID, 'keys'):
cbcd = cls.VENDOR_ID[vid][pid]
else:
cbcd = cls.BCD
if cls.test_bcd(bcd, cbcd) and cls.can_handle((vid,
pid, bcd),
if cls.test_bcd(bcd, cbcd) and cls.can_handle(dev,
debug=debug):
return True
return False
@ -151,14 +150,12 @@ class DevicePlugin(Plugin):
:param device_info: On windows a device ID string. On Unix a tuple of
``(vendor_id, product_id, bcd)``.
'''
if islinux:
try:
if debug:
dev = info(*device_info)
print '\t', repr(dev)
except:
import traceback
traceback.print_exc()
try:
if debug:
print '\t', repr(device_info)
except:
import traceback
traceback.print_exc()
return True
def open(self):

View File

@ -1,199 +0,0 @@
/*
:mod:`libusb` -- Pythonic interface to libusb
=====================================================
.. module:: fontconfig
:platform: Linux
:synopsis: Pythonic interface to the libusb library
.. moduleauthor:: Kovid Goyal <kovid@kovidgoyal.net> Copyright 2009
*/
#define PY_SSIZE_T_CLEAN
#include <Python.h>
#include <libusb-1.0/libusb.h>
libusb_context *ctxt = NULL;
void cleanup() {
if (ctxt != NULL) {
libusb_exit(ctxt);
}
}
PyObject*
py_libusb_scan(PyObject *self, PyObject *args) {
libusb_device **list = NULL;
struct libusb_device_descriptor dev;
ssize_t ret = 0, i = 0;
PyObject *ans, *pydev, *t;
if (ctxt == NULL) return PyErr_NoMemory();
ret = libusb_get_device_list(ctxt, &list);
if (ret == LIBUSB_ERROR_NO_MEM) return PyErr_NoMemory();
ans = PyTuple_New(ret);
if (ans == NULL) return PyErr_NoMemory();
for (i = 0; i < ret; i++) {
if (libusb_get_device_descriptor(list[i], &dev) != 0) {
PyTuple_SET_ITEM(ans, i, Py_None);
continue;
}
pydev = PyTuple_New(3);
if (pydev == NULL) return PyErr_NoMemory();
t = PyInt_FromLong(dev.idVendor);
if (t == NULL) return PyErr_NoMemory();
PyTuple_SET_ITEM(pydev, 0, t);
t = PyInt_FromLong(dev.idProduct);
if (t == NULL) return PyErr_NoMemory();
PyTuple_SET_ITEM(pydev, 1, t);
t = PyInt_FromLong(dev.bcdDevice);
if (t == NULL) return PyErr_NoMemory();
PyTuple_SET_ITEM(pydev, 2, t);
PyTuple_SET_ITEM(ans, i, pydev);
}
libusb_free_device_list(list, 1);
return ans;
}
PyObject*
py_libusb_info(PyObject *self, PyObject *args) {
uint16_t idVendor, idProduct, bcdDevice;
ssize_t ret = 0, i = 0; int err = 0, n;
libusb_device **list = NULL;
libusb_device_handle *handle = NULL;
struct libusb_device_descriptor dev;
PyObject *ans, *t, *t1 = NULL, *t2 = NULL, *t3 = NULL;
unsigned char data[1000];
if (ctxt == NULL) return PyErr_NoMemory();
if (!PyArg_ParseTuple(args, "OOO", &t1, &t2, &t3))
return NULL;
idVendor = (uint16_t)PyInt_AsSsize_t(t1);
idProduct = (uint16_t)PyInt_AsSsize_t(t2);
bcdDevice = (uint16_t)PyInt_AsSsize_t(t3);
ret = libusb_get_device_list(ctxt, &list);
if (ret == LIBUSB_ERROR_NO_MEM) return PyErr_NoMemory();
ans = PyDict_New();
if (ans == NULL) return PyErr_NoMemory();
for (i = 0; i < ret; i++) {
if (libusb_get_device_descriptor(list[i], &dev) != 0) continue;
if (idVendor == dev.idVendor && idProduct == dev.idProduct && bcdDevice == dev.bcdDevice) {
err = libusb_open(list[i], &handle);
if (!err) {
if (dev.iManufacturer) {
n = libusb_get_string_descriptor_ascii(handle, dev.iManufacturer, data, 1000);
if (n == LIBUSB_ERROR_TIMEOUT) {
libusb_close(handle);
err = libusb_open(list[i], &handle);
if (err) break;
n = libusb_get_string_descriptor_ascii(handle, dev.iManufacturer, data, 1000);
}
if (n > 0) {
t = PyBytes_FromStringAndSize((const char*)data, n);
if (t == NULL) return PyErr_NoMemory();
//Py_INCREF(t);
if (PyDict_SetItemString(ans, "manufacturer", t) != 0) return PyErr_NoMemory();
}
}
if (dev.iProduct) {
n = libusb_get_string_descriptor_ascii(handle, dev.iProduct, data, 1000);
if (n == LIBUSB_ERROR_TIMEOUT) {
libusb_close(handle);
err = libusb_open(list[i], &handle);
if (err) break;
n = libusb_get_string_descriptor_ascii(handle, dev.iManufacturer, data, 1000);
}
if (n > 0) {
t = PyBytes_FromStringAndSize((const char*)data, n);
if (t == NULL) return PyErr_NoMemory();
//Py_INCREF(t);
if (PyDict_SetItemString(ans, "product", t) != 0) return PyErr_NoMemory();
}
}
if (dev.iSerialNumber) {
n = libusb_get_string_descriptor_ascii(handle, dev.iSerialNumber, data, 1000);
if (n == LIBUSB_ERROR_TIMEOUT) {
libusb_close(handle);
err = libusb_open(list[i], &handle);
if (err) break;
n = libusb_get_string_descriptor_ascii(handle, dev.iManufacturer, data, 1000);
}
if (n > 0) {
t = PyBytes_FromStringAndSize((const char*)data, n);
if (t == NULL) return PyErr_NoMemory();
//Py_INCREF(t);
if (PyDict_SetItemString(ans, "serial", t) != 0) return PyErr_NoMemory();
}
}
libusb_close(handle);
}
break;
}
}
libusb_free_device_list(list, 1);
if (err != 0) {
switch (err) {
case LIBUSB_ERROR_NO_MEM:
return PyErr_NoMemory();
case LIBUSB_ERROR_ACCESS:
PyErr_SetString(PyExc_ValueError, "Dont have permission to access this device");
return NULL;
case LIBUSB_ERROR_NO_DEVICE:
PyErr_SetString(PyExc_ValueError, "Device disconnected");
return NULL;
default:
PyErr_SetString(PyExc_ValueError, "Failed to open device");
return NULL;
}
}
return ans;
}
static
PyMethodDef libusb_methods[] = {
{"scan", py_libusb_scan, METH_VARARGS,
"scan()\n\n"
"Return USB devices currently connected to system as a tuple of "
"3-tuples. Each 3-tuple has (idVendor, idProduct, bcdDevice)."
},
{"info", py_libusb_info, METH_VARARGS,
"info(idVendor, idProduct, bcdDevice)\n\n"
"Return extra information about the specified device. "
},
{NULL, NULL, 0, NULL}
};
PyMODINIT_FUNC
initlibusb(void) {
PyObject *m;
m = Py_InitModule3(
"libusb", libusb_methods,
"Interface with USB devices on system."
);
if (m == NULL) return;
if (libusb_init(&ctxt) != 0) ctxt = NULL;
Py_AtExit(cleanup);
}

View File

@ -1,27 +0,0 @@
#!/usr/bin/env python
# vim:fileencoding=UTF-8:ts=4:sw=4:sta:et:sts=4:ai
from __future__ import with_statement
__license__ = 'GPL v3'
__copyright__ = '2009, Kovid Goyal <kovid@kovidgoyal.net>'
__docformat__ = 'restructuredtext en'
from calibre.constants import plugins
libusb, libusb_err = plugins.get('libusb', (None, 'Wrong platform'))
def scan():
if libusb_err:
raise RuntimeError('Failed to load libusb1: '+libusb_err)
return set([x for x in libusb.scan() if x is not None])
def info(vendor, product, bcd):
if libusb_err:
raise RuntimeError('Failed to load libusb1: '+libusb_err)
a = libusb.info(vendor, product, bcd)
ans = {}
for k, v in a.items():
ans[k] = v.decode('ascii', 'replace')
return ans

View File

@ -5,10 +5,9 @@ Device scanner that fetches list of devices on system ina platform dependent
manner.
'''
import sys
import sys, os
from calibre import iswindows, isosx, plugins, islinux
from calibre.devices import libusb1
osx_scanner = win_scanner = linux_scanner = None
@ -23,16 +22,59 @@ elif isosx:
except:
raise RuntimeError('Failed to load the usbobserver plugin: %s'%plugins['usbobserver'][1])
class LinuxScanner(object):
linux_scanner = libusb1.scan
SYSFS_PATH = os.environ.get('SYSFS_PATH', '/sys')
def __init__(self):
self.base = os.path.join(self.SYSFS_PATH, 'subsystem', 'usb', 'devices')
if not os.path.exists(self.base):
self.base = os.path.join(self.SYSFS_PATH, 'bus', 'usb', 'devices')
self.ok = os.path.exists(self.base)
def __call__(self):
ans = set([])
for x in os.listdir(self.base):
base = os.path.join(self.base, x)
ven = os.path.join(base, 'idVendor')
prod = os.path.join(base, 'idProduct')
bcd = os.path.join(base, 'bcdDevice')
man = os.path.join(base, 'manufacturer')
prod_string = os.path.join(base, 'product')
dev = []
try:
dev.append(int('0x'+open(ven).read().strip(), 16))
except:
continue
try:
dev.append(int('0x'+open(prod).read().strip(), 16))
except:
continue
try:
dev.append(int('0x'+open(bcd).read().strip(), 16))
except:
continue
try:
dev.append(open(man).read().strip())
except:
dev.append('')
try:
dev.append(open(prod_string).read().strip())
except:
dev.append('')
ans.add(tuple(dev))
return ans
linux_scanner = LinuxScanner()
class DeviceScanner(object):
def __init__(self, *args):
if isosx and osx_scanner is None:
raise RuntimeError('The Python extension usbobserver must be available on OS X.')
if islinux and libusb1.libusb_err:
raise RuntimeError('DeviceScanner requires libusb1 to work.')
if islinux and not linux_scanner.ok:
raise RuntimeError('DeviceScanner requires the /sys filesystem to work.')
self.scanner = win_scanner if iswindows else osx_scanner if isosx else linux_scanner
self.devices = []