Unit test for LURenewCrypto's valid case
authorHelga Velroyen <helgav@google.com>
Wed, 4 Mar 2015 17:41:45 +0000 (18:41 +0100)
committerHelga Velroyen <helgav@google.com>
Wed, 18 Mar 2015 10:19:46 +0000 (11:19 +0100)
This adds a unit test which tests a successful run of
LURenewCrypto. While writing this, some options for
improvement became apparent and are fixed in this patch
as well.

Signed-off-by: Helga Velroyen <helgav@google.com>
Reviewed-by: Petr Pudlak <pudlak@google.com>

lib/cmdlib/cluster.py
test/py/cmdlib/cluster_unittest.py

index 24972fa..6f0103e 100644 (file)
@@ -129,12 +129,14 @@ class LUClusterRenewCrypto(NoHooksLU):
     except IOError:
       logging.info("No old certificate available.")
 
-    new_master_digest = _UpdateMasterClientCert(self, master_uuid, cluster,
-                                                feedback_fn)
+    # Technically it should not be necessary to set the cert
+    # paths. However, due to a bug in the mock library, we
+    # have to do this to be able to test the function properly.
+    _UpdateMasterClientCert(
+        self, master_uuid, cluster, feedback_fn,
+        client_cert=pathutils.NODED_CLIENT_CERT_FILE,
+        client_cert_tmp=pathutils.NODED_CLIENT_CERT_FILE_TMP)
 
-    utils.AddNodeToCandidateCerts(master_uuid,
-                                  new_master_digest,
-                                  cluster.candidate_certs)
     nodes = self.cfg.GetAllNodesInfo()
     for (node_uuid, node_info) in nodes.items():
       if node_info.offline:
index dc3a946..e3ea305 100644 (file)
@@ -37,6 +37,8 @@ import OpenSSL
 import copy
 import unittest
 import operator
+import shutil
+import os
 
 from ganeti.cmdlib import cluster
 from ganeti import constants
@@ -2264,5 +2266,65 @@ class TestLUClusterVerifyDisks(CmdlibTestCase):
     self.assertEqual(1, len(result["jobs"]))
 
 
+class TestLUClusterRenewCrypto(CmdlibTestCase):
+
+  def setUp(self):
+    super(TestLUClusterRenewCrypto, self).setUp()
+    self._node_cert = self._CreateTempFile()
+    shutil.copy(testutils.TestDataFilename("cert1.pem"), self._node_cert)
+    self._client_node_cert = self._CreateTempFile()
+    shutil.copy(testutils.TestDataFilename("cert2.pem"), self._client_node_cert)
+    self._client_node_cert_tmp = self._CreateTempFile()
+
+  def tearDown(self):
+    super(TestLUClusterRenewCrypto, self).tearDown()
+
+  def _GetFakeDigest(self, uuid):
+    """Creates a fake SSL digest depending on the UUID of a node.
+
+    @type uuid: string
+    @param uuid: node UUID
+    @returns: a string impersonating a SSL digest
+
+    """
+    return "FA:KE:%s:%s:%s:%s" % (uuid[0:2], uuid[2:4], uuid[4:6], uuid[6:8])
+  @patchPathutils("cluster")
+  def testSuccessfulCase(self, pathutils):
+
+    # patch pathutils to point to temporary files
+    pathutils.NODED_CERT_FILE = self._node_cert 
+    pathutils.NODED_CLIENT_CERT_FILE = self._client_node_cert 
+    pathutils.NODED_CLIENT_CERT_FILE_TMP = \
+        self._client_node_cert_tmp
+
+    # create a few non-master, online nodes
+    num_nodes = 3
+    for _ in range(num_nodes):
+      self.cfg.AddNewNode()
+
+    # make sure the RPC calls are successful for all nodes
+    self.rpc.call_node_crypto_tokens = \
+      lambda node_uuid, _: self.RpcResultsBuilder() \
+        .CreateSuccessfulNodeResult(node_uuid,
+          [(constants.CRYPTO_TYPE_SSL_DIGEST, self._GetFakeDigest(node_uuid))])
+
+    op = opcodes.OpClusterRenewCrypto()
+    self.ExecOpCode(op)
+
+    # Check if the correct certificates exist and don't exist on the master
+    self.assertTrue(os.path.exists(pathutils.NODED_CERT_FILE))
+    self.assertTrue(os.path.exists(pathutils.NODED_CLIENT_CERT_FILE))
+    self.assertFalse(os.path.exists(pathutils.NODED_CLIENT_CERT_FILE_TMP))
+
+    # Check if we have the correct digests in the configuration
+    cluster = self.cfg.GetClusterInfo()
+    self.assertEqual(num_nodes + 1, len(cluster.candidate_certs))
+    nodes = self.cfg.GetAllNodesInfo()
+    for (node_uuid, _) in nodes.items():
+      expected_digest = self._GetFakeDigest(node_uuid)
+      self.assertEqual(expected_digest, cluster.candidate_certs[node_uuid])
+
+
 if __name__ == "__main__":
   testutils.GanetiTestProgram()