cmdlib: Extract migration related functionality
authorThomas Thrainer <thomasth@google.com>
Tue, 14 May 2013 11:38:23 +0000 (13:38 +0200)
committerThomas Thrainer <thomasth@google.com>
Fri, 17 May 2013 09:32:11 +0000 (11:32 +0200)
Split instance.py further by extracting migration related logical units
and functions to instance_migration.py.

Signed-off-by: Thomas Thrainer <thomasth@google.com>
Reviewed-by: Bernardo Dal Seno <bdalseno@google.com>

Makefile.am
lib/cmdlib/__init__.py
lib/cmdlib/instance.py
lib/cmdlib/instance_migration.py [new file with mode: 0644]
lib/cmdlib/instance_utils.py

index 64306df..9e0a6f4 100644 (file)
@@ -316,6 +316,7 @@ cmdlib_PYTHON = \
        lib/cmdlib/node.py \
        lib/cmdlib/instance.py \
        lib/cmdlib/instance_storage.py \
+       lib/cmdlib/instance_migration.py \
        lib/cmdlib/instance_utils.py \
        lib/cmdlib/backup.py \
        lib/cmdlib/query.py \
index b248063..50353cc 100644 (file)
@@ -79,8 +79,6 @@ from ganeti.cmdlib.instance import \
   LUInstanceReinstall, \
   LUInstanceReboot, \
   LUInstanceConsole, \
-  LUInstanceFailover, \
-  LUInstanceMigrate, \
   LUInstanceMultiAlloc, \
   LUInstanceSetParams, \
   LUInstanceChangeGroup
@@ -90,6 +88,9 @@ from ganeti.cmdlib.instance_storage import \
   LUInstanceReplaceDisks, \
   LUInstanceActivateDisks, \
   LUInstanceDeactivateDisks
+from ganeti.cmdlib.instance_migration import \
+  LUInstanceFailover, \
+  LUInstanceMigrate
 from ganeti.cmdlib.backup import \
   LUBackupQuery, \
   LUBackupPrepare, \
index 9edf524..6f51c50 100644 (file)
@@ -27,7 +27,6 @@ import itertools
 import logging
 import operator
 import os
-import time
 
 from ganeti import compat
 from ganeti import constants
@@ -47,7 +46,7 @@ from ganeti import utils
 from ganeti import query
 
 from ganeti.cmdlib.base import NoHooksLU, LogicalUnit, _QueryBase, \
-  ResultWithJobs, Tasklet
+  ResultWithJobs
 
 from ganeti.cmdlib.common import INSTANCE_ONLINE, INSTANCE_DOWN, \
   INSTANCE_NOT_RUNNING, CAN_CHANGE_INSTANCE_OFFLINE, _CheckNodeOnline, \
@@ -58,16 +57,17 @@ from ganeti.cmdlib.common import INSTANCE_ONLINE, INSTANCE_DOWN, \
   _GetUpdatedParams, _ExpandInstanceName, _ComputeIPolicySpecViolation, \
   _CheckInstanceState, _ExpandNodeName
 from ganeti.cmdlib.instance_storage import _CreateDisks, \
-  _CheckNodesFreeDiskPerVG, _WipeDisks, _WaitForSync, _CheckDiskConsistency, \
+  _CheckNodesFreeDiskPerVG, _WipeDisks, _WaitForSync, \
   _IsExclusiveStorageEnabledNodeName, _CreateSingleBlockDev, _ComputeDisks, \
   _CheckRADOSFreeSpace, _ComputeDiskSizePerVG, _GenerateDiskTemplate, \
   _CreateBlockDev, _StartInstanceDisks, _ShutdownInstanceDisks, \
-  _AssembleInstanceDisks, _ExpandCheckDisks
+  _AssembleInstanceDisks
 from ganeti.cmdlib.instance_utils import _BuildInstanceHookEnvByObject, \
   _GetClusterDomainSecret, _BuildInstanceHookEnv, _NICListToTuple, \
   _NICToTuple, _CheckNodeNotDrained, _RemoveInstance, _CopyLockList, \
   _ReleaseLocks, _CheckNodeVmCapable, _CheckTargetNodeIPolicy, \
-  _GetInstanceInfoText, _RemoveDisks
+  _GetInstanceInfoText, _RemoveDisks, _CheckNodeFreeMemory, \
+  _CheckInstanceBridgesExist, _CheckNicsBridgesExist
 
 import ganeti.masterd.instance
 
@@ -338,62 +338,6 @@ def _CheckNodeHasOS(lu, node, os_name, force_variant):
     _CheckOSVariant(result.payload, os_name)
 
 
-def _CheckNicsBridgesExist(lu, target_nics, target_node):
-  """Check that the brigdes needed by a list of nics exist.
-
-  """
-  cluster = lu.cfg.GetClusterInfo()
-  paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics]
-  brlist = [params[constants.NIC_LINK] for params in paramslist
-            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
-  if brlist:
-    result = lu.rpc.call_bridges_exist(target_node, brlist)
-    result.Raise("Error checking bridges on destination node '%s'" %
-                 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
-
-
-def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
-  """Checks if a node has enough free memory.
-
-  This function checks if a given node has the needed amount of free
-  memory. In case the node has less memory or we cannot get the
-  information from the node, this function raises an OpPrereqError
-  exception.
-
-  @type lu: C{LogicalUnit}
-  @param lu: a logical unit from which we get configuration data
-  @type node: C{str}
-  @param node: the node to check
-  @type reason: C{str}
-  @param reason: string to use in the error message
-  @type requested: C{int}
-  @param requested: the amount of memory in MiB to check for
-  @type hypervisor_name: C{str}
-  @param hypervisor_name: the hypervisor to ask for memory stats
-  @rtype: integer
-  @return: node current free memory
-  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
-      we cannot check the node
-
-  """
-  nodeinfo = lu.rpc.call_node_info([node], None, [hypervisor_name], False)
-  nodeinfo[node].Raise("Can't get data from node %s" % node,
-                       prereq=True, ecode=errors.ECODE_ENVIRON)
-  (_, _, (hv_info, )) = nodeinfo[node].payload
-
-  free_mem = hv_info.get("memory_free", None)
-  if not isinstance(free_mem, int):
-    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
-                               " was '%s'" % (node, free_mem),
-                               errors.ECODE_ENVIRON)
-  if requested > free_mem:
-    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
-                               " needed %s MiB, available %s MiB" %
-                               (node, reason, requested, free_mem),
-                               errors.ECODE_NORES)
-  return free_mem
-
-
 class LUInstanceCreate(LogicalUnit):
   """Create an instance.
 
@@ -1680,15 +1624,6 @@ class LUInstanceRemove(LogicalUnit):
     _RemoveInstance(self, feedback_fn, instance, self.op.ignore_failures)
 
 
-def _CheckInstanceBridgesExist(lu, instance, node=None):
-  """Check that the brigdes needed by an instance exist.
-
-  """
-  if node is None:
-    node = instance.primary_node
-  _CheckNicsBridgesExist(lu, instance.nics, node)
-
-
 class LUInstanceMove(LogicalUnit):
   """Move an instance by data-copying.
 
@@ -2743,189 +2678,6 @@ class LUInstanceConsole(NoHooksLU):
     return _GetInstanceConsole(self.cfg.GetClusterInfo(), instance)
 
 
