MTP unix: Add a debug connected devices method.

This commit is contained in:
Kovid Goyal 2012-08-26 22:22:49 +05:30
parent 6b17b8a42b
commit a43ca92e02

View File

@ -7,10 +7,10 @@ __license__ = 'GPL v3'
__copyright__ = '2012, Kovid Goyal <kovid at kovidgoyal.net>' __copyright__ = '2012, Kovid Goyal <kovid at kovidgoyal.net>'
__docformat__ = 'restructuredtext en' __docformat__ = 'restructuredtext en'
import operator import operator, traceback, pprint, sys
from threading import RLock from threading import RLock
from io import BytesIO
from collections import namedtuple from collections import namedtuple
from functools import partial
from calibre import prints, as_unicode from calibre import prints, as_unicode
from calibre.constants import plugins from calibre.constants import plugins
@ -57,8 +57,6 @@ class MTP_DEVICE(MTPDeviceBase):
@synchronous @synchronous
def detect_managed_devices(self, devices_on_system, force_refresh=False): def detect_managed_devices(self, devices_on_system, force_refresh=False):
if self.libmtp is None: return None if self.libmtp is None: return None
if self.known_devices is None:
self.known_devices = frozenset(self.libmtp.known_devices())
# First remove blacklisted devices. # First remove blacklisted devices.
devs = set() devs = set()
for d in devices_on_system: for d in devices_on_system:
@ -91,6 +89,36 @@ class MTP_DEVICE(MTPDeviceBase):
return None return None
@synchronous
def debug_managed_device_detection(self, devices_on_system, output):
p = partial(prints, file=output)
if self.libmtp is None:
err = plugins['libmtp'][1]
if not err:
err = 'startup() not called on this device driver'
p(err)
return False
devs = [d for d in devices_on_system if (d.vendor_id, d.product_id)
in self.known_devices]
if not devs:
p('No known MTP devices connected to system')
return False
p('Known MTP devices connected:')
for d in devs: p(d)
d = devs[0]
p('\nTrying to open:', d)
try:
self.open(d, 'debug')
except:
p('Opening device failed:')
p(traceback.format_exc())
return False
p('Opened', self.current_friendly_name, 'successfully')
p('Storage info:')
p(pprint.pformat(self.dev.storage_info))
self.eject()
return True
@synchronous @synchronous
def create_device(self, connected_device): def create_device(self, connected_device):
d = connected_device d = connected_device
@ -115,6 +143,8 @@ class MTP_DEVICE(MTPDeviceBase):
if self.libmtp is None: if self.libmtp is None:
print ('Failed to load libmtp, MTP device detection disabled') print ('Failed to load libmtp, MTP device detection disabled')
print (p[1]) print (p[1])
else:
self.known_devices = frozenset(self.libmtp.known_devices())
for x in vars(self.libmtp): for x in vars(self.libmtp):
if x.startswith('LIBMTP'): if x.startswith('LIBMTP'):
@ -291,44 +321,17 @@ class MTP_DEVICE(MTPDeviceBase):
parent.remove_child(obj) parent.remove_child(obj)
if __name__ == '__main__': if __name__ == '__main__':
BytesIO
class PR: class PR:
def report_progress(self, sent, total): def report_progress(self, sent, total):
print (sent, total, end=', ') print (sent, total, end=', ')
from pprint import pprint
dev = MTP_DEVICE(None) dev = MTP_DEVICE(None)
dev.startup() dev.startup()
from calibre.devices.scanner import DeviceScanner from calibre.devices.scanner import DeviceScanner
scanner = DeviceScanner() scanner = DeviceScanner()
scanner.scan() scanner.scan()
devs = scanner.devices devs = scanner.devices
cd = dev.detect_managed_devices(devs) dev.debug_managed_device_detection(devs, sys.stdout)
if cd is None:
raise Exception('No MTP device found')
dev.open(cd, 'xxx')
d = dev.dev
print ("Opened device:", dev.get_gui_name())
print ("Storage info:")
pprint(d.storage_info)
print("Free space:", dev.free_space())
# print (d.create_folder(dev._main_id, 0, 'testf'))
# raw = b'test'
# fname = b'moose.txt'
# src = BytesIO(raw)
# print (d.put_file(dev._main_id, 0, fname, src, len(raw), PR()))
# with open('/tmp/flint.epub', 'wb') as f:
# print(d.get_file(786, f, PR()))
# print()
# with open('/tmp/bleak.epub', 'wb') as f:
# print(d.get_file(601, f, PR()))
# print()
dev.filesystem_cache.dump()
# print (dev.filesystem_cache.entries[0].files[0])
# print (dev.filesystem_cache.entries[0].folders[0])
dev.set_debug_level(dev.LIBMTP_DEBUG_ALL) dev.set_debug_level(dev.LIBMTP_DEBUG_ALL)
del d
dev.shutdown() dev.shutdown()