Move postcopy migration logic to cmdlib (#1263)
authorCalum Calder <calumcalder@users.noreply.github.com>
Wed, 16 Aug 2017 12:57:28 +0000 (13:57 +0100)
committerMorg <morg@google.com>
Wed, 16 Aug 2017 12:57:27 +0000 (13:57 +0100)
Exposed hypervisor postcopy functions via RPC, and implemented logic to execute them in the MigrateInstance tasklet.

Signed-off-by: Calum Calder <calumcalder@google.com>
Reviewed-by: Federico Morg Pareschi <morg@google.com>

lib/backend.py
lib/cmdlib/instance_migration.py
lib/hypervisor/hv_base.py
lib/hypervisor/hv_kvm/__init__.py
lib/objects.py
lib/rpc_defs.py
lib/server/noded.py
src/Ganeti/Constants.hs
test/py/cmdlib/instance_migration_unittest.py
test/py/ganeti.hypervisor.hv_kvm_unittest.py

index 58f4ebd..3b8db14 100644 (file)
@@ -3164,6 +3164,21 @@ def MigrateInstance(cluster_name, instance, target, live):
   except errors.HypervisorError, err:
     _Fail("Failed to migrate instance: %s", err, exc=True)
 
+def StartPostcopy(instance):
+  """ Switch a migrating instance from precopy to postcopy mode.
+
+  @type instance: L{objects.Instance}
+  @param instance: the instance currently being migrated to
+                   move to postcopy mode.
+  @raise RPCFail: If enabling postcopy fails for some reason.
+
+  """
+  hyper = hypervisor.GetHypervisor(instance.hypervisor)
+
+  try:
+    hyper.StartPostcopy(instance)
+  except Exception, err:  # pylint: disable=W0703
+    _Fail("Failed to enable postcopy mode: %s", err, exc=True)
 
 def FinalizeMigrationSource(instance, success, live):
   """Finalize the instance migration on the source node.
index 357994c..fb4403e 100644 (file)
@@ -284,6 +284,7 @@ class TLMigrateInstance(Tasklet):
   # Constants
   _MIGRATION_POLL_INTERVAL = 1      # seconds
   _MIGRATION_FEEDBACK_INTERVAL = 10 # seconds
+  _POSTCOPY_SYNC_COUNT_THRESHOLD = 2 # Precopy passes before enabling postcopy
 
   def __init__(self, lu, instance_uuid, instance_name, cleanup, failover,
                fallback, ignore_consistency, allow_runtime_changes,
@@ -916,6 +917,16 @@ class TLMigrateInstance(Tasklet):
 
     self.feedback_fn("* starting memory transfer")
     last_feedback = time.time()
+
+    cluster_migration_caps = \
+      cluster.hvparams.get("kvm", {}).get(constants.HV_KVM_MIGRATION_CAPS, "")
+    migration_caps = \
+      self.instance.hvparams.get(constants.HV_KVM_MIGRATION_CAPS,
+                                 cluster_migration_caps)
+    # migration_caps is a ':' delimited string, so checking
+    # if 'postcopy-ram' is a substring also covers using
+    # x-postcopy-ram for QEMU 2.5
+    postcopy_enabled = "postcopy-ram" in migration_caps
     while True:
       result = self.rpc.call_instance_get_migration_status(
                  self.source_node_uuid, self.instance)
@@ -932,7 +943,20 @@ class TLMigrateInstance(Tasklet):
         raise errors.OpExecError("Could not migrate instance %s: %s" %
                                  (self.instance.name, msg))
 
-      if result.payload.status != constants.HV_MIGRATION_ACTIVE:
+      if (postcopy_enabled
+          and ms.status == constants.HV_MIGRATION_ACTIVE
+          and int(ms.dirty_sync_count) >= self._POSTCOPY_SYNC_COUNT_THRESHOLD):
+        self.feedback_fn("* finishing memory transfer with postcopy")
+        self.rpc.call_instance_start_postcopy(self.source_node_uuid,
+                                              self.instance)
+
+      if self.instance.hypervisor == 'kvm':
+        migration_active = \
+          ms.status in constants.HV_KVM_MIGRATION_ACTIVE_STATUSES
+      else:
+        migration_active = \
+          ms.status == constants.HV_MIGRATION_ACTIVE
+      if not migration_active:
         self.feedback_fn("* memory transfer complete")
         break
 
index 5663b47..ef7902c 100644 (file)
@@ -537,6 +537,15 @@ class BaseHypervisor(object):
     """
     raise NotImplementedError
 
+  def StartPostcopy(self, instance):
+    """Switch a migration from precopy to postcopy mode.
+
+    @type instance: L{objects.Instance}
+    @param instance: The instance being migrated.
+
+    """
+    raise NotImplementedError
+
   def FinalizeMigrationSource(self, instance, success, live):
     """Finalize the instance migration on the source node.
 
index 9a83bcd..82232e3 100644 (file)
@@ -2487,23 +2487,15 @@ class KVMHypervisor(hv_base.BaseHypervisor):
 
     migration_caps = instance.hvparams[constants.HV_KVM_MIGRATION_CAPS]
     if migration_caps:
-      capabilities = migration_caps.split(_MIGRATION_CAPS_DELIM)
-      postcopy_enabled = ('x-postcopy-ram' in capabilities
-                          or 'postcopy-ram' in capabilities)
-      for c in capabilities:
+      for c in migration_caps.split(_MIGRATION_CAPS_DELIM):
         migrate_command = ("migrate_set_capability %s on" % c)
         self._CallMonitorCommand(instance_name, migrate_command)
-    else:
-      postcopy_enabled = False
 
     migrate_command = "migrate -d tcp:%s:%s" % (target, port)
     self._CallMonitorCommand(instance_name, migrate_command)
 
-    if postcopy_enabled:
-      self._PostcopyAfterPrecopy(instance)
-
-  def _PostcopyAfterPrecopy(self, instance):
-    """Enable postcopying RAM after one precopy pass.
+  def StartPostcopy(self, instance):
+    """Switch a migration from precopy to postcopy mode.
 
     Requires that an instance is currently migrating, and that the
     postcopy-ram (x-postcopy-ram on QEMU version 2.5 and below)
@@ -2514,27 +2506,7 @@ class KVMHypervisor(hv_base.BaseHypervisor):
     @param instance: The instance being migrated.
 
     """
-    precopy_passes = 0
-    while precopy_passes < 2:
-      migration_status = \
-          self._CallMonitorCommand(instance.name, 'info migrate')
-
-      status_match = self._MIGRATION_STATUS_RE.search(migration_status.stdout)
-      if status_match and status_match.group(1) != 'active':
-        logging.debug('Did not attempt postcopy, migration status: %s'
-          % status_match.group(1))
-        break
-      if migration_status.stderr:
-        logging.debug('Error polling for dirty sync count in '
-          'hv_kvm._PostcopyAfterPrecopy(): %s' % migration_status.stderr)
-        break
-
-      passes_match = \
-          self._MIGRATION_PRECOPY_PASSES_RE.search(migration_status.stdout)
-      if passes_match:
-        precopy_passes = int(passes_match.group(1))
-    else:
-      self._CallMonitorCommand(instance.name, 'migrate_start_postcopy')
+    self._CallMonitorCommand(instance.name, 'migrate_start_postcopy')
 
   def FinalizeMigrationSource(self, instance, success, _):
     """Finalize the instance migration on the source node.
@@ -2588,6 +2560,10 @@ class KVMHypervisor(hv_base.BaseHypervisor):
           if match:
             migration_status.transferred_ram = match.group("transferred")
             migration_status.total_ram = match.group("total")
+          sync_count_match = \
+            self._MIGRATION_PRECOPY_PASSES_RE.search(result.stdout)
+          if sync_count_match:
+            migration_status.dirty_sync_count = sync_count_match.group(1)
 
           return migration_status
 
index df0494c..0d6b967 100644 (file)
@@ -2353,6 +2353,7 @@ class MigrationStatus(ConfigObject):
     "status",
     "transferred_ram",
     "total_ram",
+    "dirty_sync_count",
     ]
 
 
index 5bd032e..4a61fb2 100644 (file)
@@ -274,6 +274,10 @@ _INSTANCE_CALLS = [
     ("target", None, "Target node name"),
     ("live", None, "Whether the migration should be done live or not"),
     ], None, None, "Migrate an instance"),
+  ("instance_start_postcopy", SINGLE, None, constants.RPC_TMO_NORMAL, [
+      ("instance", ED_INST_DICT, "Instance object"),
+  ], None, None, "Switch a migrating instance from precopy to "
+                 "postcopy mode."),
   ("instance_finalize_migration_src", SINGLE, None, constants.RPC_TMO_SLOW, [
     ("instance", ED_INST_DICT, "Instance object"),
     ("success", None, "Whether the migration succeeded or not"),
index 477a358..d8277ec 100644 (file)
@@ -710,6 +710,15 @@ class NodeRequestHandler(http.server.HttpServerHandler):
     return backend.MigrateInstance(cluster_name, instance, target, live)
 
   @staticmethod
+  def perspective_instance_start_postcopy(params):
+    """ Switches a migrating instance from precopy to postcopy mode
+
+    """
+    instance, = params
+    instance = objects.Instance.FromDict(instance)
+    return backend.StartPostcopy(instance)
+
+  @staticmethod
   def perspective_instance_finalize_migration_src(params):
     """Finalize the instance migration on the source node.
 
index bee8af3..371825c 100644 (file)
@@ -2023,9 +2023,17 @@ hvMigrationFailedStatuses =
 
 -- | KVM-specific statuses
 --
--- FIXME: this constant seems unnecessary
+hvKvmMigrationPostcopyActive :: String
+hvKvmMigrationPostcopyActive = "postcopy-active"
+
 hvKvmMigrationValidStatuses :: FrozenSet String
-hvKvmMigrationValidStatuses = hvMigrationValidStatuses
+hvKvmMigrationValidStatuses =
+  ConstantUtils.union hvMigrationValidStatuses
+                      (ConstantUtils.mkSet [hvKvmMigrationPostcopyActive])
+
+hvKvmMigrationActiveStatuses :: FrozenSet String
+hvKvmMigrationActiveStatuses = ConstantUtils.mkSet [hvMigrationActive,
+                                                    hvKvmMigrationPostcopyActive]
 
 -- | Node info keys
 hvNodeinfoKeyVersion :: String
index d05ec59..6fd9fa4 100644 (file)
@@ -38,7 +38,9 @@ from ganeti import opcodes
 
 from testsupport import *
 
+from functools import partial
 import testutils
+import mock
 
 
 class TestLUInstanceMigrate(CmdlibTestCase):
@@ -47,12 +49,21 @@ class TestLUInstanceMigrate(CmdlibTestCase):
 
     self.snode = self.cfg.AddNewNode()
 
+    self._ResetRPC()
+
+    self.inst = self.cfg.AddNewInstance(disk_template=constants.DT_DRBD8,
+                                        admin_state=constants.ADMINST_UP,
+                                        secondary_node=self.snode)
+    self.op = opcodes.OpInstanceMigrate(instance_name=self.inst.name)
+
+  def _ResetRPC(self):
     hv_info = ("bootid",
                [{
                  "type": constants.ST_LVM_VG,
                  "storage_free": 10000
                }],
                ({"memory_free": 10000}, ))
+
     self.rpc.call_node_info.return_value = \
       self.RpcResultsBuilder() \
         .AddSuccessfulNode(self.master, hv_info) \
@@ -75,6 +86,9 @@ class TestLUInstanceMigrate(CmdlibTestCase):
     self.rpc.call_instance_get_migration_status.return_value = \
       self.RpcResultsBuilder() \
         .CreateSuccessfulNodeResult(self.master, objects.MigrationStatus())
+    self.rpc.call_instance_start_postcopy.return_value = \
+      self.RpcResultsBuilder() \
+        .CreateSuccessfulNodeResult(self.master, True)
     self.rpc.call_instance_finalize_migration_dst.return_value = \
       self.RpcResultsBuilder() \
         .CreateSuccessfulNodeResult(self.snode, True)
@@ -82,11 +96,6 @@ class TestLUInstanceMigrate(CmdlibTestCase):
       self.RpcResultsBuilder() \
         .CreateSuccessfulNodeResult(self.master, True)
 
-    self.inst = self.cfg.AddNewInstance(disk_template=constants.DT_DRBD8,
-                                        admin_state=constants.ADMINST_UP,
-                                        secondary_node=self.snode)
-    self.op = opcodes.OpInstanceMigrate(instance_name=self.inst.name)
-
   def testPlainDisk(self):
     inst = self.cfg.AddNewInstance(disk_template=constants.DT_PLAIN)
     op = self.CopyOpCode(self.op,
@@ -106,6 +115,64 @@ class TestLUInstanceMigrate(CmdlibTestCase):
     op = self.CopyOpCode(self.op)
     self.ExecOpCode(op)
 
+  def _buildMigrationStatusResponse(self, **kwargs):
+      return self.RpcResultsBuilder().CreateSuccessfulNodeResult(
+          self.master,
+          objects.MigrationStatus(**kwargs)
+      )
+
+  def _execPostcopyMigration(self):
+    self.__status = 'active'
+
+    def change_status(*args, **kwargs):
+      self.__status = 'postcopy-active'
+      return mock.DEFAULT
+
+    def migration_statuses(*args, **kwargs):
+      yield self._buildMigrationStatusResponse(status=self.__status,
+                                               dirty_sync_count='1')
+      for i in range(3):
+        yield self._buildMigrationStatusResponse(status=self.__status,
+                                                 dirty_sync_count='2')
+
+      yield self._buildMigrationStatusResponse(status='completed',
+                                               dirty_sync_count='2')
+
+    self.rpc.call_instance_get_migration_status.side_effect = \
+        migration_statuses()
+    self.rpc.call_instance_start_postcopy.side_effect = change_status
+
+    op = self.CopyOpCode(self.op)
+    self.ExecOpCode(op)
+
+    self._ResetRPC()
+
+  def testPostcopyMigration(self):
+    self.inst.hypervisor = 'kvm'
+    self.inst.hvparams['migration_caps'] = 'postcopy-ram'
+
+    self._execPostcopyMigration()
+
+    self.assertTrue(self.__status == 'postcopy-active',
+                    'Not in postcopy mode after op executed')
+
+    self.inst = self.cfg.AddNewInstance(disk_template=constants.DT_DRBD8,
+                                        admin_state=constants.ADMINST_UP,
+                                        secondary_node=self.snode)
+
+
+  def testPostcopyMigrationWithDefaultHVParams(self):
+    self.inst.hypervisor = 'kvm'
+    self.cluster.hvparams['kvm']['migration_caps'] = 'postcopy-ram'
+
+    self._execPostcopyMigration()
+
+    self.assertTrue(self.__status == 'postcopy-active',
+                    'Not in postcopy mode after op executed')
+
+    self.inst = self.cfg.AddNewInstance(disk_template=constants.DT_DRBD8,
+                                        admin_state=constants.ADMINST_UP,
+                                        secondary_node=self.snode)
 
 class TestLUInstanceFailover(CmdlibTestCase):
   def setUp(self):
index f59bd7a..9d6e648 100755 (executable)
@@ -704,150 +704,5 @@ class TestKvmCpuPinning(testutils.GanetiTestCase):
       self.assertEqual(mock_process.set_cpu_affinity.call_args_list[1],
                        mock.call([4]))
 
-class TestPostcopyAfterPrecopy(testutils.GanetiTestCase):
-  def setUp(self):
-    super(TestPostcopyAfterPrecopy, self).setUp()
-    kvm_class = 'ganeti.hypervisor.hv_kvm.KVMHypervisor'
-    self.MockOut('qmp', mock.patch('ganeti.hypervisor.hv_kvm.QmpConnection'))
-    self.MockOut('run_cmd', mock.patch('ganeti.utils.RunCmd'))
-    self.MockOut('ensure_dirs', mock.patch('ganeti.utils.EnsureDirs'))
-    self.MockOut('write_file', mock.patch('ganeti.utils.WriteFile'))
-    self.params = constants.HVC_DEFAULTS[constants.HT_KVM].copy()
-
-  def _TestPostcopyAfterPrecopy(self, runcmd, postcopy_started_goal):
-    hypervisor = hv_kvm.KVMHypervisor()
-    self.iteration = 0
-    self.postcopy_started = False
-
-    def runcmd_mock(cmd, env=None, output=None, cwd="/", reset_env=False,
-           interactive=False, timeout=None, noclose_fds=None,
-           input_fd=None, postfork_fn=None):
-      res = utils.RunResult(0, None, '', '', cmd, None, None)
-      if not self.postcopy_started and cmd.find('migrate_start_postcopy') != -1:
-        self.postcopy_started = True
-        res.stdout = ('migrate_postcopy_start\n'
-                      '(qemu) ')
-      return runcmd(cmd, res)
-
-    with mock.patch('ganeti.utils.RunCmd', runcmd_mock):
-      instance = mock.MagicMock()
-      instance.name = 'example.instance'
-      hypervisor._PostcopyAfterPrecopy(instance)
-      self.assertEqual(self.postcopy_started, postcopy_started_goal)
-
-  def testNormal(self):
-    def runcmd_normal(cmd, res):
-      res = utils.RunResult(0, None, '', '', cmd, None, None)
-      if cmd.find('info migrate') != -1:
-        self.iteration += 1
-        res.stdout = (
-            'QEMU 2.5.0 monitor - type \'help\' for more information\n'
-            '(qemu) info migrate\n'
-            'capabilities: xbzrle: off rdma-pin-all: off auto-converge: on'
-            'zero-blocks: off compress: off events: off x-postcopy-ram: on \n'
-            'Migration status: active\n'
-            'skipped: 0 pages\n'
-            'dirty sync count: %i\n'
-            '(qemu) ' % self.iteration
-          )
-      return res
-
-    self._TestPostcopyAfterPrecopy(runcmd_normal, True)
-
-  def testEmptyResponses(self):
-    def runcmd_empty_responses(cmd, res):
-      res = utils.RunResult(0, None, '', '', cmd, None, None)
-      if cmd.find('info migrate') != -1:
-        self.iteration += 1
-        if self.iteration < 3:
-          res.stdout = (
-              'QEMU 2.5.0 monitor - type \'help\' for more information\n'
-              '(qemu) info migrate\n'
-              '(qemu) '
-            )
-        else:
-          res.stdout = (
-              'QEMU 2.5.0 monitor - type \'help\' for more information\n'
-              '(qemu) info migrate\n'
-              'capabilities: xbzrle: off rdma-pin-all: off auto-converge: on'
-              'zero-blocks: off compress: off events: off x-postcopy-ram: on \n'
-              'Migration status: active\n'
-              'skipped: 0 pages\n'
-              'dirty sync count: %i\n'
-              '(qemu) ' % self.iteration
-            )
-      return res
-    self._TestPostcopyAfterPrecopy(runcmd_empty_responses, True)
-
-  def testMonitorRemoved(self):
-    def runcmd_monitor_removed(cmd, res):
-      res = utils.RunResult(0, None, '', '', cmd, None, None)
-      if cmd.find('info migrate') != -1:
-        self.iteration += 1
-        if self.iteration < 3:
-          res.stdout = (
-              'QEMU 2.5.0 monitor - type \'help\' for more information\n'
-              '(qemu) info migrate\n'
-              'capabilities: xbzrle: off rdma-pin-all: off auto-converge: on'
-              'zero-blocks: off compress: off events: off x-postcopy-ram: on \n'
-              'Migration status: active\n'
-              'skipped: 0 pages\n'
-              'dirty sync count: %i\n'
-              '(qemu) '
-            )
-        else:
-          res.stderr = ('2017/07/26 15:49:52 socat[105703] E connect(3, AF=1 '
-                        '"/var/run/ganeti/kvm-hypervisor/ctrl/example.instanc'
-                        'e.monitor", 85): No such file or directory')
-      return res
-    self._TestPostcopyAfterPrecopy(runcmd_monitor_removed, False)
-
-  def testMigrationFailed(self):
-    def runcmd_migration_failed(cmd, res):
-      res = utils.RunResult(0, None, '', '', cmd, None, None)
-      if cmd.find('info migrate') != -1:
-        self.iteration += 1
-        if self.iteration < 3:
-          res.stdout = (
-              'QEMU 2.5.0 monitor - type \'help\' for more information\n'
-              '(qemu) info migrate\n'
-              'capabilities: xbzrle: off rdma-pin-all: off auto-converge: on'
-              'zero-blocks: off compress: off events: off x-postcopy-ram: on \n'
-              'Migration status: active\n'
-              'skipped: 0 pages\n'
-              'dirty sync count: %i\n'
-              '(qemu) '
-            )
-        else:
-          res.stdout = (
-              'QEMU 2.5.0 monitor - type \'help\' for more information\n'
-              '(qemu) info migrate\n'
-              'capabilities: xbzrle: off rdma-pin-all: off auto-converge: on'
-              'zero-blocks: off compress: off events: off x-postcopy-ram: on \n'
-              'Migration status: failed\n'
-              'skipped: 0 pages\n'
-              'dirty sync count: %i\n'
-              '(qemu) '
-            )
-      return res
-    self._TestPostcopyAfterPrecopy(runcmd_migration_failed, False)
-
-  def testAlreadyInPostcopy(self):
-    def runcmd_already_in_postcopy(cmd, res):
-      res = utils.RunResult(0, None, '', '', cmd, None, None)
-      if cmd.find('info migrate') != -1:
-        res.stdout = (
-            'QEMU 2.5.0 monitor - type \'help\' for more information\n'
-            '(qemu) info migrate\n'
-            'capabilities: xbzrle: off rdma-pin-all: off auto-converge: on'
-            'zero-blocks: off compress: off events: off x-postcopy-ram: on \n'
-            'Migration status: postcopy-active\n'
-            'skipped: 0 pages\n'
-            'dirty sync count: %i\n'
-            '(qemu) '
-          )
-      return res
-    self._TestPostcopyAfterPrecopy(runcmd_already_in_postcopy, False)
-
 if __name__ == "__main__":
   testutils.GanetiTestProgram()