From 43c97767afe7aa1dccefec35946dbc8b9cde73a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A1ndor=20Oroszi?= Date: Fri, 11 Sep 2020 17:17:31 +0200 Subject: [PATCH] Allow using an OpenSSL hashed directory for verification in X509Store (#943) Add X509Store.load_locations() to set a CA bundle file and/or an OpenSSL- style hashed CA/CRL lookup directory, similar to the already existing SSL.Context.load_verify_locations(). Co-authored-by: Sandor Oroszi --- CHANGELOG.rst | 3 + src/OpenSSL/crypto.py | 48 +++++++++++++++ tests/test_crypto.py | 136 +++++++++++++++++++++++++++++++++++++++++- 3 files changed, 186 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index e8c52c232..e5f08d210 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -24,6 +24,9 @@ Deprecations: Changes: ^^^^^^^^ +- Added ``OpenSSL.crypto.X509Store.load_locations`` to set trusted + certificate file bundles and/or directories for verification. + `#943 `_ - Added ``Context.set_keylog_callback`` to log key material. `#910 `_ - Added ``OpenSSL.SSL.Connection.get_verified_chain`` to retrieve the diff --git a/src/OpenSSL/crypto.py b/src/OpenSSL/crypto.py index fee5259ae..880c2b315 100644 --- a/src/OpenSSL/crypto.py +++ b/src/OpenSSL/crypto.py @@ -19,6 +19,7 @@ exception_from_error_queue as _exception_from_error_queue, byte_string as _byte_string, native as _native, + path_string as _path_string, UNSPECIFIED as _UNSPECIFIED, text_to_bytes_and_warn as _text_to_bytes_and_warn, make_assert as _make_assert, @@ -1674,6 +1675,53 @@ def set_time(self, vfy_time): _lib.X509_VERIFY_PARAM_set_time(param, int(vfy_time.strftime("%s"))) _openssl_assert(_lib.X509_STORE_set1_param(self._store, param) != 0) + def load_locations(self, cafile, capath=None): + """ + Let X509Store know where we can find trusted certificates for the + certificate chain. Note that the certificates have to be in PEM + format. + + If *capath* is passed, it must be a directory prepared using the + ``c_rehash`` tool included with OpenSSL. Either, but not both, of + *cafile* or *capath* may be ``None``. + + .. note:: + + Both *cafile* and *capath* may be set simultaneously. + + Call this method multiple times to add more than one location. + For example, CA certificates, and certificate revocation list bundles + may be passed in *cafile* in subsequent calls to this method. + + .. versionadded:: 20.0 + + :param cafile: In which file we can find the certificates (``bytes`` or + ``unicode``). + :param capath: In which directory we can find the certificates + (``bytes`` or ``unicode``). + + :return: ``None`` if the locations were set successfully. + + :raises OpenSSL.crypto.Error: If both *cafile* and *capath* is ``None`` + or the locations could not be set for any reason. + + """ + if cafile is None: + cafile = _ffi.NULL + else: + cafile = _path_string(cafile) + + if capath is None: + capath = _ffi.NULL + else: + capath = _path_string(capath) + + load_result = _lib.X509_STORE_load_locations( + self._store, cafile, capath + ) + if not load_result: + _raise_current_error() + class X509StoreContextError(Exception): """ diff --git a/tests/test_crypto.py b/tests/test_crypto.py index ac4e72951..820a4b259 100644 --- a/tests/test_crypto.py +++ b/tests/test_crypto.py @@ -10,6 +10,7 @@ import base64 from subprocess import PIPE, Popen from datetime import datetime, timedelta +import sys import pytest @@ -46,7 +47,14 @@ get_elliptic_curves, ) -from .util import EqualityTestsMixin, is_consistent_type, WARNING_TYPE_EXPECTED +from OpenSSL._util import ffi as _ffi, lib as _lib + +from .util import ( + EqualityTestsMixin, + is_consistent_type, + WARNING_TYPE_EXPECTED, + NON_ASCII, +) def normalize_privatekey_pem(pem): @@ -2228,6 +2236,68 @@ def test_add_cert_accepts_duplicate(self): store.add_cert(cert) store.add_cert(cert) + @pytest.mark.parametrize( + "cafile, capath, call_cafile, call_capath", + [ + ( + "/cafile" + NON_ASCII, + None, + b"/cafile" + NON_ASCII.encode(sys.getfilesystemencoding()), + _ffi.NULL, + ), + ( + b"/cafile" + NON_ASCII.encode("utf-8"), + None, + b"/cafile" + NON_ASCII.encode("utf-8"), + _ffi.NULL, + ), + ( + None, + "/capath" + NON_ASCII, + _ffi.NULL, + b"/capath" + NON_ASCII.encode(sys.getfilesystemencoding()), + ), + ( + None, + b"/capath" + NON_ASCII.encode("utf-8"), + _ffi.NULL, + b"/capath" + NON_ASCII.encode("utf-8"), + ), + ], + ) + def test_load_locations_parameters( + self, cafile, capath, call_cafile, call_capath, monkeypatch + ): + class LibMock(object): + def load_locations(self, store, cafile, capath): + self.cafile = cafile + self.capath = capath + return 1 + + lib_mock = LibMock() + monkeypatch.setattr( + _lib, "X509_STORE_load_locations", lib_mock.load_locations + ) + + store = X509Store() + store.load_locations(cafile=cafile, capath=capath) + + assert call_cafile == lib_mock.cafile + assert call_capath == lib_mock.capath + + def test_load_locations_fails_when_all_args_are_none(self): + store = X509Store() + with pytest.raises(Error): + store.load_locations(None, None) + + def test_load_locations_raises_error_on_failure(self, tmpdir): + invalid_ca_file = tmpdir.join("invalid.pem") + invalid_ca_file.write("This is not a certificate") + + store = X509Store() + with pytest.raises(Error): + store.load_locations(cafile=str(invalid_ca_file)) + class TestPKCS12(object): """ @@ -3884,6 +3954,70 @@ def test_get_verified_chain_invalid_chain_no_root(self): assert exc.value.args[0][2] == "unable to get issuer certificate" assert exc.value.certificate.get_subject().CN == "intermediate" + @pytest.fixture + def root_ca_file(self, tmpdir): + return self._create_ca_file(tmpdir, "root_ca_hash_dir", self.root_cert) + + @pytest.fixture + def intermediate_ca_file(self, tmpdir): + return self._create_ca_file( + tmpdir, "intermediate_ca_hash_dir", self.intermediate_cert + ) + + @staticmethod + def _create_ca_file(base_path, hash_directory, cacert): + ca_hash = "{:08x}.0".format(cacert.subject_name_hash()) + cafile = base_path.join(hash_directory, ca_hash) + cafile.write_binary( + dump_certificate(FILETYPE_PEM, cacert), ensure=True + ) + return cafile + + def test_verify_with_ca_file_location(self, root_ca_file): + store = X509Store() + store.load_locations(str(root_ca_file)) + + store_ctx = X509StoreContext(store, self.intermediate_cert) + store_ctx.verify_certificate() + + def test_verify_with_ca_path_location(self, root_ca_file): + store = X509Store() + store.load_locations(None, str(root_ca_file.dirname)) + + store_ctx = X509StoreContext(store, self.intermediate_cert) + store_ctx.verify_certificate() + + def test_verify_with_cafile_and_capath( + self, root_ca_file, intermediate_ca_file + ): + store = X509Store() + store.load_locations( + cafile=str(root_ca_file), capath=str(intermediate_ca_file.dirname) + ) + + store_ctx = X509StoreContext(store, self.intermediate_server_cert) + store_ctx.verify_certificate() + + def test_verify_with_multiple_ca_files( + self, root_ca_file, intermediate_ca_file + ): + store = X509Store() + store.load_locations(str(root_ca_file)) + store.load_locations(str(intermediate_ca_file)) + + store_ctx = X509StoreContext(store, self.intermediate_server_cert) + store_ctx.verify_certificate() + + def test_verify_failure_with_empty_ca_directory(self, tmpdir): + store = X509Store() + store.load_locations(None, str(tmpdir)) + + store_ctx = X509StoreContext(store, self.intermediate_cert) + with pytest.raises(X509StoreContextError) as exc: + store_ctx.verify_certificate() + + assert exc.value.args[0][2] == "unable to get local issuer certificate" + class TestSignVerify(object): """