Skip to content

Commit

Permalink
Allow using an OpenSSL hashed directory for verification in X509Store (
Browse files Browse the repository at this point in the history
…#943)

Add X509Store.load_locations() to set a CA bundle file and/or an OpenSSL-
style hashed CA/CRL lookup directory, similar to the already existing
SSL.Context.load_verify_locations().

Co-authored-by: Sandor Oroszi <sandor.oroszi@balabit.com>
  • Loading branch information
orosam and Sandor Oroszi authored Sep 11, 2020
1 parent 0488214 commit 43c9776
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 1 deletion.
3 changes: 3 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ Deprecations:
Changes:
^^^^^^^^

- Added ``OpenSSL.crypto.X509Store.load_locations`` to set trusted
certificate file bundles and/or directories for verification.
`#943 </~https://github.com/pyca/pyopenssl/pull/943>`_
- Added ``Context.set_keylog_callback`` to log key material.
`#910 </~https://github.com/pyca/pyopenssl/pull/910>`_
- Added ``OpenSSL.SSL.Connection.get_verified_chain`` to retrieve the
Expand Down
48 changes: 48 additions & 0 deletions src/OpenSSL/crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
exception_from_error_queue as _exception_from_error_queue,
byte_string as _byte_string,
native as _native,
path_string as _path_string,
UNSPECIFIED as _UNSPECIFIED,
text_to_bytes_and_warn as _text_to_bytes_and_warn,
make_assert as _make_assert,
Expand Down Expand Up @@ -1674,6 +1675,53 @@ def set_time(self, vfy_time):
_lib.X509_VERIFY_PARAM_set_time(param, int(vfy_time.strftime("%s")))
_openssl_assert(_lib.X509_STORE_set1_param(self._store, param) != 0)

def load_locations(self, cafile, capath=None):
"""
Let X509Store know where we can find trusted certificates for the
certificate chain. Note that the certificates have to be in PEM
format.
If *capath* is passed, it must be a directory prepared using the
``c_rehash`` tool included with OpenSSL. Either, but not both, of
*cafile* or *capath* may be ``None``.
.. note::
Both *cafile* and *capath* may be set simultaneously.
Call this method multiple times to add more than one location.
For example, CA certificates, and certificate revocation list bundles
may be passed in *cafile* in subsequent calls to this method.
.. versionadded:: 20.0
:param cafile: In which file we can find the certificates (``bytes`` or
``unicode``).
:param capath: In which directory we can find the certificates
(``bytes`` or ``unicode``).
:return: ``None`` if the locations were set successfully.
:raises OpenSSL.crypto.Error: If both *cafile* and *capath* is ``None``
or the locations could not be set for any reason.
"""
if cafile is None:
cafile = _ffi.NULL
else:
cafile = _path_string(cafile)

if capath is None:
capath = _ffi.NULL
else:
capath = _path_string(capath)

load_result = _lib.X509_STORE_load_locations(
self._store, cafile, capath
)
if not load_result:
_raise_current_error()


class X509StoreContextError(Exception):
"""
Expand Down
136 changes: 135 additions & 1 deletion tests/test_crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import base64
from subprocess import PIPE, Popen
from datetime import datetime, timedelta
import sys

import pytest

Expand Down Expand Up @@ -46,7 +47,14 @@
get_elliptic_curves,
)

from .util import EqualityTestsMixin, is_consistent_type, WARNING_TYPE_EXPECTED
from OpenSSL._util import ffi as _ffi, lib as _lib

from .util import (
EqualityTestsMixin,
is_consistent_type,
WARNING_TYPE_EXPECTED,
NON_ASCII,
)


def normalize_privatekey_pem(pem):
Expand Down Expand Up @@ -2228,6 +2236,68 @@ def test_add_cert_accepts_duplicate(self):
store.add_cert(cert)
store.add_cert(cert)

@pytest.mark.parametrize(
"cafile, capath, call_cafile, call_capath",
[
(
"/cafile" + NON_ASCII,
None,
b"/cafile" + NON_ASCII.encode(sys.getfilesystemencoding()),
_ffi.NULL,
),
(
b"/cafile" + NON_ASCII.encode("utf-8"),
None,
b"/cafile" + NON_ASCII.encode("utf-8"),
_ffi.NULL,
),
(
None,
"/capath" + NON_ASCII,
_ffi.NULL,
b"/capath" + NON_ASCII.encode(sys.getfilesystemencoding()),
),
(
None,
b"/capath" + NON_ASCII.encode("utf-8"),
_ffi.NULL,
b"/capath" + NON_ASCII.encode("utf-8"),
),
],
)
def test_load_locations_parameters(
self, cafile, capath, call_cafile, call_capath, monkeypatch
):
class LibMock(object):
def load_locations(self, store, cafile, capath):
self.cafile = cafile
self.capath = capath
return 1

lib_mock = LibMock()
monkeypatch.setattr(
_lib, "X509_STORE_load_locations", lib_mock.load_locations
)

store = X509Store()
store.load_locations(cafile=cafile, capath=capath)

assert call_cafile == lib_mock.cafile
assert call_capath == lib_mock.capath

def test_load_locations_fails_when_all_args_are_none(self):
store = X509Store()
with pytest.raises(Error):
store.load_locations(None, None)

def test_load_locations_raises_error_on_failure(self, tmpdir):
invalid_ca_file = tmpdir.join("invalid.pem")
invalid_ca_file.write("This is not a certificate")

store = X509Store()
with pytest.raises(Error):
store.load_locations(cafile=str(invalid_ca_file))


class TestPKCS12(object):
"""
Expand Down Expand Up @@ -3884,6 +3954,70 @@ def test_get_verified_chain_invalid_chain_no_root(self):
assert exc.value.args[0][2] == "unable to get issuer certificate"
assert exc.value.certificate.get_subject().CN == "intermediate"

@pytest.fixture
def root_ca_file(self, tmpdir):
return self._create_ca_file(tmpdir, "root_ca_hash_dir", self.root_cert)

@pytest.fixture
def intermediate_ca_file(self, tmpdir):
return self._create_ca_file(
tmpdir, "intermediate_ca_hash_dir", self.intermediate_cert
)

@staticmethod
def _create_ca_file(base_path, hash_directory, cacert):
ca_hash = "{:08x}.0".format(cacert.subject_name_hash())
cafile = base_path.join(hash_directory, ca_hash)
cafile.write_binary(
dump_certificate(FILETYPE_PEM, cacert), ensure=True
)
return cafile

def test_verify_with_ca_file_location(self, root_ca_file):
store = X509Store()
store.load_locations(str(root_ca_file))

store_ctx = X509StoreContext(store, self.intermediate_cert)
store_ctx.verify_certificate()

def test_verify_with_ca_path_location(self, root_ca_file):
store = X509Store()
store.load_locations(None, str(root_ca_file.dirname))

store_ctx = X509StoreContext(store, self.intermediate_cert)
store_ctx.verify_certificate()

def test_verify_with_cafile_and_capath(
self, root_ca_file, intermediate_ca_file
):
store = X509Store()
store.load_locations(
cafile=str(root_ca_file), capath=str(intermediate_ca_file.dirname)
)

store_ctx = X509StoreContext(store, self.intermediate_server_cert)
store_ctx.verify_certificate()

def test_verify_with_multiple_ca_files(
self, root_ca_file, intermediate_ca_file
):
store = X509Store()
store.load_locations(str(root_ca_file))
store.load_locations(str(intermediate_ca_file))

store_ctx = X509StoreContext(store, self.intermediate_server_cert)
store_ctx.verify_certificate()

def test_verify_failure_with_empty_ca_directory(self, tmpdir):
store = X509Store()
store.load_locations(None, str(tmpdir))

store_ctx = X509StoreContext(store, self.intermediate_cert)
with pytest.raises(X509StoreContextError) as exc:
store_ctx.verify_certificate()

assert exc.value.args[0][2] == "unable to get local issuer certificate"


class TestSignVerify(object):
"""
Expand Down

0 comments on commit 43c9776

Please sign in to comment.