Add functions to usbobserver to query OS X for BSD device names and mount points.

This commit is contained in:
Kovid Goyal 2009-12-26 19:03:00 -07:00
parent fe52e0f0ab
commit 46e7a9094a

View File

@ -24,31 +24,74 @@
#include <stdio.h> #include <stdio.h>
#include <CoreFoundation/CFNumber.h>
#include <IOKit/usb/IOUSBLib.h> #include <IOKit/usb/IOUSBLib.h>
#include <IOKit/IOCFPlugIn.h> #include <IOKit/IOCFPlugIn.h>
#include <IOKit/IOKitLib.h> #include <IOKit/IOKitLib.h>
#include <mach/mach.h> #include <IOKit/storage/IOMedia.h>
#include <IOKit/IOBSD.h>
#include <IOKit/usb/USBSpec.h>
CFStringRef USB_PROPS[3] = { CFSTR("USB Vendor Name"), CFSTR("USB Product Name"), CFSTR("USB Serial Number") }; #include <mach/mach.h>
#include <sys/param.h>
#include <paths.h>
#include <sys/ucred.h>
#include <sys/mount.h>
#ifndef kUSBVendorString
#define kUSBVendorString "USB Vendor Name"
#endif
#ifndef kUSBProductString
#define kUSBProductString "USB Product Name"
#endif
#ifndef kUSBSerialNumberString
#define kUSBSerialNumberString "USB Serial Number"
#endif
#define NUKE(x) Py_XDECREF(x); x = NULL;
static PyObject* static PyObject*
get_iokit_string_property(io_service_t dev, int prop) { usbobserver_get_iokit_string_property(io_service_t dev, CFStringRef prop) {
CFTypeRef PropRef; CFTypeRef PropRef;
char buf[500]; char buf[500];
PropRef = IORegistryEntryCreateCFProperty(dev, USB_PROPS[prop], kCFAllocatorDefault, 0); PropRef = IORegistryEntryCreateCFProperty(dev, prop, kCFAllocatorDefault, 0);
if (PropRef) { if (PropRef) {
if(!CFStringGetCString(PropRef, buf, 500, kCFStringEncodingUTF8)) buf[0] = '\0'; if(!CFStringGetCString(PropRef, buf, 500, kCFStringEncodingUTF8)) buf[0] = '\0';
CFRelease(PropRef);
} else buf[0] = '\0'; } else buf[0] = '\0';
return PyUnicode_DecodeUTF8(buf, strlen(buf), "replace"); return PyUnicode_DecodeUTF8(buf, strlen(buf), "replace");
} }
static PyObject*
usbobserver_get_iokit_number_property(io_service_t dev, CFStringRef prop) {
CFTypeRef PropRef;
long val = 0;
PropRef = IORegistryEntryCreateCFProperty(dev, prop, kCFAllocatorDefault, 0);
if (PropRef) {
CFNumberGetValue((CFNumberRef)PropRef, kCFNumberLongType, &val);
CFRelease(PropRef);
}
return PyLong_FromLong(val);
}
static PyObject * static PyObject *
usbobserver_get_usb_devices(PyObject *self, PyObject *args) { usbobserver_get_usb_devices(PyObject *self, PyObject *args) {
CFMutableDictionaryRef matchingDict; CFMutableDictionaryRef matchingDict;
kern_return_t kr; kern_return_t kr;
PyObject *devices, *device;
io_service_t usbDevice;
PyObject *vendor, *product, *bcd;
PyObject *manufacturer, *productn, *serial;
//Set up matching dictionary for class IOUSBDevice and its subclasses //Set up matching dictionary for class IOUSBDevice and its subclasses
matchingDict = IOServiceMatching(kIOUSBDeviceClassName); matchingDict = IOServiceMatching(kIOUSBDeviceClassName);
@ -58,78 +101,206 @@ usbobserver_get_usb_devices(PyObject *self, PyObject *args) {
} }
io_iterator_t iter; io_iterator_t iter;
IOServiceGetMatchingServices(kIOMasterPortDefault, matchingDict, &iter); kr = IOServiceGetMatchingServices(kIOMasterPortDefault, matchingDict, &iter);
io_service_t usbDevice; if (KERN_SUCCESS != kr) {
IOCFPlugInInterface **plugInInterface = NULL; printf("IOServiceGetMatchingServices returned 0x%08x\n", kr);
SInt32 score; PyErr_SetString(PyExc_RuntimeError, "Could not run IO Matching");
IOUSBDeviceInterface182 **dev = NULL; return NULL;
UInt16 vendor, product, bcd; }
PyObject *manufacturer, *productn, *serial;
PyObject *devices, *device;
devices = PyList_New(0); devices = PyList_New(0);
if (devices == NULL) { if (devices == NULL) {
PyErr_NoMemory(); PyErr_NoMemory();
return NULL; return NULL;
} }
while ((usbDevice = IOIteratorNext(iter))) { while ((usbDevice = IOIteratorNext(iter))) {
plugInInterface = NULL; dev = NULL;
//Create an intermediate plugin
kr = IOCreatePlugInInterfaceForService(usbDevice, kIOUSBDeviceUserClientTypeID, kIOCFPlugInInterfaceID, &plugInInterface, &score);
if ((kIOReturnSuccess != kr) || !plugInInterface) {
printf("Unable to create a plug-in (%08x)\n", kr); continue;
}
//Now create the device interface
HRESULT result = (*plugInInterface)->QueryInterface(plugInInterface, CFUUIDGetUUIDBytes(kIOUSBDeviceInterfaceID), (LPVOID)&dev);
if (result || !dev) { vendor = usbobserver_get_iokit_number_property(usbDevice, CFSTR(kUSBVendorID));
printf("Couldn't create a device interface (%08x)\n", (int) result); product = usbobserver_get_iokit_number_property(usbDevice, CFSTR(kUSBProductID));
continue; bcd = usbobserver_get_iokit_number_property(usbDevice, CFSTR(kUSBDeviceReleaseNumber));
} manufacturer = usbobserver_get_iokit_string_property(usbDevice, CFSTR(kUSBVendorString));
productn = usbobserver_get_iokit_string_property(usbDevice, CFSTR(kUSBProductString));
serial = usbobserver_get_iokit_string_property(usbDevice, CFSTR(kUSBSerialNumberString));
if (usbDevice) IOObjectRelease(usbDevice);
kr = (*dev)->GetDeviceVendor(dev, &vendor); if (vendor != NULL && product != NULL && bcd != NULL) {
kr = (*dev)->GetDeviceProduct(dev, &product);
kr = (*dev)->GetDeviceReleaseNumber(dev, &bcd);
manufacturer = get_iokit_string_property(usbDevice, 0); if (manufacturer == NULL) { manufacturer = Py_None; Py_INCREF(Py_None); }
if (manufacturer == NULL) manufacturer = Py_None; if (productn == NULL) { productn = Py_None; Py_INCREF(Py_None); }
productn = get_iokit_string_property(usbDevice, 1); if (serial == NULL) { serial = Py_None; Py_INCREF(Py_None); }
if (productn == NULL) productn = Py_None;
serial = get_iokit_string_property(usbDevice, 2);
if (serial == NULL) serial = Py_None;
device = Py_BuildValue("(iiiNNN)", vendor, product, bcd, manufacturer, productn, serial); device = Py_BuildValue("(OOOOOO)", vendor, product, bcd, manufacturer, productn, serial);
if (device == NULL) { if (device != NULL) {
IOObjectRelease(usbDevice); PyList_Append(devices, device);
(*plugInInterface)->Release(plugInInterface);
(*dev)->Release(dev);
Py_DECREF(devices);
return NULL;
}
if (PyList_Append(devices, device) == -1) {
IOObjectRelease(usbDevice);
(*plugInInterface)->Release(plugInInterface);
(*dev)->Release(dev);
Py_DECREF(devices);
Py_DECREF(device);
return NULL;
}
IOObjectRelease(usbDevice);
(*plugInInterface)->Release(plugInInterface);
(*dev)->Release(dev);
Py_DECREF(device); Py_DECREF(device);
} }
}
NUKE(vendor); NUKE(product); NUKE(bcd); NUKE(manufacturer);
NUKE(productn); NUKE(serial);
}
if (iter) IOObjectRelease(iter);
return devices; return devices;
} }
static PyObject*
usbobserver_get_bsd_path(io_object_t dev) {
char cpath[ MAXPATHLEN ];
CFTypeRef PropRef;
size_t dev_path_length;
cpath[0] = '\0';
PropRef = IORegistryEntryCreateCFProperty(dev, CFSTR(kIOBSDNameKey), kCFAllocatorDefault, 0);
if (!PropRef) return NULL;
strcpy(cpath, _PATH_DEV);
dev_path_length = strlen(cpath);
if (!CFStringGetCString(PropRef,
cpath + dev_path_length,
MAXPATHLEN - dev_path_length,
kCFStringEncodingUTF8)) return NULL;
return PyUnicode_DecodeUTF8(cpath, strlen(cpath), "replace");
}
static PyObject*
usbobserver_find_prop(io_registry_entry_t e, CFStringRef key, int is_string )
{
char buf[500]; long val = 0;
PyObject *ans;
IOOptionBits bits = kIORegistryIterateRecursively | kIORegistryIterateParents;
CFTypeRef PropRef = IORegistryEntrySearchCFProperty( e, kIOServicePlane, key, NULL, bits );
if (!PropRef) return NULL;
buf[0] = '\0';
if(is_string) {
CFStringGetCString(PropRef, buf, 500, kCFStringEncodingUTF8);
ans = PyUnicode_DecodeUTF8(buf, strlen(buf), "replace");
} else {
CFNumberGetValue((CFNumberRef)PropRef, kCFNumberLongType, &val);
ans = PyLong_FromLong(val);
}
CFRelease(PropRef);
return ans;
}
static PyObject*
usbobserver_get_usb_drives(PyObject *self, PyObject *args) {
CFMutableDictionaryRef matchingDict;
kern_return_t kr = KERN_FAILURE;
io_iterator_t iter;
io_object_t next;
PyObject *ans = NULL, *bsd_path = NULL, *t = NULL, *vid, *pid, *bcd, *manufacturer, *product, *serial;
//Set up matching dictionary for class IOMedia and its subclasses
matchingDict = IOServiceMatching(kIOMediaClass);
if (!matchingDict) {
PyErr_SetString(PyExc_RuntimeError, "Couldn't create a Media matching dictionary");
return NULL;
}
// Only want writable and ejectable leaf nodes
CFDictionarySetValue(matchingDict, CFSTR(kIOMediaWritableKey), kCFBooleanTrue);
CFDictionarySetValue(matchingDict, CFSTR(kIOMediaLeafKey), kCFBooleanTrue);
CFDictionarySetValue(matchingDict, CFSTR(kIOMediaEjectableKey), kCFBooleanTrue);
ans = PyList_New(0);
if (ans == NULL) return PyErr_NoMemory();
kr = IOServiceGetMatchingServices(kIOMasterPortDefault, matchingDict, &iter);
if (KERN_SUCCESS != kr) {
printf("IOServiceGetMatchingServices returned 0x%08x\n", kr);
PyErr_SetString(PyExc_RuntimeError, "Could not run IO Matching");
return NULL;
}
while ((next = IOIteratorNext(iter))) {
bsd_path = usbobserver_get_bsd_path(next);
vid = usbobserver_find_prop(next, CFSTR(kUSBVendorID), 0);
pid = usbobserver_find_prop(next, CFSTR(kUSBProductID), 0);
bcd = usbobserver_find_prop(next, CFSTR(kUSBDeviceReleaseNumber), 0);
manufacturer = usbobserver_find_prop(next, CFSTR(kUSBVendorString), 1);
product = usbobserver_find_prop(next, CFSTR(kUSBProductString), 1);
serial = usbobserver_find_prop(next, CFSTR(kUSBSerialNumberString), 1);
IOObjectRelease(next);
if (bsd_path != NULL && vid != NULL && pid != NULL && bcd != NULL) {
if (manufacturer == NULL) { manufacturer = Py_None; Py_INCREF(Py_None); }
if (product == NULL) { product = Py_None; Py_INCREF(Py_None); }
if (serial == NULL) { serial = Py_None; Py_INCREF(Py_None); }
t = Py_BuildValue("(OOOOOOO)", bsd_path, vid, pid, bcd, manufacturer, product, serial);
if (t != NULL) {
PyList_Append(ans, t);
Py_DECREF(t); t = NULL;
}
}
NUKE(bsd_path); NUKE(vid); NUKE(pid); NUKE(bcd);
NUKE(manufacturer); NUKE(product); NUKE(serial);
}
if (iter) IOObjectRelease(iter);
return ans;
}
static PyObject*
usbobserver_get_mounted_filesystems(PyObject *self, PyObject *args) {
struct statfs *buf, t;
int num, i;
PyObject *ans, *key, *val;
num = getfsstat(NULL, 0, MNT_NOWAIT);
if (num == -1) {
PyErr_SetString(PyExc_RuntimeError, "Initial call to getfsstat failed");
return NULL;
}
ans = PyDict_New();
if (ans == NULL) return PyErr_NoMemory();
buf = (struct statfs*)calloc(num, sizeof(struct statfs));
if (buf == NULL) return PyErr_NoMemory();
num = getfsstat(buf, num*sizeof(struct statfs), MNT_WAIT);
if (num == -1) {
PyErr_SetString(PyExc_RuntimeError, "Call to getfsstat failed");
return NULL;
}
for (i = 0 ; i < num; i++) {
t = buf[i];
key = PyBytes_FromString(t.f_mntfromname);
val = PyBytes_FromString(t.f_mntonname);
if (key != NULL && val != NULL) {
PyDict_SetItem(ans, key, val);
}
NUKE(key); NUKE(val);
}
free(buf);
return ans;
}
static PyMethodDef usbobserver_methods[] = { static PyMethodDef usbobserver_methods[] = {
{"get_usb_devices", usbobserver_get_usb_devices, METH_VARARGS, {"get_usb_devices", usbobserver_get_usb_devices, METH_VARARGS,
"Get list of connected USB devices. Returns a list of tuples. Each tuple is of the form (vendor_id, product_id)." "Get list of connected USB devices. Returns a list of tuples. Each tuple is of the form (vendor_id, product_id, bcd, manufacturer, product, serial number)."
}, },
{"get_usb_drives", usbobserver_get_usb_drives, METH_VARARGS,
"Get list of mounted drives. Returns a list of tuples, each of the form (name, bsd_path)."
},
{"get_mounted_filesystems", usbobserver_get_mounted_filesystems, METH_VARARGS,
"Get mapping of mounted filesystems. Mapping is from BSD name to mount point."
},
{NULL, NULL, 0, NULL} {NULL, NULL, 0, NULL}
}; };