-def _DeclareLocksForMigration(lu, level):
-  """Declares locks for L{TLMigrateInstance}.
-
-  @type lu: L{LogicalUnit}
-  @param level: Lock level
-
-  """
-  if level == locking.LEVEL_NODE_ALLOC:
-    assert lu.op.instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
-
-    instance = lu.cfg.GetInstanceInfo(lu.op.instance_name)
-
-    # Node locks are already declared here rather than at LEVEL_NODE as we need
-    # the instance object anyway to declare the node allocation lock.
-    if instance.disk_template in constants.DTS_EXT_MIRROR:
-      if lu.op.target_node is None:
-        lu.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
-        lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
-      else:
-        lu.needed_locks[locking.LEVEL_NODE] = [instance.primary_node,
-                                               lu.op.target_node]
-      del lu.recalculate_locks[locking.LEVEL_NODE]
-    else:
-      lu._LockInstancesNodes() # pylint: disable=W0212
-
-  elif level == locking.LEVEL_NODE:
-    # Node locks are declared together with the node allocation lock
-    assert (lu.needed_locks[locking.LEVEL_NODE] or
-            lu.needed_locks[locking.LEVEL_NODE] is locking.ALL_SET)
-
-  elif level == locking.LEVEL_NODE_RES:
-    # Copy node locks
-    lu.needed_locks[locking.LEVEL_NODE_RES] = \
-      _CopyLockList(lu.needed_locks[locking.LEVEL_NODE])
-
-
-def _ExpandNamesForMigration(lu):
-  """Expands names for use with L{TLMigrateInstance}.
-
-  @type lu: L{LogicalUnit}
-
-  """
-  if lu.op.target_node is not None:
-    lu.op.target_node = _ExpandNodeName(lu.cfg, lu.op.target_node)
-
-  lu.needed_locks[locking.LEVEL_NODE] = []
-  lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
-
-  lu.needed_locks[locking.LEVEL_NODE_RES] = []
-  lu.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
-
-  # The node allocation lock is actually only needed for externally replicated
-  # instances (e.g. sharedfile or RBD) and if an iallocator is used.
-  lu.needed_locks[locking.LEVEL_NODE_ALLOC] = []
-
-
-class LUInstanceFailover(LogicalUnit):
-  """Failover an instance.
-
-  """
-  HPATH = "instance-failover"
-  HTYPE = constants.HTYPE_INSTANCE
-  REQ_BGL = False
-
-  def CheckArguments(self):
-    """Check the arguments.
-
-    """
-    self.iallocator = getattr(self.op, "iallocator", None)
-    self.target_node = getattr(self.op, "target_node", None)
-
-  def ExpandNames(self):
-    self._ExpandAndLockInstance()
-    _ExpandNamesForMigration(self)
-
-    self._migrater = \
-      TLMigrateInstance(self, self.op.instance_name, False, True, False,
-                        self.op.ignore_consistency, True,
-                        self.op.shutdown_timeout, self.op.ignore_ipolicy)
-
-    self.tasklets = [self._migrater]
-
-  def DeclareLocks(self, level):
-    _DeclareLocksForMigration(self, level)
-
-  def BuildHooksEnv(self):
-    """Build hooks env.
-
-    This runs on master, primary and secondary nodes of the instance.
-
-    """
-    instance = self._migrater.instance
-    source_node = instance.primary_node
-    target_node = self.op.target_node
-    env = {
-      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
-      "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
-      "OLD_PRIMARY": source_node,
-      "NEW_PRIMARY": target_node,
-      }
-
-    if instance.disk_template in constants.DTS_INT_MIRROR:
-      env["OLD_SECONDARY"] = instance.secondary_nodes[0]
-      env["NEW_SECONDARY"] = source_node
-    else:
-      env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = ""
-
-    env.update(_BuildInstanceHookEnvByObject(self, instance))
-
-    return env
-
-  def BuildHooksNodes(self):
-    """Build hooks nodes.
-
-    """
-    instance = self._migrater.instance
-    nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
-    return (nl, nl + [instance.primary_node])
-
-
-class LUInstanceMigrate(LogicalUnit):
-  """Migrate an instance.
-
-  This is migration without shutting down, compared to the failover,
-  which is done with shutdown.
-
-  """
-  HPATH = "instance-migrate"
-  HTYPE = constants.HTYPE_INSTANCE
-  REQ_BGL = False
-
-  def ExpandNames(self):
-    self._ExpandAndLockInstance()
-    _ExpandNamesForMigration(self)
-
-    self._migrater = \
-      TLMigrateInstance(self, self.op.instance_name, self.op.cleanup,
-                        False, self.op.allow_failover, False,
-                        self.op.allow_runtime_changes,
-                        constants.DEFAULT_SHUTDOWN_TIMEOUT,
-                        self.op.ignore_ipolicy)
-
-    self.tasklets = [self._migrater]
-
-  def DeclareLocks(self, level):
-    _DeclareLocksForMigration(self, level)
-
-  def BuildHooksEnv(self):
-    """Build hooks env.
-
-    This runs on master, primary and secondary nodes of the instance.
-
-    """
-    instance = self._migrater.instance
-    source_node = instance.primary_node
-    target_node = self.op.target_node
-    env = _BuildInstanceHookEnvByObject(self, instance)
-    env.update({
-      "MIGRATE_LIVE": self._migrater.live,
-      "MIGRATE_CLEANUP": self.op.cleanup,
-      "OLD_PRIMARY": source_node,
-      "NEW_PRIMARY": target_node,
-      "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
-      })
-
-    if instance.disk_template in constants.DTS_INT_MIRROR:
-      env["OLD_SECONDARY"] = target_node
-      env["NEW_SECONDARY"] = source_node
-    else:
-      env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = None
-
-    return env
-
-  def BuildHooksNodes(self):
-    """Build hooks nodes.
-
-    """
-    instance = self._migrater.instance
-    snodes = list(instance.secondary_nodes)
-    nl = [self.cfg.GetMasterNode(), instance.primary_node] + snodes
-    return (nl, nl)
-
-
 class LUInstanceMultiAlloc(NoHooksLU):
   """Allocates multiple instances at the same time.
 
@@ -4592,707 +4344,3 @@ class LUInstanceChangeGroup(LogicalUnit):
                  " instance '%s'", len(jobs), self.op.instance_name)
 
     return ResultWithJobs(jobs)
-
-
-class TLMigrateInstance(Tasklet):
-  """Tasklet class for instance migration.
-
-  @type live: boolean
-  @ivar live: whether the migration will be done live or non-live;
-      this variable is initalized only after CheckPrereq has run
-  @type cleanup: boolean
-  @ivar cleanup: Wheater we cleanup from a failed migration
-  @type iallocator: string
-  @ivar iallocator: The iallocator used to determine target_node
-  @type target_node: string
-  @ivar target_node: If given, the target_node to reallocate the instance to
-  @type failover: boolean
-  @ivar failover: Whether operation results in failover or migration
-  @type fallback: boolean
-  @ivar fallback: Whether fallback to failover is allowed if migration not
-                  possible
-  @type ignore_consistency: boolean
-  @ivar ignore_consistency: Wheter we should ignore consistency between source
-                            and target node
-  @type shutdown_timeout: int
-  @ivar shutdown_timeout: In case of failover timeout of the shutdown
-  @type ignore_ipolicy: bool
-  @ivar ignore_ipolicy: If true, we can ignore instance policy when migrating
-
-  """
-
-  # Constants
-  _MIGRATION_POLL_INTERVAL = 1      # seconds
-  _MIGRATION_FEEDBACK_INTERVAL = 10 # seconds
-
-  def __init__(self, lu, instance_name, cleanup, failover, fallback,
-               ignore_consistency, allow_runtime_changes, shutdown_timeout,
-               ignore_ipolicy):
-    """Initializes this class.
-
-    """
-    Tasklet.__init__(self, lu)
-
-    # Parameters
-    self.instance_name = instance_name
-    self.cleanup = cleanup
-    self.live = False # will be overridden later
-    self.failover = failover
-    self.fallback = fallback
-    self.ignore_consistency = ignore_consistency
-    self.shutdown_timeout = shutdown_timeout
-    self.ignore_ipolicy = ignore_ipolicy
-    self.allow_runtime_changes = allow_runtime_changes
-
-  def CheckPrereq(self):
-    """Check prerequisites.
-
-    This checks that the instance is in the cluster.
-
-    """
-    instance_name = _ExpandInstanceName(self.lu.cfg, self.instance_name)
-    instance = self.cfg.GetInstanceInfo(instance_name)
-    assert instance is not None
-    self.instance = instance
-    cluster = self.cfg.GetClusterInfo()
-
-    if (not self.cleanup and
-        not instance.admin_state == constants.ADMINST_UP and
-        not self.failover and self.fallback):
-      self.lu.LogInfo("Instance is marked down or offline, fallback allowed,"
-                      " switching to failover")
-      self.failover = True
-
-    if instance.disk_template not in constants.DTS_MIRRORED:
-      if self.failover:
-        text = "failovers"
-      else:
-        text = "migrations"
-      raise errors.OpPrereqError("Instance's disk layout '%s' does not allow"
-                                 " %s" % (instance.disk_template, text),
-                                 errors.ECODE_STATE)
-
-    if instance.disk_template in constants.DTS_EXT_MIRROR:
-      _CheckIAllocatorOrNode(self.lu, "iallocator", "target_node")
-
-      if self.lu.op.iallocator:
-        assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
-        self._RunAllocator()
-      else:
-        # We set set self.target_node as it is required by
-        # BuildHooksEnv
-        self.target_node = self.lu.op.target_node
-
-      # Check that the target node is correct in terms of instance policy
-      nodeinfo = self.cfg.GetNodeInfo(self.target_node)
-      group_info = self.cfg.GetNodeGroup(nodeinfo.group)
-      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
-                                                              group_info)
-      _CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, self.cfg,
-                              ignore=self.ignore_ipolicy)
-
-      # self.target_node is already populated, either directly or by the
-      # iallocator run
-      target_node = self.target_node
-      if self.target_node == instance.primary_node:
-        raise errors.OpPrereqError("Cannot migrate instance %s"
-                                   " to its primary (%s)" %
-                                   (instance.name, instance.primary_node),
-                                   errors.ECODE_STATE)
-
-      if len(self.lu.tasklets) == 1:
-        # It is safe to release locks only when we're the only tasklet
-        # in the LU
-        _ReleaseLocks(self.lu, locking.LEVEL_NODE,
-                      keep=[instance.primary_node, self.target_node])
-        _ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
-
-    else:
-      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
-
-      secondary_nodes = instance.secondary_nodes
-      if not secondary_nodes:
-        raise errors.ConfigurationError("No secondary node but using"
-                                        " %s disk template" %
-                                        instance.disk_template)
-      target_node = secondary_nodes[0]
-      if self.lu.op.iallocator or (self.lu.op.target_node and
-                                   self.lu.op.target_node != target_node):
-        if self.failover:
-          text = "failed over"
-        else:
-          text = "migrated"
-        raise errors.OpPrereqError("Instances with disk template %s cannot"
-                                   " be %s to arbitrary nodes"
-                                   " (neither an iallocator nor a target"
-                                   " node can be passed)" %
-                                   (instance.disk_template, text),
-                                   errors.ECODE_INVAL)
-      nodeinfo = self.cfg.GetNodeInfo(target_node)
-      group_info = self.cfg.GetNodeGroup(nodeinfo.group)
-      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
-                                                              group_info)
-      _CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, self.cfg,
-                              ignore=self.ignore_ipolicy)
-
-    i_be = cluster.FillBE(instance)
-
-    # check memory requirements on the secondary node
-    if (not self.cleanup and
-         (not self.failover or instance.admin_state == constants.ADMINST_UP)):
-      self.tgt_free_mem = _CheckNodeFreeMemory(self.lu, target_node,
-                                               "migrating instance %s" %
-                                               instance.name,
-                                               i_be[constants.BE_MINMEM],
-                                               instance.hypervisor)
-    else:
-      self.lu.LogInfo("Not checking memory on the secondary node as"
-                      " instance will not be started")
-
-    # check if failover must be forced instead of migration
-    if (not self.cleanup and not self.failover and
-        i_be[constants.BE_ALWAYS_FAILOVER]):
-      self.lu.LogInfo("Instance configured to always failover; fallback"
-                      " to failover")
-      self.failover = True
-
-    # check bridge existance
-    _CheckInstanceBridgesExist(self.lu, instance, node=target_node)
-
-    if not self.cleanup:
-      _CheckNodeNotDrained(self.lu, target_node)
-      if not self.failover:
-        result = self.rpc.call_instance_migratable(instance.primary_node,
-                                                   instance)
-        if result.fail_msg and self.fallback:
-          self.lu.LogInfo("Can't migrate, instance offline, fallback to"
-                          " failover")
-          self.failover = True
-        else:
-          result.Raise("Can't migrate, please use failover",
-                       prereq=True, ecode=errors.ECODE_STATE)
-
-    assert not (self.failover and self.cleanup)
-
-    if not self.failover:
-      if self.lu.op.live is not None and self.lu.op.mode is not None:
-        raise errors.OpPrereqError("Only one of the 'live' and 'mode'"
-                                   " parameters are accepted",
-                                   errors.ECODE_INVAL)
-      if self.lu.op.live is not None:
-        if self.lu.op.live:
-          self.lu.op.mode = constants.HT_MIGRATION_LIVE
-        else:
-          self.lu.op.mode = constants.HT_MIGRATION_NONLIVE
-        # reset the 'live' parameter to None so that repeated
-        # invocations of CheckPrereq do not raise an exception
-        self.lu.op.live = None
-      elif self.lu.op.mode is None:
-        # read the default value from the hypervisor
-        i_hv = cluster.FillHV(self.instance, skip_globals=False)
-        self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]
-
-      self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
-    else:
-      # Failover is never live
-      self.live = False
-
-    if not (self.failover or self.cleanup):
-      remote_info = self.rpc.call_instance_info(instance.primary_node,
-                                                instance.name,
-                                                instance.hypervisor)
-      remote_info.Raise("Error checking instance on node %s" %
-                        instance.primary_node)
-      instance_running = bool(remote_info.payload)
-      if instance_running:
-        self.current_mem = int(remote_info.payload["memory"])
-
-  def _RunAllocator(self):
-    """Run the allocator based on input opcode.
-
-    """
-    assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
-
-    # FIXME: add a self.ignore_ipolicy option
-    req = iallocator.IAReqRelocate(name=self.instance_name,
-                                   relocate_from=[self.instance.primary_node])
-    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
-
-    ial.Run(self.lu.op.iallocator)
-
-    if not ial.success:
-      raise errors.OpPrereqError("Can't compute nodes using"
-                                 " iallocator '%s': %s" %
-                                 (self.lu.op.iallocator, ial.info),
-                                 errors.ECODE_NORES)
-    self.target_node = ial.result[0]
-    self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
-                    self.instance_name, self.lu.op.iallocator,
-                    utils.CommaJoin(ial.result))
-
-  def _WaitUntilSync(self):
-    """Poll with custom rpc for disk sync.
-
-    This uses our own step-based rpc call.
-
-    """
-    self.feedback_fn("* wait until resync is done")
-    all_done = False
-    while not all_done:
-      all_done = True
-      result = self.rpc.call_drbd_wait_sync(self.all_nodes,
-                                            self.nodes_ip,
-                                            (self.instance.disks,
-                                             self.instance))
-      min_percent = 100
-      for node, nres in result.items():
-        nres.Raise("Cannot resync disks on node %s" % node)
-        node_done, node_percent = nres.payload
-        all_done = all_done and node_done
-        if node_percent is not None:
-          min_percent = min(min_percent, node_percent)
-      if not all_done:
-        if min_percent < 100:
-          self.feedback_fn("   - progress: %.1f%%" % min_percent)
-        time.sleep(2)
-
-  def _EnsureSecondary(self, node):
-    """Demote a node to secondary.
-
-    """
-    self.feedback_fn("* switching node %s to secondary mode" % node)
-
-    for dev in self.instance.disks:
-      self.cfg.SetDiskID(dev, node)
-
-    result = self.rpc.call_blockdev_close(node, self.instance.name,
-                                          self.instance.disks)
-    result.Raise("Cannot change disk to secondary on node %s" % node)
-
-  def _GoStandalone(self):
-    """Disconnect from the network.
-
-    """
-    self.feedback_fn("* changing into standalone mode")
-    result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
-                                               self.instance.disks)
-    for node, nres in result.items():
-      nres.Raise("Cannot disconnect disks node %s" % node)
-
-  def _GoReconnect(self, multimaster):
-    """Reconnect to the network.
-
-    """
-    if multimaster:
-      msg = "dual-master"
-    else:
-      msg = "single-master"
-    self.feedback_fn("* changing disks into %s mode" % msg)
-    result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
-                                           (self.instance.disks, self.instance),
-                                           self.instance.name, multimaster)
-    for node, nres in result.items():
-      nres.Raise("Cannot change disks config on node %s" % node)
-
-  def _ExecCleanup(self):
-    """Try to cleanup after a failed migration.
-
-    The cleanup is done by:
-      - check that the instance is running only on one node
-        (and update the config if needed)
-      - change disks on its secondary node to secondary
-      - wait until disks are fully synchronized
-      - disconnect from the network
-      - change disks into single-master mode
-      - wait again until disks are fully synchronized
-
-    """
-    instance = self.instance
-    target_node = self.target_node
-    source_node = self.source_node
-
-    # check running on only one node
-    self.feedback_fn("* checking where the instance actually runs"
-                     " (if this hangs, the hypervisor might be in"
-                     " a bad state)")
-    ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
-    for node, result in ins_l.items():
-      result.Raise("Can't contact node %s" % node)
-
-    runningon_source = instance.name in ins_l[source_node].payload
-    runningon_target = instance.name in ins_l[target_node].payload
-
-    if runningon_source and runningon_target:
-      raise errors.OpExecError("Instance seems to be running on two nodes,"
-                               " or the hypervisor is confused; you will have"
-                               " to ensure manually that it runs only on one"
-                               " and restart this operation")
-
-    if not (runningon_source or runningon_target):
-      raise errors.OpExecError("Instance does not seem to be running at all;"
-                               " in this case it's safer to repair by"
-                               " running 'gnt-instance stop' to ensure disk"
-                               " shutdown, and then restarting it")
-
-    if runningon_target:
-      # the migration has actually succeeded, we need to update the config
-      self.feedback_fn("* instance running on secondary node (%s),"
-                       " updating config" % target_node)
-      instance.primary_node = target_node
-      self.cfg.Update(instance, self.feedback_fn)
-      demoted_node = source_node
-    else:
-      self.feedback_fn("* instance confirmed to be running on its"
-                       " primary node (%s)" % source_node)
-      demoted_node = target_node
-
-    if instance.disk_template in constants.DTS_INT_MIRROR:
-      self._EnsureSecondary(demoted_node)
-      try:
-        self._WaitUntilSync()
-      except errors.OpExecError:
-        # we ignore here errors, since if the device is standalone, it
-        # won't be able to sync
-        pass
-      self._GoStandalone()
-      self._GoReconnect(False)
-      self._WaitUntilSync()
-
-    self.feedback_fn("* done")
-
-  def _RevertDiskStatus(self):
-    """Try to revert the disk status after a failed migration.
-
-    """
-    target_node = self.target_node
-    if self.instance.disk_template in constants.DTS_EXT_MIRROR:
-      return
-
-    try:
-      self._EnsureSecondary(target_node)
-      self._GoStandalone()
-      self._GoReconnect(False)
-      self._WaitUntilSync()
-    except errors.OpExecError, err:
-      self.lu.LogWarning("Migration failed and I can't reconnect the drives,"
-                         " please try to recover the instance manually;"
-                         " error '%s'" % str(err))
-
-  def _AbortMigration(self):
-    """Call the hypervisor code to abort a started migration.
-
-    """
-    instance = self.instance
-    target_node = self.target_node
-    source_node = self.source_node
-    migration_info = self.migration_info
-
-    abort_result = self.rpc.call_instance_finalize_migration_dst(target_node,
-                                                                 instance,
-                                                                 migration_info,
-                                                                 False)
-    abort_msg = abort_result.fail_msg
-    if abort_msg:
-      logging.error("Aborting migration failed on target node %s: %s",
-                    target_node, abort_msg)
-      # Don't raise an exception here, as we stil have to try to revert the
-      # disk status, even if this step failed.
-
-    abort_result = self.rpc.call_instance_finalize_migration_src(
-      source_node, instance, False, self.live)
-    abort_msg = abort_result.fail_msg
-    if abort_msg:
-      logging.error("Aborting migration failed on source node %s: %s",
-                    source_node, abort_msg)
-
-  def _ExecMigration(self):
-    """Migrate an instance.
-
-    The migrate is done by:
-      - change the disks into dual-master mode
-      - wait until disks are fully synchronized again
-      - migrate the instance
-      - change disks on the new secondary node (the old primary) to secondary
-      - wait until disks are fully synchronized
-      - change disks into single-master mode
-
-    """
-    instance = self.instance
-    target_node = self.target_node
-    source_node = self.source_node
-
-    # Check for hypervisor version mismatch and warn the user.
-    nodeinfo = self.rpc.call_node_info([source_node, target_node],
-                                       None, [self.instance.hypervisor], False)
-    for ninfo in nodeinfo.values():
-      ninfo.Raise("Unable to retrieve node information from node '%s'" %
-                  ninfo.node)
-    (_, _, (src_info, )) = nodeinfo[source_node].payload
-    (_, _, (dst_info, )) = nodeinfo[target_node].payload
-
-    if ((constants.HV_NODEINFO_KEY_VERSION in src_info) and
-        (constants.HV_NODEINFO_KEY_VERSION in dst_info)):
-      src_version = src_info[constants.HV_NODEINFO_KEY_VERSION]
-      dst_version = dst_info[constants.HV_NODEINFO_KEY_VERSION]
-      if src_version != dst_version:
-        self.feedback_fn("* warning: hypervisor version mismatch between"
-                         " source (%s) and target (%s) node" %
-                         (src_version, dst_version))
-
-    self.feedback_fn("* checking disk consistency between source and target")
-    for (idx, dev) in enumerate(instance.disks):
-      if not _CheckDiskConsistency(self.lu, instance, dev, target_node, False):
-        raise errors.OpExecError("Disk %s is degraded or not fully"
-                                 " synchronized on target node,"
-                                 " aborting migration" % idx)
-
-    if self.current_mem > self.tgt_free_mem:
-      if not self.allow_runtime_changes:
-        raise errors.OpExecError("Memory ballooning not allowed and not enough"
-                                 " free memory to fit instance %s on target"
-                                 " node %s (have %dMB, need %dMB)" %
-                                 (instance.name, target_node,
-                                  self.tgt_free_mem, self.current_mem))
-      self.feedback_fn("* setting instance memory to %s" % self.tgt_free_mem)
-      rpcres = self.rpc.call_instance_balloon_memory(instance.primary_node,
-                                                     instance,
-                                                     self.tgt_free_mem)
-      rpcres.Raise("Cannot modify instance runtime memory")
-
-    # First get the migration information from the remote node
-    result = self.rpc.call_migration_info(source_node, instance)
-    msg = result.fail_msg
-    if msg:
-      log_err = ("Failed fetching source migration information from %s: %s" %
-                 (source_node, msg))
-      logging.error(log_err)
-      raise errors.OpExecError(log_err)
-
-    self.migration_info = migration_info = result.payload
-
-    if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
-      # Then switch the disks to master/master mode
-      self._EnsureSecondary(target_node)
-      self._GoStandalone()
-      self._GoReconnect(True)
-      self._WaitUntilSync()
-
-    self.feedback_fn("* preparing %s to accept the instance" % target_node)
-    result = self.rpc.call_accept_instance(target_node,
-                                           instance,
-                                           migration_info,
-                                           self.nodes_ip[target_node])
-
-    msg = result.fail_msg
-    if msg:
-      logging.error("Instance pre-migration failed, trying to revert"
-                    " disk status: %s", msg)
-      self.feedback_fn("Pre-migration failed, aborting")
-      self._AbortMigration()
-      self._RevertDiskStatus()
-      raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
-                               (instance.name, msg))
-
-    self.feedback_fn("* migrating instance to %s" % target_node)
-    result = self.rpc.call_instance_migrate(source_node, instance,
-                                            self.nodes_ip[target_node],
-                                            self.live)
-    msg = result.fail_msg
-    if msg:
-      logging.error("Instance migration failed, trying to revert"
-                    " disk status: %s", msg)
-      self.feedback_fn("Migration failed, aborting")
-      self._AbortMigration()
-      self._RevertDiskStatus()
-      raise errors.OpExecError("Could not migrate instance %s: %s" %
-                               (instance.name, msg))
-
-    self.feedback_fn("* starting memory transfer")
-    last_feedback = time.time()
-    while True:
-      result = self.rpc.call_instance_get_migration_status(source_node,
-                                                           instance)
-      msg = result.fail_msg
-      ms = result.payload   # MigrationStatus instance
-      if msg or (ms.status in constants.HV_MIGRATION_FAILED_STATUSES):
-        logging.error("Instance migration failed, trying to revert"
-                      " disk status: %s", msg)
-        self.feedback_fn("Migration failed, aborting")
-        self._AbortMigration()
-        self._RevertDiskStatus()
-        if not msg:
-          msg = "hypervisor returned failure"
-        raise errors.OpExecError("Could not migrate instance %s: %s" %
-                                 (instance.name, msg))
-
-      if result.payload.status != constants.HV_MIGRATION_ACTIVE:
-        self.feedback_fn("* memory transfer complete")
-        break
-
-      if (utils.TimeoutExpired(last_feedback,
-                               self._MIGRATION_FEEDBACK_INTERVAL) and
-          ms.transferred_ram is not None):
-        mem_progress = 100 * float(ms.transferred_ram) / float(ms.total_ram)
-        self.feedback_fn("* memory transfer progress: %.2f %%" % mem_progress)
-        last_feedback = time.time()
-
-      time.sleep(self._MIGRATION_POLL_INTERVAL)
-
-    result = self.rpc.call_instance_finalize_migration_src(source_node,
-                                                           instance,
-                                                           True,
-                                                           self.live)
-    msg = result.fail_msg
-    if msg:
-      logging.error("Instance migration succeeded, but finalization failed"
-                    " on the source node: %s", msg)
-      raise errors.OpExecError("Could not finalize instance migration: %s" %
-                               msg)
-
-    instance.primary_node = target_node
-
-    # distribute new instance config to the other nodes
-    self.cfg.Update(instance, self.feedback_fn)
-
-    result = self.rpc.call_instance_finalize_migration_dst(target_node,
-                                                           instance,
-                                                           migration_info,
-                                                           True)
-    msg = result.fail_msg
-    if msg:
-      logging.error("Instance migration succeeded, but finalization failed"
-                    " on the target node: %s", msg)
-      raise errors.OpExecError("Could not finalize instance migration: %s" %
-                               msg)
-
-    if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
-      self._EnsureSecondary(source_node)
-      self._WaitUntilSync()
-      self._GoStandalone()
-      self._GoReconnect(False)
-      self._WaitUntilSync()
-
-    # If the instance's disk template is `rbd' or `ext' and there was a
-    # successful migration, unmap the device from the source node.
-    if self.instance.disk_template in (constants.DT_RBD, constants.DT_EXT):
-      disks = _ExpandCheckDisks(instance, instance.disks)
-      self.feedback_fn("* unmapping instance's disks from %s" % source_node)
-      for disk in disks:
-        result = self.rpc.call_blockdev_shutdown(source_node, (disk, instance))
-        msg = result.fail_msg
-        if msg:
-          logging.error("Migration was successful, but couldn't unmap the"
-                        " block device %s on source node %s: %s",
-                        disk.iv_name, source_node, msg)
-          logging.error("You need to unmap the device %s manually on %s",
-                        disk.iv_name, source_node)
-
-    self.feedback_fn("* done")
-
-  def _ExecFailover(self):
-    """Failover an instance.
-
-    The failover is done by shutting it down on its present node and
-    starting it on the secondary.
-
-    """
-    instance = self.instance
-    primary_node = self.cfg.GetNodeInfo(instance.primary_node)
-
-    source_node = instance.primary_node
-    target_node = self.target_node
-
-    if instance.admin_state == constants.ADMINST_UP:
-      self.feedback_fn("* checking disk consistency between source and target")
-      for (idx, dev) in enumerate(instance.disks):
-        # for drbd, these are drbd over lvm
-        if not _CheckDiskConsistency(self.lu, instance, dev, target_node,
-                                     False):
-          if primary_node.offline:
-            self.feedback_fn("Node %s is offline, ignoring degraded disk %s on"
-                             " target node %s" %
-                             (primary_node.name, idx, target_node))
-          elif not self.ignore_consistency:
-            raise errors.OpExecError("Disk %s is degraded on target node,"
-                                     " aborting failover" % idx)
-    else:
-      self.feedback_fn("* not checking disk consistency as instance is not"
-                       " running")
-
-    self.feedback_fn("* shutting down instance on source node")
-    logging.info("Shutting down instance %s on node %s",
-                 instance.name, source_node)
-
-    result = self.rpc.call_instance_shutdown(source_node, instance,
-                                             self.shutdown_timeout,
-                                             self.lu.op.reason)
-    msg = result.fail_msg
-    if msg:
-      if self.ignore_consistency or primary_node.offline:
-        self.lu.LogWarning("Could not shutdown instance %s on node %s,"
-                           " proceeding anyway; please make sure node"
-                           " %s is down; error details: %s",
-                           instance.name, source_node, source_node, msg)
-      else:
-        raise errors.OpExecError("Could not shutdown instance %s on"
-                                 " node %s: %s" %
-                                 (instance.name, source_node, msg))
-
-    self.feedback_fn("* deactivating the instance's disks on source node")
-    if not _ShutdownInstanceDisks(self.lu, instance, ignore_primary=True):
-      raise errors.OpExecError("Can't shut down the instance's disks")
-
-    instance.primary_node = target_node
-    # distribute new instance config to the other nodes
-    self.cfg.Update(instance, self.feedback_fn)
-
-    # Only start the instance if it's marked as up
-    if instance.admin_state == constants.ADMINST_UP:
-      self.feedback_fn("* activating the instance's disks on target node %s" %
-                       target_node)
-      logging.info("Starting instance %s on node %s",
-                   instance.name, target_node)
-
-      disks_ok, _ = _AssembleInstanceDisks(self.lu, instance,
-                                           ignore_secondaries=True)
-      if not disks_ok:
-        _ShutdownInstanceDisks(self.lu, instance)
-        raise errors.OpExecError("Can't activate the instance's disks")
-
-      self.feedback_fn("* starting the instance on the target node %s" %
-                       target_node)
-      result = self.rpc.call_instance_start(target_node, (instance, None, None),
-                                            False, self.lu.op.reason)
-      msg = result.fail_msg
-      if msg:
-        _ShutdownInstanceDisks(self.lu, instance)
-        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
-                                 (instance.name, target_node, msg))
-
-  def Exec(self, feedback_fn):
-    """Perform the migration.
-
-    """
-    self.feedback_fn = feedback_fn
-    self.source_node = self.instance.primary_node
-
-    # FIXME: if we implement migrate-to-any in DRBD, this needs fixing
-    if self.instance.disk_template in constants.DTS_INT_MIRROR:
-      self.target_node = self.instance.secondary_nodes[0]
-      # Otherwise self.target_node has been populated either
-      # directly, or through an iallocator.
-
-    self.all_nodes = [self.source_node, self.target_node]
-    self.nodes_ip = dict((name, node.secondary_ip) for (name, node)
-                         in self.cfg.GetMultiNodeInfo(self.all_nodes))
-
-    if self.failover:
-      feedback_fn("Failover instance %s" % self.instance.name)
-      self._ExecFailover()
-    else:
-      feedback_fn("Migrating instance %s" % self.instance.name)
-
-      if self.cleanup:
-        return self._ExecCleanup()
-      else:
-        return self._ExecMigration()
diff --git a/lib/cmdlib/instance_migration.py b/lib/cmdlib/instance_migration.py
new file mode 100644 (file)
index 0000000..d3360e8
--- /dev/null
@@ -0,0 +1,928 @@
+#
+#
+
+# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful, but
+# WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+# General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+# 02110-1301, USA.
+
+
+"""Logical units dealing with instance migration an failover."""
+
+import logging
+import time
+
+from ganeti import constants
+from ganeti import errors
+from ganeti import locking
+from ganeti.masterd import iallocator
+from ganeti import utils
+from ganeti.cmdlib.base import LogicalUnit, Tasklet
+from ganeti.cmdlib.common import _ExpandInstanceName, \
+  _CheckIAllocatorOrNode, _ExpandNodeName
+from ganeti.cmdlib.instance_storage import _CheckDiskConsistency, \
+  _ExpandCheckDisks, _ShutdownInstanceDisks, _AssembleInstanceDisks
+from ganeti.cmdlib.instance_utils import _BuildInstanceHookEnvByObject, \
+  _CheckTargetNodeIPolicy, _ReleaseLocks, _CheckNodeNotDrained, \
+  _CopyLockList, _CheckNodeFreeMemory, _CheckInstanceBridgesExist
+
+import ganeti.masterd.instance
+
+
+def _ExpandNamesForMigration(lu):
+  """Expands names for use with L{TLMigrateInstance}.
+
+  @type lu: L{LogicalUnit}
+
+  """
+  if lu.op.target_node is not None:
+    lu.op.target_node = _ExpandNodeName(lu.cfg, lu.op.target_node)
+
+  lu.needed_locks[locking.LEVEL_NODE] = []
+  lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
+
+  lu.needed_locks[locking.LEVEL_NODE_RES] = []
+  lu.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
+
+  # The node allocation lock is actually only needed for externally replicated
+  # instances (e.g. sharedfile or RBD) and if an iallocator is used.
+  lu.needed_locks[locking.LEVEL_NODE_ALLOC] = []
+
+
+def _DeclareLocksForMigration(lu, level):
+  """Declares locks for L{TLMigrateInstance}.
+
+  @type lu: L{LogicalUnit}
+  @param level: Lock level
+
+  """
+  if level == locking.LEVEL_NODE_ALLOC:
+    assert lu.op.instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
+
+    instance = lu.cfg.GetInstanceInfo(lu.op.instance_name)
+
+    # Node locks are already declared here rather than at LEVEL_NODE as we need
+    # the instance object anyway to declare the node allocation lock.
+    if instance.disk_template in constants.DTS_EXT_MIRROR:
+      if lu.op.target_node is None:
+        lu.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
+        lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
+      else:
+        lu.needed_locks[locking.LEVEL_NODE] = [instance.primary_node,
+                                               lu.op.target_node]
+      del lu.recalculate_locks[locking.LEVEL_NODE]
+    else:
+      lu._LockInstancesNodes() # pylint: disable=W0212
+
+  elif level == locking.LEVEL_NODE:
+    # Node locks are declared together with the node allocation lock
+    assert (lu.needed_locks[locking.LEVEL_NODE] or
+            lu.needed_locks[locking.LEVEL_NODE] is locking.ALL_SET)
+
+  elif level == locking.LEVEL_NODE_RES:
+    # Copy node locks
+    lu.needed_locks[locking.LEVEL_NODE_RES] = \
+      _CopyLockList(lu.needed_locks[locking.LEVEL_NODE])
+
+
+class LUInstanceFailover(LogicalUnit):
+  """Failover an instance.
+
+  """
+  HPATH = "instance-failover"
+  HTYPE = constants.HTYPE_INSTANCE
+  REQ_BGL = False
+
+  def CheckArguments(self):
+    """Check the arguments.
+
+    """
+    self.iallocator = getattr(self.op, "iallocator", None)
+    self.target_node = getattr(self.op, "target_node", None)
+
+  def ExpandNames(self):
+    self._ExpandAndLockInstance()
+    _ExpandNamesForMigration(self)
+
+    self._migrater = \
+      TLMigrateInstance(self, self.op.instance_name, False, True, False,
+                        self.op.ignore_consistency, True,
+                        self.op.shutdown_timeout, self.op.ignore_ipolicy)
+
+    self.tasklets = [self._migrater]
+
+  def DeclareLocks(self, level):
+    _DeclareLocksForMigration(self, level)
+
+  def BuildHooksEnv(self):
+    """Build hooks env.
+
+    This runs on master, primary and secondary nodes of the instance.
+
+    """
+    instance = self._migrater.instance
+    source_node = instance.primary_node
+    target_node = self.op.target_node
+    env = {
+      "IGNORE_CONSISTENCY": self.op.ignore_consistency,
+      "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
+      "OLD_PRIMARY": source_node,
+      "NEW_PRIMARY": target_node,
+      }
+
+    if instance.disk_template in constants.DTS_INT_MIRROR:
+      env["OLD_SECONDARY"] = instance.secondary_nodes[0]
+      env["NEW_SECONDARY"] = source_node
+    else:
+      env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = ""
+
+    env.update(_BuildInstanceHookEnvByObject(self, instance))
+
+    return env
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
+    instance = self._migrater.instance
+    nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
+    return (nl, nl + [instance.primary_node])
+
+
+class LUInstanceMigrate(LogicalUnit):
+  """Migrate an instance.
+
+  This is migration without shutting down, compared to the failover,
+  which is done with shutdown.
+
+  """
+  HPATH = "instance-migrate"
+  HTYPE = constants.HTYPE_INSTANCE
+  REQ_BGL = False
+
+  def ExpandNames(self):
+    self._ExpandAndLockInstance()
+    _ExpandNamesForMigration(self)
+
+    self._migrater = \
+      TLMigrateInstance(self, self.op.instance_name, self.op.cleanup,
+                        False, self.op.allow_failover, False,
+                        self.op.allow_runtime_changes,
+                        constants.DEFAULT_SHUTDOWN_TIMEOUT,
+                        self.op.ignore_ipolicy)
+
+    self.tasklets = [self._migrater]
+
+  def DeclareLocks(self, level):
+    _DeclareLocksForMigration(self, level)
+
+  def BuildHooksEnv(self):
+    """Build hooks env.
+
+    This runs on master, primary and secondary nodes of the instance.
+
+    """
+    instance = self._migrater.instance
+    source_node = instance.primary_node
+    target_node = self.op.target_node
+    env = _BuildInstanceHookEnvByObject(self, instance)
+    env.update({
+      "MIGRATE_LIVE": self._migrater.live,
+      "MIGRATE_CLEANUP": self.op.cleanup,
+      "OLD_PRIMARY": source_node,
+      "NEW_PRIMARY": target_node,
+      "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
+      })
+
+    if instance.disk_template in constants.DTS_INT_MIRROR:
+      env["OLD_SECONDARY"] = target_node
+      env["NEW_SECONDARY"] = source_node
+    else:
+      env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = None
+
+    return env
+
+  def BuildHooksNodes(self):
+    """Build hooks nodes.
+
+    """
+    instance = self._migrater.instance
+    snodes = list(instance.secondary_nodes)
+    nl = [self.cfg.GetMasterNode(), instance.primary_node] + snodes
+    return (nl, nl)
+
+
+class TLMigrateInstance(Tasklet):
+  """Tasklet class for instance migration.
+
+  @type live: boolean
+  @ivar live: whether the migration will be done live or non-live;
+      this variable is initalized only after CheckPrereq has run
+  @type cleanup: boolean
+  @ivar cleanup: Wheater we cleanup from a failed migration
+  @type iallocator: string
+  @ivar iallocator: The iallocator used to determine target_node
+  @type target_node: string
+  @ivar target_node: If given, the target_node to reallocate the instance to
+  @type failover: boolean
+  @ivar failover: Whether operation results in failover or migration
+  @type fallback: boolean
+  @ivar fallback: Whether fallback to failover is allowed if migration not
+                  possible
+  @type ignore_consistency: boolean
+  @ivar ignore_consistency: Wheter we should ignore consistency between source
+                            and target node
+  @type shutdown_timeout: int
+  @ivar shutdown_timeout: In case of failover timeout of the shutdown
+  @type ignore_ipolicy: bool
+  @ivar ignore_ipolicy: If true, we can ignore instance policy when migrating
+
+  """
+
+  # Constants
+  _MIGRATION_POLL_INTERVAL = 1      # seconds
+  _MIGRATION_FEEDBACK_INTERVAL = 10 # seconds
+
+  def __init__(self, lu, instance_name, cleanup, failover, fallback,
+               ignore_consistency, allow_runtime_changes, shutdown_timeout,
+               ignore_ipolicy):
+    """Initializes this class.
+
+    """
+    Tasklet.__init__(self, lu)
+
+    # Parameters
+    self.instance_name = instance_name
+    self.cleanup = cleanup
+    self.live = False # will be overridden later
+    self.failover = failover
+    self.fallback = fallback
+    self.ignore_consistency = ignore_consistency
+    self.shutdown_timeout = shutdown_timeout
+    self.ignore_ipolicy = ignore_ipolicy
+    self.allow_runtime_changes = allow_runtime_changes
+
+  def CheckPrereq(self):
+    """Check prerequisites.
+
+    This checks that the instance is in the cluster.
+
+    """
+    instance_name = _ExpandInstanceName(self.lu.cfg, self.instance_name)
+    instance = self.cfg.GetInstanceInfo(instance_name)
+    assert instance is not None
+    self.instance = instance
+    cluster = self.cfg.GetClusterInfo()
+
+    if (not self.cleanup and
+        not instance.admin_state == constants.ADMINST_UP and
+        not self.failover and self.fallback):
+      self.lu.LogInfo("Instance is marked down or offline, fallback allowed,"
+                      " switching to failover")
+      self.failover = True
+
+    if instance.disk_template not in constants.DTS_MIRRORED:
+      if self.failover:
+        text = "failovers"
+      else:
+        text = "migrations"
+      raise errors.OpPrereqError("Instance's disk layout '%s' does not allow"
+                                 " %s" % (instance.disk_template, text),
+                                 errors.ECODE_STATE)
+
+    if instance.disk_template in constants.DTS_EXT_MIRROR:
+      _CheckIAllocatorOrNode(self.lu, "iallocator", "target_node")
+
+      if self.lu.op.iallocator:
+        assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
+        self._RunAllocator()
+      else:
+        # We set set self.target_node as it is required by
+        # BuildHooksEnv
+        self.target_node = self.lu.op.target_node
+
+      # Check that the target node is correct in terms of instance policy
+      nodeinfo = self.cfg.GetNodeInfo(self.target_node)
+      group_info = self.cfg.GetNodeGroup(nodeinfo.group)
+      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
+                                                              group_info)
+      _CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, self.cfg,
+                              ignore=self.ignore_ipolicy)
+
+      # self.target_node is already populated, either directly or by the
+      # iallocator run
+      target_node = self.target_node
+      if self.target_node == instance.primary_node:
+        raise errors.OpPrereqError("Cannot migrate instance %s"
+                                   " to its primary (%s)" %
+                                   (instance.name, instance.primary_node),
+                                   errors.ECODE_STATE)
+
+      if len(self.lu.tasklets) == 1:
+        # It is safe to release locks only when we're the only tasklet
+        # in the LU
+        _ReleaseLocks(self.lu, locking.LEVEL_NODE,
+                      keep=[instance.primary_node, self.target_node])
+        _ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
+
+    else:
+      assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
+
+      secondary_nodes = instance.secondary_nodes
+      if not secondary_nodes:
+        raise errors.ConfigurationError("No secondary node but using"
+                                        " %s disk template" %
+                                        instance.disk_template)
+      target_node = secondary_nodes[0]
+      if self.lu.op.iallocator or (self.lu.op.target_node and
+                                   self.lu.op.target_node != target_node):
+        if self.failover:
+          text = "failed over"
+        else:
+          text = "migrated"
+        raise errors.OpPrereqError("Instances with disk template %s cannot"
+                                   " be %s to arbitrary nodes"
+                                   " (neither an iallocator nor a target"
+                                   " node can be passed)" %
+                                   (instance.disk_template, text),
+                                   errors.ECODE_INVAL)
+      nodeinfo = self.cfg.GetNodeInfo(target_node)
+      group_info = self.cfg.GetNodeGroup(nodeinfo.group)
+      ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
+                                                              group_info)
+      _CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, self.cfg,
+                              ignore=self.ignore_ipolicy)
+
+    i_be = cluster.FillBE(instance)
+
+    # check memory requirements on the secondary node
+    if (not self.cleanup and
+         (not self.failover or instance.admin_state == constants.ADMINST_UP)):
+      self.tgt_free_mem = _CheckNodeFreeMemory(self.lu, target_node,
+                                               "migrating instance %s" %
+                                               instance.name,
+                                               i_be[constants.BE_MINMEM],
+                                               instance.hypervisor)
+    else:
+      self.lu.LogInfo("Not checking memory on the secondary node as"
+                      " instance will not be started")
+
+    # check if failover must be forced instead of migration
+    if (not self.cleanup and not self.failover and
+        i_be[constants.BE_ALWAYS_FAILOVER]):
+      self.lu.LogInfo("Instance configured to always failover; fallback"
+                      " to failover")
+      self.failover = True
+
+    # check bridge existance
+    _CheckInstanceBridgesExist(self.lu, instance, node=target_node)
+
+    if not self.cleanup:
+      _CheckNodeNotDrained(self.lu, target_node)
+      if not self.failover:
+        result = self.rpc.call_instance_migratable(instance.primary_node,
+                                                   instance)
+        if result.fail_msg and self.fallback:
+          self.lu.LogInfo("Can't migrate, instance offline, fallback to"
+                          " failover")
+          self.failover = True
+        else:
+          result.Raise("Can't migrate, please use failover",
+                       prereq=True, ecode=errors.ECODE_STATE)
+
+    assert not (self.failover and self.cleanup)
+
+    if not self.failover:
+      if self.lu.op.live is not None and self.lu.op.mode is not None:
+        raise errors.OpPrereqError("Only one of the 'live' and 'mode'"
+                                   " parameters are accepted",
+                                   errors.ECODE_INVAL)
+      if self.lu.op.live is not None:
+        if self.lu.op.live:
+          self.lu.op.mode = constants.HT_MIGRATION_LIVE
+        else:
+          self.lu.op.mode = constants.HT_MIGRATION_NONLIVE
+        # reset the 'live' parameter to None so that repeated
+        # invocations of CheckPrereq do not raise an exception
+        self.lu.op.live = None
+      elif self.lu.op.mode is None:
+        # read the default value from the hypervisor
+        i_hv = cluster.FillHV(self.instance, skip_globals=False)
+        self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]
+
+      self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
+    else:
+      # Failover is never live
+      self.live = False
+
+    if not (self.failover or self.cleanup):
+      remote_info = self.rpc.call_instance_info(instance.primary_node,
+                                                instance.name,
+                                                instance.hypervisor)
+      remote_info.Raise("Error checking instance on node %s" %
+                        instance.primary_node)
+      instance_running = bool(remote_info.payload)
+      if instance_running:
+        self.current_mem = int(remote_info.payload["memory"])
+
+  def _RunAllocator(self):
+    """Run the allocator based on input opcode.
+
+    """
+    assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
+
+    # FIXME: add a self.ignore_ipolicy option
+    req = iallocator.IAReqRelocate(name=self.instance_name,
+                                   relocate_from=[self.instance.primary_node])
+    ial = iallocator.IAllocator(self.cfg, self.rpc, req)
+
+    ial.Run(self.lu.op.iallocator)
+
+    if not ial.success:
+      raise errors.OpPrereqError("Can't compute nodes using"
+                                 " iallocator '%s': %s" %
+                                 (self.lu.op.iallocator, ial.info),
+                                 errors.ECODE_NORES)
+    self.target_node = ial.result[0]
+    self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
+                    self.instance_name, self.lu.op.iallocator,
+                    utils.CommaJoin(ial.result))
+
+  def _WaitUntilSync(self):
+    """Poll with custom rpc for disk sync.
+
+    This uses our own step-based rpc call.
+
+    """
+    self.feedback_fn("* wait until resync is done")
+    all_done = False
+    while not all_done:
+      all_done = True
+      result = self.rpc.call_drbd_wait_sync(self.all_nodes,
+                                            self.nodes_ip,
+                                            (self.instance.disks,
+                                             self.instance))
+      min_percent = 100
+      for node, nres in result.items():
+        nres.Raise("Cannot resync disks on node %s" % node)
+        node_done, node_percent = nres.payload
+        all_done = all_done and node_done
+        if node_percent is not None:
+          min_percent = min(min_percent, node_percent)
+      if not all_done:
+        if min_percent < 100:
+          self.feedback_fn("   - progress: %.1f%%" % min_percent)
+        time.sleep(2)
+
+  def _EnsureSecondary(self, node):
+    """Demote a node to secondary.
+
+    """
+    self.feedback_fn("* switching node %s to secondary mode" % node)
+
+    for dev in self.instance.disks:
+      self.cfg.SetDiskID(dev, node)
+
+    result = self.rpc.call_blockdev_close(node, self.instance.name,
+                                          self.instance.disks)
+    result.Raise("Cannot change disk to secondary on node %s" % node)
+
+  def _GoStandalone(self):
+    """Disconnect from the network.
+
+    """
+    self.feedback_fn("* changing into standalone mode")
+    result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
+                                               self.instance.disks)
+    for node, nres in result.items():
+      nres.Raise("Cannot disconnect disks node %s" % node)
+
+  def _GoReconnect(self, multimaster):
+    """Reconnect to the network.
+
+    """
+    if multimaster:
+      msg = "dual-master"
+    else:
+      msg = "single-master"
+    self.feedback_fn("* changing disks into %s mode" % msg)
+    result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
+                                           (self.instance.disks, self.instance),
+                                           self.instance.name, multimaster)
+    for node, nres in result.items():
+      nres.Raise("Cannot change disks config on node %s" % node)
+
+  def _ExecCleanup(self):
+    """Try to cleanup after a failed migration.
+
+    The cleanup is done by:
+      - check that the instance is running only on one node
+        (and update the config if needed)
+      - change disks on its secondary node to secondary
+      - wait until disks are fully synchronized
+      - disconnect from the network
+      - change disks into single-master mode
+      - wait again until disks are fully synchronized
+
+    """
+    instance = self.instance
+    target_node = self.target_node
+    source_node = self.source_node
+
+    # check running on only one node
+    self.feedback_fn("* checking where the instance actually runs"
+                     " (if this hangs, the hypervisor might be in"
+                     " a bad state)")
+    ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
+    for node, result in ins_l.items():
+      result.Raise("Can't contact node %s" % node)
+
+    runningon_source = instance.name in ins_l[source_node].payload
+    runningon_target = instance.name in ins_l[target_node].payload
+
+    if runningon_source and runningon_target:
+      raise errors.OpExecError("Instance seems to be running on two nodes,"
+                               " or the hypervisor is confused; you will have"
+                               " to ensure manually that it runs only on one"
+                               " and restart this operation")
+
+    if not (runningon_source or runningon_target):
+      raise errors.OpExecError("Instance does not seem to be running at all;"
+                               " in this case it's safer to repair by"
+                               " running 'gnt-instance stop' to ensure disk"
+                               " shutdown, and then restarting it")
+
+    if runningon_target:
+      # the migration has actually succeeded, we need to update the config
+      self.feedback_fn("* instance running on secondary node (%s),"
+                       " updating config" % target_node)
+      instance.primary_node = target_node
+      self.cfg.Update(instance, self.feedback_fn)
+      demoted_node = source_node
+    else:
+      self.feedback_fn("* instance confirmed to be running on its"
+                       " primary node (%s)" % source_node)
+      demoted_node = target_node
+
+    if instance.disk_template in constants.DTS_INT_MIRROR:
+      self._EnsureSecondary(demoted_node)
+      try:
+        self._WaitUntilSync()
+      except errors.OpExecError:
+        # we ignore here errors, since if the device is standalone, it
+        # won't be able to sync
+        pass
+      self._GoStandalone()
+      self._GoReconnect(False)
+      self._WaitUntilSync()
+
+    self.feedback_fn("* done")
+
+  def _RevertDiskStatus(self):
+    """Try to revert the disk status after a failed migration.
+
+    """
+    target_node = self.target_node
+    if self.instance.disk_template in constants.DTS_EXT_MIRROR:
+      return
+
+    try:
+      self._EnsureSecondary(target_node)
+      self._GoStandalone()
+      self._GoReconnect(False)
+      self._WaitUntilSync()
+    except errors.OpExecError, err:
+      self.lu.LogWarning("Migration failed and I can't reconnect the drives,"
+                         " please try to recover the instance manually;"
+                         " error '%s'" % str(err))
+
+  def _AbortMigration(self):
+    """Call the hypervisor code to abort a started migration.
+
+    """
+    instance = self.instance
+    target_node = self.target_node
+    source_node = self.source_node
+    migration_info = self.migration_info
+
+    abort_result = self.rpc.call_instance_finalize_migration_dst(target_node,
+                                                                 instance,
+                                                                 migration_info,
+                                                                 False)
+    abort_msg = abort_result.fail_msg
+    if abort_msg:
+      logging.error("Aborting migration failed on target node %s: %s",
+                    target_node, abort_msg)
+      # Don't raise an exception here, as we stil have to try to revert the
+      # disk status, even if this step failed.
+
+    abort_result = self.rpc.call_instance_finalize_migration_src(
+      source_node, instance, False, self.live)
+    abort_msg = abort_result.fail_msg
+    if abort_msg:
+      logging.error("Aborting migration failed on source node %s: %s",
+                    source_node, abort_msg)
+
+  def _ExecMigration(self):
+    """Migrate an instance.
+
+    The migrate is done by:
+      - change the disks into dual-master mode
+      - wait until disks are fully synchronized again
+      - migrate the instance
+      - change disks on the new secondary node (the old primary) to secondary
+      - wait until disks are fully synchronized
+      - change disks into single-master mode
+
+    """
+    instance = self.instance
+    target_node = self.target_node
+    source_node = self.source_node
+
+    # Check for hypervisor version mismatch and warn the user.
+    nodeinfo = self.rpc.call_node_info([source_node, target_node],
+                                       None, [self.instance.hypervisor], False)
+    for ninfo in nodeinfo.values():
+      ninfo.Raise("Unable to retrieve node information from node '%s'" %
+                  ninfo.node)
+    (_, _, (src_info, )) = nodeinfo[source_node].payload
+    (_, _, (dst_info, )) = nodeinfo[target_node].payload
+
+    if ((constants.HV_NODEINFO_KEY_VERSION in src_info) and
+        (constants.HV_NODEINFO_KEY_VERSION in dst_info)):
+      src_version = src_info[constants.HV_NODEINFO_KEY_VERSION]
+      dst_version = dst_info[constants.HV_NODEINFO_KEY_VERSION]
+      if src_version != dst_version:
+        self.feedback_fn("* warning: hypervisor version mismatch between"
+                         " source (%s) and target (%s) node" %
+                         (src_version, dst_version))
+
+    self.feedback_fn("* checking disk consistency between source and target")
+    for (idx, dev) in enumerate(instance.disks):
+      if not _CheckDiskConsistency(self.lu, instance, dev, target_node, False):
+        raise errors.OpExecError("Disk %s is degraded or not fully"
+                                 " synchronized on target node,"
+                                 " aborting migration" % idx)
+
+    if self.current_mem > self.tgt_free_mem:
+      if not self.allow_runtime_changes:
+        raise errors.OpExecError("Memory ballooning not allowed and not enough"
+                                 " free memory to fit instance %s on target"
+                                 " node %s (have %dMB, need %dMB)" %
+                                 (instance.name, target_node,
+                                  self.tgt_free_mem, self.current_mem))
+      self.feedback_fn("* setting instance memory to %s" % self.tgt_free_mem)
+      rpcres = self.rpc.call_instance_balloon_memory(instance.primary_node,
+                                                     instance,
+                                                     self.tgt_free_mem)
+      rpcres.Raise("Cannot modify instance runtime memory")
+
+    # First get the migration information from the remote node
+    result = self.rpc.call_migration_info(source_node, instance)
+    msg = result.fail_msg
+    if msg:
+      log_err = ("Failed fetching source migration information from %s: %s" %
+                 (source_node, msg))
+      logging.error(log_err)
+      raise errors.OpExecError(log_err)
+
+    self.migration_info = migration_info = result.payload
+
+    if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
+      # Then switch the disks to master/master mode
+      self._EnsureSecondary(target_node)
+      self._GoStandalone()
+      self._GoReconnect(True)
+      self._WaitUntilSync()
+
+    self.feedback_fn("* preparing %s to accept the instance" % target_node)
+    result = self.rpc.call_accept_instance(target_node,
+                                           instance,
+                                           migration_info,
+                                           self.nodes_ip[target_node])
+
+    msg = result.fail_msg
+    if msg:
+      logging.error("Instance pre-migration failed, trying to revert"
+                    " disk status: %s", msg)
+      self.feedback_fn("Pre-migration failed, aborting")
+      self._AbortMigration()
+      self._RevertDiskStatus()
+      raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
+                               (instance.name, msg))
+
+    self.feedback_fn("* migrating instance to %s" % target_node)
+    result = self.rpc.call_instance_migrate(source_node, instance,
+                                            self.nodes_ip[target_node],
+                                            self.live)
+    msg = result.fail_msg
+    if msg:
+      logging.error("Instance migration failed, trying to revert"
+                    " disk status: %s", msg)
+      self.feedback_fn("Migration failed, aborting")
+      self._AbortMigration()
+      self._RevertDiskStatus()
+      raise errors.OpExecError("Could not migrate instance %s: %s" %
+                               (instance.name, msg))
+
+    self.feedback_fn("* starting memory transfer")
+    last_feedback = time.time()
+    while True:
+      result = self.rpc.call_instance_get_migration_status(source_node,
+                                                           instance)
+      msg = result.fail_msg
+      ms = result.payload   # MigrationStatus instance
+      if msg or (ms.status in constants.HV_MIGRATION_FAILED_STATUSES):
+        logging.error("Instance migration failed, trying to revert"
+                      " disk status: %s", msg)
+        self.feedback_fn("Migration failed, aborting")
+        self._AbortMigration()
+        self._RevertDiskStatus()
+        if not msg:
+          msg = "hypervisor returned failure"
+        raise errors.OpExecError("Could not migrate instance %s: %s" %
+                                 (instance.name, msg))
+
+      if result.payload.status != constants.HV_MIGRATION_ACTIVE:
+        self.feedback_fn("* memory transfer complete")
+        break
+
+      if (utils.TimeoutExpired(last_feedback,
+                               self._MIGRATION_FEEDBACK_INTERVAL) and
+          ms.transferred_ram is not None):
+        mem_progress = 100 * float(ms.transferred_ram) / float(ms.total_ram)
+        self.feedback_fn("* memory transfer progress: %.2f %%" % mem_progress)
+        last_feedback = time.time()
+
+      time.sleep(self._MIGRATION_POLL_INTERVAL)
+
+    result = self.rpc.call_instance_finalize_migration_src(source_node,
+                                                           instance,
+                                                           True,
+                                                           self.live)
+    msg = result.fail_msg
+    if msg:
+      logging.error("Instance migration succeeded, but finalization failed"
+                    " on the source node: %s", msg)
+      raise errors.OpExecError("Could not finalize instance migration: %s" %
+                               msg)
+
+    instance.primary_node = target_node
+
+    # distribute new instance config to the other nodes
+    self.cfg.Update(instance, self.feedback_fn)
+
+    result = self.rpc.call_instance_finalize_migration_dst(target_node,
+                                                           instance,
+                                                           migration_info,
+                                                           True)
+    msg = result.fail_msg
+    if msg:
+      logging.error("Instance migration succeeded, but finalization failed"
+                    " on the target node: %s", msg)
+      raise errors.OpExecError("Could not finalize instance migration: %s" %
+                               msg)
+
+    if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
+      self._EnsureSecondary(source_node)
+      self._WaitUntilSync()
+      self._GoStandalone()
+      self._GoReconnect(False)
+      self._WaitUntilSync()
+
+    # If the instance's disk template is `rbd' or `ext' and there was a
+    # successful migration, unmap the device from the source node.
+    if self.instance.disk_template in (constants.DT_RBD, constants.DT_EXT):
+      disks = _ExpandCheckDisks(instance, instance.disks)
+      self.feedback_fn("* unmapping instance's disks from %s" % source_node)
+      for disk in disks:
+        result = self.rpc.call_blockdev_shutdown(source_node, (disk, instance))
+        msg = result.fail_msg
+        if msg:
+          logging.error("Migration was successful, but couldn't unmap the"
+                        " block device %s on source node %s: %s",
+                        disk.iv_name, source_node, msg)
+          logging.error("You need to unmap the device %s manually on %s",
+                        disk.iv_name, source_node)
+
+    self.feedback_fn("* done")
+
+  def _ExecFailover(self):
+    """Failover an instance.
+
+    The failover is done by shutting it down on its present node and
+    starting it on the secondary.
+
+    """
+    instance = self.instance
+    primary_node = self.cfg.GetNodeInfo(instance.primary_node)
+
+    source_node = instance.primary_node
+    target_node = self.target_node
+
+    if instance.admin_state == constants.ADMINST_UP:
+      self.feedback_fn("* checking disk consistency between source and target")
+      for (idx, dev) in enumerate(instance.disks):
+        # for drbd, these are drbd over lvm
+        if not _CheckDiskConsistency(self.lu, instance, dev, target_node,
+                                     False):
+          if primary_node.offline:
+            self.feedback_fn("Node %s is offline, ignoring degraded disk %s on"
+                             " target node %s" %
+                             (primary_node.name, idx, target_node))
+          elif not self.ignore_consistency:
+            raise errors.OpExecError("Disk %s is degraded on target node,"
+                                     " aborting failover" % idx)
+    else:
+      self.feedback_fn("* not checking disk consistency as instance is not"
+                       " running")
+
+    self.feedback_fn("* shutting down instance on source node")
+    logging.info("Shutting down instance %s on node %s",
+                 instance.name, source_node)
+
+    result = self.rpc.call_instance_shutdown(source_node, instance,
+                                             self.shutdown_timeout,
+                                             self.lu.op.reason)
+    msg = result.fail_msg
+    if msg:
+      if self.ignore_consistency or primary_node.offline:
+        self.lu.LogWarning("Could not shutdown instance %s on node %s,"
+                           " proceeding anyway; please make sure node"
+                           " %s is down; error details: %s",
+                           instance.name, source_node, source_node, msg)
+      else:
+        raise errors.OpExecError("Could not shutdown instance %s on"
+                                 " node %s: %s" %
+                                 (instance.name, source_node, msg))
+
+    self.feedback_fn("* deactivating the instance's disks on source node")
+    if not _ShutdownInstanceDisks(self.lu, instance, ignore_primary=True):
+      raise errors.OpExecError("Can't shut down the instance's disks")
+
+    instance.primary_node = target_node
+    # distribute new instance config to the other nodes
+    self.cfg.Update(instance, self.feedback_fn)
+
+    # Only start the instance if it's marked as up
+    if instance.admin_state == constants.ADMINST_UP:
+      self.feedback_fn("* activating the instance's disks on target node %s" %
+                       target_node)
+      logging.info("Starting instance %s on node %s",
+                   instance.name, target_node)
+
+      disks_ok, _ = _AssembleInstanceDisks(self.lu, instance,
+                                           ignore_secondaries=True)
+      if not disks_ok:
+        _ShutdownInstanceDisks(self.lu, instance)
+        raise errors.OpExecError("Can't activate the instance's disks")
+
+      self.feedback_fn("* starting the instance on the target node %s" %
+                       target_node)
+      result = self.rpc.call_instance_start(target_node, (instance, None, None),
+                                            False, self.lu.op.reason)
+      msg = result.fail_msg
+      if msg:
+        _ShutdownInstanceDisks(self.lu, instance)
+        raise errors.OpExecError("Could not start instance %s on node %s: %s" %
+                                 (instance.name, target_node, msg))
+
+  def Exec(self, feedback_fn):
+    """Perform the migration.
+
+    """
+    self.feedback_fn = feedback_fn
+    self.source_node = self.instance.primary_node
+
+    # FIXME: if we implement migrate-to-any in DRBD, this needs fixing
+    if self.instance.disk_template in constants.DTS_INT_MIRROR:
+      self.target_node = self.instance.secondary_nodes[0]
+      # Otherwise self.target_node has been populated either
+      # directly, or through an iallocator.
+
+    self.all_nodes = [self.source_node, self.target_node]
+    self.nodes_ip = dict((name, node.secondary_ip) for (name, node)
+                         in self.cfg.GetMultiNodeInfo(self.all_nodes))
+
+    if self.failover:
+      feedback_fn("Failover instance %s" % self.instance.name)
+      self._ExecFailover()
+    else:
+      feedback_fn("Migrating instance %s" % self.instance.name)
+
+      if self.cleanup:
+        return self._ExecCleanup()
+      else:
+        return self._ExecMigration()
index bb760ff..0d849b1 100644 (file)
@@ -448,3 +448,68 @@ def _GetInstanceInfoText(instance):
 
   """
   return "originstname+%s" % instance.name
