MTP: Start writing unit tests for device interaction and fix filesystem listing on unix

This commit is contained in:
Kovid Goyal 2012-08-23 17:13:29 +05:30
parent 3a45bca085
commit 755d262b67
5 changed files with 228 additions and 190 deletions

View File

@ -17,7 +17,7 @@ from calibre.utils.icu import sort_key, lower
class FileOrFolder(object): class FileOrFolder(object):
def __init__(self, entry, fs_cache, all_storage_ids): def __init__(self, entry, fs_cache, all_storage_ids=()):
self.object_id = entry['id'] self.object_id = entry['id']
self.is_folder = entry['is_folder'] self.is_folder = entry['is_folder']
self.name = force_unicode(entry.get('name', '___'), 'utf-8') self.name = force_unicode(entry.get('name', '___'), 'utf-8')
@ -28,7 +28,7 @@ class FileOrFolder(object):
self.parent_id = entry.get('parent_id', None) self.parent_id = entry.get('parent_id', None)
if self.parent_id == 0: if self.parent_id == 0:
sid = self.storage_id sid = self.storage_id
if sid not in all_storage_ids: if all_storage_ids and sid not in all_storage_ids:
sid = all_storage_ids[0] sid = all_storage_ids[0]
self.parent_id = sid self.parent_id = sid
if self.parent_id is None and self.storage_id is None: if self.parent_id is None and self.storage_id is None:
@ -68,11 +68,19 @@ class FileOrFolder(object):
yield e yield e
def add_child(self, entry): def add_child(self, entry):
ans = FileOrFolder(entry, self.id_map) ans = FileOrFolder(entry, self.fs_cache())
t = self.folders if ans.is_folder else self.files t = self.folders if ans.is_folder else self.files
t.append(ans) t.append(ans)
return ans return ans
def remove_child(self, entry):
for x in (self.files, self.folders):
try:
x.remove(entry)
except ValueError:
pass
self.id_map.pop(entry.object_id, None)
def dump(self, prefix='', out=sys.stdout): def dump(self, prefix='', out=sys.stdout):
c = '+' if self.is_folder else '-' c = '+' if self.is_folder else '-'
data = ('%s children'%(sum(map(len, (self.files, self.folders)))) data = ('%s children'%(sum(map(len, (self.files, self.folders))))

View File

@ -9,14 +9,80 @@ __docformat__ = 'restructuredtext en'
import unittest import unittest
from calibre.utils.icu import lower
from calibre.devices.mtp.driver import MTP_DEVICE from calibre.devices.mtp.driver import MTP_DEVICE
from calibre.devices.scanner import DeviceScanner
class Test(unittest.TestCase): class TestDeviceInteraction(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.dev = MTP_DEVICE(None)
cls.dev.startup()
cls.scanner = DeviceScanner()
cls.scanner.scan()
cd = cls.dev.detect_managed_devices(cls.scanner.devices)
if cd is None:
raise ValueError('No MTP device found')
cls.dev.open(cd, 'test_library')
if cls.dev.free_space()[0] < 10*(1024**2):
raise ValueError('The connected device %s does not have enough free'
' space in its main memory to do the tests'%cd)
cls.dev.filesystem_cache
cls.storage = cls.dev.filesystem_cache.entries[0]
@classmethod
def tearDownClass(cls):
cls.dev.shutdown()
cls.dev = None
def setUp(self): def setUp(self):
self.dev = MTP_DEVICE(None) self.cleanup = []
def tearDown(self): def tearDown(self):
pass for obj in reversed(self.cleanup):
self.dev.delete_file_or_folder(obj)
def test_folder_operations(self):
''' Test the creation of folders, duplicate folders and sub folders '''
# Create a folder
name = 'zzz-test-folder'
folder = self.dev.create_folder(self.storage, name)
self.cleanup.append(folder)
self.assertTrue(folder.is_folder)
self.assertEqual(folder.parent_id, self.storage.object_id)
self.assertEqual(folder.storage_id, self.storage.object_id)
self.assertEqual(lower(name), lower(folder.name))
# Create a sub-folder
name = 'sub-folder'
subfolder = self.dev.create_folder(folder, name)
self.assertTrue(subfolder.is_folder)
self.assertEqual(subfolder.parent_id, folder.object_id)
self.assertEqual(subfolder.storage_id, self.storage.object_id)
self.assertEqual(lower(name), lower(subfolder.name))
self.cleanup.append(subfolder)
# Check that creating an existing folder returns that folder (case
# insensitively)
self.assertIs(subfolder, self.dev.create_folder(folder,
'SUB-FOLDER'),
msg='Creating an existing folder did not return the existing folder')
# Check that creating folders as children of files is not allowed
root_file = [f for f in self.dev.filesystem_cache.entries[0].files if
not f.is_folder]
if root_file:
with self.assertRaises(ValueError):
self.dev.create_folder(root_file[0], 'sub-folder')
def tests():
return unittest.TestLoader().loadTestsFromTestCase(TestDeviceInteraction)
def run():
unittest.TextTestRunner(verbosity=2).run(tests())
if __name__ == '__main__':
run()

View File

@ -12,6 +12,7 @@ from threading import RLock
from io import BytesIO from io import BytesIO
from collections import namedtuple from collections import namedtuple
from calibre import prints
from calibre.constants import plugins from calibre.constants import plugins
from calibre.devices.errors import OpenFailed, DeviceError from calibre.devices.errors import OpenFailed, DeviceError
from calibre.devices.mtp.base import MTPDeviceBase, synchronous from calibre.devices.mtp.base import MTPDeviceBase, synchronous
@ -157,34 +158,31 @@ class MTP_DEVICE(MTPDeviceBase):
def filesystem_cache(self): def filesystem_cache(self):
if self._filesystem_cache is None: if self._filesystem_cache is None:
with self.lock: with self.lock:
files, errs = self.dev.get_filelist(self) storage, all_items, all_errs = [], [], []
if errs and not files:
raise DeviceError('Failed to read files from device. Underlying errors:\n'
+self.format_errorstack(errs))
folders, errs = self.dev.get_folderlist()
if errs and not folders:
raise DeviceError('Failed to read folders from device. Underlying errors:\n'
+self.format_errorstack(errs))
storage = []
for sid, capacity in zip([self._main_id, self._carda_id, for sid, capacity in zip([self._main_id, self._carda_id,
self._cardb_id], self.total_space()): self._cardb_id], self.total_space()):
if sid is not None: if sid is None: continue
name = _('Unknown') name = _('Unknown')
for x in self.dev.storage_info: for x in self.dev.storage_info:
if x['id'] == sid: if x['id'] == sid:
name = x['name'] name = x['name']
break break
storage.append({'id':sid, 'size':capacity, storage.append({'id':sid, 'size':capacity,
'is_folder':True, 'name':name}) 'is_folder':True, 'name':name, 'can_delete':False,
all_folders = [] 'is_system':True})
def recurse(f): items, errs = self.dev.get_filesystem(sid)
all_folders.append(f) all_items.extend(items), all_errs.extend(errs)
for c in f['children']: if not all_items and all_errs:
recurse(c) raise DeviceError(
'Failed to read filesystem from %s with errors: %s'
for f in folders: recurse(f) %(self.current_friendly_name,
self._filesystem_cache = FilesystemCache(storage, self.format_errorstack(all_errs)))
all_folders+files) if all_errs:
prints('There were some errors while getting the '
' filesystem from %s: %s'%(
self.current_friendly_name,
self.format_errorstack(all_errs)))
self._filesystem_cache = FilesystemCache(storage, all_items)
return self._filesystem_cache return self._filesystem_cache
@synchronous @synchronous
@ -223,16 +221,42 @@ class MTP_DEVICE(MTPDeviceBase):
return tuple(ans) return tuple(ans)
@synchronous @synchronous
def create_folder(self, parent_id, name): def create_folder(self, parent, name):
parent = self.filesystem_cache.id_map[parent_id]
if not parent.is_folder: if not parent.is_folder:
raise ValueError('%s is not a folder'%parent.full_path) raise ValueError('%s is not a folder'%(parent.full_path,))
e = parent.folder_named(name) e = parent.folder_named(name)
if e is not None: if e is not None:
return e return e
ans = self.dev.create_folder(parent.storage_id, parent_id, name) ename = name.encode('utf-8') if isinstance(name, unicode) else name
sid, pid = parent.storage_id, parent.object_id
if pid == sid:
pid = 0
ans, errs = self.dev.create_folder(sid, pid, ename)
if ans is None:
raise DeviceError(
'Failed to create folder named %s in %s with error: %s'%
(name, parent.full_path, self.format_errorstack(errs)))
ans['storage_id'] = sid
return parent.add_child(ans) return parent.add_child(ans)
@synchronous
def delete_file_or_folder(self, obj):
if not obj.can_delete:
raise ValueError('Cannot delete %s as deletion not allowed'%
(obj.full_path,))
if obj.is_system:
raise ValueError('Cannot delete %s as it is a system object'%
(obj.full_path,))
if obj.files or obj.folders:
raise ValueError('Cannot delete %s as it is not empty'%
(obj.full_path,))
parent = obj.parent
ok, errs = self.dev.delete_object(obj.object_id)
if not ok:
raise DeviceError('Failed to delete %s with error: '%
(obj.full_path, self.format_errorstack(errs)))
parent.remove_child(obj)
if __name__ == '__main__': if __name__ == '__main__':
BytesIO BytesIO
class PR: class PR:

View File

@ -118,6 +118,34 @@ static uint16_t data_from_python(void *params, void *priv, uint32_t wantlen, uns
return ret; return ret;
} }
static PyObject* build_file_metadata(LIBMTP_file_t *nf, uint32_t storage_id) {
char *filename = nf->filename;
if (filename == NULL) filename = "";
return Py_BuildValue("{s:k,s:k,s:k,s:s,s:K,s:O}",
"id", nf->item_id,
"parent_id", nf->parent_id,
"storage_id", storage_id,
"name", filename,
"size", nf->filesize,
"is_folder", (nf->filetype == LIBMTP_FILETYPE_FOLDER) ? Py_True : Py_False
);
}
static PyObject* file_metadata(LIBMTP_mtpdevice_t *device, PyObject *errs, uint32_t item_id, uint32_t storage_id) {
LIBMTP_file_t *nf;
PyObject *ans = NULL;
Py_BEGIN_ALLOW_THREADS;
nf = LIBMTP_Get_Filemetadata(device, item_id);
Py_END_ALLOW_THREADS;
if (nf == NULL) dump_errorstack(device, errs);
else {
ans = build_file_metadata(nf, storage_id);
LIBMTP_destroy_file_t(nf);
}
return ans;
}
// }}} // }}}
// Device object definition {{{ // Device object definition {{{
@ -188,9 +216,7 @@ libmtp_Device_init(libmtp_Device *self, PyObject *args, PyObject *kwds)
} }
} }
// Note that contrary to what the libmtp docs imply, we cannot use dev = LIBMTP_Open_Raw_Device_Uncached(&rawdev);
// LIBMTP_Open_Raw_Device_Uncached as using it causes file listing to fail
dev = LIBMTP_Open_Raw_Device(&rawdev);
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
if (dev == NULL) { if (dev == NULL) {
@ -328,117 +354,65 @@ libmtp_Device_storage_info(libmtp_Device *self, void *closure) {
return ans; return ans;
} // }}} } // }}}
// Device.get_filelist {{{ // Device.get_filesystem {{{
static PyObject *
libmtp_Device_get_filelist(libmtp_Device *self, PyObject *args, PyObject *kwargs) {
PyObject *ans, *fo, *callback = NULL, *errs;
ProgressCallback cb;
LIBMTP_file_t *f, *tf;
ENSURE_DEV(NULL); ENSURE_STORAGE(NULL); static int recursive_get_files(LIBMTP_mtpdevice_t *dev, uint32_t storage_id, uint32_t parent_id, PyObject *ans, PyObject *errs) {
LIBMTP_file_t *f, *files;
PyObject *entry;
int ok = 1;
Py_BEGIN_ALLOW_THREADS;
files = LIBMTP_Get_Files_And_Folders(dev, storage_id, parent_id);
Py_END_ALLOW_THREADS;
if (!PyArg_ParseTuple(args, "|O", &callback)) return NULL; if (files == NULL) return ok;
if (callback == NULL || !PyCallable_Check(callback)) callback = NULL;
cb.obj = callback;
ans = PyList_New(0); for (f = files; ok && f != NULL; f = f->next) {
errs = PyList_New(0); entry = build_file_metadata(f, storage_id);
if (ans == NULL || errs == NULL) { PyErr_NoMemory(); return NULL; } if (entry == NULL) { ok = 0; }
else {
PyList_Append(ans, entry);
Py_DECREF(entry);
}
Py_XINCREF(callback); if (ok && f->filetype == LIBMTP_FILETYPE_FOLDER) {
cb.state = PyEval_SaveThread(); if (!recursive_get_files(dev, storage_id, f->item_id, ans, errs)) {
tf = LIBMTP_Get_Filelisting_With_Callback(self->device, report_progress, &cb); ok = 0;
PyEval_RestoreThread(cb.state); }
Py_XDECREF(callback); }
if (tf == NULL) {
dump_errorstack(self->device, errs);
return Py_BuildValue("NN", ans, errs);
}
for (f=tf; f != NULL; f=f->next) {
fo = Py_BuildValue("{s:k,s:k,s:k,s:s,s:K,s:k,s:O}",
"id", f->item_id,
"parent_id", f->parent_id,
"storage_id", f->storage_id,
"name", f->filename,
"size", f->filesize,
"modtime", f->modificationdate,
"is_folder", Py_False
);
if (fo == NULL || PyList_Append(ans, fo) != 0) break;
Py_DECREF(fo);
} }
// Release memory // Release memory
f = tf; f = files;
while (f != NULL) { while (f != NULL) {
tf = f; f = f->next; LIBMTP_destroy_file_t(tf); files = f; f = f->next; LIBMTP_destroy_file_t(files);
} }
if (callback != NULL) { return ok;
// Bug in libmtp where it does not call callback with 100%
fo = PyObject_CallFunction(callback, "KK", PyList_Size(ans), PyList_Size(ans));
Py_XDECREF(fo);
}
return Py_BuildValue("NN", ans, errs);
} // }}}
// Device.get_folderlist {{{
int folderiter(LIBMTP_folder_t *f, PyObject *parent) {
PyObject *folder, *children;
children = PyList_New(0);
if (children == NULL) { PyErr_NoMemory(); return 1;}
folder = Py_BuildValue("{s:k,s:k,s:k,s:s,s:O,s:N}",
"id", f->folder_id,
"parent_id", f->parent_id,
"storage_id", f->storage_id,
"name", f->name,
"is_folder", Py_True,
"children", children);
if (folder == NULL) return 1;
PyList_Append(parent, folder);
Py_DECREF(folder);
if (f->sibling != NULL) {
if (folderiter(f->sibling, parent)) return 1;
}
if (f->child != NULL) {
if (folderiter(f->child, children)) return 1;
}
return 0;
} }
static PyObject * static PyObject *
libmtp_Device_get_folderlist(libmtp_Device *self, PyObject *args, PyObject *kwargs) { libmtp_Device_get_filesystem(libmtp_Device *self, PyObject *args, PyObject *kwargs) {
PyObject *ans, *errs; PyObject *ans, *errs;
LIBMTP_folder_t *f; uint32_t storage_id;
int ok = 0;
ENSURE_DEV(NULL); ENSURE_STORAGE(NULL); ENSURE_DEV(NULL); ENSURE_STORAGE(NULL);
if (!PyArg_ParseTuple(args, "k", &storage_id)) return NULL;
ans = PyList_New(0); ans = PyList_New(0);
errs = PyList_New(0); errs = PyList_New(0);
if (errs == NULL || ans == NULL) { PyErr_NoMemory(); return NULL; } if (errs == NULL || ans == NULL) { PyErr_NoMemory(); return NULL; }
Py_BEGIN_ALLOW_THREADS; LIBMTP_Clear_Errorstack(self->device);
f = LIBMTP_Get_Folder_List(self->device); ok = recursive_get_files(self->device, storage_id, 0, ans, errs);
Py_END_ALLOW_THREADS; dump_errorstack(self->device, errs);
if (!ok) {
if (f == NULL) { Py_DECREF(ans);
dump_errorstack(self->device, errs); Py_DECREF(errs);
return Py_BuildValue("NN", ans, errs); return NULL;
} }
if (folderiter(f, ans)) return NULL;
LIBMTP_destroy_folder_t(f);
return Py_BuildValue("NN", ans, errs); return Py_BuildValue("NN", ans, errs);
} // }}} } // }}}
@ -477,13 +451,13 @@ libmtp_Device_get_file(libmtp_Device *self, PyObject *args, PyObject *kwargs) {
// Device.put_file {{{ // Device.put_file {{{
static PyObject * static PyObject *
libmtp_Device_put_file(libmtp_Device *self, PyObject *args, PyObject *kwargs) { libmtp_Device_put_file(libmtp_Device *self, PyObject *args, PyObject *kwargs) {
PyObject *stream, *callback = NULL, *errs, *fo; PyObject *stream, *callback = NULL, *errs, *fo = NULL;
ProgressCallback cb; ProgressCallback cb;
uint32_t parent_id, storage_id; uint32_t parent_id, storage_id;
uint64_t filesize; uint64_t filesize;
int ret; int ret;
char *name; char *name;
LIBMTP_file_t f, *nf; LIBMTP_file_t f;
ENSURE_DEV(NULL); ENSURE_STORAGE(NULL); ENSURE_DEV(NULL); ENSURE_STORAGE(NULL);
@ -500,26 +474,9 @@ libmtp_Device_put_file(libmtp_Device *self, PyObject *args, PyObject *kwargs) {
PyEval_RestoreThread(cb.state); PyEval_RestoreThread(cb.state);
Py_XDECREF(callback); Py_DECREF(stream); Py_XDECREF(callback); Py_DECREF(stream);
fo = Py_None; Py_INCREF(fo);
if (ret != 0) dump_errorstack(self->device, errs); if (ret != 0) dump_errorstack(self->device, errs);
else { else fo = file_metadata(self->device, errs, f.item_id, storage_id);
Py_BEGIN_ALLOW_THREADS; if (fo == NULL) { fo = Py_None; Py_INCREF(fo); }
nf = LIBMTP_Get_Filemetadata(self->device, f.item_id);
Py_END_ALLOW_THREADS;
if (nf == NULL) dump_errorstack(self->device, errs);
else {
Py_DECREF(fo);
fo = Py_BuildValue("{s:k,s:k,s:k,s:s,s:K,s:k}",
"id", nf->item_id,
"parent_id", nf->parent_id,
"storage_id", nf->storage_id,
"name", nf->filename,
"size", nf->filesize,
"modtime", nf->modificationdate
);
LIBMTP_destroy_file_t(nf);
}
}
return Py_BuildValue("NN", fo, errs); return Py_BuildValue("NN", fo, errs);
@ -549,11 +506,10 @@ libmtp_Device_delete_object(libmtp_Device *self, PyObject *args, PyObject *kwarg
// Device.create_folder {{{ // Device.create_folder {{{
static PyObject * static PyObject *
libmtp_Device_create_folder(libmtp_Device *self, PyObject *args, PyObject *kwargs) { libmtp_Device_create_folder(libmtp_Device *self, PyObject *args, PyObject *kwargs) {
PyObject *errs, *fo, *children, *temp; PyObject *errs, *fo = NULL;
uint32_t parent_id, storage_id; uint32_t parent_id, storage_id;
char *name; char *name;
uint32_t folder_id; uint32_t folder_id;
LIBMTP_folder_t *f = NULL, *cf;
ENSURE_DEV(NULL); ENSURE_STORAGE(NULL); ENSURE_DEV(NULL); ENSURE_STORAGE(NULL);
@ -561,39 +517,13 @@ libmtp_Device_create_folder(libmtp_Device *self, PyObject *args, PyObject *kwarg
errs = PyList_New(0); errs = PyList_New(0);
if (errs == NULL) { PyErr_NoMemory(); return NULL; } if (errs == NULL) { PyErr_NoMemory(); return NULL; }
fo = Py_None; Py_INCREF(fo);
Py_BEGIN_ALLOW_THREADS; Py_BEGIN_ALLOW_THREADS;
folder_id = LIBMTP_Create_Folder(self->device, name, parent_id, storage_id); folder_id = LIBMTP_Create_Folder(self->device, name, parent_id, storage_id);
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
if (folder_id == 0) dump_errorstack(self->device, errs); if (folder_id == 0) dump_errorstack(self->device, errs);
else { else fo = file_metadata(self->device, errs, folder_id, storage_id);
Py_BEGIN_ALLOW_THREADS; if (fo == NULL) { fo = Py_None; Py_INCREF(fo); }
// Cannot use Get_Folder_List_For_Storage as it fails
f = LIBMTP_Get_Folder_List(self->device);
Py_END_ALLOW_THREADS;
if (f == NULL) dump_errorstack(self->device, errs);
else {
cf = LIBMTP_Find_Folder(f, folder_id);
if (cf == NULL) {
temp = Py_BuildValue("is", 1, "Newly created folder not present on device!");
if (temp == NULL) { PyErr_NoMemory(); return NULL;}
PyList_Append(errs, temp);
Py_DECREF(temp);
} else {
Py_DECREF(fo);
children = PyList_New(0);
if (children == NULL) { PyErr_NoMemory(); return NULL; }
fo = Py_BuildValue("{s:k,s:k,s:k,s:s,s:N}",
"id", cf->folder_id,
"parent_id", cf->parent_id,
"storage_id", cf->storage_id,
"name", cf->name,
"children", children);
}
LIBMTP_destroy_folder_t(f);
}
}
return Py_BuildValue("NN", fo, errs); return Py_BuildValue("NN", fo, errs);
} // }}} } // }}}
@ -603,12 +533,8 @@ static PyMethodDef libmtp_Device_methods[] = {
"update_storage_info() -> Reread the storage info from the device (total, space, free space, storage locations, etc.)" "update_storage_info() -> Reread the storage info from the device (total, space, free space, storage locations, etc.)"
}, },
{"get_filelist", (PyCFunction)libmtp_Device_get_filelist, METH_VARARGS, {"get_filesystem", (PyCFunction)libmtp_Device_get_filesystem, METH_VARARGS,
"get_filelist(callback=None) -> Get the list of files on the device. callback must be callable accepts arguments (current, total)'. Returns files, errors." "get_filesystem(storage_id) -> Get the list of files and folders on the device in storage_id. Returns files, errors."
},
{"get_folderlist", (PyCFunction)libmtp_Device_get_folderlist, METH_VARARGS,
"get_folderlist() -> Get the list of folders on the device. Returns files, errors."
}, },
{"get_file", (PyCFunction)libmtp_Device_get_file, METH_VARARGS, {"get_file", (PyCFunction)libmtp_Device_get_file, METH_VARARGS,

View File

@ -154,7 +154,7 @@ class MTP_DEVICE(MTPDeviceBase):
name = s['name'] name = s['name']
break break
storage = {'id':storage_id, 'size':capacity, 'name':name, storage = {'id':storage_id, 'size':capacity, 'name':name,
'is_folder':True} 'is_folder':True, 'can_delete':False, 'is_system':True}
id_map = self.dev.get_filesystem(storage_id) id_map = self.dev.get_filesystem(storage_id)
for x in id_map.itervalues(): x['storage_id'] = storage_id for x in id_map.itervalues(): x['storage_id'] = storage_id
all_storage.append(storage) all_storage.append(storage)
@ -247,7 +247,7 @@ class MTP_DEVICE(MTPDeviceBase):
def get_file(self, object_id, stream=None, callback=None): def get_file(self, object_id, stream=None, callback=None):
f = self.filesystem_cache.id_map[object_id] f = self.filesystem_cache.id_map[object_id]
if f.is_folder: if f.is_folder:
raise ValueError('%s is a folder on the device'%f.full_path) raise ValueError('%s is a folder on the device'%(f.full_path,))
if stream is None: if stream is None:
stream = SpooledTemporaryFile(5*1024*1024, '_wpd_receive_file.dat') stream = SpooledTemporaryFile(5*1024*1024, '_wpd_receive_file.dat')
try: try:
@ -262,14 +262,28 @@ class MTP_DEVICE(MTPDeviceBase):
return stream return stream
@same_thread @same_thread
def create_folder(self, parent_id, name): def create_folder(self, parent, name):
parent = self.filesystem_cache.id_map[parent_id]
if not parent.is_folder: if not parent.is_folder:
raise ValueError('%s is not a folder'%parent.full_path) raise ValueError('%s is not a folder'%(parent.full_path,))
e = parent.folder_named(name) e = parent.folder_named(name)
if e is not None: if e is not None:
return e return e
ans = self.dev.create_folder(parent_id, name) ans = self.dev.create_folder(parent.object_id, name)
ans['storage_id'] = parent.storage_id ans['storage_id'] = parent.storage_id
return parent.add_child(ans) return parent.add_child(ans)
@same_thread
def delete_file_or_folder(self, obj):
if not obj.can_delete:
raise ValueError('Cannot delete %s as deletion not allowed'%
(obj.full_path,))
if obj.is_system:
raise ValueError('Cannot delete %s as it is a system object'%
(obj.full_path,))
if obj.files or obj.folders:
raise ValueError('Cannot delete %s as it is not empty'%
(obj.full_path,))
parent = obj.parent
self.dev.delete_object(obj.object_id)
parent.remove_child(obj)