Working ls -lhR --color implementation

This commit is contained in:
Kovid Goyal 2006-11-06 01:15:41 +00:00
parent baa766ae3c
commit 324e8df283
7 changed files with 1140 additions and 322 deletions

View File

@ -14,11 +14,6 @@
## with this program; if not, write to the Free Software Foundation, Inc.,
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import sys, usb
from data import *
from types import *
from exceptions import Exception
### End point description for PRS-500 procductId=667
### Endpoint Descriptor:
### bLength 7
@ -46,25 +41,89 @@ from exceptions import Exception
### Has two configurations 1 is the USB charging config 2 is the self-powered config.
### I think config management is automatic. Endpoints are the same
class PathError(Exception):
def __init__(self, msg):
Exception.__init__(self, msg)
import sys, usb, logging, StringIO, time
from optparse import OptionParser
class ControlError(Exception):
def __init__(self, query=None, response=None, desc=None):
self.query = query
self.response = response
Exception.__init__(self, desc)
from prstypes import *
from errors import *
from terminfo import TerminalController
#try:
# import psyco
# psyco.full()
#except ImportError:
# print 'Psyco not installed, the program will just run slower'
_term = None
LOG_PACKETS=False # If True all packets are looged to stdout
MINIMUM_COL_WIDTH = 12
class File(object):
def __init__(self, file):
self.is_dir = file[1].is_dir
self.is_readonly = file[1].is_readonly
self.size = file[1].file_size
self.ctime = file[1].ctime
self.wtime = file[1].wtime
path = file[0]
if path.endswith("/"): path = path[:-1]
self.path = path
self.name = path[path.rfind("/")+1:].rstrip()
def __repr__(self):
return self.path
@apply
def mode_string():
doc=""" The mode string for this file. There are only two modes read-only and read-write """
def fget(self):
mode, x = "-", "-"
if self.is_dir: mode, x = "d", "x"
if self.is_readonly: mode += "r-"+x+"r-"+x+"r-"+x
else: mode += "rw"+x+"rw"+x+"rw"+x
return mode
return property(**locals())
@apply
def name_in_color():
doc=""" The name in ANSI text. Directories are blue, ebooks are green """
def fget(self):
cname = self.name
blue, green, normal = "", "", ""
if _term: blue, green, normal = _term.BLUE, _term.GREEN, _term.NORMAL
if self.is_dir: cname = blue + self.name + normal
else:
ext = self.name[self.name.rfind("."):]
if ext in (".pdf", ".rtf", ".lrf", ".lrx", ".txt"): cname = green + self.name + normal
return cname
return property(**locals())
@apply
def human_readable_size():
doc=""" File size in human readable form """
def fget(self):
if self.size < 1024: divisor, suffix = 1, ""
elif self.size < 1024*1024: divisor, suffix = 1024., "M"
elif self.size < 1024*1024*1024: divisor, suffix = 1024*1024, "G"
size = str(self.size/divisor)
if size.find(".") > -1: size = size[:size.find(".")+2]
return size + suffix
return property(**locals())
@apply
def modification_time():
doc=""" Last modified time in the Linux ls -l format """
def fget(self):
return time.strftime("%Y-%m-%d %H:%M", time.gmtime(self.wtime))
return property(**locals())
@apply
def creation_time():
doc=""" Last modified time in the Linux ls -l format """
def fget(self):
return time.strftime("%Y-%m-%d %H:%M", time.gmtime(self.ctime))
return property(**locals())
def __str__(self):
if self.query and self.response:
return "Got unexpected response:\n" + \
"query:\n"+str(self.query.query)+"\n"+\
"expected:\n"+str(self.query.response)+"\n" +\
"actual:\n"+str(self.response)
if self.desc:
return self.desc
return "Unknown control error occurred"
class DeviceDescriptor:
def __init__(self, vendor_id, product_id, interface_id) :
@ -87,7 +146,9 @@ class DeviceDescriptor:
return None
class PRS500Device:
class PRS500Device(object):
SONY_VENDOR_ID = 0x054c
PRS500_PRODUCT_ID = 0x029b
PRS500_INTERFACE_ID = 0
@ -101,6 +162,12 @@ class PRS500Device:
self.device = self.device_descriptor.getDevice()
self.handle = None
@classmethod
def _validate_response(cls, res, type=0x00, number=0x00):
if type != res.type or number != res.rnumber:
raise ProtocolError("Inavlid response.\ntype: expected="+hex(type)+" actual="+hex(res.type)+
"\nrnumber: expected="+hex(number)+" actual="+hex(res.rnumber))
def open(self) :
self.device = self.device_descriptor.getDevice()
if not self.device:
@ -118,90 +185,220 @@ class PRS500Device:
self.handle.releaseInterface()
self.handle, self.device = None, None
def send_sony_control_query(self, query, timeout=100):
r = self.handle.controlMsg(0x40, 0x80, query.query)
if r != len(query.query):
def _send_command(self, command, response_type=Response, timeout=100):
"""
Send command to device and return its response.
command -- an object of type Command or one of its derived classes
response_type -- an object of type 'type'. The return packet from the device is returned as an object of type response_type.
timeout -- the time to wait for a response from the device, in milliseconds
"""
if LOG_PACKETS: print "Command\n%s\n--\n"%command
bytes_sent = self.handle.controlMsg(0x40, 0x80, command)
if bytes_sent != len(command):
raise ControlError(desc="Could not send control request to device\n" + str(query.query))
res = normalize_buffer(self.handle.controlMsg(0xc0, 0x81, len(query.response), timeout=timeout))
if res != query.response:
raise ControlError(query=query, response=res)
response = response_type(self.handle.controlMsg(0xc0, 0x81, Response.SIZE, timeout=timeout))
if LOG_PACKETS: print "Response\n%s\n--\n"%response
return response
def bulkRead(self, size):
return TransferBuffer(self.handle.bulkRead(PRS500Device.PRS500_BULK_IN_EP, size))
def _send_validated_command(self, command, cnumber=None, response_type=Response, timeout=100):
""" Wrapper around _send_command that checks if the response's rnumber == cnumber or command.number if cnumber==None """
if cnumber == None: cnumber = command.number
res = self._send_command(command, response_type=response_type, timeout=timeout)
PRS500Device._validate_response(res, type=command.type, number=cnumber)
return res
def initialize(self):
def _bulk_read(self, data_type=Answer, size=4096):
data = data_type(self.handle.bulkRead(PRS500Device.PRS500_BULK_IN_EP, size))
if LOG_PACKETS: print "Answer\n%s\n--\n"%data
return data
def _read_single_bulk_packet(self, command_number=0x00, data_type=Answer, size=4096):
data = self._bulk_read(data_type=data_type, size=size)
self._send_validated_command(AcknowledgeBulkRead(data.id), cnumber=command_number)
return data
def _test_bulk_reads(self):
self._send_validated_command( ShortCommand(number=0x00, type=0x01, command=0x00) )
self._read_single_bulk_packet(command_number=0x00, size=24)
def _start_session(self):
self.handle.reset()
for query in initialization:
self.send_sony_control_query(query)
if query.bulkTransfer and "__len__" not in dir(query.bulkTransfer):
self.bulkRead(query.bulkTransfer)
self._test_bulk_reads()
self._send_validated_command( ShortCommand(number=0x0107, command=0x028000, type=0x01) ) # TODO: Figure out the meaning of this command
self._test_bulk_reads()
self._send_validated_command( ShortCommand(number=0x0106, type=0x01, command=0x312d) ) # TODO: Figure out the meaning of this command
self._send_validated_command( ShortCommand(number=0x01, type=0x01, command=0x01) )
def ls(self, path):
"""
ls path
def _end_session(self):
self._send_validated_command( ShortCommand(number=0x01, type=0x01, command=0x00) )
Packet scheme: query, bulk read, acknowledge; repeat
Errors, EOF conditions are indicated in the reply to query. They also show up in the reply to acknowledge
I haven't figured out what the first bulk read is for
"""
if path[len(path)-1] != "/": path = path + "/"
self.initialize()
q1 = LSQuery(path, type=1)
files, res1, res2, error_type = [], None, None, 0
def _run_session(self, *args):
self._start_session()
res = None
try:
self.send_sony_control_query(q1)
except ControlError, e:
if e.response == LSQuery.PATH_NOT_FOUND_RESPONSE:
error_type = 1
raise PathError(path[:-1] + " does not exist")
elif e.response == LSQuery.IS_FILE_RESPONSE: error_type = 2
elif e.response == LSQuery.NOT_MOUNTED_RESPONSE:
error_type = 3
raise PathError(path + " is not mounted")
elif e.response == LSQuery.INVALID_PATH_RESPONSE:
error_type = 4
raise PathError(path + " is an invalid path")
else: raise e
res = args[0](args[1:])
finally:
res1 = normalize_buffer(self.bulkRead(q1.bulkTransfer))
self.send_sony_control_query(q1.acknowledge_query(1, error_type=error_type))
self._end_session()
pass
return res
if error_type == 2: # If path points to a file
files.append(path[:-1])
def _get_path_properties(self, path):
res = self._send_validated_command(PathQuery(path), response_type=ListResponse)
data = self._read_single_bulk_packet(size=0x28, data_type=PathAnswer, command_number=PathQuery.PROPERTIES)
if res.path_not_found : raise PathError(path[:-1] + " does not exist on device")
if res.is_invalid : raise PathError(path[:-1] + " is not a valid path")
if res.is_unmounted : raise PathError(path[:-1] + " is not mounted")
return (res, data)
def _list(self, args):
path = args[0]
if not path.endswith("/"): path += "/" # Initially assume path is a directory
files = []
res, data = self._get_path_properties(path)
if res.is_file:
path = path[:-1]
res, data = self._get_path_properties(path)
files = [ (path, data) ]
else:
q2 = LSQuery(path, type=2)
try:
self.send_sony_control_query(q2)
finally:
res2 = normalize_buffer(self.bulkRead(q2.bulkTransfer))
self.send_sony_control_query(q1.acknowledge_query(2))
send_name = q2.send_name_query(res2)
buffer_length = 0
self._send_validated_command(PathQuery(path, number=PathQuery.ID), response_type=ListResponse)
id = self._read_single_bulk_packet(size=0x14, data_type=IdAnswer, command_number=PathQuery.ID).id
next = ShortCommand.list_command(id=id)
cnumber = next.number
items = []
while True:
try:
self.send_sony_control_query(send_name)
except ControlError, e:
buffer_length = 16 + e.response[28] + e.response[29] + e.response[30] + e.response[31]
res = self.bulkRead(buffer_length)
if e.response == LSQuery.EOL_RESPONSE:
self.send_sony_control_query(q2.acknowledge_query(0))
break
else:
self.send_sony_control_query(q2.acknowledge_query(3))
files.append("".join([chr(i) for i in list(res)[23:]]))
res = self._send_validated_command(next, response_type=ListResponse)
size = res.data[2] + 16
data = self._read_single_bulk_packet(size=size, data_type=ListAnswer, command_number=cnumber)
# path_not_found seems to happen if the usb server doesn't have the permissions to access the directory
if res.is_eol or res.path_not_found: break
items.append(data.name)
for item in items:
ipath = path + item
res, data = self._get_path_properties(ipath)
files.append( (ipath, data) )
files.sort()
return files
def list(self, path, recurse=False):
files = self._run_session(self._list, path)
files = [ File(file) for file in files ]
dirs = [(path, files)]
for file in files:
if recurse and file.is_dir and not file.path.startswith(("/dev","/proc")):
dirs[len(dirs):] = self.list(file.path, recurse=True)
return dirs
def main(path):
def ls(self, path, recurse=False, color=False, human_readable_size=False, ll=False, cols=0):
def col_split(l, cols): # split list l into columns
rows = len(l) / cols
if len(l) % cols:
rows += 1
m = []
for i in range(rows):
m.append(l[i::rows])
return m
def row_widths(table): # Calculate widths for each column in the row-wise table
tcols = len(table[0])
rowwidths = [ 0 for i in range(tcols) ]
for row in table:
c = 0
for item in row:
rowwidths[c] = len(item) if len(item) > rowwidths[c] else rowwidths[c]
c += 1
return rowwidths
output = StringIO.StringIO()
if path.endswith("/"): path = path[:-1]
dirs = self.list(path, recurse)
for dir in dirs:
if recurse: print >>output, dir[0] + ":"
lsoutput, lscoloutput = [], []
files = dir[1]
maxlen = 0
if ll: # Calculate column width for size column
for file in files:
size = len(str(file.size))
if human_readable_size: size = len(file.human_readable_size)
if size > maxlen: maxlen = size
for file in files:
name = file.name
lsoutput.append(name)
if color: name = file.name_in_color
lscoloutput.append(name)
if ll:
size = str(file.size)
if human_readable_size: size = file.human_readable_size
print >>output, file.mode_string, ("%"+str(maxlen)+"s")%size, file.modification_time, name
if not ll and len(lsoutput) > 0:
trytable = []
for colwidth in range(MINIMUM_COL_WIDTH, cols):
trycols = int(cols/colwidth)
trytable = col_split(lsoutput, trycols)
works = True
for row in trytable:
row_break = False
for item in row:
if len(item) > colwidth - 1:
works, row_break = False, True
break
if row_break: break
if works: break
rowwidths = row_widths(trytable)
trytablecol = col_split(lscoloutput, len(trytable[0]))
for r in range(len(trytable)):
for c in range(len(trytable[r])):
padding = rowwidths[c] - len(trytable[r][c])
print >>output, trytablecol[r][c], "".ljust(padding),
print >>output
print >>output
listing = output.getvalue().rstrip()+ "\n"
output.close()
return listing
def main(argv):
if _term : cols = _term.COLS
else: cols = 70
parser = OptionParser(usage="usage: %prog command [options] args\n\ncommand is one of: ls, get, put or rm\n\n"+
"For help on a particular command: %prog command")
parser.add_option("--log-packets", help="print out packet stream to stdout", dest="log_packets", action="store_true", default=False)
parser.remove_option("-h")
parser.disable_interspersed_args()
options, args = parser.parse_args()
LOG_PACKETS = options.log_packets
if len(args) < 1:
parser.print_help()
sys.exit(1)
command = args[0]
args = args[1:]
dev = PRS500Device()
dev.open()
try:
print " ".join(dev.ls(path))
except PathError, e:
print >> sys.stderr, e
finally:
dev.close()
if command == "ls":
parser = OptionParser(usage="usage: %prog ls [options] path\n\npath must begin with /,a:/ or b:/")
parser.add_option("--color", help="show ls output in color", dest="color", action="store_true", default=False)
parser.add_option("-l", help="In addition to the name of each file, print the file type, permissions, and timestamp (the modification time unless other times are selected)", dest="ll", action="store_true", default=False)
parser.add_option("-R", help="Recursively list subdirectories encountered. /dev and /proc are omitted", dest="recurse", action="store_true", default=False)
parser.remove_option("-h")
parser.add_option("-h", "--human-readable", help="show sizes in human readable format", dest="hrs", action="store_true", default=False)
options, args = parser.parse_args(args)
if len(args) < 1:
parser.print_help()
sys.exit(1)
dev.open()
try:
print dev.ls(args[0], color=options.color, recurse=options.recurse, ll=options.ll, human_readable_size=options.hrs, cols=cols),
except PathError, e:
print >> sys.stderr, e
sys.exit(1)
finally:
dev.close()
else:
parser.print_help()
sys.exit(1)
if __name__ == "__main__":
main(sys.argv[1])
_term = TerminalController()
main(sys.argv)

