Add libmtp wrapper methods to get files and metadata by name

This commit is contained in:
Kovid Goyal 2025-01-19 09:47:21 +05:30
parent 51d94af28d
commit 030a05e7f8
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
3 changed files with 212 additions and 55 deletions

View File

@ -693,6 +693,8 @@ class MTP_DEVICE(BASE):
def main():
import io
from pprint import pprint
io
dev = MTP_DEVICE(None)
dev.startup()
try:
@ -706,17 +708,17 @@ def main():
dev.set_progress_reporter(prints)
dev.open(cd, None)
dev.filesystem_cache.dump()
print('Prefix for main mem:', dev.prefix_for_location(None), flush=True)
raw = os.urandom(32 * 1024)
folder = dev.create_folder(dev.filesystem_cache.entries[0], 'developing-mtp-driver')
f = dev.put_file(folder, 'developing-mtp-driver.bin', io.BytesIO(raw), len(raw))
print('Put file:', f, flush=True)
buf = io.BytesIO()
dev.get_file(f.mtp_id_path, buf)
if buf.getvalue() != raw:
raise ValueError('Getting previously put file did not return expected data')
print('Successfully got previously put file', flush=True)
dev.recursive_delete(f)
docs = dev.prefix_for_location(None)
print('Prefix for main mem:', docs, flush=True)
entries = dev.list_mtp_folder_by_name(dev.filesystem_cache.entries[0], docs)
pprint(entries)
pprint(dev.get_mtp_metadata_by_name(dev.filesystem_cache.entries[0], docs, entries[0]['name']))
files = [x for x in entries if not x['is_folder']]
with dev.get_mtp_file_by_name(dev.filesystem_cache.entries[0], docs, files[0]['name']) as f:
print('Got', files[0]['name'], 'of size:', len(f.read()))
except Exception:
import traceback
traceback.print_exc()
finally:
dev.shutdown()

View File

@ -6,7 +6,6 @@ __docformat__ = 'restructuredtext en'
import operator
import pprint
import sys
import time
import traceback
from collections import namedtuple
@ -430,6 +429,48 @@ class MTP_DEVICE(MTPDeviceBase):
stream.name = f.name
return stream
@synchronous
def list_mtp_folder_by_name(self, parent, *names: str):
if not parent.is_folder:
raise ValueError(f'{parent.full_path} is not a folder')
parent_id = self.libmtp.LIBMTP_FILES_AND_FOLDERS_ROOT if parent.is_storage else parent.object_id
x = self.dev.list_folder_by_name(parent.storage_id, parent_id, names)
if x is None:
raise DeviceError(f'Could not find folder named: {"/".join(names)} in {parent.full_path}')
return x
@synchronous
def get_mtp_metadata_by_name(self, parent, *names: str):
if not parent.is_folder:
raise ValueError(f'{parent.full_path} is not a folder')
parent_id = self.libmtp.LIBMTP_FILES_AND_FOLDERS_ROOT if parent.is_storage else parent.object_id
x = self.dev.get_metadata_by_name(parent.storage_id, parent_id, names)
if x is None:
raise DeviceError(f'Could not find file named: {"/".join(names)} in {parent.full_path}')
m, errs = x
if not m:
raise DeviceError(f'Failed to get metadata for: {"/".join(names)} in {parent.full_path} with errors: {self.format_errorstack(errs)}')
return m
@synchronous
def get_mtp_file_by_name(self, parent, *names: str, stream=None, callback=None):
if not parent.is_folder:
raise ValueError(f'{parent.full_path} is not a folder')
set_name = stream is None
if stream is None:
stream = SpooledTemporaryFile(5*1024*1024, '_wpd_receive_file.dat')
parent_id = self.libmtp.LIBMTP_FILES_AND_FOLDERS_ROOT if parent.is_storage else parent.object_id
x = self.dev.get_file_by_name(parent.storage_id, parent_id, names, stream, callback)
if x is None:
raise DeviceError(f'Could not find file named: {"/".join(names)} in {parent.full_path}')
ok, errs = x
if not ok:
raise DeviceError(f'Failed to get file: {"/".join(names)} in {parent.full_path} with errors: {self.format_errorstack(errs)}')
stream.seek(0)
if set_name:
stream.name = '/'.join(names)
return stream
@synchronous
def delete_file_or_folder(self, obj):
if obj.deleted:
@ -450,32 +491,3 @@ class MTP_DEVICE(MTPDeviceBase):
(obj.full_path, self.format_errorstack(errs)))
parent.remove_child(obj)
return parent
def develop():
from calibre.devices.scanner import DeviceScanner
scanner = DeviceScanner()
scanner.scan()
dev = MTP_DEVICE(None)
dev.startup()
try:
cd = dev.detect_managed_devices(scanner.devices)
if cd is None:
raise RuntimeError('No MTP device found')
dev.open(cd, 'develop')
pprint.pprint(dev.dev.storage_info)
dev.filesystem_cache
finally:
dev.shutdown()
if __name__ == '__main__':
dev = MTP_DEVICE(None)
dev.startup()
from calibre.devices.scanner import DeviceScanner
scanner = DeviceScanner()
scanner.scan()
devs = scanner.devices
dev.debug_managed_device_detection(devs, sys.stdout)
dev.set_debug_level(dev.LIBMTP_DEBUG_ALL)
dev.shutdown()

