x509 function for creating signed certs
authorHelga Velroyen <helgav@google.com>
Fri, 5 Jun 2015 13:35:00 +0000 (15:35 +0200)
committerHelga Velroyen <helgav@google.com>
Mon, 6 Jul 2015 10:45:42 +0000 (12:45 +0200)
So far, all our SSL certficates were self-signed. As from
this patch series on client certificates will be signed by
the cluster certificate, we need a utility function for
creation of not self-signed certificates.

Signed-off-by: Helga Velroyen <helgav@google.com>
Reviewed-by: Klaus Aehlig <aehlig@google.com>

lib/utils/x509.py
test/py/ganeti.utils.x509_unittest.py

index 6ab62c4..63ded07 100644 (file)
@@ -54,6 +54,7 @@ X509_SIGNATURE = re.compile(r"^%s:\s*(?P<salt>%s+)/(?P<sign>%s+)$" %
                             (re.escape(constants.X509_CERT_SIGNATURE_HEADER),
                              HEX_CHAR_RE, HEX_CHAR_RE),
                             re.S | re.I)
+X509_CERT_SIGN_DIGEST = "SHA1"
 
 # Certificate verification results
 (CERT_WARNING,
@@ -326,6 +327,69 @@ def GenerateSelfSignedSslCert(filename, serial_no,
   return (key_pem, cert_pem)
 
 
+def GenerateSignedX509Cert(common_name, validity, serial_no,
+                           signing_cert_pem):
+  """Generates a signed (but not self-signed) X509 certificate.
+
+  @type common_name: string
+  @param common_name: commonName value, should be hostname of the machine
+  @type validity: int
+  @param validity: Validity for certificate in seconds
+  @type signing_cert_pem: X509 key
+  @param signing_cert_pem: PEM-encoded private key of the signing certificate
+  @return: a tuple of strings containing the PEM-encoded private key and
+           certificate
+
+  """
+  # Create key pair with private and public key.
+  key_pair = OpenSSL.crypto.PKey()
+  key_pair.generate_key(OpenSSL.crypto.TYPE_RSA, constants.RSA_KEY_BITS)
+
+  # Create certificate sigining request.
+  req = OpenSSL.crypto.X509Req()
+  req.get_subject().CN = common_name
+  req.set_pubkey(key_pair)
+  req.sign(key_pair, X509_CERT_SIGN_DIGEST)
+
+  # Load the certificates used for signing.
+  signing_key = OpenSSL.crypto.load_privatekey(
+      OpenSSL.crypto.FILETYPE_PEM, signing_cert_pem)
+  signing_cert = OpenSSL.crypto.load_certificate(
+      OpenSSL.crypto.FILETYPE_PEM, signing_cert_pem)
+
+  # Create a certificate and sign it.
+  cert = OpenSSL.crypto.X509()
+  cert.set_subject(req.get_subject())
+  cert.set_serial_number(serial_no)
+  cert.gmtime_adj_notBefore(0)
+  cert.gmtime_adj_notAfter(validity)
+  cert.set_issuer(signing_cert.get_subject())
+  cert.set_pubkey(req.get_pubkey())
+  cert.sign(signing_key, X509_CERT_SIGN_DIGEST)
+
+  # Encode the key and certificate in PEM format.
+  key_pem = OpenSSL.crypto.dump_privatekey(
+      OpenSSL.crypto.FILETYPE_PEM, key_pair)
+  cert_pem = OpenSSL.crypto.dump_certificate(
+      OpenSSL.crypto.FILETYPE_PEM, cert)
+
+  return (key_pem, cert_pem)
+
+
+def GenerateSignedSslCert(filename_cert, serial_no,
+                          filename_signing_cert,
+                          common_name=constants.X509_CERT_CN,
+                          validity=constants.X509_CERT_DEFAULT_VALIDITY,
+                          uid=-1, gid=-1):
+  signing_cert_pem = utils_io.ReadFile(filename_signing_cert)
+  (key_pem, cert_pem) = GenerateSignedX509Cert(
+      common_name, validity * 24 * 60 * 60, serial_no, signing_cert_pem)
+
+  utils_io.WriteFile(filename_cert, mode=0440, data=key_pem + cert_pem,
+                     uid=uid, gid=gid)
+  return (key_pem, cert_pem)
+
+
 def ExtractX509Certificate(pem):
   """Extracts the certificate from a PEM-formatted string.
 
index 99ecd76..01ad894 100755 (executable)
@@ -245,7 +245,7 @@ class TestVerifyCertificateInner(unittest.TestCase):
     self.assertEqual(errcode, utils.CERT_ERROR)
 
 
-class TestGenerateSelfSignedX509Cert(unittest.TestCase):
+class TestGenerateX509Certs(unittest.TestCase):
   def setUp(self):
     self.tmpdir = tempfile.mkdtemp()
 
@@ -294,6 +294,40 @@ class TestGenerateSelfSignedX509Cert(unittest.TestCase):
     self.assert_(self._checkRsaPrivateKey(cert1))
     self.assert_(self._checkCertificate(cert1))
 
+  def _checkKeyMatchesCert(self, key, cert):
+    ctx = OpenSSL.SSL.Context(OpenSSL.SSL.TLSv1_METHOD)
+    ctx.use_privatekey(key)
+    ctx.use_certificate(cert)
+    try:
+      ctx.check_privatekey()
+    except OpenSSL.SSL.Error:
+      return False
+    else:
+      return True
+
+  def testSignedSslCertificate(self):
+    server_cert_filename = os.path.join(self.tmpdir, "server.pem")
+    utils.GenerateSelfSignedSslCert(server_cert_filename, 123456)
+
+    client_hostname = "myhost.example.com"
+    client_cert_filename = os.path.join(self.tmpdir, "client.pem")
+    utils.GenerateSignedSslCert(client_cert_filename, 666,
+        server_cert_filename, common_name=client_hostname)
+
+    client_cert_pem = utils.ReadFile(client_cert_filename)
+
+    self._checkRsaPrivateKey(client_cert_pem)
+    self._checkCertificate(client_cert_pem)
+
+    priv_key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
+                                              client_cert_pem)
+    client_cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
+                                           client_cert_pem)
+
+    self.assertTrue(self._checkKeyMatchesCert(priv_key, client_cert))
+    self.assertEqual(client_cert.get_issuer().CN, "ganeti.example.com")
+    self.assertEqual(client_cert.get_subject().CN, client_hostname)
+
 
 class TestCheckNodeCertificate(testutils.GanetiTestCase):
   def setUp(self):