(re.escape(constants.X509_CERT_SIGNATURE_HEADER),
HEX_CHAR_RE, HEX_CHAR_RE),
re.S | re.I)
+X509_CERT_SIGN_DIGEST = "SHA1"
# Certificate verification results
(CERT_WARNING,
return (key_pem, cert_pem)
+def GenerateSignedX509Cert(common_name, validity, serial_no,
+ signing_cert_pem):
+ """Generates a signed (but not self-signed) X509 certificate.
+
+ @type common_name: string
+ @param common_name: commonName value, should be hostname of the machine
+ @type validity: int
+ @param validity: Validity for certificate in seconds
+ @type signing_cert_pem: X509 key
+ @param signing_cert_pem: PEM-encoded private key of the signing certificate
+ @return: a tuple of strings containing the PEM-encoded private key and
+ certificate
+
+ """
+ # Create key pair with private and public key.
+ key_pair = OpenSSL.crypto.PKey()
+ key_pair.generate_key(OpenSSL.crypto.TYPE_RSA, constants.RSA_KEY_BITS)
+
+ # Create certificate sigining request.
+ req = OpenSSL.crypto.X509Req()
+ req.get_subject().CN = common_name
+ req.set_pubkey(key_pair)
+ req.sign(key_pair, X509_CERT_SIGN_DIGEST)
+
+ # Load the certificates used for signing.
+ signing_key = OpenSSL.crypto.load_privatekey(
+ OpenSSL.crypto.FILETYPE_PEM, signing_cert_pem)
+ signing_cert = OpenSSL.crypto.load_certificate(
+ OpenSSL.crypto.FILETYPE_PEM, signing_cert_pem)
+
+ # Create a certificate and sign it.
+ cert = OpenSSL.crypto.X509()
+ cert.set_subject(req.get_subject())
+ cert.set_serial_number(serial_no)
+ cert.gmtime_adj_notBefore(0)
+ cert.gmtime_adj_notAfter(validity)
+ cert.set_issuer(signing_cert.get_subject())
+ cert.set_pubkey(req.get_pubkey())
+ cert.sign(signing_key, X509_CERT_SIGN_DIGEST)
+
+ # Encode the key and certificate in PEM format.
+ key_pem = OpenSSL.crypto.dump_privatekey(
+ OpenSSL.crypto.FILETYPE_PEM, key_pair)
+ cert_pem = OpenSSL.crypto.dump_certificate(
+ OpenSSL.crypto.FILETYPE_PEM, cert)
+
+ return (key_pem, cert_pem)
+
+
+def GenerateSignedSslCert(filename_cert, serial_no,
+ filename_signing_cert,
+ common_name=constants.X509_CERT_CN,
+ validity=constants.X509_CERT_DEFAULT_VALIDITY,
+ uid=-1, gid=-1):
+ signing_cert_pem = utils_io.ReadFile(filename_signing_cert)
+ (key_pem, cert_pem) = GenerateSignedX509Cert(
+ common_name, validity * 24 * 60 * 60, serial_no, signing_cert_pem)
+
+ utils_io.WriteFile(filename_cert, mode=0440, data=key_pem + cert_pem,
+ uid=uid, gid=gid)
+ return (key_pem, cert_pem)
+
+
def ExtractX509Certificate(pem):
"""Extracts the certificate from a PEM-formatted string.
self.assertEqual(errcode, utils.CERT_ERROR)
-class TestGenerateSelfSignedX509Cert(unittest.TestCase):
+class TestGenerateX509Certs(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp()
self.assert_(self._checkRsaPrivateKey(cert1))
self.assert_(self._checkCertificate(cert1))
+ def _checkKeyMatchesCert(self, key, cert):
+ ctx = OpenSSL.SSL.Context(OpenSSL.SSL.TLSv1_METHOD)
+ ctx.use_privatekey(key)
+ ctx.use_certificate(cert)
+ try:
+ ctx.check_privatekey()
+ except OpenSSL.SSL.Error:
+ return False
+ else:
+ return True
+
+ def testSignedSslCertificate(self):
+ server_cert_filename = os.path.join(self.tmpdir, "server.pem")
+ utils.GenerateSelfSignedSslCert(server_cert_filename, 123456)
+
+ client_hostname = "myhost.example.com"
+ client_cert_filename = os.path.join(self.tmpdir, "client.pem")
+ utils.GenerateSignedSslCert(client_cert_filename, 666,
+ server_cert_filename, common_name=client_hostname)
+
+ client_cert_pem = utils.ReadFile(client_cert_filename)
+
+ self._checkRsaPrivateKey(client_cert_pem)
+ self._checkCertificate(client_cert_pem)
+
+ priv_key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
+ client_cert_pem)
+ client_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
+ client_cert_pem)
+
+ self.assertTrue(self._checkKeyMatchesCert(priv_key, client_cert))
+ self.assertEqual(client_cert.get_issuer().CN, "ganeti.example.com")
+ self.assertEqual(client_cert.get_subject().CN, client_hostname)
+
class TestCheckNodeCertificate(testutils.GanetiTestCase):
def setUp(self):