View File

@ -13,6 +13,7 @@
#include <sys/stat.h>
#include <fcntl.h>
#include <stdlib.h>
#include <stdbool.h>
#include <libmtp.h>
#include "devices.h"
@ -418,7 +419,7 @@ Device_get_filesystem(Device *self, PyObject *args) {
if (errs == NULL || ans == NULL) { PyErr_NoMemory(); return NULL; }
LIBMTP_Clear_Errorstack(self->device);
ok = recursive_get_files(self->device, (uint32_t)storage_id, 0xFFFFFFFF, ans, errs, callback, 0);
ok = recursive_get_files(self->device, (uint32_t)storage_id, LIBMTP_FILES_AND_FOLDERS_ROOT, ans, errs, callback, 0);
dump_errorstack(self->device, errs);
if (!ok) {
Py_DECREF(ans);
@ -431,21 +432,13 @@ Device_get_filesystem(Device *self, PyObject *args) {
} // }}}
// Device.get_file {{{
static PyObject *
Device_get_file(Device *self, PyObject *args) {
PyObject *stream, *callback = NULL, *errs;
ProgressCallback cb;
unsigned long fileid;
static PyObject*
get_file_impl(Device *self, PyObject *stream, PyObject *callback, unsigned long fileid) {
int ret;
ENSURE_DEV(NULL); ENSURE_STORAGE(NULL);
if (!PyArg_ParseTuple(args, "kO|O", &fileid, &stream, &callback)) return NULL;
errs = PyList_New(0);
PyObject *errs = PyList_New(0);
if (errs == NULL) { PyErr_NoMemory(); return NULL; }
if (callback == NULL || !PyCallable_Check(callback)) callback = NULL;
ProgressCallback cb = {0};
cb.obj = callback; cb.extra = stream;
Py_XINCREF(callback); Py_INCREF(stream);
cb.state = PyEval_SaveThread();
@ -456,9 +449,146 @@ Device_get_file(Device *self, PyObject *args) {
if (ret != 0) {
dump_errorstack(self->device, errs);
}
Py_XDECREF(PyObject_CallMethod(stream, "flush", NULL));
PyObject *pret = PyObject_CallMethod(stream, "flush", NULL);
if (pret == NULL) PyErr_Clear();
else Py_DECREF(pret);
return Py_BuildValue("ON", (ret == 0) ? Py_True : Py_False, errs);
}
static PyObject *
Device_get_file(Device *self, PyObject *args) {
PyObject *stream, *callback = NULL;
unsigned long fileid;
ENSURE_DEV(NULL); ENSURE_STORAGE(NULL);
if (!PyArg_ParseTuple(args, "kO|O", &fileid, &stream, &callback)) return NULL;
if (callback == NULL || !PyCallable_Check(callback)) callback = NULL;
return get_file_impl(self, stream, callback, fileid);
}
static bool
find_in_parent(Device *self, unsigned long storage_id, unsigned long parent_id, PyObject *name, unsigned long *fileid) {
LIBMTP_file_t *f, *files;
*fileid = 0;
bool found = false;
Py_BEGIN_ALLOW_THREADS;
files = LIBMTP_Get_Files_And_Folders(self->device, storage_id, parent_id);
Py_END_ALLOW_THREADS;
for (f = files; f != NULL; f = f->next) {
if (!f->filename) continue;
PyObject *k = PyUnicode_FromString(f->filename);
if (!k) { PyErr_Clear(); continue; }
PyObject *l = PyObject_CallMethod(k, "lower", NULL);
Py_DECREF(k); if (!l) break;
bool matches = PyUnicode_Compare(l, name) == 0;
Py_DECREF(l);
if (matches) {
*fileid = f->item_id;
found = true;
break;
}
}
for (f = files; f != NULL; f = f->next) LIBMTP_destroy_file_t(f);
return found;
}
static PyObject *
Device_get_file_by_name(Device *self, PyObject *args) {
PyObject *stream, *callback = NULL, *names;
unsigned long fileid = 0, storageid, parentid;
ENSURE_DEV(NULL); ENSURE_STORAGE(NULL);
if (!PyArg_ParseTuple(args, "kkO!O|O", &storageid, &parentid, &PyTuple_Type, &names, &stream, &callback)) return NULL;
if (callback == NULL || !PyCallable_Check(callback)) callback = NULL;
if (!PyTuple_GET_SIZE(names)) Py_RETURN_NONE;
for (Py_ssize_t i = 0; i < PyTuple_GET_SIZE(names); i++) {
PyObject *k = PyTuple_GET_ITEM(names, i);
if (!PyUnicode_Check(k)) { PyErr_SetString(PyExc_TypeError, "names must contain only unicode strings"); return NULL; }
PyObject *l = PyObject_CallMethod(k, "lower", NULL);
if (!l) return NULL;
bool found = find_in_parent(self, storageid, parentid, l, &fileid);
Py_DECREF(l);
if (!found) {
if (PyErr_Occurred()) return NULL;
Py_RETURN_NONE;
}
parentid = fileid;
}
return get_file_impl(self, stream, callback, fileid);
}
// }}}
// Device.get_metadata_by_name {{{
static PyObject *
Device_get_metadata_by_name(Device *self, PyObject *args) {
unsigned long parent_id, storage_id, folder_id = 0; PyObject *names;
bool found = false;
ENSURE_DEV(NULL); ENSURE_STORAGE(NULL);
if (!PyArg_ParseTuple(args, "kkO!", &storage_id, &parent_id, &PyTuple_Type, &names)) return NULL;
for (Py_ssize_t i = 0; i < PyTuple_GET_SIZE(names); i++) {
PyObject *k = PyTuple_GET_ITEM(names, i);
if (!PyUnicode_Check(k)) { PyErr_SetString(PyExc_TypeError, "names must contain only unicode strings"); return NULL; }
PyObject *l = PyObject_CallMethod(k, "lower", NULL);
if (!l) return NULL;
found = find_in_parent(self, storage_id, parent_id, l, &folder_id);
Py_DECREF(l);
if (!found) {
if (PyErr_Occurred()) return NULL;
Py_RETURN_NONE;
}
parent_id = folder_id;
}
if (!found) Py_RETURN_NONE;
PyObject *errs = PyList_New(0);
if (!errs) return NULL;
PyObject *ans = file_metadata(self->device, errs, folder_id, storage_id);
if (ans == NULL) { ans = Py_None; Py_INCREF(ans); }
return Py_BuildValue("NN", ans, errs);
} // }}}
// Device.list_folder_by_name {{{
static PyObject *
Device_list_folder_by_name(Device *self, PyObject *args) {
unsigned long parent_id, storage_id, folder_id = 0; PyObject *names;
bool found = false;
ENSURE_DEV(NULL); ENSURE_STORAGE(NULL);
if (!PyArg_ParseTuple(args, "kkO!", &storage_id, &parent_id, &PyTuple_Type, &names)) return NULL;
for (Py_ssize_t i = 0; i < PyTuple_GET_SIZE(names); i++) {
PyObject *k = PyTuple_GET_ITEM(names, i);
if (!PyUnicode_Check(k)) { PyErr_SetString(PyExc_TypeError, "names must contain only unicode strings"); return NULL; }
PyObject *l = PyObject_CallMethod(k, "lower", NULL);
if (!l) return NULL;
found = find_in_parent(self, storage_id, parent_id, l, &folder_id);
Py_DECREF(l);
if (!found) {
if (PyErr_Occurred()) return NULL;
Py_RETURN_NONE;
}
parent_id = folder_id;
}
if (!found) Py_RETURN_NONE;
LIBMTP_file_t *f, *files;
PyObject *entry;
Py_BEGIN_ALLOW_THREADS;
files = LIBMTP_Get_Files_And_Folders(self->device, storage_id, folder_id);
Py_END_ALLOW_THREADS;
if (files == NULL) Py_RETURN_NONE;
PyObject *ans = PyList_New(0);
if (!ans) return NULL;
for (f = files; f != NULL; f = f->next) {
entry = build_file_metadata(f, storage_id);
if (entry == NULL) { Py_CLEAR(ans); break; }
bool appended = PyList_Append(ans, entry) == 0;
Py_DECREF(entry);
if (!appended) { Py_CLEAR(ans); break; }
}
for (f = files; f != NULL; f = f->next) LIBMTP_destroy_file_t(f);
return ans;
} // }}}
// Device.put_file {{{
@ -554,6 +684,18 @@ static PyMethodDef Device_methods[] = {
"get_file(fileid, stream, callback=None) -> Get the file specified by fileid from the device. stream must be a file-like object. The file will be written to it. callback works the same as in get_filelist(). Returns ok, errs, where errs is a list of errors (if any)."
},
{"get_file_by_name", (PyCFunction)Device_get_file_by_name, METH_VARARGS,
"get_file_by_name(storage_id, parent_id, names, stream, callback=None) -> Get the file specified by names (a tuple of name components) relative to parent_id from the device. stream must be a file-like object. The file will be written to it. callback works the same as in get_filelist(). Returns None or (ok, errs), where errs is a list of errors (if any)."
},
{"list_folder_by_name", (PyCFunction)Device_list_folder_by_name, METH_VARARGS,
"list_folder_by_name(storage_id, parent_id, names) -> List the folder specified by names (a tuple of name components) relative to parent_id from the device. Return None or a list of entries."
},
{"get_metadata_by_name", (PyCFunction)Device_get_metadata_by_name, METH_VARARGS,
"get_metadata_by_name(storage_id, parent_id, names) -> Return metadata for specified name (a tuple of name components) relative to parent from the device. Return (metadata, errs)."
},
{"put_file", (PyCFunction)Device_put_file, METH_VARARGS,
"put_file(storage_id, parent_id, filename, stream, size, callback=None) -> Put a file on the device. The file is read from stream. It is put inside the folder identified by parent_id on the storage identified by storage_id. Use parent_id=0 to put it in the root. stream must be a file-like object. size is the size in bytes of the data in stream. callback works the same as in get_filelist(). Returns fileinfo, errs, where errs is a list of errors (if any), and fileinfo is a file information dictionary, as returned by get_filelist(). fileinfo will be None if case or errors."
},
@ -749,6 +891,7 @@ exec_module(PyObject *m) {
PyModule_AddIntMacro(m, LIBMTP_DEBUG_USB);
PyModule_AddIntMacro(m, LIBMTP_DEBUG_DATA);
PyModule_AddIntMacro(m, LIBMTP_DEBUG_ALL);
PyModule_AddIntMacro(m, LIBMTP_FILES_AND_FOLDERS_ROOT);
return 0;
}