163
data.py
View File

@ -1,163 +0,0 @@
#!/usr/bin/env python
## Copyright (C) 2006 Kovid Goyal kovid@kovidgoyal.net
## This program is free software; you can redistribute it and/or modify
## it under the terms of the GNU General Public License as published by
## the Free Software Foundation; either version 2 of the License, or
## (at your option) any later version.
##
## This program is distributed in the hope that it will be useful,
## but WITHOUT ANY WARRANTY; without even the implied warranty of
## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
## GNU General Public License for more details.
##
## You should have received a copy of the GNU General Public License along
## with this program; if not, write to the Free Software Foundation, Inc.,
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
import sys, re
from prstypes import *
# The sequence of control commands to send the device before attempting any operations. Should be preceeded by a reset?
initialization = []
initialization.append(\
ControlQuery(TransferBuffer((0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0)),\
TransferBuffer((0, 16, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 12, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)), bulkTransfer=24))
initialization.append(\
ControlQuery(TransferBuffer((0, 16, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 16, 0, 0, 0, 5, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)), \
TransferBuffer((0, 16, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 12, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0))))
initialization.append(\
ControlQuery(TransferBuffer((7, 1, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 0, 128, 2, 0)), \
TransferBuffer((0, 16, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 12, 0, 0, 0, 7, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0))))
initialization.append(\
ControlQuery(TransferBuffer((0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 1, 0, 0, 0)), \
TransferBuffer((0, 16, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 12, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)), bulkTransfer=24))
initialization.append(\
ControlQuery(TransferBuffer((0, 16, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 16, 0, 0, 0, 5, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)), \
TransferBuffer((0, 16, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 12, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0))))
initialization.append(\
ControlQuery(TransferBuffer((6, 1, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 8, 0, 0, 0, 45, 49, 0, 0, 0, 0, 0, 0)), \
TransferBuffer((0, 16, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 12, 0, 0, 0, 6, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0))))
initialization.append(\
ControlQuery(TransferBuffer((1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 1, 0, 0, 0)), \
TransferBuffer((0, 16, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 12, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0))))
end_transaction = \
ControlQuery(TransferBuffer((1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0)),\
TransferBuffer((0, 16, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 12, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)))
def string_to_buffer(string):
""" Convert a string to a TransferBuffer """
return TransferBuffer([ ord(ch) for ch in string ])
class LSQuery(ControlQuery):
"""
Contains all the device specific data (packet formats) needed to implement a simple ls command.
See PRS500Device.ls() to understand how it is used.
"""
PATH_NOT_FOUND_RESPONSE = (0, 16, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 12, 0, 0, 0, 24, 0, 0, 0, 215, 255, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0)
IS_FILE_RESPONSE = (0, 16, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 12, 0, 0, 0, 24, 0, 0, 0, 210, 255, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0)
NOT_MOUNTED_RESPONSE = (0, 16, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 12, 0, 0, 0, 24, 0, 0, 0, 200, 255, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0)
INVALID_PATH_RESPONSE = (0, 16, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 12, 0, 0, 0, 24, 0, 0, 0, 249, 255, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0)
ACKNOWLEDGE_RESPONSE = (0, 16, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 12, 0, 0, 0, 0x35, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
ACKNOWLEDGE_COMMAND = (0, 16, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 16, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
SEND_NAME_COMMAND = (53, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 4 , 0, 0, 0, 0, 0, 0, 0)
EOL_RESPONSE = (0, 16, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 16, 0, 0, 0, 53, 0, 0, 0, 250, 255, 255, 255, 0, 0, 0, 0, 0, 0, 0, 0)
def __init__(self, path, type=1):
self.path = path
if len(self.path) >= 8:
self.path_fragment = self.path[8:]
for i in range(4 - len(self.path_fragment)):
self.path_fragment += '\x00'
self.path_fragment = [ ord(self.path_fragment[i]) for i in range(4) ]
else:
self.path_fragment = [ 0x00 for i in range(4) ]
src = [ 0x00 for i in range(20) ]
if type == 1:
src[0] = 0x18
elif type == 2:
src[0] = 0x33
src[4], src[12], src[16] = 0x01, len(path)+4, len(path)
query = TransferBuffer(src) + string_to_buffer(path)
src = [ 0x00 for i in range(32) ]
src[1], src[4], src[12], src[16] = 0x10, 0x01, 0x0c, 0x18
if type == 2: src[16] = 0x33
ControlQuery.__init__(self, query, TransferBuffer(src), bulkTransfer = 0x28)
def acknowledge_query(self, type, error_type=0):
"""
Return the acknowledge query used after receiving data as part of an ls query
type - should only take values 0,1,2,3 corresponding to the 4 different types of acknowledge queries.
If it takes any other value it is assumed to be zero.
error_type - 0 = no error, 1 = path not found, 2 = is file, 3 = not mounted, 4 = invalid path
"""
if error_type == 1:
response = list(LSQuery.PATH_NOT_FOUND_RESPONSE)
response[4] = 0x00
elif error_type == 2:
response = list(LSQuery.IS_FILE_RESPONSE)
response[4] = 0x00
elif error_type == 3:
response = list(LSQuery.NOT_MOUNTED_RESPONSE)
response[4] = 0x00
elif error_type == 4:
response = list(LSQuery.INVALID_PATH_RESPONSE)
response[4] = 0x00
else: response = list(LSQuery.ACKNOWLEDGE_RESPONSE)
query = list(LSQuery.ACKNOWLEDGE_COMMAND)
response[-4:] = self.path_fragment
if type == 1:
query[16] = 0x03
response[16] = 0x18
elif type == 2:
query[16] = 0x06
response[16] = 0x33
elif type == 3:
query[16] = 0x07
response[16] = 0x35
else: # All other type values are mapped to 0, which is an EOL condition
response[20], response[21], response[22], response[23] = 0xfa, 0xff, 0xff, 0xff
return ControlQuery(TransferBuffer(query), TransferBuffer(response))
def send_name_query(self, buffer):
"""
Return a ControlQuery that will cause the device to send the next name in the list
buffer - TransferBuffer that contains 4 bytes of information that identify the directory we are listing.
Note that the response to this command contains information (the size of the receive buffer for the next bulk read) thus
the expected response is set to null.
"""
query = list(LSQuery.SEND_NAME_COMMAND)
query[-4:] = list(buffer)[-4:]
response = [ 0x00 for i in range(32) ]
return ControlQuery(TransferBuffer(query), TransferBuffer(response))
def main(file):
""" Convenience method for converting spike.pl output to python code. Used to read control packet data from USB logs """
PSF = open(file, 'r')
lines = PSF.readlines()
packets = []
temp = []
for line in lines:
if re.match("\s+$", line):
temp = "".join(temp)
packet = []
for i in range(0, len(temp), 2):
packet.append(int(temp[i]+temp[i+1], 16))
temp = []
packets.append(tuple(packet))
continue
temp = temp + line.split()
print r"seq = []"
for i in range(0, len(packets), 2):
print "seq.append(ControlQuery(TransferBuffer(" + str(packets[i]) + "), TransferBuffer(" + str(packets[i+1]) + ")))"
if __name__ == "__main__":
main(sys.argv[1])

19
epydoc.conf Normal file
View File

@ -0,0 +1,19 @@
[epydoc] # Epydoc section marker (required by ConfigParser)
# Information about the project.
name: My Cool Project
url: http://cool.project/
# The list of modules to document. Modules can be named using
# dotted names, module filenames, or package directory names.
# This option may be repeated.
modules: usb, struct
modules: prstypes.py, communicate.py, errors.py
# Write html output to the directory "apidocs"
output: html
target: apidocs/
# Include all automatically generated graphs. These graphs are
# generated using Graphviz dot.
graph: all

32
errors.py Normal file
View File

@ -0,0 +1,32 @@
#!/usr/bin/env python
from exceptions import Exception
class ProtocolError(Exception):
""" The base class for all exceptions in this package """
def __init__(self, msg):
Exception.__init__(self, msg)
class PacketError(ProtocolError):
""" Errors with creating/interpreting packets """
def __init__(self, msg):
ProtocolError.__init__(self, msg)
class PathError(ProtocolError):
def __init__(self, msg):
Exception.__init__(self, msg)
class ControlError(ProtocolError):
def __init__(self, query=None, response=None, desc=None):
self.query = query
self.response = response
Exception.__init__(self, desc)
def __str__(self):
if self.query and self.response:
return "Got unexpected response:\n" + \
"query:\n"+str(self.query.query)+"\n"+\
"expected:\n"+str(self.query.response)+"\n" +\
"actual:\n"+str(self.response)
if self.desc:
return self.desc
return "Unknown control error occurred"

104
prs-500.e3p Normal file
View File

@ -0,0 +1,104 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE Project SYSTEM "Project-3.9.dtd">
<!-- Project file for project pr5-500 -->
<!-- Saved: 2006-10-31, 21:14:19 -->
<!-- Copyright (C) 2006 Kovid Goyal, kovid@kovidgoyal.net -->
<Project version="3.9">
<ProgLanguage mixed="0">Python</ProgLanguage>
<UIType>Console</UIType>
<Description>Library to communicate with the Sony Reader prs-500 via USB</Description>
<Version></Version>
<Author>Kovid Goyal</Author>
<Email>kovid@kovidgoyal.net</Email>
<Sources>
<Source>
<Name>communicate.py</Name>
</Source>
<Source>
<Name>data.py</Name>
</Source>
<Source>
<Name>prstypes.py</Name>
</Source>
<Source>
<Name>errors.py</Name>
</Source>
</Sources>
<Forms>
</Forms>
<Translations>
</Translations>
<Interfaces>
</Interfaces>
<Others>
</Others>
<MainScript>
<Name>communicate.py</Name>
</MainScript>
<Vcs>
<VcsType>Subversion</VcsType>
<VcsOptions>(dp0
S'status'
p1
(lp2
S''
p3
asS'log'
p4
(lp5
g3
asS'global'
p6
(lp7
g3
asS'update'
p8
(lp9
g3
asS'remove'
p10
(lp11
g3
asS'add'
p12
(lp13
g3
asS'tag'
p14
(lp15
g3
asS'export'
p16
(lp17
g3
asS'commit'
p18
(lp19
g3
asS'diff'
p20
(lp21
g3
asS'checkout'
p22
(lp23
g3
asS'history'
p24
(lp25
g3
as.</VcsOptions>
<VcsOtherData>(dp0
S'standardLayout'
p1
I01
s.</VcsOtherData>
</Vcs>
<FiletypeAssociations>
<FiletypeAssociation pattern="*.ui.h" type="FORMS" />
<FiletypeAssociation pattern="*.ptl" type="SOURCES" />
<FiletypeAssociation pattern="*.idl" type="INTERFACES" />
<FiletypeAssociation pattern="*.ui" type="FORMS" />
<FiletypeAssociation pattern="*.py" type="SOURCES" />
</FiletypeAssociations>
</Project>

View File

@ -14,83 +14,520 @@
## with this program; if not, write to the Free Software Foundation, Inc.,
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""
Contains convenience wrappers for packet data that allow output in the same format as the logs produced by spike.pl
"""
import struct
from errors import PacketError
BYTE = "<B" # Unsigned char little endian encoded in 1 byte
WORD = "<H" # Unsigned short little endian encoded in 2 bytes
DWORD = "<I" # Unsigned integer little endian encoded in 4 bytes
DDWORD = "<Q" # Unsigned long long little endian encoded in 8 bytes
def normalize_buffer(tb):
""" Replace negative bytes by 256 + byte """
nb = list(tb)
for i in range(len(nb)):
if nb[i] < 0:
nb[i] = 256 + nb[i]
return TransferBuffer(nb)
def phex(num):
"""
Return the hex representation of num without the 0x prefix.
If the hex representation is only 1 digit it is padded to the left with a zero.
"""
index, sign = 2, ""
if num < 0:
index, sign = 3, "-"
h=hex(num)[index:]
if len(h) < 2:
h = "0"+h
return sign + h
class TransferBuffer(tuple):
"""
Thin wrapper around tuple to present the string representation of a transfer buffer as in the output of spike.pl """
class TransferBuffer(list):
def __init__(self, packet):
tuple.__init__(packet)
self.packet = packet
"""
packet should be any listable object, or an integer. If it is an integer, a zero buffer of that length is created.
packet is normalized (see TransferBuffer.normalize)
"""
if "__len__" in dir(packet):
list.__init__(self, list(packet))
self.normalize()
else: list.__init__(self, [0 for i in range(packet)])
def __add__(self, tb):
return TransferBuffer(tuple.__add__(self, tb))
""" Return a TransferBuffer rather thana list as the sum """
return TransferBuffer(list.__add__(self, tb))
def __getslice__(self, start, end):
""" Return a TransferBuffer rather than a list as the slice """
return TransferBuffer(list.__getslice__(self, start, end))
def __str__(self):
"""
Return a string representation of this packet in the same format as that produced by spike.pl
Return a string representation of this buffer in the same format as that produced by spike.pl
"""
ans = ""
ans, ascii = "", ""
for i in range(0, len(self), 2):
for b in range(2):
try:
ans = ans + phex(self[i+b])
except IndexError:
break
ans += TransferBuffer.phex(self[i+b])
ascii += chr(self[i+b]) if self[i+b] > 31 and self[i+b] < 127 else "."
except IndexError: break
ans = ans + " "
if (i+2)%16 == 0:
ans = ans + "\n"
ans += "\t" + ascii + "\n"
ascii = ""
if len(ascii) > 0:
last_line = ans[ans.rfind("\n")+1:]
padding = 32 - len(last_line)
ans += "".ljust(padding) + "\t\t" + ascii
return ans.strip()
class ControlQuery:
"""
Container for all the transfer buffers that make up a single query.
def unpack(self, fmt=DWORD, start=0):
""" Return decoded data from buffer. See pydoc struct for fmt. start is position in buffer from which to decode. """
end = start + struct.calcsize(fmt)
return struct.unpack(fmt, "".join([ chr(i) for i in list.__getslice__(self, start, end) ]))
A query has a transmitted buffer, an expected response and an optional buffer that is either read
from or written to via a bulk transfer.
"""
def pack(self, val, fmt=DWORD, start=0):
""" Encode data and write it to buffer. See pydoc struct fmt. start is position in buffer at which to write encoded data. """
self[start:start+struct.calcsize(fmt)] = [ ord(i) for i in struct.pack(fmt, val) ]
def __init__(self, query, response, bulkTransfer=None):
def normalize(self):
""" Replace negative bytes by 256 + byte """
for i in range(len(self)):
if self[i] < 0:
self[i] = 256 + self[i]
@classmethod
def phex(cls, num):
"""
Construct this query.
Return the hex representation of num without the 0x prefix.
query - A TransferBuffer that should be sent to the device on the control pipe
response - A TransferBuffer that the device is expected to return. Used for error checking.
bulkTransfer - If it is a number, it indicates that a buffer of size bulkTransfer should be read from the device via a
bulk read. If it is a TransferBuffer then it will be sent to the device via a bulk write.
If the hex representation is only 1 digit it is padded to the left with a zero.
"""
self.query = query
self.response = response
self.bulkTransfer = bulkTransfer
def __eq__(self, cq):
""" Bulk transfers are not compared to decide equality. """
return self.query == cq.query and self.response == cq.response
index, sign = 2, ""
if num < 0:
index, sign = 3, "-"
h=hex(num)[index:]
if len(h) < 2:
h = "0"+h
return sign + h
class Command(TransferBuffer):
""" Defines the structure of command packets sent to the device. """
def __init__(self, packet):
if ("__len__" in dir(packet) and len(packet) < 16) or ("__len__" not in dir(packet) and packet < 16):
raise PacketError(str(self.__class__)[7:-2] + " packets must have length atleast 16")
TransferBuffer.__init__(self, packet)
@apply
def number():
doc =\
"""
Command number
Observed command numbers are:
0x00001000 -- Acknowledge
0x00000107 -- Purpose unknown, occurs in start_session
0x00000106 -- Purpose unknown, occurs in start_session
"""
def fget(self):
return self.unpack(start=0, fmt=DWORD)[0]
def fset(self, val):
self.pack(val, start=0, fmt=DWORD)
return property(**locals())
@apply
def type():
doc =\
"""
Command type. Known types 0x00, 0x01. Not sure what the type means.
"""
def fget(self):
return self.unpack(start=4, fmt=DDWORD)[0]
def fset(self, val):
self.pack(val, start=4, fmt=DDWORD)
return property(**locals())
@apply
def length():
doc =\
""" Length in bytes of the data part of the query """
def fget(self):
return self.unpack(start=12, fmt=DWORD)[0]
def fset(self, val):
self.pack(val, start=12, fmt=DWORD)
return property(**locals())
@apply
def data():
doc =\
"""
The data part of this command. Returned/set as/by a TransferBuffer.
Setting it by default changes self.length to the length of the new buffer. You may have to reset it to
the significant part of the buffer.
"""
def fget(self):
return self[16:]
def fset(self, buffer):
self[16:] = buffer
self.length = len(buffer)
return property(**locals())
class ShortCommand(Command):
SIZE = 20 #Packet size in bytes
def __init__(self, number=0x00, type=0x00, command=0x00):
""" command must be an integer """
Command.__init__(self, ShortCommand.SIZE)
self.number = number
self.type = type
self.length = 4
self.command = command
@classmethod
def list_command(cls, id):
""" Return the command packet used to ask for the next item in the list """
return ShortCommand(number=0x35, type=0x01, command=id)
@apply
def command():
doc =\
"""
The command. Not sure why this is needed in addition to Command.number
"""
def fget(self):
return self.unpack(start=16, fmt=DWORD)[0]
def fset(self, val):
self.pack(val, start=16, fmt=DWORD)
return property(**locals())
class LongCommand(Command):
SIZE = 32 # Size in bytes of long command packets
def __init__(self, number=0x00, type=0x00, command=0x00):
""" command must be either an integer or a list of not more than 4 integers """
Command.__init__(self, LongCommand.SIZE)
self.number = number
self.type = type
self.length = 16
self.command = command
@apply
def command():
doc =\
"""
The command.
It should be set to a x-integer list, where 0<x<5.
"""
def fget(self):
return self.unpack(start=16, fmt="<"+str(self.length/4)+"I")
def fset(self, val):
if "__len__" not in dir(val): val = (val,)
start = 16
for command in val:
self.pack(command, start=start, fmt=DWORD)
start += struct.calcsize(DWORD)
return property(**locals())
class AcknowledgeBulkRead(LongCommand):
""" Must be sent to device after a bulk read """
def __init__(self, bulk_read_id):
""" bulk_read_id is an integer, the id of the bulk read we are acknowledging """
LongCommand.__init__(self, number=0x1000, type=0x00, command=bulk_read_id)
class PathQuery(Command):
""" Defines structure of commands that request information about a path """
# Command.number values used in path queries
PROPERTIES = 0x18 # Ask for file properties
ID = 0x33 # Ask for query id for a directory listing
def __init__(self, path, number=0x18):
Command.__init__(self, 20 + len(path))
self.number=number
self.type = 0x01
self.length = 4 + len(path)
self.path_length = len(path)
self.path = path
@apply
def path_length():
doc =\
""" The length in bytes of the path to follow """
def fget(self):
return self.unpack(start=16, fmt=DWORD)[0]
def fset(self, val):
self.pack(val, start=16, fmt=DWORD)
return property(**locals())
@apply
def path():
doc =\
""" The path """
def fget(self):
return self.unpack(start=20, fmt="<"+str(self.path_length)+"s")[0]
def fset(self, val):
self.pack(val, start=20, fmt="<"+str(self.path_length)+"s")
return property(**locals())
class Response(Command):
""" Defines the structure of response packets received from the device. """
SIZE = 32 # Size of response packets in the SONY protocol
def __init__(self, packet):
""" len(packet) == Response.SIZE """
if len(packet) != Response.SIZE:
raise PacketError(str(self.__class__)[7:-2] + " packets must have exactly " + str(Response.SIZE) + " bytes not " + str(len(packet)))
Command.__init__(self, packet)
if self.number != 0x00001000:
raise PacketError("Response packets must have their number set to " + hex(0x00001000))
@apply
def rnumber():
doc =\
"""
The response number.
It will be the command number from a command that was sent to the device sometime before this response.
"""
def fget(self):
return self.unpack(start=16, fmt=DWORD)[0]
def fset(self, val):
self.pack(val, start=16, fmt=DWORD)
return property(**locals())
@apply
def data():
doc =\
""" The last 3 DWORDs of data in this response packet. Returned as a list. """
def fget(self):
return self.unpack(start=20, fmt="<III")
def fset(self, val):
self.pack(val, start=20, fmt="<III")
return property(**locals())
class ListResponse(Response):
""" Defines the structure of response packets received during list (ll) queries """
IS_FILE = 0xffffffd2
IS_INVALID = 0xfffffff9
IS_UNMOUNTED = 0xffffffc8
IS_EOL = 0xfffffffa
PATH_NOT_FOUND = 0xffffffd7
@apply
def code():
doc =\
"""
The response code. Used to indicate conditions like EOL/Error/IsFile
fmt=DWORD
"""
def fget(self):
return self.unpack(start=20, fmt=DDWORD)[0]
def fset(self, val):
self.pack(val, start=20, fmt=DDWORD)
return property(**locals())
@apply
def is_file():
def fget(self):
return self.code == ListResponse.IS_FILE
return property(**locals())
@apply
def is_invalid():
def fget(self):
return self.code == ListResponse.IS_INVALID
return property(**locals())
@apply
def path_not_found():
def fget(self):
return self.code == ListResponse.PATH_NOT_FOUND
return property(**locals())
@apply
def is_unmounted():
def fget(self):
return self.code == ListResponse.IS_UNMOUNTED
return property(**locals())
@apply
def is_eol():
def fget(self):
return self.code == ListResponse.IS_EOL
return property(**locals())
class Answer(TransferBuffer):
""" Defines the structure of packets sent to host via a bulk transfer (i.e., bulk reads) """
def __init__(self, packet):
""" packet must be a listable object of length >= 16 """
if len(packet) < 16 : raise PacketError(str(self.__class__)[7:-2] + " packets must have a length of atleast 16 bytes")
TransferBuffer.__init__(self, packet)
@apply
def id():
doc =\
""" The id of this bulk transfer packet """
def fget(self):
return self.unpack(start=0, fmt=DWORD)[0]
def fset(self, val):
self.pack(val, start=0, fmt=DWORD)
return property(**locals())
class PathAnswer(Answer):
""" Defines the structure of packets that contain size, date and permissions information about files/directories. """
@apply
def file_size():
doc =\
""" The file size """
def fget(self):
return self.unpack(start=16, fmt=DDWORD)[0]
def fset(self, val):
self.pack(val, start=16, fmt=DDWORD)
return property(**locals())
@apply
def is_dir():
doc =\
""" True if path points to a directory, False if it points to a file """
def fget(self):
return (self.unpack(start=24, fmt=DWORD)[0] == 2)
def fset(self, val):
if val: val = 2
else: val = 1
self.pack(val, start=24, fmt=DWORD)
return property(**locals())
@apply
def ctime():
doc =\
""" The creation time of this file/dir as an epoch """
def fget(self):
return self.unpack(start=28, fmt=DWORD)[0]
def fset(self, val):
self.pack(val, start=28, fmt=DWORD)
return property(**locals())
@apply
def wtime():
doc =\
""" The modification time of this file/dir as an epoch """
def fget(self):
return self.unpack(start=32, fmt=DWORD)[0]
def fset(self, val):
self.pack(val, start=32, fmt=DWORD)
return property(**locals())
@apply
def is_readonly():
doc =\
""" Whether this file is readonly """
def fget(self):
return self.unpack(start=36, fmt=DWORD)[0] != 0
def fset(self, val):
if val: val = 4
else: val = 0
self.pack(val, start=36, fmt=DWORD)
return property(**locals())
class IdAnswer(Answer):
""" Defines the structure of packets that contain identifiers for directories. """
@apply
def id():
doc =\
""" The identifier """
def fget(self):
return self.unpack(start=16, fmt=DWORD)[0]
def fset(self, val):
self.pack(val, start=16, fmt=DWORD)
return property(**locals())
class ListAnswer(Answer):
""" Defines the structure of packets that contain items in a list. """
@apply
def is_dir():
doc =\
""" True if list item points to a directory, False if it points to a file """
def fget(self):
return (self.unpack(start=16, fmt=DWORD)[0] == 2)
def fset(self, val):
if val: val = 2
else: val = 1
self.pack(val, start=16, fmt=DWORD)
return property(**locals())
@apply
def name_length():
doc =\
""" The length in bytes of the list item to follow """
def fget(self):
return self.unpack(start=20, fmt=DWORD)[0]
def fset(self, val):
self.pack(val, start=20, fmt=DWORD)
return property(**locals())
@apply
def name():
doc =\
""" The name of the list item """
def fget(self):
return self.unpack(start=24, fmt="<"+str(self.name_length)+"s")[0]
def fset(self, val):
self.pack(val, start=24, fmt="<"+str(self.name_length)+"s")
return property(**locals())

192
terminfo.py Normal file
View File

@ -0,0 +1,192 @@
import sys, re
class TerminalController:
"""
A class that can be used to portably generate formatted output to
a terminal.
`TerminalController` defines a set of instance variables whose
values are initialized to the control sequence necessary to
perform a given action. These can be simply included in normal
output to the terminal:
>>> term = TerminalController()
>>> print 'This is '+term.GREEN+'green'+term.NORMAL
Alternatively, the `render()` method can used, which replaces
'${action}' with the string required to perform 'action':
>>> term = TerminalController()
>>> print term.render('This is ${GREEN}green${NORMAL}')
If the terminal doesn't support a given action, then the value of
the corresponding instance variable will be set to ''. As a
result, the above code will still work on terminals that do not
support color, except that their output will not be colored.
Also, this means that you can test whether the terminal supports a
given action by simply testing the truth value of the
corresponding instance variable:
>>> term = TerminalController()
>>> if term.CLEAR_SCREEN:
... print 'This terminal supports clearning the screen.'
Finally, if the width and height of the terminal are known, then
they will be stored in the `COLS` and `LINES` attributes.
"""
# Cursor movement:
BOL = '' #: Move the cursor to the beginning of the line
UP = '' #: Move the cursor up one line
DOWN = '' #: Move the cursor down one line
LEFT = '' #: Move the cursor left one char
RIGHT = '' #: Move the cursor right one char
# Deletion:
CLEAR_SCREEN = '' #: Clear the screen and move to home position
CLEAR_EOL = '' #: Clear to the end of the line.
CLEAR_BOL = '' #: Clear to the beginning of the line.
CLEAR_EOS = '' #: Clear to the end of the screen
# Output modes:
BOLD = '' #: Turn on bold mode
BLINK = '' #: Turn on blink mode
DIM = '' #: Turn on half-bright mode
REVERSE = '' #: Turn on reverse-video mode
NORMAL = '' #: Turn off all modes
# Cursor display:
HIDE_CURSOR = '' #: Make the cursor invisible
SHOW_CURSOR = '' #: Make the cursor visible
# Terminal size:
COLS = None #: Width of the terminal (None for unknown)
LINES = None #: Height of the terminal (None for unknown)
# Foreground colors:
BLACK = BLUE = GREEN = CYAN = RED = MAGENTA = YELLOW = WHITE = ''
# Background colors:
BG_BLACK = BG_BLUE = BG_GREEN = BG_CYAN = ''
BG_RED = BG_MAGENTA = BG_YELLOW = BG_WHITE = ''
_STRING_CAPABILITIES = """
BOL=cr UP=cuu1 DOWN=cud1 LEFT=cub1 RIGHT=cuf1
CLEAR_SCREEN=clear CLEAR_EOL=el CLEAR_BOL=el1 CLEAR_EOS=ed BOLD=bold
BLINK=blink DIM=dim REVERSE=rev UNDERLINE=smul NORMAL=sgr0
HIDE_CURSOR=cinvis SHOW_CURSOR=cnorm""".split()
_COLORS = """BLACK BLUE GREEN CYAN RED MAGENTA YELLOW WHITE""".split()
_ANSICOLORS = "BLACK RED GREEN YELLOW BLUE MAGENTA CYAN WHITE".split()
def __init__(self, term_stream=sys.stdout):
"""
Create a `TerminalController` and initialize its attributes
with appropriate values for the current terminal.
`term_stream` is the stream that will be used for terminal
output; if this stream is not a tty, then the terminal is
assumed to be a dumb terminal (i.e., have no capabilities).
"""
# Curses isn't available on all platforms
try: import curses
except: return
# If the stream isn't a tty, then assume it has no capabilities.
if not term_stream.isatty(): return
# Check the terminal type. If we fail, then assume that the
# terminal has no capabilities.
try: curses.setupterm()
except: return
# Look up numeric capabilities.
self.COLS = curses.tigetnum('cols')
self.LINES = curses.tigetnum('lines')
# Look up string capabilities.
for capability in self._STRING_CAPABILITIES:
(attrib, cap_name) = capability.split('=')
setattr(self, attrib, self._tigetstr(cap_name) or '')
# Colors
set_fg = self._tigetstr('setf')
if set_fg:
for i,color in zip(range(len(self._COLORS)), self._COLORS):
setattr(self, color, curses.tparm(set_fg, i) or '')
set_fg_ansi = self._tigetstr('setaf')
if set_fg_ansi:
for i,color in zip(range(len(self._ANSICOLORS)), self._ANSICOLORS):
setattr(self, color, curses.tparm(set_fg_ansi, i) or '')
set_bg = self._tigetstr('setb')
if set_bg:
for i,color in zip(range(len(self._COLORS)), self._COLORS):
setattr(self, 'BG_'+color, curses.tparm(set_bg, i) or '')
set_bg_ansi = self._tigetstr('setab')
if set_bg_ansi:
for i,color in zip(range(len(self._ANSICOLORS)), self._ANSICOLORS):
setattr(self, 'BG_'+color, curses.tparm(set_bg_ansi, i) or '')
def _tigetstr(self, cap_name):
# String capabilities can include "delays" of the form "$<2>".
# For any modern terminal, we should be able to just ignore
# these, so strip them out.
import curses
cap = curses.tigetstr(cap_name) or ''
return re.sub(r'\$<\d+>[/*]?', '', cap)
def render(self, template):
"""
Replace each $-substitutions in the given template string with
the corresponding terminal control string (if it's defined) or
'' (if it's not).
"""
return re.sub(r'\$\$|\${\w+}', self._render_sub, template)
def _render_sub(self, match):
s = match.group()
if s == '$$': return s
else: return getattr(self, s[2:-1])
#######################################################################
# Example use case: progress bar
#######################################################################
class ProgressBar:
"""
A 3-line progress bar, which looks like::
Header
20% [===========----------------------------------]
progress message
The progress bar is colored, if the terminal supports color
output; and adjusts to the width of the terminal.
"""
BAR = '%3d%% ${GREEN}[${BOLD}%s%s${NORMAL}${GREEN}]${NORMAL}\n'
HEADER = '${BOLD}${CYAN}%s${NORMAL}\n\n'
def __init__(self, term, header):
self.term = term
if not (self.term.CLEAR_EOL and self.term.UP and self.term.BOL):
raise ValueError("Terminal isn't capable enough -- you "
"should use a simpler progress dispaly.")
self.width = self.term.COLS or 75
self.bar = term.render(self.BAR)
self.header = self.term.render(self.HEADER % header.center(self.width))
self.cleared = 1 #: true if we haven't drawn the bar yet.
self.update(0, '')
def update(self, percent, message):
if self.cleared:
sys.stdout.write(self.header)
self.cleared = 0
n = int((self.width-10)*percent)
sys.stdout.write(
self.term.BOL + self.term.UP + self.term.CLEAR_EOL +
(self.bar % (100*percent, '='*n, '-'*(self.width-10-n))) +
self.term.CLEAR_EOL + message.center(self.width))
def clear(self):
if not self.cleared:
sys.stdout.write(self.term.BOL + self.term.CLEAR_EOL +
self.term.UP + self.term.CLEAR_EOL +
self.term.UP + self.term.CLEAR_EOL)
self.cleared = 1