def _VerifyClientCertificate(cert_file=pathutils.NODED_CLIENT_CERT_FILE):
"""Verify the existance and validity of the client SSL certificate.
+ Also, verify that the client certificate is not self-signed. Self-
+ signed client certificates stem from Ganeti versions 2.12.0 - 2.12.4
+ and should be replaced by client certificates signed by the server
+ certificate. Hence we output a warning when we encounter a self-signed
+ one.
+
"""
create_cert_cmd = "gnt-cluster renew-crypto --new-node-certificates"
if not os.path.exists(cert_file):
(errcode, msg) = utils.VerifyCertificate(cert_file)
if errcode is not None:
return (errcode, msg)
- else:
- # if everything is fine, we return the digest to be compared to the config
- return (None, utils.GetCertificateDigest(cert_filename=cert_file))
+
+ (errcode, msg) = utils.IsCertificateSelfSigned(cert_file)
+ if errcode is not None:
+ return (errcode, msg)
+
+ # if everything is fine, we return the digest to be compared to the config
+ return (None, utils.GetCertificateDigest(cert_filename=cert_file))
def VerifyNode(what, cluster_name, all_hvparams, node_groups, groups_cfg):
(errcode, msg) = \
x509.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
- constants.SSL_CERT_EXPIRATION_ERROR)
+ constants.SSL_CERT_EXPIRATION_ERROR)
if msg:
fnamemsg = "While verifying %s: %s" % (filename, msg)
return (constants.CV_ERROR, fnamemsg)
raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
+
+
+def IsCertificateSelfSigned(cert_filename):
+ """Checks whether the certificate issuer is the same as the owner.
+
+ Note that this does not actually verify the signature, it simply
+ compares the certificates common name and the issuer's common
+ name. This is sufficient, because now that Ganeti started creating
+ non-self-signed client-certificates, it uses their hostnames
+ as common names and thus they are distinguishable by common name
+ from the server certificates.
+
+ @type cert_filename: string
+ @param cert_filename: filename of the certificate to examine
+
+ """
+ try:
+ cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
+ io.ReadFile(cert_filename))
+ except Exception, err: # pylint: disable=W0703
+ return (constants.CV_ERROR,
+ "Failed to load X509 certificate %s: %s" % (cert_filename, err))
+
+ if cert.get_subject().CN == cert.get_issuer().CN:
+ msg = "The certificate '%s' is self-signed. Please run 'gnt-cluster" \
+ " renew-crypto --new-node-certificates' to get a properly signed" \
+ " certificate." % cert_filename
+ return (constants.CV_WARNING, msg)
+
+ return (None, None)
verif_cert.return_value = (None, None)
cert_file = testutils.TestDataFilename("cert2.pem")
(errcode, digest) = backend._VerifyClientCertificate(cert_file=cert_file)
- self.assertEqual(None, errcode)
+ self.assertEqual(constants.CV_WARNING, errcode)
self.assertTrue(isinstance(digest, str))
@testutils.patch_object(utils, "VerifyCertificate")