py3: Port terminal I/O module

This commit is contained in:
Kovid Goyal 2019-04-03 14:28:51 +05:30
parent eaac4d254d
commit 3c64ea0995
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
2 changed files with 54 additions and 20 deletions

View File

@ -9,8 +9,8 @@ __docformat__ = 'restructuredtext en'
import os, sys, re import os, sys, re
from calibre.constants import iswindows from calibre.constants import iswindows, ispy3
from polyglot.builtins import iteritems, range, zip from polyglot.builtins import iteritems, range, zip, native_string_type
if iswindows: if iswindows:
import ctypes.wintypes import ctypes.wintypes
@ -26,7 +26,7 @@ if iswindows:
def fmt(code): def fmt(code):
return ('\033[%dm'%code).encode('ascii') return '\033[%dm' % code
RATTRIBUTES = dict( RATTRIBUTES = dict(
@ -96,11 +96,11 @@ def colored(text, fg=None, bg=None, bold=False):
prefix.append(BACKGROUNDS[bg]) prefix.append(BACKGROUNDS[bg])
if bold: if bold:
prefix.append(ATTRIBUTES['bold']) prefix.append(ATTRIBUTES['bold'])
prefix = b''.join(prefix) prefix = ''.join(prefix)
suffix = RESET suffix = RESET
if isinstance(text, type(u'')): if isinstance(text, bytes):
prefix = prefix.decode('ascii') prefix = prefix.encode('ascii')
suffix = suffix.decode('ascii') suffix = suffix.encode('ascii')
return prefix + text + suffix return prefix + text + suffix
@ -128,7 +128,7 @@ class Detect(object):
# Stream is a console # Stream is a console
self.set_console = windll.kernel32.SetConsoleTextAttribute self.set_console = windll.kernel32.SetConsoleTextAttribute
self.default_console_text_attributes = WCOLORS['white'] self.default_console_text_attributes = WCOLORS['white']
kernel32 = WinDLL(b'kernel32', use_last_error=True) kernel32 = WinDLL(native_string_type('kernel32'), use_last_error=True)
self.write_console = kernel32.WriteConsoleW self.write_console = kernel32.WriteConsoleW
self.write_console.argtypes = [wintypes.HANDLE, wintypes.c_wchar_p, wintypes.DWORD, POINTER(wintypes.DWORD), wintypes.LPVOID] self.write_console.argtypes = [wintypes.HANDLE, wintypes.c_wchar_p, wintypes.DWORD, POINTER(wintypes.DWORD), wintypes.LPVOID]
self.write_console.restype = wintypes.BOOL self.write_console.restype = wintypes.BOOL
@ -151,9 +151,13 @@ class Detect(object):
while text: while text:
t, text = text[:chunk], text[chunk:] t, text = text[:chunk], text[chunk:]
wt = c_wchar_p(t) wt = c_wchar_p(t)
# Use the fact that len(t) == wcslen(wt) in python 2.7 on if ispy3:
# windows where the python unicode type uses UTF-16 text_len = len(t.encode('utf-16'))
if not self.write_console(self.file_handle, wt, len(t), byref(written), None): else:
# Use the fact that len(t) == wcslen(wt) in python 2.7 on
# windows where the python unicode type uses UTF-16
text_len = len(t)
if not self.write_console(self.file_handle, wt, text_len, byref(written), None):
# Older versions of windows can fail to write large strings # Older versions of windows can fail to write large strings
# to console with WriteConsoleW (seen it happen on Win XP) # to console with WriteConsoleW (seen it happen on Win XP)
import winerror import winerror
@ -217,17 +221,27 @@ class ColoredStream(Detect):
class ANSIStream(Detect): class ANSIStream(Detect):
ANSI_RE = re.compile(br'\033\[((?:\d|;)*)([a-zA-Z])') ANSI_RE = r'\033\[((?:\d|;)*)([a-zA-Z])'
def __init__(self, stream=None): def __init__(self, stream=None):
super(ANSIStream, self).__init__(stream) super(ANSIStream, self).__init__(stream)
self.encoding = getattr(self.stream, 'encoding', 'utf-8') or 'utf-8' self.encoding = getattr(self.stream, 'encoding', 'utf-8') or 'utf-8'
self.stream_takes_unicode = hasattr(self.stream, 'buffer')
self.last_state = (None, None, False) self.last_state = (None, None, False)
self._ansi_re_bin = self._ansi_re_unicode = None
def ansi_re(self, binary=False):
attr = '_ansi_re_bin' if binary else '_ansi_re_unicode'
ans = getattr(self, attr)
if ans is None:
expr = self.ANSI_RE
if binary:
expr = expr.encode('ascii')
ans = re.compile(expr)
setattr(self, attr, ans)
return ans
def write(self, text): def write(self, text):
if isinstance(text, type(u'')):
text = text.encode(self.encoding, 'replace')
if not self.isatty: if not self.isatty:
return self.strip_and_write(text) return self.strip_and_write(text)
@ -239,8 +253,22 @@ class ANSIStream(Detect):
self.write_and_convert(text) self.write_and_convert(text)
def polyglot_write(self, text):
binary = isinstance(text, bytes)
stream = self.stream
if self.stream_takes_unicode:
if binary:
stream = self.stream.buffer
else:
if not binary:
text = text.encode(self.encoding, 'replace')
stream.write(text)
def strip_and_write(self, text): def strip_and_write(self, text):
self.stream.write(self.ANSI_RE.sub(b'', text)) binary = isinstance(text, bytes)
pat = self.ansi_re(binary)
repl = b'' if binary else ''
self.polyglot_write(pat.sub(repl, text))
def write_and_convert(self, text): def write_and_convert(self, text):
''' '''
@ -249,7 +277,8 @@ class ANSIStream(Detect):
calls. calls.
''' '''
cursor = 0 cursor = 0
for match in self.ANSI_RE.finditer(text): binary = isinstance(text, bytes)
for match in self.ansi_re(binary).finditer(text):
start, end = match.span() start, end = match.span()
self.write_plain_text(text, cursor, start) self.write_plain_text(text, cursor, start)
self.convert_ansi(*match.groups()) self.convert_ansi(*match.groups())
@ -268,21 +297,25 @@ class ANSIStream(Detect):
pass pass
else: else:
return self.write_unicode_text(utext) return self.write_unicode_text(utext)
self.stream.write(text) self.polyglot_write(text)
def convert_ansi(self, paramstring, command): def convert_ansi(self, paramstring, command):
if isinstance(paramstring, bytes):
paramstring = paramstring.decode('ascii', 'replace')
if isinstance(command, bytes):
command = command.decode('ascii', 'replace')
params = self.extract_params(paramstring) params = self.extract_params(paramstring)
self.call_win32(command, params) self.call_win32(command, params)
def extract_params(self, paramstring): def extract_params(self, paramstring):
def split(paramstring): def split(paramstring):
for p in paramstring.split(b';'): for p in paramstring.split(';'):
if p: if p:
yield int(p) yield int(p)
return tuple(split(paramstring)) return tuple(split(paramstring))
def call_win32(self, command, params): def call_win32(self, command, params):
if command != b'm': if command != 'm':
return return
fg, bg, bold = self.last_state fg, bg, bold = self.last_state

View File

@ -7,6 +7,7 @@ from __future__ import absolute_import, division, print_function, unicode_litera
import sys import sys
is_py3 = sys.version_info.major >= 3 is_py3 = sys.version_info.major >= 3
native_string_type = str
def iterkeys(d): def iterkeys(d):