+
+
+def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
+  """Checks if a node has enough free memory.
+
+  This function checks if a given node has the needed amount of free
+  memory. In case the node has less memory or we cannot get the
+  information from the node, this function raises an OpPrereqError
+  exception.
+
+  @type lu: C{LogicalUnit}
+  @param lu: a logical unit from which we get configuration data
+  @type node: C{str}
+  @param node: the node to check
+  @type reason: C{str}
+  @param reason: string to use in the error message
+  @type requested: C{int}
+  @param requested: the amount of memory in MiB to check for
+  @type hypervisor_name: C{str}
+  @param hypervisor_name: the hypervisor to ask for memory stats
+  @rtype: integer
+  @return: node current free memory
+  @raise errors.OpPrereqError: if the node doesn't have enough memory, or
+      we cannot check the node
+
+  """
+  nodeinfo = lu.rpc.call_node_info([node], None, [hypervisor_name], False)
+  nodeinfo[node].Raise("Can't get data from node %s" % node,
+                       prereq=True, ecode=errors.ECODE_ENVIRON)
+  (_, _, (hv_info, )) = nodeinfo[node].payload
+
+  free_mem = hv_info.get("memory_free", None)
+  if not isinstance(free_mem, int):
+    raise errors.OpPrereqError("Can't compute free memory on node %s, result"
+                               " was '%s'" % (node, free_mem),
+                               errors.ECODE_ENVIRON)
+  if requested > free_mem:
+    raise errors.OpPrereqError("Not enough memory on node %s for %s:"
+                               " needed %s MiB, available %s MiB" %
+                               (node, reason, requested, free_mem),
+                               errors.ECODE_NORES)
+  return free_mem
+
+
+def _CheckInstanceBridgesExist(lu, instance, node=None):
+  """Check that the brigdes needed by an instance exist.
+
+  """
+  if node is None:
+    node = instance.primary_node
+  _CheckNicsBridgesExist(lu, instance.nics, node)
+
+
+def _CheckNicsBridgesExist(lu, target_nics, target_node):
+  """Check that the brigdes needed by a list of nics exist.
+
+  """
+  cluster = lu.cfg.GetClusterInfo()
+  paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics]
+  brlist = [params[constants.NIC_LINK] for params in paramslist
+            if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
+  if brlist:
+    result = lu.rpc.call_bridges_exist(target_node, brlist)
+    result.Raise("Error checking bridges on destination node '%s'" %
+                 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)