Skip to content

Commit

Permalink
mgr: enable verification of TLS certs without files
Browse files Browse the repository at this point in the history
Signed-off-by: Patrick Seidensal <[email protected]>
  • Loading branch information
p-se committed Mar 5, 2020
1 parent e468c99 commit d1b390d
Showing 1 changed file with 42 additions and 6 deletions.
48 changes: 42 additions & 6 deletions src/pybind/mgr/mgr_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,18 @@ def create_self_signed_cert(organisation='Ceph', common_name='mgr') -> Tuple[str
return cert.decode('utf-8'), pkey.decode('utf-8')


def verify_cacrt_content(crt):
# type: (str) -> None
from OpenSSL import crypto
try:
x509 = crypto.load_certificate(crypto.FILETYPE_PEM, crt)
if x509.has_expired():
logger.warning('Certificate has expired: {}'.format(crt))
except (ValueError, crypto.Error) as e:
raise ServerConfigException(
'Invalid certificate: {}'.format(str(e)))


def verify_cacrt(cert_fname):
# type: (str) -> None
"""Basic validation of a ca cert"""
Expand All @@ -164,18 +176,42 @@ def verify_cacrt(cert_fname):
if not os.path.isfile(cert_fname):
raise ServerConfigException("Certificate {} does not exist".format(cert_fname))

from OpenSSL import crypto
try:
with open(cert_fname) as f:
x509 = crypto.load_certificate(crypto.FILETYPE_PEM, f.read())
if x509.has_expired():
logger.warning(
'Certificate {} has expired'.format(cert_fname))
except (ValueError, crypto.Error) as e:
verify_cacrt_content(f.read())
except ValueError as e:
raise ServerConfigException(
'Invalid certificate {}: {}'.format(cert_fname, str(e)))


def verify_tls(crt, key):
# type: (str, str) -> None
verify_cacrt_content(crt)

from OpenSSL import crypto, SSL
try:
_key = crypto.load_privatekey(crypto.FILETYPE_PEM, key)
_key.check()
except (ValueError, crypto.Error) as e:
raise ServerConfigException(
'Invalid private key: {}'.format(str(e)))
try:
_crt = crypto.load_certificate(crypto.FILETYPE_PEM, crt)
except ValueError as e:
raise ServerConfigException(
'Invalid certificate key: {}'.format(str(e))
)

try:
context = SSL.Context(SSL.TLSv1_METHOD)
context.use_certificate(_crt)
context.use_privatekey(_key)
context.check_privatekey()
except crypto.Error as e:
logger.warning(
'Private key and certificate do not match up: {}'.format(str(e)))


def verify_tls_files(cert_fname, pkey_fname):
# type: (str, str) -> None
"""Basic checks for TLS certificate and key files
Expand Down

0 comments on commit d1b390d

Please sign in to comment.