From 324e8df2834cfbac1f3bc7218c35053a0f3a231e Mon Sep 17 00:00:00 2001 From: Kovid Goyal Date: Mon, 6 Nov 2006 01:15:41 +0000 Subject: [PATCH] Working ls -lhR --color implementation --- communicate.py | 389 +++++++++++++++++++++++++--------- data.py | 163 -------------- epydoc.conf | 19 ++ errors.py | 32 +++ prs-500.e3p | 104 +++++++++ prstypes.py | 563 +++++++++++++++++++++++++++++++++++++++++++------ terminfo.py | 192 +++++++++++++++++ 7 files changed, 1140 insertions(+), 322 deletions(-) delete mode 100755 data.py create mode 100644 epydoc.conf create mode 100644 errors.py create mode 100644 prs-500.e3p create mode 100644 terminfo.py diff --git a/communicate.py b/communicate.py index 8944468075..53ebebc981 100755 --- a/communicate.py +++ b/communicate.py @@ -14,11 +14,6 @@ ## with this program; if not, write to the Free Software Foundation, Inc., ## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -import sys, usb -from data import * -from types import * -from exceptions import Exception - ### End point description for PRS-500 procductId=667 ### Endpoint Descriptor: ### bLength 7 @@ -46,25 +41,89 @@ from exceptions import Exception ### Has two configurations 1 is the USB charging config 2 is the self-powered config. ### I think config management is automatic. Endpoints are the same -class PathError(Exception): - def __init__(self, msg): - Exception.__init__(self, msg) +import sys, usb, logging, StringIO, time +from optparse import OptionParser -class ControlError(Exception): - def __init__(self, query=None, response=None, desc=None): - self.query = query - self.response = response - Exception.__init__(self, desc) +from prstypes import * +from errors import * +from terminfo import TerminalController + +#try: +# import psyco +# psyco.full() +#except ImportError: +# print 'Psyco not installed, the program will just run slower' +_term = None + +LOG_PACKETS=False # If True all packets are looged to stdout +MINIMUM_COL_WIDTH = 12 + +class File(object): + def __init__(self, file): + self.is_dir = file[1].is_dir + self.is_readonly = file[1].is_readonly + self.size = file[1].file_size + self.ctime = file[1].ctime + self.wtime = file[1].wtime + path = file[0] + if path.endswith("/"): path = path[:-1] + self.path = path + self.name = path[path.rfind("/")+1:].rstrip() - def __str__(self): - if self.query and self.response: - return "Got unexpected response:\n" + \ - "query:\n"+str(self.query.query)+"\n"+\ - "expected:\n"+str(self.query.response)+"\n" +\ - "actual:\n"+str(self.response) - if self.desc: - return self.desc - return "Unknown control error occurred" + def __repr__(self): + return self.path + + @apply + def mode_string(): + doc=""" The mode string for this file. There are only two modes read-only and read-write """ + def fget(self): + mode, x = "-", "-" + if self.is_dir: mode, x = "d", "x" + if self.is_readonly: mode += "r-"+x+"r-"+x+"r-"+x + else: mode += "rw"+x+"rw"+x+"rw"+x + return mode + return property(**locals()) + + @apply + def name_in_color(): + doc=""" The name in ANSI text. Directories are blue, ebooks are green """ + def fget(self): + cname = self.name + blue, green, normal = "", "", "" + if _term: blue, green, normal = _term.BLUE, _term.GREEN, _term.NORMAL + if self.is_dir: cname = blue + self.name + normal + else: + ext = self.name[self.name.rfind("."):] + if ext in (".pdf", ".rtf", ".lrf", ".lrx", ".txt"): cname = green + self.name + normal + return cname + return property(**locals()) + + @apply + def human_readable_size(): + doc=""" File size in human readable form """ + def fget(self): + if self.size < 1024: divisor, suffix = 1, "" + elif self.size < 1024*1024: divisor, suffix = 1024., "M" + elif self.size < 1024*1024*1024: divisor, suffix = 1024*1024, "G" + size = str(self.size/divisor) + if size.find(".") > -1: size = size[:size.find(".")+2] + return size + suffix + return property(**locals()) + + @apply + def modification_time(): + doc=""" Last modified time in the Linux ls -l format """ + def fget(self): + return time.strftime("%Y-%m-%d %H:%M", time.gmtime(self.wtime)) + return property(**locals()) + + @apply + def creation_time(): + doc=""" Last modified time in the Linux ls -l format """ + def fget(self): + return time.strftime("%Y-%m-%d %H:%M", time.gmtime(self.ctime)) + return property(**locals()) + class DeviceDescriptor: def __init__(self, vendor_id, product_id, interface_id) : @@ -87,7 +146,9 @@ class DeviceDescriptor: return None -class PRS500Device: + + +class PRS500Device(object): SONY_VENDOR_ID = 0x054c PRS500_PRODUCT_ID = 0x029b PRS500_INTERFACE_ID = 0 @@ -100,6 +161,12 @@ class PRS500Device: PRS500Device.PRS500_INTERFACE_ID) self.device = self.device_descriptor.getDevice() self.handle = None + + @classmethod + def _validate_response(cls, res, type=0x00, number=0x00): + if type != res.type or number != res.rnumber: + raise ProtocolError("Inavlid response.\ntype: expected="+hex(type)+" actual="+hex(res.type)+ + "\nrnumber: expected="+hex(number)+" actual="+hex(res.rnumber)) def open(self) : self.device = self.device_descriptor.getDevice() @@ -113,95 +180,225 @@ class PRS500Device: self.handle.setConfiguration(1) self.handle.claimInterface(self.device_descriptor.interface_id) self.handle.reset() - + def close(self): self.handle.releaseInterface() self.handle, self.device = None, None - def send_sony_control_query(self, query, timeout=100): - r = self.handle.controlMsg(0x40, 0x80, query.query) - if r != len(query.query): + def _send_command(self, command, response_type=Response, timeout=100): + """ + Send command to device and return its response. + + command -- an object of type Command or one of its derived classes + response_type -- an object of type 'type'. The return packet from the device is returned as an object of type response_type. + timeout -- the time to wait for a response from the device, in milliseconds + """ + if LOG_PACKETS: print "Command\n%s\n--\n"%command + bytes_sent = self.handle.controlMsg(0x40, 0x80, command) + if bytes_sent != len(command): raise ControlError(desc="Could not send control request to device\n" + str(query.query)) - res = normalize_buffer(self.handle.controlMsg(0xc0, 0x81, len(query.response), timeout=timeout)) - if res != query.response: - raise ControlError(query=query, response=res) + response = response_type(self.handle.controlMsg(0xc0, 0x81, Response.SIZE, timeout=timeout)) + if LOG_PACKETS: print "Response\n%s\n--\n"%response + return response - def bulkRead(self, size): - return TransferBuffer(self.handle.bulkRead(PRS500Device.PRS500_BULK_IN_EP, size)) + def _send_validated_command(self, command, cnumber=None, response_type=Response, timeout=100): + """ Wrapper around _send_command that checks if the response's rnumber == cnumber or command.number if cnumber==None """ + if cnumber == None: cnumber = command.number + res = self._send_command(command, response_type=response_type, timeout=timeout) + PRS500Device._validate_response(res, type=command.type, number=cnumber) + return res - def initialize(self): + def _bulk_read(self, data_type=Answer, size=4096): + data = data_type(self.handle.bulkRead(PRS500Device.PRS500_BULK_IN_EP, size)) + if LOG_PACKETS: print "Answer\n%s\n--\n"%data + return data + + def _read_single_bulk_packet(self, command_number=0x00, data_type=Answer, size=4096): + data = self._bulk_read(data_type=data_type, size=size) + self._send_validated_command(AcknowledgeBulkRead(data.id), cnumber=command_number) + return data + + def _test_bulk_reads(self): + self._send_validated_command( ShortCommand(number=0x00, type=0x01, command=0x00) ) + self._read_single_bulk_packet(command_number=0x00, size=24) + + def _start_session(self): self.handle.reset() - for query in initialization: - self.send_sony_control_query(query) - if query.bulkTransfer and "__len__" not in dir(query.bulkTransfer): - self.bulkRead(query.bulkTransfer) + self._test_bulk_reads() + self._send_validated_command( ShortCommand(number=0x0107, command=0x028000, type=0x01) ) # TODO: Figure out the meaning of this command + self._test_bulk_reads() + self._send_validated_command( ShortCommand(number=0x0106, type=0x01, command=0x312d) ) # TODO: Figure out the meaning of this command + self._send_validated_command( ShortCommand(number=0x01, type=0x01, command=0x01) ) - def ls(self, path): - """ - ls path - - Packet scheme: query, bulk read, acknowledge; repeat - Errors, EOF conditions are indicated in the reply to query. They also show up in the reply to acknowledge - I haven't figured out what the first bulk read is for - """ - if path[len(path)-1] != "/": path = path + "/" - self.initialize() - q1 = LSQuery(path, type=1) - files, res1, res2, error_type = [], None, None, 0 + def _end_session(self): + self._send_validated_command( ShortCommand(number=0x01, type=0x01, command=0x00) ) + + def _run_session(self, *args): + self._start_session() + res = None try: - self.send_sony_control_query(q1) - except ControlError, e: - if e.response == LSQuery.PATH_NOT_FOUND_RESPONSE: - error_type = 1 - raise PathError(path[:-1] + " does not exist") - elif e.response == LSQuery.IS_FILE_RESPONSE: error_type = 2 - elif e.response == LSQuery.NOT_MOUNTED_RESPONSE: - error_type = 3 - raise PathError(path + " is not mounted") - elif e.response == LSQuery.INVALID_PATH_RESPONSE: - error_type = 4 - raise PathError(path + " is an invalid path") - else: raise e + res = args[0](args[1:]) finally: - res1 = normalize_buffer(self.bulkRead(q1.bulkTransfer)) - self.send_sony_control_query(q1.acknowledge_query(1, error_type=error_type)) + self._end_session() + pass + return res - if error_type == 2: # If path points to a file - files.append(path[:-1]) + def _get_path_properties(self, path): + res = self._send_validated_command(PathQuery(path), response_type=ListResponse) + data = self._read_single_bulk_packet(size=0x28, data_type=PathAnswer, command_number=PathQuery.PROPERTIES) + if res.path_not_found : raise PathError(path[:-1] + " does not exist on device") + if res.is_invalid : raise PathError(path[:-1] + " is not a valid path") + if res.is_unmounted : raise PathError(path[:-1] + " is not mounted") + return (res, data) + + def _list(self, args): + path = args[0] + if not path.endswith("/"): path += "/" # Initially assume path is a directory + files = [] + res, data = self._get_path_properties(path) + if res.is_file: + path = path[:-1] + res, data = self._get_path_properties(path) + files = [ (path, data) ] else: - q2 = LSQuery(path, type=2) - try: - self.send_sony_control_query(q2) - finally: - res2 = normalize_buffer(self.bulkRead(q2.bulkTransfer)) - self.send_sony_control_query(q1.acknowledge_query(2)) - - send_name = q2.send_name_query(res2) - buffer_length = 0 + self._send_validated_command(PathQuery(path, number=PathQuery.ID), response_type=ListResponse) + id = self._read_single_bulk_packet(size=0x14, data_type=IdAnswer, command_number=PathQuery.ID).id + next = ShortCommand.list_command(id=id) + cnumber = next.number + items = [] while True: - try: - self.send_sony_control_query(send_name) - except ControlError, e: - buffer_length = 16 + e.response[28] + e.response[29] + e.response[30] + e.response[31] - res = self.bulkRead(buffer_length) - if e.response == LSQuery.EOL_RESPONSE: - self.send_sony_control_query(q2.acknowledge_query(0)) - break - else: - self.send_sony_control_query(q2.acknowledge_query(3)) - files.append("".join([chr(i) for i in list(res)[23:]])) + res = self._send_validated_command(next, response_type=ListResponse) + size = res.data[2] + 16 + data = self._read_single_bulk_packet(size=size, data_type=ListAnswer, command_number=cnumber) + # path_not_found seems to happen if the usb server doesn't have the permissions to access the directory + if res.is_eol or res.path_not_found: break + items.append(data.name) + for item in items: + ipath = path + item + res, data = self._get_path_properties(ipath) + files.append( (ipath, data) ) + files.sort() return files + + def list(self, path, recurse=False): + files = self._run_session(self._list, path) + files = [ File(file) for file in files ] + dirs = [(path, files)] + for file in files: + if recurse and file.is_dir and not file.path.startswith(("/dev","/proc")): + dirs[len(dirs):] = self.list(file.path, recurse=True) + return dirs + + def ls(self, path, recurse=False, color=False, human_readable_size=False, ll=False, cols=0): + def col_split(l, cols): # split list l into columns + rows = len(l) / cols + if len(l) % cols: + rows += 1 + m = [] + for i in range(rows): + m.append(l[i::rows]) + return m + def row_widths(table): # Calculate widths for each column in the row-wise table + tcols = len(table[0]) + rowwidths = [ 0 for i in range(tcols) ] + for row in table: + c = 0 + for item in row: + rowwidths[c] = len(item) if len(item) > rowwidths[c] else rowwidths[c] + c += 1 + return rowwidths + + output = StringIO.StringIO() + if path.endswith("/"): path = path[:-1] + dirs = self.list(path, recurse) + for dir in dirs: + if recurse: print >>output, dir[0] + ":" + lsoutput, lscoloutput = [], [] + files = dir[1] + maxlen = 0 + if ll: # Calculate column width for size column + for file in files: + size = len(str(file.size)) + if human_readable_size: size = len(file.human_readable_size) + if size > maxlen: maxlen = size + for file in files: + name = file.name + lsoutput.append(name) + if color: name = file.name_in_color + lscoloutput.append(name) + if ll: + size = str(file.size) + if human_readable_size: size = file.human_readable_size + print >>output, file.mode_string, ("%"+str(maxlen)+"s")%size, file.modification_time, name + if not ll and len(lsoutput) > 0: + trytable = [] + for colwidth in range(MINIMUM_COL_WIDTH, cols): + trycols = int(cols/colwidth) + trytable = col_split(lsoutput, trycols) + works = True + for row in trytable: + row_break = False + for item in row: + if len(item) > colwidth - 1: + works, row_break = False, True + break + if row_break: break + if works: break + rowwidths = row_widths(trytable) + trytablecol = col_split(lscoloutput, len(trytable[0])) + for r in range(len(trytable)): + for c in range(len(trytable[r])): + padding = rowwidths[c] - len(trytable[r][c]) + print >>output, trytablecol[r][c], "".ljust(padding), + print >>output + print >>output + listing = output.getvalue().rstrip()+ "\n" + output.close() + return listing -def main(path): + + +def main(argv): + if _term : cols = _term.COLS + else: cols = 70 + + parser = OptionParser(usage="usage: %prog command [options] args\n\ncommand is one of: ls, get, put or rm\n\n"+ + "For help on a particular command: %prog command") + parser.add_option("--log-packets", help="print out packet stream to stdout", dest="log_packets", action="store_true", default=False) + parser.remove_option("-h") + parser.disable_interspersed_args() + options, args = parser.parse_args() + LOG_PACKETS = options.log_packets + if len(args) < 1: + parser.print_help() + sys.exit(1) + command = args[0] + args = args[1:] dev = PRS500Device() - dev.open() - try: - print " ".join(dev.ls(path)) - except PathError, e: - print >> sys.stderr, e - finally: - dev.close() - + if command == "ls": + parser = OptionParser(usage="usage: %prog ls [options] path\n\npath must begin with /,a:/ or b:/") + parser.add_option("--color", help="show ls output in color", dest="color", action="store_true", default=False) + parser.add_option("-l", help="In addition to the name of each file, print the file type, permissions, and timestamp (the modification time unless other times are selected)", dest="ll", action="store_true", default=False) + parser.add_option("-R", help="Recursively list subdirectories encountered. /dev and /proc are omitted", dest="recurse", action="store_true", default=False) + parser.remove_option("-h") + parser.add_option("-h", "--human-readable", help="show sizes in human readable format", dest="hrs", action="store_true", default=False) + options, args = parser.parse_args(args) + if len(args) < 1: + parser.print_help() + sys.exit(1) + dev.open() + try: + print dev.ls(args[0], color=options.color, recurse=options.recurse, ll=options.ll, human_readable_size=options.hrs, cols=cols), + except PathError, e: + print >> sys.stderr, e + sys.exit(1) + finally: + dev.close() + else: + parser.print_help() + sys.exit(1) + if __name__ == "__main__": - main(sys.argv[1]) + _term = TerminalController() + main(sys.argv) diff --git a/data.py b/data.py deleted file mode 100755 index d55e4a8198..0000000000 --- a/data.py +++ /dev/null @@ -1,163 +0,0 @@ -#!/usr/bin/env python -## Copyright (C) 2006 Kovid Goyal kovid@kovidgoyal.net -## This program is free software; you can redistribute it and/or modify -## it under the terms of the GNU General Public License as published by -## the Free Software Foundation; either version 2 of the License, or -## (at your option) any later version. -## -## This program is distributed in the hope that it will be useful, -## but WITHOUT ANY WARRANTY; without even the implied warranty of -## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -## GNU General Public License for more details. -## -## You should have received a copy of the GNU General Public License along -## with this program; if not, write to the Free Software Foundation, Inc., -## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. - -import sys, re -from prstypes import * - -# The sequence of control commands to send the device before attempting any operations. Should be preceeded by a reset? -initialization = [] -initialization.append(\ -ControlQuery(TransferBuffer((0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0)),\ - TransferBuffer((0, 16, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 12, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)), bulkTransfer=24)) -initialization.append(\ -ControlQuery(TransferBuffer((0, 16, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 16, 0, 0, 0, 5, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)), \ - TransferBuffer((0, 16, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 12, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)))) -initialization.append(\ -ControlQuery(TransferBuffer((7, 1, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 0, 128, 2, 0)), \ - TransferBuffer((0, 16, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 12, 0, 0, 0, 7, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)))) -initialization.append(\ -ControlQuery(TransferBuffer((0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 1, 0, 0, 0)), \ - TransferBuffer((0, 16, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 12, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)), bulkTransfer=24)) -initialization.append(\ -ControlQuery(TransferBuffer((0, 16, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 16, 0, 0, 0, 5, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)), \ - TransferBuffer((0, 16, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 12, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)))) -initialization.append(\ -ControlQuery(TransferBuffer((6, 1, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 8, 0, 0, 0, 45, 49, 0, 0, 0, 0, 0, 0)), \ - TransferBuffer((0, 16, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 12, 0, 0, 0, 6, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)))) -initialization.append(\ -ControlQuery(TransferBuffer((1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 1, 0, 0, 0)), \ - TransferBuffer((0, 16, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 12, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)))) - -end_transaction = \ -ControlQuery(TransferBuffer((1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0)),\ - TransferBuffer((0, 16, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 12, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0))) - -def string_to_buffer(string): - """ Convert a string to a TransferBuffer """ - return TransferBuffer([ ord(ch) for ch in string ]) - -class LSQuery(ControlQuery): - """ - Contains all the device specific data (packet formats) needed to implement a simple ls command. - See PRS500Device.ls() to understand how it is used. - """ - PATH_NOT_FOUND_RESPONSE = (0, 16, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 12, 0, 0, 0, 24, 0, 0, 0, 215, 255, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0) - IS_FILE_RESPONSE = (0, 16, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 12, 0, 0, 0, 24, 0, 0, 0, 210, 255, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0) - NOT_MOUNTED_RESPONSE = (0, 16, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 12, 0, 0, 0, 24, 0, 0, 0, 200, 255, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0) - INVALID_PATH_RESPONSE = (0, 16, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 12, 0, 0, 0, 24, 0, 0, 0, 249, 255, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0) - ACKNOWLEDGE_RESPONSE = (0, 16, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 12, 0, 0, 0, 0x35, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) - ACKNOWLEDGE_COMMAND = (0, 16, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 16, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0) - SEND_NAME_COMMAND = (53, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 4 , 0, 0, 0, 0, 0, 0, 0) - EOL_RESPONSE = (0, 16, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 16, 0, 0, 0, 53, 0, 0, 0, 250, 255, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0) - - - def __init__(self, path, type=1): - self.path = path - if len(self.path) >= 8: - self.path_fragment = self.path[8:] - for i in range(4 - len(self.path_fragment)): - self.path_fragment += '\x00' - self.path_fragment = [ ord(self.path_fragment[i]) for i in range(4) ] - else: - self.path_fragment = [ 0x00 for i in range(4) ] - src = [ 0x00 for i in range(20) ] - if type == 1: - src[0] = 0x18 - elif type == 2: - src[0] = 0x33 - src[4], src[12], src[16] = 0x01, len(path)+4, len(path) - query = TransferBuffer(src) + string_to_buffer(path) - src = [ 0x00 for i in range(32) ] - src[1], src[4], src[12], src[16] = 0x10, 0x01, 0x0c, 0x18 - if type == 2: src[16] = 0x33 - ControlQuery.__init__(self, query, TransferBuffer(src), bulkTransfer = 0x28) - - def acknowledge_query(self, type, error_type=0): - """ - Return the acknowledge query used after receiving data as part of an ls query - - type - should only take values 0,1,2,3 corresponding to the 4 different types of acknowledge queries. - If it takes any other value it is assumed to be zero. - - error_type - 0 = no error, 1 = path not found, 2 = is file, 3 = not mounted, 4 = invalid path - """ - if error_type == 1: - response = list(LSQuery.PATH_NOT_FOUND_RESPONSE) - response[4] = 0x00 - elif error_type == 2: - response = list(LSQuery.IS_FILE_RESPONSE) - response[4] = 0x00 - elif error_type == 3: - response = list(LSQuery.NOT_MOUNTED_RESPONSE) - response[4] = 0x00 - elif error_type == 4: - response = list(LSQuery.INVALID_PATH_RESPONSE) - response[4] = 0x00 - else: response = list(LSQuery.ACKNOWLEDGE_RESPONSE) - query = list(LSQuery.ACKNOWLEDGE_COMMAND) - response[-4:] = self.path_fragment - if type == 1: - query[16] = 0x03 - response[16] = 0x18 - elif type == 2: - query[16] = 0x06 - response[16] = 0x33 - elif type == 3: - query[16] = 0x07 - response[16] = 0x35 - else: # All other type values are mapped to 0, which is an EOL condition - response[20], response[21], response[22], response[23] = 0xfa, 0xff, 0xff, 0xff - - return ControlQuery(TransferBuffer(query), TransferBuffer(response)) - - def send_name_query(self, buffer): - """ - Return a ControlQuery that will cause the device to send the next name in the list - - buffer - TransferBuffer that contains 4 bytes of information that identify the directory we are listing. - - Note that the response to this command contains information (the size of the receive buffer for the next bulk read) thus - the expected response is set to null. - """ - query = list(LSQuery.SEND_NAME_COMMAND) - query[-4:] = list(buffer)[-4:] - response = [ 0x00 for i in range(32) ] - return ControlQuery(TransferBuffer(query), TransferBuffer(response)) - - -def main(file): - """ Convenience method for converting spike.pl output to python code. Used to read control packet data from USB logs """ - PSF = open(file, 'r') - lines = PSF.readlines() - - packets = [] - temp = [] - for line in lines: - if re.match("\s+$", line): - temp = "".join(temp) - packet = [] - for i in range(0, len(temp), 2): - packet.append(int(temp[i]+temp[i+1], 16)) - temp = [] - packets.append(tuple(packet)) - continue - temp = temp + line.split() - print r"seq = []" - for i in range(0, len(packets), 2): - print "seq.append(ControlQuery(TransferBuffer(" + str(packets[i]) + "), TransferBuffer(" + str(packets[i+1]) + ")))" - -if __name__ == "__main__": - main(sys.argv[1]) diff --git a/epydoc.conf b/epydoc.conf new file mode 100644 index 0000000000..221da46165 --- /dev/null +++ b/epydoc.conf @@ -0,0 +1,19 @@ +[epydoc] # Epydoc section marker (required by ConfigParser) + +# Information about the project. +name: My Cool Project +url: http://cool.project/ + +# The list of modules to document. Modules can be named using +# dotted names, module filenames, or package directory names. +# This option may be repeated. +modules: usb, struct +modules: prstypes.py, communicate.py, errors.py + +# Write html output to the directory "apidocs" +output: html +target: apidocs/ + +# Include all automatically generated graphs. These graphs are +# generated using Graphviz dot. +graph: all diff --git a/errors.py b/errors.py new file mode 100644 index 0000000000..8ab707dc86 --- /dev/null +++ b/errors.py @@ -0,0 +1,32 @@ +#!/usr/bin/env python +from exceptions import Exception + +class ProtocolError(Exception): + """ The base class for all exceptions in this package """ + def __init__(self, msg): + Exception.__init__(self, msg) + +class PacketError(ProtocolError): + """ Errors with creating/interpreting packets """ + def __init__(self, msg): + ProtocolError.__init__(self, msg) + +class PathError(ProtocolError): + def __init__(self, msg): + Exception.__init__(self, msg) + +class ControlError(ProtocolError): + def __init__(self, query=None, response=None, desc=None): + self.query = query + self.response = response + Exception.__init__(self, desc) + + def __str__(self): + if self.query and self.response: + return "Got unexpected response:\n" + \ + "query:\n"+str(self.query.query)+"\n"+\ + "expected:\n"+str(self.query.response)+"\n" +\ + "actual:\n"+str(self.response) + if self.desc: + return self.desc + return "Unknown control error occurred" diff --git a/prs-500.e3p b/prs-500.e3p new file mode 100644 index 0000000000..c6c16d6898 --- /dev/null +++ b/prs-500.e3p @@ -0,0 +1,104 @@ + + + + + + + Python + Console + Library to communicate with the Sony Reader prs-500 via USB + + Kovid Goyal + kovid@kovidgoyal.net + + + communicate.py + + + data.py + + + prstypes.py + + + errors.py + + + + + + + + + + + + communicate.py + + + Subversion + (dp0 +S'status' +p1 +(lp2 +S'' +p3 +asS'log' +p4 +(lp5 +g3 +asS'global' +p6 +(lp7 +g3 +asS'update' +p8 +(lp9 +g3 +asS'remove' +p10 +(lp11 +g3 +asS'add' +p12 +(lp13 +g3 +asS'tag' +p14 +(lp15 +g3 +asS'export' +p16 +(lp17 +g3 +asS'commit' +p18 +(lp19 +g3 +asS'diff' +p20 +(lp21 +g3 +asS'checkout' +p22 +(lp23 +g3 +asS'history' +p24 +(lp25 +g3 +as. + (dp0 +S'standardLayout' +p1 +I01 +s. + + + + + + + + + diff --git a/prstypes.py b/prstypes.py index aca7c2177a..e6c2e0e2ab 100755 --- a/prstypes.py +++ b/prstypes.py @@ -14,83 +14,520 @@ ## with this program; if not, write to the Free Software Foundation, Inc., ## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -""" -Contains convenience wrappers for packet data that allow output in the same format as the logs produced by spike.pl -""" +import struct +from errors import PacketError + +BYTE = " 31 and self[i+b] < 127 else "." + except IndexError: break ans = ans + " " - if (i+2)%16 == 0: - ans = ans + "\n" + if (i+2)%16 == 0: + ans += "\t" + ascii + "\n" + ascii = "" + if len(ascii) > 0: + last_line = ans[ans.rfind("\n")+1:] + padding = 32 - len(last_line) + ans += "".ljust(padding) + "\t\t" + ascii return ans.strip() -class ControlQuery: - """ - Container for all the transfer buffers that make up a single query. + def unpack(self, fmt=DWORD, start=0): + """ Return decoded data from buffer. See pydoc struct for fmt. start is position in buffer from which to decode. """ + end = start + struct.calcsize(fmt) + return struct.unpack(fmt, "".join([ chr(i) for i in list.__getslice__(self, start, end) ])) + + def pack(self, val, fmt=DWORD, start=0): + """ Encode data and write it to buffer. See pydoc struct fmt. start is position in buffer at which to write encoded data. """ + self[start:start+struct.calcsize(fmt)] = [ ord(i) for i in struct.pack(fmt, val) ] + + def normalize(self): + """ Replace negative bytes by 256 + byte """ + for i in range(len(self)): + if self[i] < 0: + self[i] = 256 + self[i] + + @classmethod + def phex(cls, num): + """ + Return the hex representation of num without the 0x prefix. - A query has a transmitted buffer, an expected response and an optional buffer that is either read - from or written to via a bulk transfer. - """ - - def __init__(self, query, response, bulkTransfer=None): + If the hex representation is only 1 digit it is padded to the left with a zero. """ - Construct this query. - - query - A TransferBuffer that should be sent to the device on the control pipe - response - A TransferBuffer that the device is expected to return. Used for error checking. - bulkTransfer - If it is a number, it indicates that a buffer of size bulkTransfer should be read from the device via a - bulk read. If it is a TransferBuffer then it will be sent to the device via a bulk write. - """ - self.query = query - self.response = response - self.bulkTransfer = bulkTransfer - - def __eq__(self, cq): - """ Bulk transfers are not compared to decide equality. """ - return self.query == cq.query and self.response == cq.response - + index, sign = 2, "" + if num < 0: + index, sign = 3, "-" + h=hex(num)[index:] + if len(h) < 2: + h = "0"+h + return sign + h + + +class Command(TransferBuffer): + """ Defines the structure of command packets sent to the device. """ + + def __init__(self, packet): + if ("__len__" in dir(packet) and len(packet) < 16) or ("__len__" not in dir(packet) and packet < 16): + raise PacketError(str(self.__class__)[7:-2] + " packets must have length atleast 16") + TransferBuffer.__init__(self, packet) + + @apply + def number(): + doc =\ + """ + Command number + + Observed command numbers are: + 0x00001000 -- Acknowledge + 0x00000107 -- Purpose unknown, occurs in start_session + 0x00000106 -- Purpose unknown, occurs in start_session + """ + def fget(self): + return self.unpack(start=0, fmt=DWORD)[0] + + def fset(self, val): + self.pack(val, start=0, fmt=DWORD) + + return property(**locals()) + + @apply + def type(): + doc =\ + """ + Command type. Known types 0x00, 0x01. Not sure what the type means. + """ + def fget(self): + return self.unpack(start=4, fmt=DDWORD)[0] + + def fset(self, val): + self.pack(val, start=4, fmt=DDWORD) + + return property(**locals()) + + @apply + def length(): + doc =\ + """ Length in bytes of the data part of the query """ + def fget(self): + return self.unpack(start=12, fmt=DWORD)[0] + + def fset(self, val): + self.pack(val, start=12, fmt=DWORD) + + return property(**locals()) + + @apply + def data(): + doc =\ + """ + The data part of this command. Returned/set as/by a TransferBuffer. + + Setting it by default changes self.length to the length of the new buffer. You may have to reset it to + the significant part of the buffer. + """ + def fget(self): + return self[16:] + + def fset(self, buffer): + self[16:] = buffer + self.length = len(buffer) + + return property(**locals()) + + +class ShortCommand(Command): + + SIZE = 20 #Packet size in bytes + + def __init__(self, number=0x00, type=0x00, command=0x00): + """ command must be an integer """ + Command.__init__(self, ShortCommand.SIZE) + self.number = number + self.type = type + self.length = 4 + self.command = command + + @classmethod + def list_command(cls, id): + """ Return the command packet used to ask for the next item in the list """ + return ShortCommand(number=0x35, type=0x01, command=id) + + @apply + def command(): + doc =\ + """ + The command. Not sure why this is needed in addition to Command.number + """ + def fget(self): + return self.unpack(start=16, fmt=DWORD)[0] + + def fset(self, val): + self.pack(val, start=16, fmt=DWORD) + + return property(**locals()) + +class LongCommand(Command): + + SIZE = 32 # Size in bytes of long command packets + + def __init__(self, number=0x00, type=0x00, command=0x00): + """ command must be either an integer or a list of not more than 4 integers """ + Command.__init__(self, LongCommand.SIZE) + self.number = number + self.type = type + self.length = 16 + self.command = command + + @apply + def command(): + doc =\ + """ + The command. + + It should be set to a x-integer list, where 0= 16 """ + if len(packet) < 16 : raise PacketError(str(self.__class__)[7:-2] + " packets must have a length of atleast 16 bytes") + TransferBuffer.__init__(self, packet) + + @apply + def id(): + doc =\ + """ The id of this bulk transfer packet """ + + def fget(self): + return self.unpack(start=0, fmt=DWORD)[0] + + def fset(self, val): + self.pack(val, start=0, fmt=DWORD) + + return property(**locals()) + +class PathAnswer(Answer): + + """ Defines the structure of packets that contain size, date and permissions information about files/directories. """ + + @apply + def file_size(): + doc =\ + """ The file size """ + + def fget(self): + return self.unpack(start=16, fmt=DDWORD)[0] + + def fset(self, val): + self.pack(val, start=16, fmt=DDWORD) + + return property(**locals()) + + @apply + def is_dir(): + doc =\ + """ True if path points to a directory, False if it points to a file """ + + def fget(self): + return (self.unpack(start=24, fmt=DWORD)[0] == 2) + + def fset(self, val): + if val: val = 2 + else: val = 1 + self.pack(val, start=24, fmt=DWORD) + + return property(**locals()) + + @apply + def ctime(): + doc =\ + """ The creation time of this file/dir as an epoch """ + + def fget(self): + return self.unpack(start=28, fmt=DWORD)[0] + + def fset(self, val): + self.pack(val, start=28, fmt=DWORD) + + return property(**locals()) + + @apply + def wtime(): + doc =\ + """ The modification time of this file/dir as an epoch """ + + def fget(self): + return self.unpack(start=32, fmt=DWORD)[0] + + def fset(self, val): + self.pack(val, start=32, fmt=DWORD) + + return property(**locals()) + + @apply + def is_readonly(): + doc =\ + """ Whether this file is readonly """ + + def fget(self): + return self.unpack(start=36, fmt=DWORD)[0] != 0 + + def fset(self, val): + if val: val = 4 + else: val = 0 + self.pack(val, start=36, fmt=DWORD) + + return property(**locals()) + +class IdAnswer(Answer): + + """ Defines the structure of packets that contain identifiers for directories. """ + + @apply + def id(): + doc =\ + """ The identifier """ + + def fget(self): + return self.unpack(start=16, fmt=DWORD)[0] + + def fset(self, val): + self.pack(val, start=16, fmt=DWORD) + + return property(**locals()) + +class ListAnswer(Answer): + + """ Defines the structure of packets that contain items in a list. """ + + @apply + def is_dir(): + doc =\ + """ True if list item points to a directory, False if it points to a file """ + + def fget(self): + return (self.unpack(start=16, fmt=DWORD)[0] == 2) + + def fset(self, val): + if val: val = 2 + else: val = 1 + self.pack(val, start=16, fmt=DWORD) + + return property(**locals()) + + @apply + def name_length(): + doc =\ + """ The length in bytes of the list item to follow """ + def fget(self): + return self.unpack(start=20, fmt=DWORD)[0] + + def fset(self, val): + self.pack(val, start=20, fmt=DWORD) + + return property(**locals()) + + @apply + def name(): + doc =\ + """ The name of the list item """ + + def fget(self): + return self.unpack(start=24, fmt="<"+str(self.name_length)+"s")[0] + + def fset(self, val): + self.pack(val, start=24, fmt="<"+str(self.name_length)+"s") + + return property(**locals()) diff --git a/terminfo.py b/terminfo.py new file mode 100644 index 0000000000..58ab6eb7d7 --- /dev/null +++ b/terminfo.py @@ -0,0 +1,192 @@ +import sys, re + +class TerminalController: + """ + A class that can be used to portably generate formatted output to + a terminal. + + `TerminalController` defines a set of instance variables whose + values are initialized to the control sequence necessary to + perform a given action. These can be simply included in normal + output to the terminal: + + >>> term = TerminalController() + >>> print 'This is '+term.GREEN+'green'+term.NORMAL + + Alternatively, the `render()` method can used, which replaces + '${action}' with the string required to perform 'action': + + >>> term = TerminalController() + >>> print term.render('This is ${GREEN}green${NORMAL}') + + If the terminal doesn't support a given action, then the value of + the corresponding instance variable will be set to ''. As a + result, the above code will still work on terminals that do not + support color, except that their output will not be colored. + Also, this means that you can test whether the terminal supports a + given action by simply testing the truth value of the + corresponding instance variable: + + >>> term = TerminalController() + >>> if term.CLEAR_SCREEN: + ... print 'This terminal supports clearning the screen.' + + Finally, if the width and height of the terminal are known, then + they will be stored in the `COLS` and `LINES` attributes. + """ + # Cursor movement: + BOL = '' #: Move the cursor to the beginning of the line + UP = '' #: Move the cursor up one line + DOWN = '' #: Move the cursor down one line + LEFT = '' #: Move the cursor left one char + RIGHT = '' #: Move the cursor right one char + + # Deletion: + CLEAR_SCREEN = '' #: Clear the screen and move to home position + CLEAR_EOL = '' #: Clear to the end of the line. + CLEAR_BOL = '' #: Clear to the beginning of the line. + CLEAR_EOS = '' #: Clear to the end of the screen + + # Output modes: + BOLD = '' #: Turn on bold mode + BLINK = '' #: Turn on blink mode + DIM = '' #: Turn on half-bright mode + REVERSE = '' #: Turn on reverse-video mode + NORMAL = '' #: Turn off all modes + + # Cursor display: + HIDE_CURSOR = '' #: Make the cursor invisible + SHOW_CURSOR = '' #: Make the cursor visible + + # Terminal size: + COLS = None #: Width of the terminal (None for unknown) + LINES = None #: Height of the terminal (None for unknown) + + # Foreground colors: + BLACK = BLUE = GREEN = CYAN = RED = MAGENTA = YELLOW = WHITE = '' + + # Background colors: + BG_BLACK = BG_BLUE = BG_GREEN = BG_CYAN = '' + BG_RED = BG_MAGENTA = BG_YELLOW = BG_WHITE = '' + + _STRING_CAPABILITIES = """ + BOL=cr UP=cuu1 DOWN=cud1 LEFT=cub1 RIGHT=cuf1 + CLEAR_SCREEN=clear CLEAR_EOL=el CLEAR_BOL=el1 CLEAR_EOS=ed BOLD=bold + BLINK=blink DIM=dim REVERSE=rev UNDERLINE=smul NORMAL=sgr0 + HIDE_CURSOR=cinvis SHOW_CURSOR=cnorm""".split() + _COLORS = """BLACK BLUE GREEN CYAN RED MAGENTA YELLOW WHITE""".split() + _ANSICOLORS = "BLACK RED GREEN YELLOW BLUE MAGENTA CYAN WHITE".split() + + def __init__(self, term_stream=sys.stdout): + """ + Create a `TerminalController` and initialize its attributes + with appropriate values for the current terminal. + `term_stream` is the stream that will be used for terminal + output; if this stream is not a tty, then the terminal is + assumed to be a dumb terminal (i.e., have no capabilities). + """ + # Curses isn't available on all platforms + try: import curses + except: return + + # If the stream isn't a tty, then assume it has no capabilities. + if not term_stream.isatty(): return + + # Check the terminal type. If we fail, then assume that the + # terminal has no capabilities. + try: curses.setupterm() + except: return + + # Look up numeric capabilities. + self.COLS = curses.tigetnum('cols') + self.LINES = curses.tigetnum('lines') + + # Look up string capabilities. + for capability in self._STRING_CAPABILITIES: + (attrib, cap_name) = capability.split('=') + setattr(self, attrib, self._tigetstr(cap_name) or '') + + # Colors + set_fg = self._tigetstr('setf') + if set_fg: + for i,color in zip(range(len(self._COLORS)), self._COLORS): + setattr(self, color, curses.tparm(set_fg, i) or '') + set_fg_ansi = self._tigetstr('setaf') + if set_fg_ansi: + for i,color in zip(range(len(self._ANSICOLORS)), self._ANSICOLORS): + setattr(self, color, curses.tparm(set_fg_ansi, i) or '') + set_bg = self._tigetstr('setb') + if set_bg: + for i,color in zip(range(len(self._COLORS)), self._COLORS): + setattr(self, 'BG_'+color, curses.tparm(set_bg, i) or '') + set_bg_ansi = self._tigetstr('setab') + if set_bg_ansi: + for i,color in zip(range(len(self._ANSICOLORS)), self._ANSICOLORS): + setattr(self, 'BG_'+color, curses.tparm(set_bg_ansi, i) or '') + + def _tigetstr(self, cap_name): + # String capabilities can include "delays" of the form "$<2>". + # For any modern terminal, we should be able to just ignore + # these, so strip them out. + import curses + cap = curses.tigetstr(cap_name) or '' + return re.sub(r'\$<\d+>[/*]?', '', cap) + + def render(self, template): + """ + Replace each $-substitutions in the given template string with + the corresponding terminal control string (if it's defined) or + '' (if it's not). + """ + return re.sub(r'\$\$|\${\w+}', self._render_sub, template) + + def _render_sub(self, match): + s = match.group() + if s == '$$': return s + else: return getattr(self, s[2:-1]) + +####################################################################### +# Example use case: progress bar +####################################################################### + +class ProgressBar: + """ + A 3-line progress bar, which looks like:: + + Header + 20% [===========----------------------------------] + progress message + + The progress bar is colored, if the terminal supports color + output; and adjusts to the width of the terminal. + """ + BAR = '%3d%% ${GREEN}[${BOLD}%s%s${NORMAL}${GREEN}]${NORMAL}\n' + HEADER = '${BOLD}${CYAN}%s${NORMAL}\n\n' + + def __init__(self, term, header): + self.term = term + if not (self.term.CLEAR_EOL and self.term.UP and self.term.BOL): + raise ValueError("Terminal isn't capable enough -- you " + "should use a simpler progress dispaly.") + self.width = self.term.COLS or 75 + self.bar = term.render(self.BAR) + self.header = self.term.render(self.HEADER % header.center(self.width)) + self.cleared = 1 #: true if we haven't drawn the bar yet. + self.update(0, '') + + def update(self, percent, message): + if self.cleared: + sys.stdout.write(self.header) + self.cleared = 0 + n = int((self.width-10)*percent) + sys.stdout.write( + self.term.BOL + self.term.UP + self.term.CLEAR_EOL + + (self.bar % (100*percent, '='*n, '-'*(self.width-10-n))) + + self.term.CLEAR_EOL + message.center(self.width)) + + def clear(self): + if not self.cleared: + sys.stdout.write(self.term.BOL + self.term.CLEAR_EOL + + self.term.UP + self.term.CLEAR_EOL + + self.term.UP + self.term.CLEAR_EOL) + self.cleared = 1