Since I am here, add code to set Subject Alternative names for certificates

This commit is contained in:
Kovid Goyal 2015-05-26 21:21:35 +05:30
parent 7750fdcf0e
commit 0cacdf26e6
2 changed files with 56 additions and 10 deletions

View File

@ -14,6 +14,7 @@
#undef X509_NAME #undef X509_NAME
#endif #endif
#include <openssl/x509.h> #include <openssl/x509.h>
#include <openssl/x509v3.h>
#include <openssl/ssl.h> #include <openssl/ssl.h>
#include <openssl/rsa.h> #include <openssl/rsa.h>
#include <openssl/evp.h> #include <openssl/evp.h>
@ -84,17 +85,29 @@ static void free_req(PyObject *capsule) {
if (Cert) X509_REQ_free(Cert); if (Cert) X509_REQ_free(Cert);
} }
static int add_ext(STACK_OF(X509_EXTENSION) *sk, int nid, char *value) {
X509_EXTENSION *ex = NULL;
ex = X509V3_EXT_conf_nid(NULL, NULL, nid, value);
if (!ex) { set_error("X509V3_EXT_conf_nid"); return 0;}
if (!sk_X509_EXTENSION_push(sk, ex)) { set_error("sk_X509_EXTENSION_push"); return 0; }
return 1;
}
static PyObject* create_rsa_cert_req(PyObject *self, PyObject *args) { static PyObject* create_rsa_cert_req(PyObject *self, PyObject *args) {
RSA *KeyPair = NULL; RSA *KeyPair = NULL;
PyObject *capsule = NULL, *ans = NULL; PyObject *capsule = NULL, *ans = NULL, *t = NULL;
X509_NAME *Name = NULL; X509_NAME *Name = NULL;
char *common_name = NULL, *country = NULL, *state = NULL, *locality = NULL, *org = NULL, *org_unit = NULL, *email = NULL; char *common_name = NULL, *country = NULL, *state = NULL, *locality = NULL, *org = NULL, *org_unit = NULL, *email = NULL, buf[1024];
X509_REQ *Cert = NULL; X509_REQ *Cert = NULL;
EVP_PKEY *PrivateKey = NULL; EVP_PKEY *PrivateKey = NULL;
PyObject *alt_names = NULL;
int ok = 0, signature_length = 0; int ok = 0, signature_length = 0;
Py_ssize_t i = 0;
STACK_OF(X509_EXTENSION) *exts = NULL;
if(!PyArg_ParseTuple(args, "Oszzzzzz", &capsule, &common_name, &country, &state, &locality, &org, &org_unit, &email)) return NULL; if(!PyArg_ParseTuple(args, "OOszzzzzz", &capsule, &alt_names, &common_name, &country, &state, &locality, &org, &org_unit, &email)) return NULL;
if(!PyCapsule_CheckExact(capsule)) return PyErr_Format(PyExc_TypeError, "The key is not a capsule object"); if(!PyCapsule_CheckExact(capsule)) return PyErr_Format(PyExc_TypeError, "The key is not a capsule object");
if(!PySequence_Check(alt_names)) return PyErr_Format(PyExc_TypeError, "alt_names must be a sequence");
KeyPair = PyCapsule_GetPointer(capsule, NULL); KeyPair = PyCapsule_GetPointer(capsule, NULL);
if (!KeyPair) return PyErr_Format(PyExc_TypeError, "The key capsule is NULL"); if (!KeyPair) return PyErr_Format(PyExc_TypeError, "The key capsule is NULL");
Cert = X509_REQ_new(); Cert = X509_REQ_new();
@ -110,6 +123,20 @@ static PyObject* create_rsa_cert_req(PyObject *self, PyObject *args) {
if (!add_entry(Name, "emailAddress", email)) goto error; if (!add_entry(Name, "emailAddress", email)) goto error;
if (!add_entry(Name, "CN", common_name)) goto error; if (!add_entry(Name, "CN", common_name)) goto error;
if (PySequence_Length(alt_names) > 0) {
exts = sk_X509_EXTENSION_new_null();
if (!exts) { set_error("sk_X509_EXTENSION_new_null"); goto error; }
for (i = 0; i < PySequence_Length(alt_names); i++) {
t = PySequence_ITEM(alt_names, i);
memset(buf, 0, 1024);
snprintf(buf, 1023, "DNS:%s", PyBytes_AS_STRING(t));
Py_XDECREF(t);
if(!add_ext(exts, NID_subject_alt_name, buf)) goto error;
}
X509_REQ_add_extensions(Cert, exts);
sk_X509_EXTENSION_pop_free(exts, X509_EXTENSION_free);
}
PrivateKey = EVP_PKEY_new(); PrivateKey = EVP_PKEY_new();
if (!PrivateKey) { set_error("EVP_PKEY_new"); goto error; } if (!PrivateKey) { set_error("EVP_PKEY_new"); goto error; }
if (!EVP_PKEY_set1_RSA(PrivateKey, KeyPair)) { set_error("EVP_PKEY_set1_RSA"); goto error; } if (!EVP_PKEY_set1_RSA(PrivateKey, KeyPair)) { set_error("EVP_PKEY_set1_RSA"); goto error; }
@ -177,7 +204,9 @@ static PyObject* create_rsa_cert(PyObject *self, PyObject *args) {
RSA *CA_key = NULL; RSA *CA_key = NULL;
X509_NAME *Name = NULL; X509_NAME *Name = NULL;
EVP_PKEY *PubKey = NULL, *PrivateKey = NULL; EVP_PKEY *PubKey = NULL, *PrivateKey = NULL;
int not_before = 0, expire = 1, req_is_for_CA_cert = 0, signature_length = 0, ok = 0; int not_before = 0, expire = 1, req_is_for_CA_cert = 0, signature_length = 0, ok = 0, i = 0;
X509V3_CTX ctx;
STACK_OF(X509_EXTENSION) *exts = NULL;
if(!PyArg_ParseTuple(args, "OOO|ii", &reqC, &CA_certC, &CA_keyC, &not_before, &expire)) return NULL; if(!PyArg_ParseTuple(args, "OOO|ii", &reqC, &CA_certC, &CA_keyC, &not_before, &expire)) return NULL;
if(!PyCapsule_CheckExact(reqC)) return PyErr_Format(PyExc_TypeError, "The req is not a capsule object"); if(!PyCapsule_CheckExact(reqC)) return PyErr_Format(PyExc_TypeError, "The req is not a capsule object");
@ -214,6 +243,16 @@ static PyObject* create_rsa_cert(PyObject *self, PyObject *args) {
if (!Name) { set_error("X509_REQ_get_subject_name(CA_cert)"); goto error; } if (!Name) { set_error("X509_REQ_get_subject_name(CA_cert)"); goto error; }
if (!X509_set_issuer_name(Cert, Name)) { set_error("X509_set_issuer_name"); goto error; } if (!X509_set_issuer_name(Cert, Name)) { set_error("X509_set_issuer_name"); goto error; }
if (!req_is_for_CA_cert) {
exts = X509_REQ_get_extensions(req);
if (exts) {
X509V3_set_ctx(&ctx, CA_cert, Cert, NULL, NULL, 0);
for (i = 0; i < sk_X509_EXTENSION_num(exts); i++) {
if(!X509_add_ext(Cert, sk_X509_EXTENSION_value(exts, i), -1)) { set_error("X509_add_ext"); goto error; }
}
}
}
PubKey=X509_REQ_get_pubkey(req); PubKey=X509_REQ_get_pubkey(req);
if (!PubKey) { set_error("X509_REQ_get_pubkey"); goto error; } if (!PubKey) { set_error("X509_REQ_get_pubkey"); goto error; }
if (!X509_REQ_verify(req, PubKey)) { set_error("X509_REQ_verify"); goto error; } if (!X509_REQ_verify(req, PubKey)) { set_error("X509_REQ_verify"); goto error; }
@ -316,7 +355,7 @@ static PyMethodDef certgen_methods[] = {
}, },
{"create_rsa_cert_req", create_rsa_cert_req, METH_VARARGS, {"create_rsa_cert_req", create_rsa_cert_req, METH_VARARGS,
"create_rsa_cert_req(keypair, common_name, country, state, locality, org, org_unit, email_address)\n\nCreate a certificate signing request." "create_rsa_cert_req(keypair, alt_names, common_name, country, state, locality, org, org_unit, email_address)\n\nCreate a certificate signing request."
}, },
{"create_rsa_cert", create_rsa_cert, METH_VARARGS, {"create_rsa_cert", create_rsa_cert, METH_VARARGS,

View File

@ -17,15 +17,15 @@ def create_key_pair(size=2048):
def create_cert_request( def create_cert_request(
key_pair, common_name, key_pair, common_name,
country='IN', state='Maharashtra', locality='Mumbai', organization=None, country='IN', state='Maharashtra', locality='Mumbai', organization=None,
organizational_unit=None, email_address=None organizational_unit=None, email_address=None, alt_names=()
): ):
def enc(x): def enc(x):
if isinstance(x, type('')): if isinstance(x, type('')):
x = x.encode('ascii') x = x.encode('ascii')
return x or None return x or None
return certgen.create_rsa_cert_req( return certgen.create_rsa_cert_req(
key_pair, *map(enc, ( key_pair, tuple(bytes(enc(x)) for x in alt_names if x),
common_name, country, state, locality, organization, organizational_unit, email_address)) *map(enc, (common_name, country, state, locality, organization, organizational_unit, email_address))
) )
def create_cert(req, ca_cert, ca_keypair, expire=365, not_before=0): def create_cert(req, ca_cert, ca_keypair, expire=365, not_before=0):
@ -47,7 +47,7 @@ def create_server_cert(
domain, ca_cert_file=None, server_cert_file=None, server_key_file=None, domain, ca_cert_file=None, server_cert_file=None, server_key_file=None,
expire=365, ca_key_file=None, ca_name='Dummy Certificate Authority', key_size=2048, expire=365, ca_key_file=None, ca_name='Dummy Certificate Authority', key_size=2048,
country='IN', state='Maharashtra', locality='Mumbai', organization=None, country='IN', state='Maharashtra', locality='Mumbai', organization=None,
organizational_unit=None, email_address=None, encrypt_key_with_password=None, organizational_unit=None, email_address=None, alt_names=(), encrypt_key_with_password=None,
): ):
# Create the Certificate Authority # Create the Certificate Authority
cakey = create_key_pair(key_size) cakey = create_key_pair(key_size)
@ -56,7 +56,7 @@ def create_server_cert(
# Create the server certificate issued by the newly created CA # Create the server certificate issued by the newly created CA
pkey = create_key_pair(key_size) pkey = create_key_pair(key_size)
req = create_cert_request(pkey, domain, country, state, locality, organization, organizational_unit, email_address) req = create_cert_request(pkey, domain, country, state, locality, organization, organizational_unit, email_address, alt_names)
cert = create_cert(req, cacert, cakey, expire=expire) cert = create_cert(req, cacert, cakey, expire=expire)
def export(dest, obj, func, *args): def export(dest, obj, func, *args):
@ -71,3 +71,10 @@ def create_server_cert(
export(server_cert_file, cert, serialize_cert) export(server_cert_file, cert, serialize_cert)
export(server_key_file, pkey, serialize_key, encrypt_key_with_password) export(server_key_file, pkey, serialize_key, encrypt_key_with_password)
export(ca_key_file, cakey, serialize_key, encrypt_key_with_password) export(ca_key_file, cakey, serialize_key, encrypt_key_with_password)
return cacert, cakey, cert, pkey
if __name__ == '__main__':
cacert, cakey, cert, pkey = create_server_cert('test.me', alt_names=['1.test.me', '*.all.test.me'])
print (cert_info(cert).encode('utf-8'))