mirror of
https://github.com/kovidgoyal/calibre.git
synced 2025-07-08 10:44:09 -04:00
Add extended key usage and dont use subprocess for verification
This commit is contained in:
parent
60db6aae2f
commit
b01d756329
@ -12,7 +12,6 @@ from lxml.html import fromstring, tostring
|
|||||||
|
|
||||||
from calibre.utils.resources import get_path as P
|
from calibre.utils.resources import get_path as P
|
||||||
|
|
||||||
from .qt import Browser
|
|
||||||
from .simple import Overseer
|
from .simple import Overseer
|
||||||
|
|
||||||
skip = ''
|
skip = ''
|
||||||
@ -103,9 +102,20 @@ class TestFetchBackend(unittest.TestCase):
|
|||||||
self.server.shutdown()
|
self.server.shutdown()
|
||||||
self.server_thread.join(5)
|
self.server_thread.join(5)
|
||||||
|
|
||||||
def test_recipe_browser(self):
|
def test_recipe_browser_qt(self):
|
||||||
|
from .qt import Browser
|
||||||
|
self.do_recipe_browser_test(Browser)
|
||||||
|
|
||||||
|
def test_recipe_browser_webengine(self):
|
||||||
|
from .qt import WebEngineBrowser
|
||||||
|
self.do_recipe_browser_test(WebEngineBrowser)
|
||||||
|
|
||||||
|
def do_recipe_browser_test(self, Browser):
|
||||||
from urllib.error import URLError
|
from urllib.error import URLError
|
||||||
from urllib.request import Request
|
from urllib.request import Request
|
||||||
|
|
||||||
|
br = Browser(user_agent='test-ua', headers=(('th', '1'),), start_worker=True)
|
||||||
|
|
||||||
def u(path=''):
|
def u(path=''):
|
||||||
return f'http://localhost:{self.port}{path}'
|
return f'http://localhost:{self.port}{path}'
|
||||||
|
|
||||||
@ -134,7 +144,6 @@ class TestFetchBackend(unittest.TestCase):
|
|||||||
self.dont_send_body = False
|
self.dont_send_body = False
|
||||||
self.dont_send_response = False
|
self.dont_send_response = False
|
||||||
|
|
||||||
br = Browser(user_agent='test-ua', headers=(('th', '1'),), start_worker=True)
|
|
||||||
try:
|
try:
|
||||||
r = get()
|
r = get()
|
||||||
self.ae(r['request_count'], 1)
|
self.ae(r['request_count'], 1)
|
||||||
|
@ -110,14 +110,16 @@ add_ext(STACK_OF(X509_EXTENSION) *sk, int nid, const char *value, char *item_typ
|
|||||||
static PyObject* create_rsa_cert_req(PyObject *self, PyObject *args) {
|
static PyObject* create_rsa_cert_req(PyObject *self, PyObject *args) {
|
||||||
PyObject *capsule = NULL, *ans = NULL, *t = NULL;
|
PyObject *capsule = NULL, *ans = NULL, *t = NULL;
|
||||||
X509_NAME *Name = NULL;
|
X509_NAME *Name = NULL;
|
||||||
char *common_name = NULL, *country = NULL, *state = NULL, *locality = NULL, *org = NULL, *org_unit = NULL, *email = NULL, *basic_constraints = NULL, *digital_key_usage = NULL;
|
char *common_name = NULL, *country = NULL, *state = NULL, *locality = NULL, *org = NULL, *org_unit = NULL, *email = NULL, *basic_constraints = NULL, *digital_key_usage = NULL, *ext_key_usage = NULL;
|
||||||
X509_REQ *Cert = NULL;
|
X509_REQ *Cert = NULL;
|
||||||
PyObject *alt_names = NULL;
|
PyObject *alt_names = NULL;
|
||||||
int ok = 0, signature_length = 0;
|
int ok = 0, signature_length = 0;
|
||||||
Py_ssize_t i = 0;
|
Py_ssize_t i = 0;
|
||||||
STACK_OF(X509_EXTENSION) *exts = NULL;
|
STACK_OF(X509_EXTENSION) *exts = NULL;
|
||||||
|
|
||||||
if(!PyArg_ParseTuple(args, "OO!szzzzzzzz", &capsule, &PyTuple_Type, &alt_names, &common_name, &country, &state, &locality, &org, &org_unit, &email, &basic_constraints, &digital_key_usage)) return NULL;
|
if(!PyArg_ParseTuple(args, "OO!szzzzzzzzz",
|
||||||
|
&capsule, &PyTuple_Type, &alt_names, &common_name, &country, &state, &locality, &org, &org_unit, &email,
|
||||||
|
&basic_constraints, &digital_key_usage, &ext_key_usage)) return NULL;
|
||||||
if(!PyCapsule_CheckExact(capsule)) return PyErr_Format(PyExc_TypeError, "The key is not a capsule object");
|
if(!PyCapsule_CheckExact(capsule)) return PyErr_Format(PyExc_TypeError, "The key is not a capsule object");
|
||||||
EVP_PKEY *KeyPair = PyCapsule_GetPointer(capsule, NULL);
|
EVP_PKEY *KeyPair = PyCapsule_GetPointer(capsule, NULL);
|
||||||
if (!KeyPair) return PyErr_Format(PyExc_TypeError, "The key capsule is NULL");
|
if (!KeyPair) return PyErr_Format(PyExc_TypeError, "The key capsule is NULL");
|
||||||
@ -150,6 +152,9 @@ static PyObject* create_rsa_cert_req(PyObject *self, PyObject *args) {
|
|||||||
if (digital_key_usage) {
|
if (digital_key_usage) {
|
||||||
if(!add_ext(exts, NID_key_usage, digital_key_usage, "key_usage")) goto error;
|
if(!add_ext(exts, NID_key_usage, digital_key_usage, "key_usage")) goto error;
|
||||||
}
|
}
|
||||||
|
if (ext_key_usage) {
|
||||||
|
if(!add_ext(exts, NID_ext_key_usage, ext_key_usage, "key_usage")) goto error;
|
||||||
|
}
|
||||||
X509_REQ_add_extensions(Cert, exts);
|
X509_REQ_add_extensions(Cert, exts);
|
||||||
sk_X509_EXTENSION_pop_free(exts, X509_EXTENSION_free);
|
sk_X509_EXTENSION_pop_free(exts, X509_EXTENSION_free);
|
||||||
}
|
}
|
||||||
@ -374,6 +379,33 @@ error:
|
|||||||
return ans;
|
return ans;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static PyObject*
|
||||||
|
verify_cert(PyObject *self, PyObject *args) {
|
||||||
|
PyObject *ca_cert_cap, *cert_cap;
|
||||||
|
if(!PyArg_ParseTuple(args, "OO", &ca_cert_cap, &cert_cap)) return NULL;
|
||||||
|
if(!PyCapsule_CheckExact(ca_cert_cap)) return PyErr_Format(PyExc_TypeError, "The ca_cert is not a capsule object");
|
||||||
|
if(!PyCapsule_CheckExact(cert_cap)) return PyErr_Format(PyExc_TypeError, "The cert is not a capsule object");
|
||||||
|
X509 *ca_cert = PyCapsule_GetPointer(ca_cert_cap, NULL);
|
||||||
|
X509 *cert = PyCapsule_GetPointer(cert_cap, NULL);
|
||||||
|
X509_STORE *store = X509_STORE_new();
|
||||||
|
X509_STORE_add_cert(store, ca_cert);
|
||||||
|
X509_STORE_set_flags(store,
|
||||||
|
X509_V_FLAG_X509_STRICT | X509_V_FLAG_CHECK_SS_SIGNATURE |
|
||||||
|
X509_V_FLAG_POLICY_CHECK | X509_V_FLAG_EXPLICIT_POLICY |
|
||||||
|
X509_V_FLAG_NOTIFY_POLICY);
|
||||||
|
X509_STORE_CTX *vfy_ctx = X509_STORE_CTX_new();
|
||||||
|
X509_STORE_CTX_init(vfy_ctx, store, cert, NULL);
|
||||||
|
// X509_STORE_CTX_set_flags(vfy_ctx, X509_V_FLAG_CRL_CHECK | X509_V_FLAG_CRL_CHECK_ALL);
|
||||||
|
int ok;
|
||||||
|
Py_BEGIN_ALLOW_THREADS
|
||||||
|
ok = X509_verify_cert(vfy_ctx);
|
||||||
|
Py_END_ALLOW_THREADS
|
||||||
|
X509_STORE_CTX_free(vfy_ctx);
|
||||||
|
X509_STORE_free(store);
|
||||||
|
if (!ok) { set_error("Verification failed"); return NULL; }
|
||||||
|
Py_RETURN_NONE;
|
||||||
|
}
|
||||||
|
|
||||||
static PyMethodDef certgen_methods[] = {
|
static PyMethodDef certgen_methods[] = {
|
||||||
{"create_rsa_keypair", create_rsa_keypair, METH_VARARGS,
|
{"create_rsa_keypair", create_rsa_keypair, METH_VARARGS,
|
||||||
"create_rsa_keypair(size)\n\nCreate a RSA keypair of the specified size"
|
"create_rsa_keypair(size)\n\nCreate a RSA keypair of the specified size"
|
||||||
@ -399,6 +431,10 @@ static PyMethodDef certgen_methods[] = {
|
|||||||
"cert_info(cert)\n\nReturn the certificate information (certificate in text format)"
|
"cert_info(cert)\n\nReturn the certificate information (certificate in text format)"
|
||||||
},
|
},
|
||||||
|
|
||||||
|
{"verify_cert", verify_cert, METH_VARARGS,
|
||||||
|
"verify_cert(cacert, cert)\n\nVerift cert against CA cert"
|
||||||
|
},
|
||||||
|
|
||||||
{NULL, NULL, 0, NULL}
|
{NULL, NULL, 0, NULL}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -16,11 +16,13 @@ def create_key_pair(size=2048):
|
|||||||
def create_cert_request(
|
def create_cert_request(
|
||||||
key_pair, common_name,
|
key_pair, common_name,
|
||||||
country='IN', state='Maharashtra', locality='Mumbai', organization=None,
|
country='IN', state='Maharashtra', locality='Mumbai', organization=None,
|
||||||
organizational_unit=None, email_address=None, alt_names=(), basic_constraints=None, digital_key_usage=None
|
organizational_unit=None, email_address=None, alt_names=(), basic_constraints=None,
|
||||||
|
digital_key_usage=None, ext_key_usage=None,
|
||||||
):
|
):
|
||||||
return certgen.create_rsa_cert_req(
|
return certgen.create_rsa_cert_req(
|
||||||
key_pair, tuple(alt_names), common_name,
|
key_pair, tuple(alt_names), common_name,
|
||||||
country, state, locality, organization, organizational_unit, email_address, basic_constraints, digital_key_usage
|
country, state, locality, organization, organizational_unit, email_address,
|
||||||
|
basic_constraints, digital_key_usage, ext_key_usage,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@ -66,12 +68,15 @@ def create_server_cert(
|
|||||||
|
|
||||||
# Create the Certificate Authority
|
# Create the Certificate Authority
|
||||||
cakey = create_key_pair(key_size)
|
cakey = create_key_pair(key_size)
|
||||||
careq = create_cert_request(cakey, ca_name, basic_constraints='critical,CA:TRUE', digital_key_usage='critical,keyCertSign,cRLSign')
|
careq = create_cert_request(
|
||||||
|
cakey, ca_name, basic_constraints='critical,CA:TRUE', digital_key_usage='critical,keyCertSign,cRLSign')
|
||||||
cacert = create_ca_cert(careq, cakey)
|
cacert = create_ca_cert(careq, cakey)
|
||||||
|
|
||||||
# Create the server certificate issued by the newly created CA
|
# Create the server certificate issued by the newly created CA
|
||||||
pkey = create_key_pair(key_size)
|
pkey = create_key_pair(key_size)
|
||||||
req = create_cert_request(pkey, domain_or_ip, country, state, locality, organization, organizational_unit, email_address, alt_names)
|
req = create_cert_request(
|
||||||
|
pkey, domain_or_ip, country, state, locality, organization, organizational_unit, email_address, alt_names,
|
||||||
|
ext_key_usage='critical,serverAuth')
|
||||||
cert = create_cert(req, cacert, cakey, expire=expire)
|
cert = create_cert(req, cacert, cakey, expire=expire)
|
||||||
|
|
||||||
def export(dest, obj, func, *args):
|
def export(dest, obj, func, *args):
|
||||||
@ -92,24 +97,14 @@ def create_server_cert(
|
|||||||
|
|
||||||
|
|
||||||
def develop():
|
def develop():
|
||||||
import subprocess
|
|
||||||
import tempfile
|
|
||||||
cacert, cakey, cert, pkey = create_server_cert('test.me', alt_names=['DNS:moose.cat', 'DNS:huge.bat'])
|
cacert, cakey, cert, pkey = create_server_cert('test.me', alt_names=['DNS:moose.cat', 'DNS:huge.bat'])
|
||||||
print("CA Certificate")
|
print("CA Certificate")
|
||||||
print(cert_info(cacert))
|
print(cert_info(cacert))
|
||||||
print(), print(), print()
|
print(), print(), print()
|
||||||
print('Server Certificate')
|
print('Server Certificate')
|
||||||
print(cert_info(cert))
|
print(cert_info(cert))
|
||||||
with tempfile.NamedTemporaryFile(prefix='CA-', suffix='.pem') as cafile, tempfile.NamedTemporaryFile(prefix='Server-', suffix='.crt') as certfile:
|
certgen.verify_cert(cacert, cacert)
|
||||||
cafile.write(serialize_cert(cacert).encode())
|
certgen.verify_cert(cacert, cert)
|
||||||
certfile.write(serialize_cert(cert).encode())
|
|
||||||
cafile.flush()
|
|
||||||
certfile.flush()
|
|
||||||
# Verify the self signed certificate
|
|
||||||
subprocess.check_call(['openssl', 'verify', '-verbose', '-x509_strict', '-CAfile', cafile.name, cafile.name])
|
|
||||||
# Verify the server certificate signed with the self signed certificate
|
|
||||||
raise SystemExit(subprocess.run([
|
|
||||||
'openssl', 'verify', '-show_chain', '-no_check_time', '-verbose', '-x509_strict', '-CAfile', cafile.name, certfile.name]).returncode)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
Loading…
x
Reference in New Issue
Block a user