Dont use deprecated OpenSSL APIs in certgen

This commit is contained in:
Kovid Goyal 2023-10-06 14:58:30 +05:30
parent d463f34b7b
commit 4605c43101
No known key found for this signature in database
GPG Key ID: 06BC317B515ACE7C
2 changed files with 56 additions and 78 deletions

View File

@ -37,36 +37,24 @@ static PyObject* set_error(const char *where) {
} }
static void free_rsa_keypair(PyObject *capsule) { static void free_rsa_keypair(PyObject *capsule) {
RSA *KeyPair = PyCapsule_GetPointer(capsule, NULL); EVP_PKEY *pkey= PyCapsule_GetPointer(capsule, NULL);
if (KeyPair) RSA_free(KeyPair); EVP_PKEY_free(pkey);
} }
static PyObject* create_rsa_keypair(PyObject *self, PyObject *args) { static PyObject*
create_rsa_keypair(PyObject *self, PyObject *args) {
int keysize = 0; int keysize = 0;
RSA *KeyPair = NULL;
BIGNUM *BigNumber = NULL;
PyObject *ans = NULL; PyObject *ans = NULL;
int ret = 0;
if(!PyArg_ParseTuple(args, "i", &keysize)) return NULL; if(!PyArg_ParseTuple(args, "i", &keysize)) return NULL;
if (keysize < 1024) return PyErr_Format(PyExc_ValueError, "The key size %d is less than 1024. 1024 is the minimum.", keysize); if (keysize < 1024) return PyErr_Format(PyExc_ValueError, "The key size %d is less than 1024. 1024 is the minimum.", keysize);
if (RAND_status() != 1) return PyErr_Format(PyExc_RuntimeError, "The OopenSSL PRNG failed to seed itself"); if (RAND_status() != 1) return PyErr_Format(PyExc_RuntimeError, "The OopenSSL PRNG failed to seed itself");
KeyPair = RSA_new(); EVP_PKEY *pkey;
if (!KeyPair) return set_error("RSA_new");
BigNumber = BN_new();
if (!BigNumber) {set_error("BN_new"); goto error;}
if (!BN_set_word(BigNumber, RSA_F4)) { set_error("BN_set_word"); goto error; }
Py_BEGIN_ALLOW_THREADS; Py_BEGIN_ALLOW_THREADS;
ret = RSA_generate_key_ex(KeyPair, keysize, BigNumber, NULL); pkey = EVP_RSA_gen(keysize);
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
if (!ret) { set_error("RSA_generate_key_ex"); goto error; } if (pkey == NULL) return set_error("EVP_RSA_gen");
ans = PyCapsule_New(pkey, NULL, free_rsa_keypair);
ans = PyCapsule_New(KeyPair, NULL, free_rsa_keypair);
if (ans == NULL) { PyErr_NoMemory(); goto error; }
error:
if(BigNumber) BN_free(BigNumber);
if (!ans && KeyPair) RSA_free(KeyPair);
return ans; return ans;
} }
@ -86,21 +74,20 @@ static void free_req(PyObject *capsule) {
if (Cert) X509_REQ_free(Cert); if (Cert) X509_REQ_free(Cert);
} }
static int add_ext(STACK_OF(X509_EXTENSION) *sk, int nid, char *value) { static int add_ext(STACK_OF(X509_EXTENSION) *sk, int nid, char *value, char *item_type) {
X509_EXTENSION *ex = NULL; X509_EXTENSION *ex = NULL;
ex = X509V3_EXT_conf_nid(NULL, NULL, nid, value); ex = X509V3_EXT_conf_nid(NULL, NULL, nid, value);
if (!ex) { set_error("X509V3_EXT_conf_nid"); return 0;} char ebuf[256] = {0};
if (!sk_X509_EXTENSION_push(sk, ex)) { set_error("sk_X509_EXTENSION_push"); return 0; } if (!ex) { snprintf(ebuf, sizeof(ebuf), "%s: %s", item_type, "X509V3_EXT_conf_nid"); set_error(ebuf); return 0;}
if (!sk_X509_EXTENSION_push(sk, ex)) { snprintf(ebuf, sizeof(ebuf), "%s: %s", item_type, "sk_X509_EXTENSION_push"); set_error(ebuf); return 0; }
return 1; return 1;
} }
static PyObject* create_rsa_cert_req(PyObject *self, PyObject *args) { static PyObject* create_rsa_cert_req(PyObject *self, PyObject *args) {
RSA *KeyPair = NULL;
PyObject *capsule = NULL, *ans = NULL, *t = NULL; PyObject *capsule = NULL, *ans = NULL, *t = NULL;
X509_NAME *Name = NULL; X509_NAME *Name = NULL;
char *common_name = NULL, *country = NULL, *state = NULL, *locality = NULL, *org = NULL, *org_unit = NULL, *email = NULL, *basic_constraints = NULL, buf[1024]; char *common_name = NULL, *country = NULL, *state = NULL, *locality = NULL, *org = NULL, *org_unit = NULL, *email = NULL, *basic_constraints = NULL, buf[1024];
X509_REQ *Cert = NULL; X509_REQ *Cert = NULL;
EVP_PKEY *PrivateKey = NULL;
PyObject *alt_names = NULL; PyObject *alt_names = NULL;
int ok = 0, signature_length = 0; int ok = 0, signature_length = 0;
Py_ssize_t i = 0; Py_ssize_t i = 0;
@ -109,7 +96,7 @@ static PyObject* create_rsa_cert_req(PyObject *self, PyObject *args) {
if(!PyArg_ParseTuple(args, "OOszzzzzzz", &capsule, &alt_names, &common_name, &country, &state, &locality, &org, &org_unit, &email, &basic_constraints)) return NULL; if(!PyArg_ParseTuple(args, "OOszzzzzzz", &capsule, &alt_names, &common_name, &country, &state, &locality, &org, &org_unit, &email, &basic_constraints)) return NULL;
if(!PyCapsule_CheckExact(capsule)) return PyErr_Format(PyExc_TypeError, "The key is not a capsule object"); if(!PyCapsule_CheckExact(capsule)) return PyErr_Format(PyExc_TypeError, "The key is not a capsule object");
if(!PySequence_Check(alt_names)) return PyErr_Format(PyExc_TypeError, "alt_names must be a sequence"); if(!PySequence_Check(alt_names)) return PyErr_Format(PyExc_TypeError, "alt_names must be a sequence");
KeyPair = PyCapsule_GetPointer(capsule, NULL); EVP_PKEY *KeyPair = PyCapsule_GetPointer(capsule, NULL);
if (!KeyPair) return PyErr_Format(PyExc_TypeError, "The key capsule is NULL"); if (!KeyPair) return PyErr_Format(PyExc_TypeError, "The key capsule is NULL");
Cert = X509_REQ_new(); Cert = X509_REQ_new();
if (!Cert) return set_error("X509_REQ_new"); if (!Cert) return set_error("X509_REQ_new");
@ -132,31 +119,27 @@ static PyObject* create_rsa_cert_req(PyObject *self, PyObject *args) {
memset(buf, 0, 1024); memset(buf, 0, 1024);
snprintf(buf, 1023, "%s", PyBytes_AS_STRING(t)); snprintf(buf, 1023, "%s", PyBytes_AS_STRING(t));
Py_XDECREF(t); Py_XDECREF(t);
if(!add_ext(exts, NID_subject_alt_name, buf)) goto error; if(!add_ext(exts, NID_subject_alt_name, buf, "alt_names")) goto error;
} }
if (basic_constraints) { if (basic_constraints) {
if(!add_ext(exts, NID_basic_constraints, basic_constraints)) goto error; if(!add_ext(exts, NID_basic_constraints, basic_constraints, "basic_constraints")) goto error;
} }
X509_REQ_add_extensions(Cert, exts); X509_REQ_add_extensions(Cert, exts);
sk_X509_EXTENSION_pop_free(exts, X509_EXTENSION_free); sk_X509_EXTENSION_pop_free(exts, X509_EXTENSION_free);
} }
PrivateKey = EVP_PKEY_new(); if (!X509_REQ_set_pubkey(Cert, KeyPair)) { set_error("X509_REQ_set_pubkey"); goto error; }
if (!PrivateKey) { set_error("EVP_PKEY_new"); goto error; }
if (!EVP_PKEY_set1_RSA(PrivateKey, KeyPair)) { set_error("EVP_PKEY_set1_RSA"); goto error; }
if (!X509_REQ_set_pubkey(Cert, PrivateKey)) { set_error("X509_REQ_set_pubkey"); goto error; }
Py_BEGIN_ALLOW_THREADS; Py_BEGIN_ALLOW_THREADS;
signature_length = X509_REQ_sign(Cert, PrivateKey, EVP_sha256()); signature_length = X509_REQ_sign(Cert, KeyPair, EVP_sha256());
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
if (signature_length <= 0) { set_error("X509_REQ_sign"); goto error; } if (signature_length <= 0) { set_error("X509_REQ_sign"); goto error; }
ans = PyCapsule_New(Cert, NULL, free_req); ans = PyCapsule_New(Cert, NULL, free_req);
if (!ans) { PyErr_NoMemory(); goto error; } if (!ans) { goto error; }
ok = 1; ok = 1;
error: error:
if (!ok) { if (!ok) {
if (Cert) X509_REQ_free(Cert); if (Cert) X509_REQ_free(Cert);
} }
if (PrivateKey) EVP_PKEY_free(PrivateKey);
return ans; return ans;
} }
@ -174,29 +157,34 @@ static void free_cert(PyObject *capsule) {
* - not After * - not After
* The reason for this function is to allow easier abstraction :-) * The reason for this function is to allow easier abstraction :-)
*/ */
static int certificate_set_serial(X509 *cert) static int
{ certificate_set_serial(X509 *cert) {
BIGNUM *bn = NULL;
int rv = 0;
ASN1_INTEGER *sno = ASN1_INTEGER_new();
if (!sno) {
PyErr_NoMemory(); return 0;
}
bn=BN_new();
if (!bn) {
ASN1_INTEGER_free(sno);
PyErr_NoMemory(); return 0;
}
#define SERIAL_RAND_BITS 128 #define SERIAL_RAND_BITS 128
if (BN_pseudo_rand(bn, SERIAL_RAND_BITS, 0, 0) == 1 && int rv = 0;
(sno = BN_to_ASN1_INTEGER(bn,sno)) != NULL && unsigned char random_bytes[SERIAL_RAND_BITS / 8];
X509_set_serialNumber(cert, sno) == 1) if (RAND_bytes(random_bytes, sizeof(random_bytes)) != 1) {
rv = 1; set_error("RAND_bytes in certificate_set_serial");
else return 0;
set_error("X509_set_serialNumber"); }
BN_free(bn); BIGNUM *bn = NULL; ASN1_INTEGER *sno = NULL;
ASN1_INTEGER_free(sno); sno = ASN1_INTEGER_new();
if (!sno) {
PyErr_NoMemory(); goto err;
}
bn = BN_bin2bn(random_bytes, sizeof(random_bytes), NULL);
if (!bn) {
PyErr_NoMemory(); goto err;
}
if ((sno = BN_to_ASN1_INTEGER(bn,sno)) == NULL) {
set_error("BN_to_ASN1_INTEGER"); goto err;
}
if (X509_set_serialNumber(cert, sno) != 1) {
set_error("X509_set_serialNumber"); goto err ;
}
rv = 1;
err:
if (bn != NULL) BN_free(bn);
if (sno != NULL) ASN1_INTEGER_free(sno);
return rv; return rv;
} }
@ -205,9 +193,8 @@ static PyObject* create_rsa_cert(PyObject *self, PyObject *args) {
PyObject *reqC = NULL, *CA_certC = NULL, *CA_keyC = NULL, *ans = NULL; PyObject *reqC = NULL, *CA_certC = NULL, *CA_keyC = NULL, *ans = NULL;
X509_REQ *req = NULL; X509_REQ *req = NULL;
X509 *CA_cert = NULL, *Cert = NULL; X509 *CA_cert = NULL, *Cert = NULL;
RSA *CA_key = NULL;
X509_NAME *Name = NULL; X509_NAME *Name = NULL;
EVP_PKEY *PubKey = NULL, *PrivateKey = NULL; EVP_PKEY *PubKey = NULL;
int not_before = 0, expire = 1, req_is_for_CA_cert = 0, signature_length = 0, ok = 0, i = 0; int not_before = 0, expire = 1, req_is_for_CA_cert = 0, signature_length = 0, ok = 0, i = 0;
X509V3_CTX ctx; X509V3_CTX ctx;
STACK_OF(X509_EXTENSION) *exts = NULL; STACK_OF(X509_EXTENSION) *exts = NULL;
@ -223,7 +210,7 @@ static PyObject* create_rsa_cert(PyObject *self, PyObject *args) {
CA_cert = PyCapsule_GetPointer(CA_certC, NULL); CA_cert = PyCapsule_GetPointer(CA_certC, NULL);
if (!CA_cert) PyErr_Format(PyExc_TypeError, "The CA_cert capsule is NULL"); if (!CA_cert) PyErr_Format(PyExc_TypeError, "The CA_cert capsule is NULL");
} }
CA_key = PyCapsule_GetPointer(CA_keyC, NULL); EVP_PKEY *CA_key = PyCapsule_GetPointer(CA_keyC, NULL);
if (!CA_key) PyErr_Format(PyExc_TypeError, "The CA_key capsule is NULL"); if (!CA_key) PyErr_Format(PyExc_TypeError, "The CA_key capsule is NULL");
Cert = X509_new(); Cert = X509_new();
@ -259,11 +246,8 @@ static PyObject* create_rsa_cert(PyObject *self, PyObject *args) {
if (!PubKey) { set_error("X509_REQ_get_pubkey"); goto error; } if (!PubKey) { set_error("X509_REQ_get_pubkey"); goto error; }
if (!X509_REQ_verify(req, PubKey)) { set_error("X509_REQ_verify"); goto error; } if (!X509_REQ_verify(req, PubKey)) { set_error("X509_REQ_verify"); goto error; }
if (!X509_set_pubkey(Cert, PubKey)) { set_error("X509_set_pubkey"); goto error; } if (!X509_set_pubkey(Cert, PubKey)) { set_error("X509_set_pubkey"); goto error; }
PrivateKey = EVP_PKEY_new();
if (!PrivateKey) { set_error("EVP_PKEY_new"); goto error; }
if (!EVP_PKEY_set1_RSA(PrivateKey, CA_key)) { set_error("EVP_PKEY_set1_RSA"); goto error; }
Py_BEGIN_ALLOW_THREADS; Py_BEGIN_ALLOW_THREADS;
signature_length = X509_sign(Cert, PrivateKey, EVP_sha256()); signature_length = X509_sign(Cert, CA_key, EVP_sha256());
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
if (signature_length <= 0) { set_error("X509_sign"); goto error; } if (signature_length <= 0) { set_error("X509_sign"); goto error; }
ans = PyCapsule_New(Cert, NULL, free_cert); ans = PyCapsule_New(Cert, NULL, free_cert);
@ -324,8 +308,7 @@ static PyObject* cert_info(PyObject *self, PyObject *args) {
static PyObject* serialize_rsa_key(PyObject *self, PyObject *args) { static PyObject* serialize_rsa_key(PyObject *self, PyObject *args) {
PyObject *capsule = NULL, *ans = NULL; PyObject *capsule = NULL, *ans = NULL;
char *password = NULL; char *password = NULL;
EVP_PKEY *key = NULL; EVP_PKEY *keypair = NULL;
RSA *keypair = NULL;
BIO *mem = NULL; BIO *mem = NULL;
long sz = 0; long sz = 0;
int ok = 0; int ok = 0;
@ -336,21 +319,16 @@ static PyObject* serialize_rsa_key(PyObject *self, PyObject *args) {
keypair = PyCapsule_GetPointer(capsule, NULL); keypair = PyCapsule_GetPointer(capsule, NULL);
if (!keypair) return PyErr_Format(PyExc_TypeError, "The key capsule is NULL"); if (!keypair) return PyErr_Format(PyExc_TypeError, "The key capsule is NULL");
key = EVP_PKEY_new();
if (!key) { set_error("EVP_PKEY_new"); goto error; }
if (!EVP_PKEY_set1_RSA(key, keypair)) { set_error("EVP_PKEY_set1_RSA"); goto error; }
mem = BIO_new(BIO_s_mem()); mem = BIO_new(BIO_s_mem());
if (!mem) {set_error("BIO_new"); goto error; } if (!mem) {set_error("BIO_new"); goto error; }
if (password && *password) ok = PEM_write_bio_PrivateKey(mem, key, EVP_des_ede3_cbc(), NULL, 0, 0, password); if (password && *password) ok = PEM_write_bio_PrivateKey(mem, keypair, EVP_des_ede3_cbc(), NULL, 0, 0, password);
else ok = PEM_write_bio_PrivateKey(mem, key, NULL, NULL, 0, 0, NULL); else ok = PEM_write_bio_PrivateKey(mem, keypair, NULL, NULL, 0, 0, NULL);
if (!ok) { set_error("PEM_write_bio_PrivateKey"); goto error; } if (!ok) { set_error("PEM_write_bio_PrivateKey"); goto error; }
sz = BIO_get_mem_data(mem, &p); sz = BIO_get_mem_data(mem, &p);
Py_ssize_t psz = sz; Py_ssize_t psz = sz;
ans = Py_BuildValue("s#", p, psz); ans = Py_BuildValue("s#", p, psz);
error: error:
if (mem) BIO_free(mem); if (mem) BIO_free(mem);
if (key) EVP_PKEY_free(key);
return ans; return ans;
} }
@ -387,7 +365,6 @@ static int
exec_module(PyObject *module) { exec_module(PyObject *module) {
OpenSSL_add_all_algorithms(); OpenSSL_add_all_algorithms();
ERR_load_crypto_strings(); ERR_load_crypto_strings();
ERR_load_BIO_strings();
return 0; return 0;
} }
static PyModuleDef_Slot slots[] = { {Py_mod_exec, exec_module}, {0, NULL} }; static PyModuleDef_Slot slots[] = { {Py_mod_exec, exec_module}, {0, NULL} };
@ -398,6 +375,7 @@ static struct PyModuleDef module_def = {
.m_doc = "OpenSSL bindings to easily create certificates/certificate authorities.", .m_doc = "OpenSSL bindings to easily create certificates/certificate authorities.",
.m_methods = certgen_methods, .m_methods = certgen_methods,
.m_slots = slots, .m_slots = slots,
}; };
CALIBRE_MODINIT_FUNC PyInit_certgen(void) { return PyModuleDef_Init(&module_def); } CALIBRE_MODINIT_FUNC PyInit_certgen(void) { return PyModuleDef_Init(&module_def); }

View File

@ -45,7 +45,7 @@ def serialize_key(key_pair, password=None):
def cert_info(cert): def cert_info(cert):
return certgen.cert_info(cert).decode('utf-8') return certgen.cert_info(cert)
def create_server_cert( def create_server_cert(
@ -96,9 +96,9 @@ def create_server_cert(
if __name__ == '__main__': if __name__ == '__main__':
cacert, cakey, cert, pkey = create_server_cert('test.me', alt_names=['1.test.me', '*.all.test.me']) cacert, cakey, cert, pkey = create_server_cert('test.me')
print("CA Certificate") print("CA Certificate")
print(cert_info(cacert).encode('utf-8')) print(cert_info(cacert))
print(), print(), print() print(), print(), print()
print('Server Certificate') print('Server Certificate')
print(cert_info(cert).encode('utf-8')) print(cert_info(cert))