Drop the dependency on dukpy

The dukpy tests are failing on windows with VS 2017. Instead use Web
Engine to compile rapydscript code, when the rapydscript binary is not
present.
This commit is contained in:
Kovid Goyal 2019-06-26 20:17:01 +05:30
parent a6576f0eff
commit 82083ddbb8
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
7 changed files with 128 additions and 867 deletions

View File

@ -4,10 +4,6 @@ License: GPL-3
The full text of the GPL is distributed as in
/usr/share/common-licenses/GPL-3 on Debian systems.
Files: src/duktape/*
Copyright: Various
License: MIT
Files: resources/content-server/font-awesome/*
Copyright: Various
License: MIT and SIL OFL

View File

@ -639,17 +639,6 @@
}
},
{
"name": "dukpy",
"python": 2,
"unix": {
"filename": "dukpy-0.3.tar.gz",
"hash": "sha1:31c67f087f9d55798f2c00aba46fa18c3d8fb8aa",
"urls": ["https://github.com/kovidgoyal/dukpy/archive/v0.3.tar.gz"]
}
},
{
"name": "chardet",
"unix": {

Binary file not shown.

View File

@ -316,8 +316,6 @@ def find_tests():
ans = unittest.defaultTestLoader.loadTestsFromTestCase(BuildTest)
from calibre.utils.icu_test import find_tests
ans.addTests(find_tests())
import duktape.tests as dtests
ans.addTests(unittest.defaultTestLoader.loadTestsFromModule(dtests))
from tinycss.tests.main import find_tests
ans.addTests(find_tests())
from calibre.spell.dictionary import find_tests

View File

@ -3,29 +3,19 @@
# License: GPLv3 Copyright: 2015, Kovid Goyal <kovid at kovidgoyal.net>
from __future__ import absolute_import, division, print_function, unicode_literals
import atexit
import errno
import glob
import json
import os
import re
import shutil
import subprocess
import sys
from functools import partial
from io import BytesIO
from threading import Thread, local
from lzma.xz import compress, decompress
from calibre import force_unicode
from calibre.constants import (
FAKE_HOST, FAKE_PROTOCOL, __appname__, __version__, cache_dir
)
from calibre.constants import FAKE_HOST, FAKE_PROTOCOL, __appname__, __version__
from calibre.ptempfile import TemporaryDirectory
from calibre.utils.filenames import atomic_rename
from calibre.utils.terminal import ANSIStream
from duktape import Context, JSError, to_python
from lzma.xz import compress, decompress
from polyglot.builtins import itervalues, range, exec_path, raw_input, error_message, filter, getcwd, zip, unicode_type
from polyglot.queue import Empty, Queue
from polyglot.builtins import exec_path, itervalues, unicode_type, zip, as_bytes
COMPILER_PATH = 'rapydscript/compiler.js.xz'
special_title = '__webengine_messages_pending__'
@ -41,49 +31,142 @@ def update_rapydscript():
d = os.path.dirname
base = d(d(d(d(d(abspath(__file__))))))
base = os.path.join(base, 'rapydscript')
raw = subprocess.check_output(['node', '--harmony', os.path.join(base, 'bin', 'export')])
if isinstance(raw, unicode_type):
raw = raw.encode('utf-8')
with TemporaryDirectory() as tdir:
subprocess.check_call(['node', '--harmony', os.path.join(base, 'bin', 'web-repl-export'), tdir])
with open(os.path.join(tdir, 'rapydscript.js'), 'rb') as f:
raw = f.read()
path = P(COMPILER_PATH, allow_user_override=False)
with open(path, 'wb') as f:
compress(raw, f, 9)
base = os.path.join(base, 'src', 'lib')
dest = os.path.join(P('rapydscript', allow_user_override=False), 'lib')
if not os.path.exists(dest):
os.mkdir(dest)
for x in glob.glob(os.path.join(base, '*.pyj')):
shutil.copy2(x, dest)
# }}}
# Compiler {{{
tls = local()
def to_dict(obj):
return dict(zip(list(obj.keys()), list(obj.values())))
def compiler():
c = getattr(tls, 'compiler', None)
if c is None:
c = tls.compiler = Context()
c.eval('exports = {}; sha1sum = Duktape.sha1sum;', noreturn=True)
buf = BytesIO()
decompress(P(COMPILER_PATH, data=True, allow_user_override=False), buf)
c.eval(buf.getvalue().decode('utf-8'), fname=COMPILER_PATH, noreturn=True)
return c
ans = getattr(compiler, 'ans', None)
if ans is not None:
return ans
from calibre import walk
from calibre.gui2 import must_use_qt
from calibre.gui2.webengine import secure_webengine
from PyQt5.QtWebEngineWidgets import QWebEnginePage, QWebEngineScript
from PyQt5.Qt import QApplication, QEventLoop
must_use_qt()
buf = BytesIO()
decompress(P(COMPILER_PATH, data=True, allow_user_override=False), buf)
base = base_dir()
rapydscript_dir = os.path.join(base, 'src', 'pyj')
cache_path = os.path.join(module_cache_dir(), 'embedded-compiler-write-cache.json')
def create_vfs():
ans = {}
for x in walk(rapydscript_dir):
if x.endswith('.pyj'):
r = os.path.relpath(x, rapydscript_dir).replace('\\', '/')
with open(x, 'rb') as f:
ans['__stdlib__/' + r] = f.read().decode('utf-8')
return ans
def vfs_script():
try:
with open(cache_path, 'rb') as f:
write_cache = f.read().decode('utf-8')
except Exception:
write_cache = '{}'
return '''
(function() {
"use strict";
var vfs = VFS;
function read_file_sync(name) {
var ans = vfs[name];
if (ans) return ans;
ans = write_cache[name];
if (ans) return ans;
return null;
}
function write_file_sync(name, data) {
write_cache[name] = data;
}
RapydScript.virtual_file_system = {
'read_file_sync': read_file_sync,
'write_file_sync': write_file_sync
};
window.compiler = RapydScript.create_embedded_compiler();
document.title = 'compiler initialized';
})();
'''.replace('VFS', json.dumps(create_vfs()) + ';\n' + 'window.write_cache = ' + write_cache, 1)
def create_script(src, name):
s = QWebEngineScript()
s.setName(name)
s.setInjectionPoint(QWebEngineScript.DocumentReady)
s.setWorldId(QWebEngineScript.ApplicationWorld)
s.setRunsOnSubFrames(True)
s.setSourceCode(src)
return s
class Compiler(QWebEnginePage):
def __init__(self):
QWebEnginePage.__init__(self)
self.errors = []
secure_webengine(self)
script = buf.getvalue().decode('utf-8')
script += '\n\n;;\n\n' + vfs_script()
self.scripts().insert(create_script(script, 'rapydscript.js'))
self.setHtml('<p>initialize')
while self.title() != 'compiler initialized':
self.spin_loop()
def spin_loop(self):
QApplication.instance().processEvents(QEventLoop.ExcludeUserInputEvents)
def javaScriptConsoleMessage(self, level, msg, line_num, source_id):
if level:
self.errors.append(msg)
else:
print('{}:{}:{}'.format(source_id, line_num, msg))
def __call__(self, src, options):
self.compiler_result = None
self.errors = []
self.working = True
options['basedir'] = '__stdlib__'
src = 'var js = window.compiler.compile({}, {}); [js, window.write_cache]'.format(*map(json.dumps, (src, options)))
self.runJavaScript(src, QWebEngineScript.ApplicationWorld, self.compilation_done)
while self.working:
self.spin_loop()
if self.compiler_result is None:
raise CompileFailure('Failed to compile rapydscript code with error: ' + '\n'.join(self.errors))
write_cache = self.compiler_result[1]
with open(cache_path, 'wb') as f:
f.write(as_bytes(json.dumps(write_cache)))
return self.compiler_result[0]
def compilation_done(self, js):
self.working = False
self.compiler_result = js
compiler.ans = Compiler()
return compiler.ans
class CompileFailure(ValueError):
pass
def default_lib_dir():
return P('rapydscript/lib', allow_user_override=False)
_cache_dir = None
@ -105,38 +188,14 @@ def compile_pyj(data, filename='<stdin>', beautify=True, private_scope=True, lib
if isinstance(data, bytes):
data = data.decode('utf-8')
c = compiler()
c.g.current_options = {
result = c(data, {
'beautify':beautify,
'private_scope':private_scope,
'omit_baselib': omit_baselib,
'libdir': libdir or default_lib_dir(),
'basedir': getcwd() if not filename or filename == '<stdin>' else os.path.dirname(filename),
'keep_baselib': not omit_baselib,
'filename': filename,
'js_version': js_version,
}
c.g.rs_source_code = data
ok, result = c.eval(
'''
ans = [null, null];
try {
ans = [true, exports["compile"](rs_source_code, %s, current_options)];
} catch(e) {
ans = [false, e]
}
ans;
''' % json.dumps(filename))
if ok:
return result
presult = to_python(result)
if 'message' in result:
msg = presult['message']
if 'filename' in presult and 'line' in presult:
msg = '%s:%s:%s' % (presult['filename'], presult['line'], msg)
raise CompileFailure(msg)
if result.stack:
# Javascript error object instead of ParseError
raise CompileFailure(result.stack)
raise CompileFailure(repr(presult))
})
return result
has_external_compiler = None
@ -166,7 +225,9 @@ def compile_fast(data, filename=None, beautify=True, private_scope=True, libdir=
has_external_compiler = detect_external_compiler()
if not has_external_compiler:
return compile_pyj(data, filename or '<stdin>', beautify, private_scope, libdir, omit_baselib, js_version or 6)
args = ['--cache-dir', module_cache_dir(), '--import-path', libdir or default_lib_dir()]
args = ['--cache-dir', module_cache_dir()]
if libdir:
args += ['--import-path', libdir]
if not beautify:
args.append('--uglify')
if not private_scope:
@ -303,198 +364,3 @@ def msgfmt(po_data_as_string):
ctx.g.msgfmt_options = {'use_fuzzy': False}
return ctx.eval('exports.msgfmt(po_data, msgfmt_options)')
# }}}
# REPL {{{
def leading_whitespace(line):
return line[:len(line) - len(line.lstrip())]
def format_error(data):
return ':'.join(map(unicode_type, (data['file'], data['line'], data['col'], data['message'])))
class Repl(Thread):
LINE_CONTINUATION_CHARS = r'\:'
daemon = True
def __init__(self, ps1='>>> ', ps2='... ', show_js=False, libdir=None):
Thread.__init__(self, name='RapydScriptREPL')
self.to_python = to_python
self.JSError = JSError
self.enc = getattr(sys.stdin, 'encoding', None) or 'utf-8'
try:
import readline
self.readline = readline
except ImportError:
pass
self.output = ANSIStream(sys.stdout)
self.to_repl = Queue()
self.from_repl = Queue()
self.ps1, self.ps2 = ps1, ps2
self.show_js, self.libdir = show_js, libdir
self.prompt = ''
self.completions = None
self.start()
def init_ctx(self):
self.prompt = self.ps1
self.ctx = compiler()
self.ctx.g.Duktape.write = self.output.write
self.ctx.eval(r'''console = { log: function() { Duktape.write(Array.prototype.slice.call(arguments).join(' ') + '\n');}};
console['error'] = console['log'];''')
self.ctx.g.repl_options = {
'show_js': self.show_js,
'histfile':False,
'input':True, 'output':True, 'ps1':self.ps1, 'ps2':self.ps2,
'terminal':self.output.isatty,
'enum_global': 'Object.keys(this)',
'lib_path': self.libdir or os.path.dirname(P(COMPILER_PATH)) # TODO: Change this to load pyj files from the src code
}
def get_from_repl(self):
while True:
try:
return self.from_repl.get(True, 1)
except Empty:
if not self.is_alive():
raise SystemExit(1)
def run(self):
self.init_ctx()
rl = None
def set_prompt(p):
self.prompt = p
def prompt(lw):
self.from_repl.put(to_python(lw))
self.ctx.g.set_prompt = set_prompt
self.ctx.g.prompt = prompt
self.ctx.eval('''
listeners = {};
rl = {
setPrompt:set_prompt,
write:Duktape.write,
clearLine: function() {},
on: function(ev, cb) { listeners[ev] = cb; return rl; },
prompt: prompt,
sync_prompt: true,
send_line: function(line) { listeners['line'](line); },
send_interrupt: function() { listeners['SIGINT'](); },
close: function() {listeners['close'](); },
};
repl_options.readline = { createInterface: function(options) { rl.completer = options.completer; return rl; }};
exports.init_repl(repl_options)
''', fname='<init repl>')
rl = self.ctx.g.rl
completer = to_python(rl.completer)
send_interrupt = to_python(rl.send_interrupt)
send_line = to_python(rl.send_line)
while True:
ev, line = self.to_repl.get()
try:
if ev == 'SIGINT':
self.output.write('\n')
send_interrupt()
elif ev == 'line':
send_line(line)
else:
val = completer(line)
val = to_python(val)
self.from_repl.put(val[0])
except Exception as e:
if isinstance(e, JSError):
print(e.stack or error_message(e), file=sys.stderr)
else:
import traceback
traceback.print_exc()
for i in range(100):
# Do this many times to ensure we dont deadlock
self.from_repl.put(None)
def __call__(self):
if hasattr(self, 'readline'):
history = os.path.join(cache_dir(), 'pyj-repl-history.txt')
self.readline.parse_and_bind("tab: complete")
try:
self.readline.read_history_file(history)
except EnvironmentError as e:
if e.errno != errno.ENOENT:
raise
atexit.register(partial(self.readline.write_history_file, history))
def completer(text, num):
if self.completions is None:
self.to_repl.put(('complete', text))
self.completions = list(filter(None, self.get_from_repl()))
if not self.completions:
return None
try:
return self.completions[num]
except (IndexError, TypeError, AttributeError, KeyError):
self.completions = None
if hasattr(self, 'readline'):
self.readline.set_completer(completer)
while True:
lw = self.get_from_repl()
if lw is None:
raise SystemExit(1)
q = self.prompt
if hasattr(self, 'readline'):
self.readline.set_pre_input_hook(lambda:(self.readline.insert_text(lw), self.readline.redisplay()))
else:
q += lw
try:
line = raw_input(q)
self.to_repl.put(('line', line))
except EOFError:
return
except KeyboardInterrupt:
self.to_repl.put(('SIGINT', None))
# }}}
def main(args=sys.argv):
import argparse
ver = compiler().g.exports.rs_version
parser = argparse.ArgumentParser(prog='pyj',
description='RapydScript compiler and REPL. If passed input on stdin, it is compiled and written to stdout. Otherwise a REPL is started.')
parser.add_argument('--version', action='version',
version='Using RapydScript compiler version: '+ver)
parser.add_argument('--show-js', action='store_true', help='Have the REPL output the compiled javascript before executing it')
parser.add_argument('--libdir', help='Where to look for imported modules')
parser.add_argument('--omit-baselib', action='store_true', default=False, help='Omit the RapydScript base library')
parser.add_argument('--no-private-scope', action='store_true', default=False, help='Do not wrap the output in its own private scope')
args = parser.parse_args(args)
libdir = os.path.expanduser(args.libdir) if args.libdir else None
if sys.stdin.isatty():
Repl(show_js=args.show_js, libdir=libdir)()
else:
try:
enc = getattr(sys.stdin, 'encoding', 'utf-8') or 'utf-8'
data = compile_pyj(sys.stdin.read().decode(enc), libdir=libdir, private_scope=not args.no_private_scope, omit_baselib=args.omit_baselib)
print(data.encode(enc))
except JSError as e:
raise SystemExit(error_message(e))
except CompileFailure as e:
raise SystemExit(error_message(e))
def entry():
main(sys.argv[1:])
if __name__ == '__main__':
main()

View File

@ -1,383 +0,0 @@
#!/usr/bin/env python2
# vim:fileencoding=UTF-8:ts=4:sw=4:sta:et:sts=4:ai
from __future__ import absolute_import, division, print_function, unicode_literals
__copyright__ = '2011, Kovid Goyal <kovid@kovidgoyal.net>'
__docformat__ = 'restructuredtext en'
__all__ = ['dukpy', 'Context', 'undefined', 'JSError', 'to_python']
import errno, os, sys, numbers, hashlib, json
from functools import partial
import dukpy
from calibre.constants import iswindows
from calibre.utils.filenames import atomic_rename
from polyglot.builtins import error_message, getcwd, reraise, unicode_type
Context_, undefined = dukpy.Context, dukpy.undefined
fs = '''
exports.writeFileSync = Duktape.writefile;
exports.readFileSync = Duktape.readfile;
'''
vm = '''
function handle_result(result) {
if (result[0]) return result[1];
var cls = Error;
var e = result[1];
if (e.name) {
try {
cls = eval(e.name);
} catch(ex) {}
}
var err = new cls(e.message);
err.fileName = e.fileName;
err.lineNumber = e.lineNumber;
err.stack = e.stack;
throw err;
}
exports.createContext = Duktape.create_context;
exports.runInContext = function(code, ctx) {
return handle_result(Duktape.run_in_context(code, ctx));
};
exports.runInThisContext = function(code, options) {
try {
return eval(code);
} catch (e) {
console.error('Error:' + e + ' while evaluating: ' + options.filename);
throw e;
}
};
'''
path = '''
exports.join = function () { return arguments[0] + '/' + arguments[1]; }
exports.dirname = function(x) { return Duktape.dirname(x); }
'''
util = '''
exports.inspect = function(x) { return x.toString(); };
exports.inherits = function(ctor, superCtor) {
try {
ctor.super_ = superCtor;
ctor.prototype = Object.create(superCtor.prototype, {
constructor: {
value: ctor,
enumerable: false,
writable: true,
configurable: true
}
});
} catch(e) { console.log('util.inherits() failed with error:', e); throw e; }
};
'''
_assert = '''
module.exports = function(x) {if (!x) throw x + " is false"; };
exports.ok = module.exports;
exports.notStrictEqual = exports.strictEqual = exports.deepEqual = function() {};
'''
stream = '''
module.exports = {};
'''
def sha1sum(x):
return hashlib.sha1(x).hexdigest()
def load_file(base_dirs, builtin_modules, name):
try:
ans = builtin_modules.get(name)
if ans is not None:
return [True, ans]
ans = {'fs':fs, 'vm':vm, 'path':path, 'util':util, 'assert':_assert, 'stream':stream}.get(name)
if ans is not None:
return [True, ans]
if not name.endswith('.js'):
name += '.js'
def do_open(*args):
with open(os.path.join(*args), 'rb') as f:
return [True, f.read().decode('utf-8')]
for b in base_dirs:
try:
return do_open(b, name)
except EnvironmentError as e:
if e.errno != errno.ENOENT:
raise
raise EnvironmentError('No module named: %s found in the base directories: %s' % (name, os.pathsep.join(base_dirs)))
except Exception as e:
return [False, unicode_type(e)]
def readfile(path, enc='utf-8'):
try:
with open(path, 'rb') as f:
return [f.read().decode(enc or 'utf-8'), None, None]
except UnicodeDecodeError as e:
return None, '', 'Failed to decode the file: %s with specified encoding: %s' % (path, enc)
except EnvironmentError as e:
return [None, errno.errorcode[e.errno], 'Failed to read from file: %s with error: %s' % (path, error_message(e) or e)]
def atomic_write(name, raw):
bdir, bname = os.path.dirname(os.path.abspath(name)), os.path.basename(name)
tname = ('_' if iswindows else '.') + bname
with open(os.path.join(bdir, tname), 'wb') as f:
f.write(raw)
atomic_rename(f.name, name)
def writefile(path, data, enc='utf-8'):
if enc == undefined:
enc = 'utf-8'
try:
if isinstance(data, unicode_type):
data = data.encode(enc or 'utf-8')
atomic_write(path, data)
except UnicodeEncodeError as e:
return ['', 'Failed to encode the data for file: %s with specified encoding: %s' % (path, enc)]
except EnvironmentError as e:
return [errno.errorcode[e.errno], 'Failed to write to file: %s with error: %s' % (path, error_message(e) or e)]
return [None, None]
class Function(object):
def __init__(self, func):
self.func = func
self.name = func.name
def __repr__(self):
# For some reason x._Formals is undefined in duktape
x = self.func
return unicode_type('[Function: %s(...) from file: %s]' % (x.name, x.fileName))
def __call__(self, *args, **kwargs):
try:
return self.func(*args, **kwargs)
except dukpy.JSError as e:
self.reraise(e)
def reraise(self, e):
reraise(JSError, JSError(e), sys.exc_info()[2])
def to_python(x):
try:
if isinstance(x, (numbers.Number, unicode_type, bytes, bool)):
if isinstance(x, unicode_type):
x = x.encode('utf-8')
if isinstance(x, numbers.Integral):
x = int(x)
return x
except TypeError:
pass
name = x.__class__.__name__
if name == 'Array proxy':
return [to_python(y) for y in x]
if name == 'Object proxy':
return {to_python(k):to_python(v) for k, v in x.items()}
if name == 'Function proxy':
return Function(x)
return x
class JSError(Exception):
def __init__(self, ex):
e = ex.args[0]
if isinstance(e, dict):
if 'message' in e:
fn, ln = e.get('fileName'), e.get('lineNumber')
msg = unicode_type(e['message'])
if ln:
msg = unicode_type(ln) + ':' + msg
if fn:
msg = unicode_type(fn) + ':' + msg
Exception.__init__(self, msg)
for k, v in e.items():
if k != 'message':
setattr(self, k, v)
else:
setattr(self, 'js_message', v)
else:
Exception.__init__(self, unicode_type(to_python(e)))
else:
# Happens if js code throws a string or integer rather than a
# subclass of Error
Exception.__init__(self, unicode_type(e))
self.name = self.js_message = self.fileName = self.lineNumber = self.stack = None
def as_dict(self):
return {
'name':self.name or undefined,
'message': self.js_message or error_message(self),
'fileName': self.fileName or undefined,
'lineNumber': self.lineNumber or undefined,
'stack': self.stack or undefined
}
contexts = {}
def create_context(base_dirs, *args):
data = to_python(args[0]) if args else {}
ctx = Context(base_dirs=base_dirs)
for k, val in data.items():
setattr(ctx.g, k, val)
key = id(ctx)
contexts[key] = ctx
return key
def run_in_context(code, ctx, options=None):
c = contexts[ctx]
try:
ans = c.eval(code)
except JSError as e:
return [False, e.as_dict()]
except Exception as e:
import traceback
traceback.print_exc()
return [False, {'message':unicode_type(e)}]
return [True, to_python(ans)]
class Context(object):
def __init__(self, base_dirs=(), builtin_modules=None):
self._ctx = Context_()
self.g = self._ctx.g
self.g.Duktape.load_file = partial(load_file, base_dirs or (getcwd(),), builtin_modules or {})
self.g.Duktape.pyreadfile = readfile
self.g.Duktape.pywritefile = writefile
self.g.Duktape.create_context = partial(create_context, base_dirs)
self.g.Duktape.run_in_context = run_in_context
self.g.Duktape.cwd = getcwd
self.g.Duktape.sha1sum = sha1sum
self.g.Duktape.dirname = os.path.dirname
self.g.Duktape.errprint = lambda *args: print(*args, file=sys.stderr)
self.eval('''
console = {
log: function() { print(Array.prototype.join.call(arguments, ' ')); },
error: function() { Duktape.errprint(Array.prototype.join.call(arguments, ' ')); },
debug: function() { print(Array.prototype.join.call(arguments, ' ')); }
};
Duktape.modSearch = function (id, require, exports, module) {
var ans = Duktape.load_file(id);
if (ans[0]) return ans[1];
throw ans[1];
}
if (!String.prototype.trim) {
(function() {
// Make sure we trim BOM and NBSP
var rtrim = /^[\\s\uFEFF\xA0]+|[\\s\uFEFF\xA0]+$/g;
String.prototype.trim = function() {
return this.replace(rtrim, '');
};
})();
};
if (!String.prototype.trimLeft) {
(function() {
// Make sure we trim BOM and NBSP
var rtrim = /^[\\s\uFEFF\xA0]+/g;
String.prototype.trimLeft = function() {
return this.replace(rtrim, '');
};
})();
};
if (!String.prototype.trimRight) {
(function() {
// Make sure we trim BOM and NBSP
var rtrim = /[\\s\uFEFF\xA0]+$/g;
String.prototype.trimRight = function() {
return this.replace(rtrim, '');
};
})();
};
if (!String.prototype.startsWith) {
String.prototype.startsWith = function(searchString, position) {
position = position || 0;
return this.indexOf(searchString, position) === position;
};
}
if (!String.prototype.endsWith) {
String.prototype.endsWith = function(searchString, position) {
var subjectString = this.toString();
if (position === undefined || position > subjectString.length) {
position = subjectString.length;
}
position -= searchString.length;
var lastIndex = subjectString.indexOf(searchString, position);
return lastIndex !== -1 && lastIndex === position;
};
}
Duktape.readfile = function(path, encoding) {
var x = Duktape.pyreadfile(path, encoding);
var data = x[0]; var errcode = x[1]; var errmsg = x[2];
if (errmsg !== null) throw {code:errcode, message:errmsg};
return data;
}
Duktape.writefile = function(path, data, encoding) {
var x = Duktape.pywritefile(path, data, encoding);
var errcode = x[0]; var errmsg = x[1];
if (errmsg !== null) throw {code:errcode, message:errmsg};
}
process = {
'platform': 'duktape',
'env': {'HOME': _HOME_, 'TERM':_TERM_},
'exit': function() {},
'cwd':Duktape.cwd
}
'''.replace(
'_HOME_', json.dumps(os.path.expanduser('~'))).replace('_TERM_', json.dumps(os.environ.get('TERM', ''))),
'<init>')
def reraise(self, e):
reraise(JSError, JSError(e), sys.exc_info()[2])
def eval(self, code='', fname='<eval>', noreturn=False):
try:
return self._ctx.eval(code, noreturn, fname)
except dukpy.JSError as e:
self.reraise(e)
def eval_file(self, path, noreturn=False):
try:
return self._ctx.eval_file(path, noreturn)
except dukpy.JSError as e:
self.reraise(e)
def test_build():
import unittest
def load_tests(loader, suite, pattern):
from duktape import tests
for x in vars(tests).values():
if isinstance(x, type) and issubclass(x, unittest.TestCase):
tests = loader.loadTestsFromTestCase(x)
suite.addTests(tests)
return suite
class TestRunner(unittest.main):
def createTests(self):
tl = unittest.TestLoader()
suite = unittest.TestSuite()
self.test = load_tests(tl, suite, None)
result = TestRunner(verbosity=0, buffer=True, catchbreak=True, failfast=True, argv=sys.argv[:1], exit=False).result
if not result.wasSuccessful():
raise SystemExit(1)

View File

@ -1,205 +0,0 @@
import os
import sys
import tempfile
import unittest
from threading import Event, Thread
import dukpy
undefined, JSError, Context = dukpy.undefined, dukpy.JSError, dukpy.Context
class ContextTests(unittest.TestCase):
def setUp(self):
self.ctx = Context()
self.g = self.ctx.g
def test_create_context(self):
pass
def test_create_new_global_env(self):
new = self.ctx.new_global_env()
# The new context should have a distinct global object
self.g.a = 1
self.assertIs(new.g.a, undefined)
def test_eval(self):
pass
def test_eval_file(self):
pass
def test_undefined(self):
self.assertEqual(repr(undefined), 'undefined')
def test_roundtrip(self):
self.g.g = self.ctx.eval('function f() {return 1;}; f')
self.assertEqual(self.g.g.name, 'f')
self.g.a = self.ctx.eval('[1,2,3]')
self.assertEqual(self.g.a[2], 3)
class ValueTests(unittest.TestCase):
def setUp(self):
self.ctx = Context()
self.g = self.ctx.g
def test_simple(self):
for value in [undefined, None, True, False]:
self.g.value = value
self.assertIs(self.g.value, value)
for value in ["foo", 42, 3.141592, 3.141592e20]:
self.g.value = value
self.assertEqual(self.g.value, value)
def test_object(self):
self.g.value = {}
self.assertEqual(dict(self.g.value), {})
self.g.value = {'a': 1}
self.assertEqual(dict(self.g.value), {'a': 1})
self.g.value = {'a': {'b': 2}}
self.assertEqual(dict(self.g.value.a), {'b': 2})
def test_array(self):
self.g.value = []
self.assertEqual(list(self.g.value), [])
self.g.value = [0, 1, 2]
self.assertEqual(self.g.value[0], 0)
self.assertEqual(self.g.value[1], 1)
self.assertEqual(self.g.value[2], 2)
self.assertEqual(self.g.value[3], undefined)
self.assertEqual(list(self.g.value), [0, 1, 2])
self.assertEqual(len(self.g.value), 3)
self.g.value[1] = 9
self.assertEqual(self.g.value[0], 0)
self.assertEqual(self.g.value[1], 9)
self.assertEqual(self.g.value[2], 2)
self.assertEqual(self.g.value[3], undefined)
self.assertEqual(list(self.g.value), [0, 9, 2])
self.assertEqual(len(self.g.value), 3)
def test_callable(self):
def f(x):
return x * x
num = sys.getrefcount(f)
self.g.func = f
self.assertEqual(sys.getrefcount(f), num + 1)
self.assertEqual(self.g.func(123), 15129)
self.g.func = undefined
self.assertEqual(sys.getrefcount(f), num)
a = 13450234
def rval():
return a
num = sys.getrefcount(a)
self.g.func = rval
self.assertEqual(self.g.eval('func()'), a)
self.assertEqual(sys.getrefcount(a), num)
def bad():
raise Exception('testing a python exception xyz')
self.g.func = bad
val = self.g.eval('try{func();}catch(err) {err.message}')
self.assertTrue('testing a python exception xyz' in val)
self.assertTrue('bad at 0x' in val)
def test_proxy(self):
self.g.obj1 = {'a': 42}
self.g.obj2 = self.g.obj1
self.assertEqual(self.g.obj1.a, self.g.obj2.a)
self.ctx.eval('function f() {nonexistent()}')
try:
self.g.f()
self.assert_('No error raised for bad function')
except JSError as e:
e = e.args[0]
self.assertEqual('ReferenceError', e['name'])
self.assertIn('nonexistent', e['message'])
class EvalTests(unittest.TestCase):
def setUp(self):
self.ctx = Context()
self.g = self.ctx.g
with tempfile.NamedTemporaryFile(
prefix='dukpy-test-', suffix='.js', delete=False) as fobj:
fobj.write(b'1+1')
self.testfile = fobj.name
def tearDown(self):
os.remove(self.testfile)
def test_eval_invalid_args(self):
with self.assertRaises(TypeError):
self.ctx.eval()
with self.assertRaises(TypeError):
self.ctx.eval(123)
def test_eval(self):
self.assertEqual(self.ctx.eval("1+1"), 2)
def test_eval_kwargs(self):
self.assertEqual(self.ctx.eval(code="1+1"), 2)
def test_eval_errors(self):
try:
self.ctx.eval('1+/1')
self.assert_('No error raised for malformed js')
except JSError as e:
e = e.args[0]
self.assertEqual('SyntaxError', e['name'])
self.assertEqual('<eval>', e['fileName'])
self.assertEqual(1, e['lineNumber'])
self.assertIn('line 1', e['message'])
try:
self.ctx.eval('\na()', fname='xxx')
self.assert_('No error raised for malformed js')
except JSError as e:
e = e.args[0]
self.assertEqual('ReferenceError', e['name'])
self.assertEqual('xxx', e['fileName'])
self.assertEqual(2, e['lineNumber'])
def test_eval_multithreading(self):
ev = Event()
self.ctx.g.func = ev.wait
t = Thread(target=self.ctx.eval, args=('func()',))
t.daemon = True
t.start()
t.join(0.01)
self.assertTrue(t.is_alive())
ev.set()
t.join(1)
self.assertFalse(t.is_alive())
def test_eval_noreturn(self):
self.assertIsNone(self.ctx.eval("1+1", noreturn=True))
def test_eval_file_invalid_args(self):
with self.assertRaises(TypeError):
self.ctx.eval_file()
with self.assertRaises(TypeError):
self.ctx.eval_file(123)
def test_eval_file(self):
self.assertEqual(self.ctx.eval_file(self.testfile), 2)
def test_eval_file_kwargs(self):
self.assertEqual(self.ctx.eval_file(path=self.testfile), 2)
def test_eval_file_noreturn(self):
self.assertIsNone(self.ctx.eval_file(self.testfile, noreturn=True))