Unit test for backend.RenewCrypto
authorHelga Velroyen <helgav@google.com>
Tue, 19 Jan 2016 14:21:17 +0000 (15:21 +0100)
committerHelga Velroyen <helgav@google.com>
Fri, 22 Jan 2016 09:39:16 +0000 (10:39 +0100)
This patch adds a unit test for the successful execution
of backend.RenewCrypto. It mostly reuses infrastructure
from the unit tests for adding and removing SSH keys.

Signed-off-by: Helga Velroyen <helgav@google.com>
Reviewed-by: Klaus Aehlig <aehlig@google.com>

test/py/ganeti.backend_unittest.py

index a30ec67..ab72d36 100755 (executable)
@@ -1918,6 +1918,80 @@ class TestAddRemoveGenerateNodeSshKey(testutils.GanetiTestCase):
     self.assertTrue([error_msg for (node, error_msg) in error_msgs
                      if node == node_name])
 
+  def _MockReadRemoteSshPubKey(self, pub_key_file, node, cluster_name, port,
+                               ask_key, strict_host_check):
+    return self._ssh_file_manager.GetKeyOfNode(self._master_node)
+
+
+  def _MockReadLocalSshPubKeys(self, key_types, suffix=""):
+    return [self._ssh_file_manager.GetKeyOfNode(self._master_node)]
+
+  def _setUpRenewCrypto(self):
+    """Preparations only needed for the renew-crypto unittests."""
+    self.tmpdir = tempfile.mkdtemp()
+    self._dsa_keyfile = os.path.join(self.tmpdir, "id_dsa.pub")
+    self._rsa_keyfile = os.path.join(self.tmpdir, "id_rsa.pub")
+
+    self._ssh_get_all_user_files_patcher = testutils \
+        .patch_object(ssh, "GetAllUserFiles")
+    self._ssh_get_all_user_files_mock = \
+        self._ssh_get_all_user_files_patcher.start()
+    self._ssh_get_all_user_files_mock.return_value = (None,
+        {constants.SSHK_DSA: (None, self._dsa_keyfile),
+         constants.SSHK_RSA: (None, self._rsa_keyfile)})
+
+    self._ssh_read_remote_ssh_pub_key_patcher = testutils \
+        .patch_object(ssh, "ReadRemoteSshPubKey")
+    self._ssh_read_remote_ssh_pub_key_mock = \
+        self._ssh_read_remote_ssh_pub_key_patcher.start()
+    self._ssh_read_remote_ssh_pub_key_mock.side_effect = \
+        self._MockReadRemoteSshPubKey
+
+    self._ssh_read_local_ssh_pub_keys_patcher = testutils \
+        .patch_object(ssh, "ReadLocalSshPubKeys")
+    self._ssh_read_local_ssh_pub_keys_mock = \
+        self._ssh_read_local_ssh_pub_keys_patcher.start()
+    self._ssh_read_local_ssh_pub_keys_mock.side_effect = \
+        self._MockReadLocalSshPubKeys
+
+    self._ssh_replace_ssh_keys_patcher = testutils \
+        .patch_object(ssh, "ReplaceSshKeys")
+    self._ssh_replace_ssh_keys_mock = \
+        self._ssh_replace_ssh_keys_patcher.start()
+
+  def _tearDownRenewCrypto(self):
+    self._ssh_get_all_user_files_patcher.stop()
+    self._ssh_read_remote_ssh_pub_key_patcher.stop()
+    self._ssh_read_local_ssh_pub_keys_patcher.stop()
+    self._ssh_replace_ssh_keys_patcher.stop()
+
+  def testRenewCrypto(self):
+    self._setUpRenewCrypto()
+
+    node_uuids = self._ssh_file_manager.GetAllNodeUuids()
+    node_names = self._ssh_file_manager.GetAllNodeNames()
+
+    old_ssh_file_manager = copy.deepcopy(self._ssh_file_manager)
+
+    backend.RenewSshKeys(node_uuids, node_names,
+                         self._master_candidate_uuids,
+                         self._potential_master_candidates,
+                         constants.SSHK_DSA, constants.SSHK_DSA,
+                         constants.SSH_DEFAULT_KEY_BITS,
+                         ganeti_pub_keys_file=self._pub_key_file,
+                         ssconf_store=self._ssconf_mock,
+                         noded_cert_file=self.noded_cert_file,
+                         run_cmd_fn=self._run_cmd_mock)
+
+    self._tearDownRenewCrypto()
+
+    self.assertEqual(set(old_ssh_file_manager.GetAllNodeNames()),
+                     set(self._ssh_file_manager.GetAllNodeNames()))
+
+    for node_name in self._ssh_file_manager.GetAllNodeNames():
+      self.assertNotEqual(self._ssh_file_manager.GetKeyOfNode(node_name),
+                          old_ssh_file_manager.GetKeyOfNode(node_name))
+
 
 class TestRemoveSshKeyFromPublicKeyFile(testutils.GanetiTestCase):