mirror of
https://github.com/kovidgoyal/calibre.git
synced 2025-07-08 18:54:09 -04:00
Implement AES-CTR
This commit is contained in:
parent
aefc7fa751
commit
85aba366c6
118
src/pyj/aes.pyj
118
src/pyj/aes.pyj
@ -234,7 +234,7 @@ def typed_array_as_js(x):
|
||||
name = x.constructor.name or 'Uint8Array'
|
||||
return '(new ' + name + '(' + JSON.stringify(Array.prototype.slice.call(x)) + '))'
|
||||
|
||||
class CBC:
|
||||
class ModeOfOperation:
|
||||
|
||||
def __init__(self, key):
|
||||
self.key = key or generate_key(32)
|
||||
@ -244,6 +244,17 @@ class CBC:
|
||||
def key_as_js(self):
|
||||
return typed_array_as_js(self.key)
|
||||
|
||||
def tag_as_bytes(self, tag):
|
||||
if isinstance(tag, Uint8Array):
|
||||
return tag
|
||||
if not tag:
|
||||
return Uint8Array(0)
|
||||
if type(tag) is 'string':
|
||||
return string_to_bytes(tag)
|
||||
raise TypeError('Invalid tag, must be a string or a Uint8Array')
|
||||
|
||||
class CBC(ModeOfOperation):
|
||||
|
||||
def encrypt_bytes(self, bytes, tag_bytes):
|
||||
iv = first_iv = random_bytes(16)
|
||||
mlen = bytes.length + tag_bytes.length + 1
|
||||
@ -272,19 +283,10 @@ class CBC:
|
||||
# Uint8Arrays). If the optional tag (a string) is present, it is also
|
||||
# encrypted along with plaintext.It can be used to ensure message integrity.
|
||||
# See the __main__ block at the bottom of this file for example usage.
|
||||
if not tag:
|
||||
tag = Uint8Array(0)
|
||||
elif type(tag) is 'string':
|
||||
tag = string_to_bytes(tag)
|
||||
return self.encrypt_bytes(string_to_bytes(plaintext), tag)
|
||||
return self.encrypt_bytes(string_to_bytes(plaintext), self.tag_as_bytes(tag))
|
||||
|
||||
def decrypt(self, output_from_encrypt, tag):
|
||||
if not tag:
|
||||
tag_bytes = Uint8Array(0)
|
||||
elif type(tag) is 'string':
|
||||
tag_bytes = string_to_bytes(tag)
|
||||
else:
|
||||
tag_bytes = tag
|
||||
tag_bytes = self.tag_as_bytes(tag)
|
||||
iv, inputbytes = output_from_encrypt.iv, output_from_encrypt.cipherbytes
|
||||
offset = 0
|
||||
outputbytes = Uint8Array(inputbytes.length)
|
||||
@ -303,13 +305,99 @@ class CBC:
|
||||
outputbytes = outputbytes.slice(mstart, mstart + mlen)
|
||||
return bytes_to_string(outputbytes)
|
||||
|
||||
class Counter:
|
||||
|
||||
def __init__(self, initial_value):
|
||||
if not initial_value:
|
||||
self.bytes = Uint8Array(16)
|
||||
self.set_value(1)
|
||||
elif type(initial_value) is 'number':
|
||||
self.bytes = Uint8Array(16)
|
||||
self.set_value(initial_value)
|
||||
else:
|
||||
self.bytes = Uint8Array(initial_value)
|
||||
|
||||
def set_value(self, value):
|
||||
c = self.bytes
|
||||
for v'var index = 15; index >= 0; index--':
|
||||
c[index] = value % 256
|
||||
value >>= 8
|
||||
|
||||
def set_bytes(self, bytes):
|
||||
self.bytes = Uint8Array(bytes)
|
||||
|
||||
def increment(self):
|
||||
c = self.bytes
|
||||
for v'var i = 15; i >= 0; i--':
|
||||
if c[i] is 255:
|
||||
c[i] = 0
|
||||
else:
|
||||
c[i] += 1
|
||||
break
|
||||
|
||||
def __repr__(self):
|
||||
return as_hex(self.bytes)
|
||||
|
||||
class CTR(ModeOfOperation):
|
||||
|
||||
def __init__(self, key, counter):
|
||||
ModeOfOperation.__init__(self, key)
|
||||
self.cval = counter
|
||||
self.wmem = Uint8Array(16)
|
||||
self.counter = Counter(self.cval)
|
||||
self.counter_index = 16
|
||||
|
||||
def _crypt(self, bytes):
|
||||
for v'var i = 0; i < bytes.length; i++':
|
||||
if self.counter_index is 16:
|
||||
self.counter_index = 0
|
||||
self.aes.encrypt(self.counter.bytes, self.wmem, 0)
|
||||
self.counter.increment()
|
||||
bytes[i] ^= self.wmem[self.counter_index]
|
||||
self.counter_index += 1
|
||||
|
||||
def encrypt(self, plaintext, tag):
|
||||
outbytes = string_to_bytes(plaintext)
|
||||
if tag:
|
||||
tag_bytes = self.tag_as_bytes(tag)
|
||||
t = Uint8Array(outbytes.length + tag_bytes.length)
|
||||
t.set(tag_bytes)
|
||||
t.set(outbytes, tag_bytes.length)
|
||||
outbytes = t
|
||||
self._crypt(outbytes)
|
||||
return outbytes
|
||||
|
||||
def decrypt(self, output_from_encrypt, tag):
|
||||
b = Uint8Array(output_from_encrypt.length)
|
||||
b.set(output_from_encrypt)
|
||||
self._crypt(b)
|
||||
offset = 0
|
||||
if tag:
|
||||
tag_bytes = self.tag_as_bytes(tag)
|
||||
for v'var i = 0; i < tag_bytes.length; i++':
|
||||
if tag_bytes[i] != b[i]:
|
||||
raise ValueError('Corrupted message')
|
||||
offset = tag_bytes.length
|
||||
return bytes_to_string(b, offset)
|
||||
|
||||
if __name__ == '__main__':
|
||||
cbc = CBC()
|
||||
text = 'testing a basic roundtrip ø̄ū'
|
||||
|
||||
cbc = CBC()
|
||||
crypted = cbc.encrypt(text)
|
||||
decrypted = cbc.decrypt(crypted)
|
||||
print('Roundtrip:', 'OK' if text is decrypted else 'FAILED')
|
||||
print('CBC Roundtrip:', 'OK' if text is decrypted else 'FAILED')
|
||||
secret_tag = generate_tag()
|
||||
crypted = cbc.encrypt(text, secret_tag)
|
||||
decrypted = cbc.decrypt(crypted, secret_tag)
|
||||
print('Roundtrip with tag:', 'OK' if text is decrypted else 'FAILED')
|
||||
print('CBC Roundtrip with tag:', 'OK' if text is decrypted else 'FAILED')
|
||||
|
||||
ctre = CTR()
|
||||
ctrd = CTR(ctre.key)
|
||||
crypted = ctre.encrypt(text)
|
||||
decrypted = ctrd.decrypt(crypted)
|
||||
print('CTR Roundtrip:', 'OK' if text is decrypted else 'FAILED')
|
||||
|
||||
crypted = ctre.encrypt(text, secret_tag)
|
||||
decrypted = ctrd.decrypt(crypted, secret_tag)
|
||||
print('CTR Roundtrip with tag:', 'OK' if text is decrypted else 'FAILED')
|
||||
|
Loading…
x
Reference in New Issue
Block a user