Improve error message for replace-disks
[ganeti-github.git] / lib / cmdlib / instance_storage.py
index bfdee08..b1fea8b 100644 (file)
@@ -38,12 +38,12 @@ from ganeti import opcodes
 from ganeti import rpc
 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
 from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
-  _AnnotateDiskParams, _CheckIAllocatorOrNode, _ExpandNodeName, \
-  _CheckNodeOnline, _CheckInstanceNodeGroups, _CheckInstanceState, \
-  _IsExclusiveStorageEnabledNode, _FindFaultyInstanceDisks
-from ganeti.cmdlib.instance_utils import _GetInstanceInfoText, \
-  _CopyLockList, _ReleaseLocks, _CheckNodeVmCapable, \
-  _BuildInstanceHookEnvByObject, _CheckNodeNotDrained, _CheckTargetNodeIPolicy
+  AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeName, \
+  CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
+  IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks
+from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
+  CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
+  BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
 
 import ganeti.masterd.instance
 
@@ -65,8 +65,8 @@ _DISK_TEMPLATE_DEVICE_TYPE = {
   }
 
 
-def _CreateSingleBlockDev(lu, node, instance, device, info, force_open,
-                          excl_stor):
+def CreateSingleBlockDev(lu, node, instance, device, info, force_open,
+                         excl_stor):
   """Create a single block device on a given node.
 
   This will not recurse over children of the device, so they must be
@@ -146,8 +146,8 @@ def _CreateBlockDevInner(lu, node, instance, device, force_create,
     if not force_create:
       return created_devices
 
-    _CreateSingleBlockDev(lu, node, instance, device, info, force_open,
-                          excl_stor)
+    CreateSingleBlockDev(lu, node, instance, device, info, force_open,
+                         excl_stor)
     # The device has been completely created, so there is no point in keeping
     # its subdevices in the list. We just add the device itself instead.
     created_devices = [(node, device)]
@@ -160,7 +160,7 @@ def _CreateBlockDevInner(lu, node, instance, device, force_create,
     raise errors.DeviceCreationError(str(e), created_devices)
 
 
-def _IsExclusiveStorageEnabledNodeName(cfg, nodename):
+def IsExclusiveStorageEnabledNodeName(cfg, nodename):
   """Whether exclusive_storage is in effect for the given node.
 
   @type cfg: L{config.ConfigWriter}
@@ -176,7 +176,7 @@ def _IsExclusiveStorageEnabledNodeName(cfg, nodename):
   if ni is None:
     raise errors.OpPrereqError("Invalid node name %s" % nodename,
                                errors.ECODE_NOENT)
-  return _IsExclusiveStorageEnabledNode(cfg, ni)
+  return IsExclusiveStorageEnabledNode(cfg, ni)
 
 
 def _CreateBlockDev(lu, node, instance, device, force_create, info,
@@ -186,13 +186,32 @@ def _CreateBlockDev(lu, node, instance, device, force_create, info,
   This method annotates the root device first.
 
   """
-  (disk,) = _AnnotateDiskParams(instance, [device], lu.cfg)
-  excl_stor = _IsExclusiveStorageEnabledNodeName(lu.cfg, node)
+  (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
+  excl_stor = IsExclusiveStorageEnabledNodeName(lu.cfg, node)
   return _CreateBlockDevInner(lu, node, instance, disk, force_create, info,
                               force_open, excl_stor)
 
 
-def _CreateDisks(lu, instance, to_skip=None, target_node=None):
+def _UndoCreateDisks(lu, disks_created):
+  """Undo the work performed by L{CreateDisks}.
+
+  This function is called in case of an error to undo the work of
+  L{CreateDisks}.
+
+  @type lu: L{LogicalUnit}
+  @param lu: the logical unit on whose behalf we execute
+  @param disks_created: the result returned by L{CreateDisks}
+
+  """
+  for (node, disk) in disks_created:
+    lu.cfg.SetDiskID(disk, node)
+    result = lu.rpc.call_blockdev_remove(node, disk)
+    if result.fail_msg:
+      logging.warning("Failed to remove newly-created disk %s on node %s:"
+                      " %s", disk, node, result.fail_msg)
+
+
+def CreateDisks(lu, instance, to_skip=None, target_node=None, disks=None):
   """Create all disks for an instance.
 
   This abstracts away some work from AddInstance.
@@ -205,11 +224,15 @@ def _CreateDisks(lu, instance, to_skip=None, target_node=None):
   @param to_skip: list of indices to skip
   @type target_node: string
   @param target_node: if passed, overrides the target node for creation
-  @rtype: boolean
-  @return: the success of the creation
+  @type disks: list of {objects.Disk}
+  @param disks: the disks to create; if not specified, all the disks of the
+      instance are created
+  @return: information about the created disks, to be used to call
+      L{_UndoCreateDisks}
+  @raise errors.OpPrereqError: in case of error
 
   """
-  info = _GetInstanceInfoText(instance)
+  info = GetInstanceInfoText(instance)
   if target_node is None:
     pnode = instance.primary_node
     all_nodes = instance.all_nodes
@@ -217,6 +240,9 @@ def _CreateDisks(lu, instance, to_skip=None, target_node=None):
     pnode = target_node
     all_nodes = [pnode]
 
+  if disks is None:
+    disks = instance.disks
+
   if instance.disk_template in constants.DTS_FILEBASED:
     file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
     result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
@@ -225,35 +251,25 @@ def _CreateDisks(lu, instance, to_skip=None, target_node=None):
                  " node %s" % (file_storage_dir, pnode))
 
   disks_created = []
-  # Note: this needs to be kept in sync with adding of disks in
-  # LUInstanceSetParams
-  for idx, device in enumerate(instance.disks):
+  for idx, device in enumerate(disks):
     if to_skip and idx in to_skip:
       continue
     logging.info("Creating disk %s for instance '%s'", idx, instance.name)
-    #HARDCODE
     for node in all_nodes:
       f_create = node == pnode
       try:
         _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
         disks_created.append((node, device))
-      except errors.OpExecError:
-        logging.warning("Creating disk %s for instance '%s' failed",
-                        idx, instance.name)
       except errors.DeviceCreationError, e:
         logging.warning("Creating disk %s for instance '%s' failed",
                         idx, instance.name)
         disks_created.extend(e.created_devices)
-        for (node, disk) in disks_created:
-          lu.cfg.SetDiskID(disk, node)
-          result = lu.rpc.call_blockdev_remove(node, disk)
-          if result.fail_msg:
-            logging.warning("Failed to remove newly-created disk %s on node %s:"
-                            " %s", device, node, result.fail_msg)
+        _UndoCreateDisks(lu, disks_created)
         raise errors.OpExecError(e.message)
+  return disks_created
 
 
-def _ComputeDiskSizePerVG(disk_template, disks):
+def ComputeDiskSizePerVG(disk_template, disks):
   """Compute disk size requirements in the volume group
 
   """
@@ -285,7 +301,7 @@ def _ComputeDiskSizePerVG(disk_template, disks):
   return req_size_dict[disk_template]
 
 
-def _ComputeDisks(op, default_vg):
+def ComputeDisks(op, default_vg):
   """Computes the instance disks.
 
   @param op: The instance opcode
@@ -349,7 +365,7 @@ def _ComputeDisks(op, default_vg):
   return disks
 
 
-def _CheckRADOSFreeSpace():
+def CheckRADOSFreeSpace():
   """Compute disk size requirements inside the RADOS cluster.
 
   """
@@ -385,7 +401,7 @@ def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
   return drbd_dev
 
 
-def _GenerateDiskTemplate(
+def GenerateDiskTemplate(
   lu, template_name, instance_name, primary_node, secondary_nodes,
   disk_info, file_storage_dir, file_driver, base_index,
   feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
@@ -591,7 +607,7 @@ class LUInstanceRecreateDisks(LogicalUnit):
     # We don't want _CheckIAllocatorOrNode selecting the default iallocator
     # when neither iallocator nor nodes are specified
     if self.op.iallocator or self.op.nodes:
-      _CheckIAllocatorOrNode(self, "iallocator", "nodes")
+      CheckIAllocatorOrNode(self, "iallocator", "nodes")
 
     for (idx, params) in self.op.disks:
       utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
@@ -607,7 +623,7 @@ class LUInstanceRecreateDisks(LogicalUnit):
     self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
 
     if self.op.nodes:
-      self.op.nodes = [_ExpandNodeName(self.cfg, n) for n in self.op.nodes]
+      self.op.nodes = [ExpandNodeName(self.cfg, n) for n in self.op.nodes]
       self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
     else:
       self.needed_locks[locking.LEVEL_NODE] = []
@@ -652,7 +668,7 @@ class LUInstanceRecreateDisks(LogicalUnit):
     elif level == locking.LEVEL_NODE_RES:
       # Copy node locks
       self.needed_locks[locking.LEVEL_NODE_RES] = \
-        _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
+        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -660,7 +676,7 @@ class LUInstanceRecreateDisks(LogicalUnit):
     This runs on master, primary and secondary nodes of the instance.
 
     """
-    return _BuildInstanceHookEnvByObject(self, self.instance)
+    return BuildInstanceHookEnvByObject(self, self.instance)
 
   def BuildHooksNodes(self):
     """Build hooks nodes.
@@ -693,7 +709,7 @@ class LUInstanceRecreateDisks(LogicalUnit):
     else:
       primary_node = instance.primary_node
     if not self.op.iallocator:
-      _CheckNodeOnline(self, primary_node)
+      CheckNodeOnline(self, primary_node)
 
     if instance.disk_template == constants.DT_DISKLESS:
       raise errors.OpPrereqError("Instance '%s' has no disks" %
@@ -704,15 +720,15 @@ class LUInstanceRecreateDisks(LogicalUnit):
     if owned_groups:
       # Node group locks are acquired only for the primary node (and only
       # when the allocator is used)
-      _CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups,
-                               primary_only=True)
+      CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups,
+                              primary_only=True)
 
     # if we replace nodes *and* the old primary is offline, we don't
     # check the instance state
     old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
     if not ((self.op.iallocator or self.op.nodes) and old_pnode.offline):
-      _CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
-                          msg="cannot recreate disks")
+      CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
+                         msg="cannot recreate disks")
 
     if self.op.disks:
       self.disks = dict(self.op.disks)
@@ -735,9 +751,9 @@ class LUInstanceRecreateDisks(LogicalUnit):
     if self.op.iallocator:
       self._RunAllocator()
       # Release unneeded node and node resource locks
-      _ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes)
-      _ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes)
-      _ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
+      ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes)
+      ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes)
+      ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
 
     assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
 
@@ -800,7 +816,14 @@ class LUInstanceRecreateDisks(LogicalUnit):
     # All touched nodes must be locked
     mylocks = self.owned_locks(locking.LEVEL_NODE)
     assert mylocks.issuperset(frozenset(instance.all_nodes))
-    _CreateDisks(self, instance, to_skip=to_skip)
+    new_disks = CreateDisks(self, instance, to_skip=to_skip)
+
+    # TODO: Release node locks before wiping, or explain why it's not possible
+    if self.cfg.GetClusterInfo().prealloc_wipe_disks:
+      wipedisks = [(idx, disk, 0)
+                   for (idx, disk) in enumerate(instance.disks)
+                   if idx not in to_skip]
+      WipeOrCleanupDisks(self, instance, disks=wipedisks, cleanup=new_disks)
 
 
 def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
@@ -842,7 +865,7 @@ def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
                                  errors.ECODE_NORES)
 
 
-def _CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
+def CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
   """Checks if nodes have enough free disk space in all the VGs.
 
   This function checks if all given nodes have the needed amount of
@@ -895,7 +918,7 @@ def _CalcEta(time_taken, written, total_size):
   return (total_size - written) * avg_time
 
 
-def _WipeDisks(lu, instance, disks=None):
+def WipeDisks(lu, instance, disks=None):
   """Wipes instance disks.
 
   @type lu: L{LogicalUnit}
@@ -990,7 +1013,29 @@ def _WipeDisks(lu, instance, disks=None):
                         " failed", idx, instance.name)
 
 
-def _ExpandCheckDisks(instance, disks):
+def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
+  """Wrapper for L{WipeDisks} that handles errors.
+
+  @type lu: L{LogicalUnit}
+  @param lu: the logical unit on whose behalf we execute
+  @type instance: L{objects.Instance}
+  @param instance: the instance whose disks we should wipe
+  @param disks: see L{WipeDisks}
+  @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
+      case of error
+  @raise errors.OpPrereqError: in case of failure
+
+  """
+  try:
+    WipeDisks(lu, instance, disks=disks)
+  except errors.OpExecError:
+    logging.warning("Wiping disks for instance '%s' failed",
+                    instance.name)
+    _UndoCreateDisks(lu, cleanup)
+    raise
+
+
+def ExpandCheckDisks(instance, disks):
   """Return the instance disks selected by the disks list
 
   @type disks: list of L{objects.Disk} or None
@@ -1004,18 +1049,19 @@ def _ExpandCheckDisks(instance, disks):
   else:
     if not set(disks).issubset(instance.disks):
       raise errors.ProgrammerError("Can only act on disks belonging to the"
-                                   " target instance")
+                                   " target instance: expected a subset of %r,"
+                                   " got %r" % (instance.disks, disks))
     return disks
 
 
-def _WaitForSync(lu, instance, disks=None, oneshot=False):
+def WaitForSync(lu, instance, disks=None, oneshot=False):
   """Sleep and poll for an instance's disk to sync.
 
   """
   if not instance.disks or disks is not None and not disks:
     return True
 
-  disks = _ExpandCheckDisks(instance, disks)
+  disks = ExpandCheckDisks(instance, disks)
 
   if not oneshot:
     lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
@@ -1084,7 +1130,7 @@ def _WaitForSync(lu, instance, disks=None, oneshot=False):
   return not cumul_degraded
 
 
-def _ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
+def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
   """Shutdown block devices of an instance.
 
   This does the shutdown on all nodes of the instance.
@@ -1093,8 +1139,9 @@ def _ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
   ignored.
 
   """
+  lu.cfg.MarkInstanceDisksInactive(instance.name)
   all_result = True
-  disks = _ExpandCheckDisks(instance, disks)
+  disks = ExpandCheckDisks(instance, disks)
 
   for disk in disks:
     for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
@@ -1117,11 +1164,11 @@ def _SafeShutdownInstanceDisks(lu, instance, disks=None):
   _ShutdownInstanceDisks.
 
   """
-  _CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
-  _ShutdownInstanceDisks(lu, instance, disks=disks)
+  CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
+  ShutdownInstanceDisks(lu, instance, disks=disks)
 
 
-def _AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
+def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
                            ignore_size=False):
   """Prepare the block devices for an instance.
 
@@ -1148,7 +1195,7 @@ def _AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
   device_info = []
   disks_ok = True
   iname = instance.name
-  disks = _ExpandCheckDisks(instance, disks)
+  disks = ExpandCheckDisks(instance, disks)
 
   # With the two passes mechanism we try to reduce the window of
   # opportunity for the race condition of switching DRBD to primary
@@ -1159,6 +1206,10 @@ def _AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
   # into any other network-connected state (Connected, SyncTarget,
   # SyncSource, etc.)
 
+  # mark instance disks as active before doing actual work, so watcher does
+  # not try to shut them down erroneously
+  lu.cfg.MarkInstanceDisksActive(iname)
+
   # 1st pass, assemble on all nodes in secondary mode
   for idx, inst_disk in enumerate(disks):
     for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
@@ -1210,17 +1261,20 @@ def _AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
   for disk in disks:
     lu.cfg.SetDiskID(disk, instance.primary_node)
 
+  if not disks_ok:
+    lu.cfg.MarkInstanceDisksInactive(iname)
+
   return disks_ok, device_info
 
 
-def _StartInstanceDisks(lu, instance, force):
+def StartInstanceDisks(lu, instance, force):
   """Start the disks of an instance.
 
   """
-  disks_ok, _ = _AssembleInstanceDisks(lu, instance,
-                                       ignore_secondaries=force)
+  disks_ok, _ = AssembleInstanceDisks(lu, instance,
+                                      ignore_secondaries=force)
   if not disks_ok:
-    _ShutdownInstanceDisks(lu, instance)
+    ShutdownInstanceDisks(lu, instance)
     if force is not None and not force:
       lu.LogWarning("",
                     hint=("If the message above refers to a secondary node,"
@@ -1249,7 +1303,7 @@ class LUInstanceGrowDisk(LogicalUnit):
     elif level == locking.LEVEL_NODE_RES:
       # Copy node locks
       self.needed_locks[locking.LEVEL_NODE_RES] = \
-        _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
+        CopyLockList(self.needed_locks[locking.LEVEL_NODE])
 
   def BuildHooksEnv(self):
     """Build hooks env.
@@ -1262,7 +1316,7 @@ class LUInstanceGrowDisk(LogicalUnit):
       "AMOUNT": self.op.amount,
       "ABSOLUTE": self.op.absolute,
       }
-    env.update(_BuildInstanceHookEnvByObject(self, self.instance))
+    env.update(BuildInstanceHookEnvByObject(self, self.instance))
     return env
 
   def BuildHooksNodes(self):
@@ -1283,7 +1337,7 @@ class LUInstanceGrowDisk(LogicalUnit):
       "Cannot retrieve locked instance %s" % self.op.instance_name
     nodenames = list(instance.all_nodes)
     for node in nodenames:
-      _CheckNodeOnline(self, node)
+      CheckNodeOnline(self, node)
 
     self.instance = instance
 
@@ -1318,14 +1372,14 @@ class LUInstanceGrowDisk(LogicalUnit):
       # TODO: check the free disk space for file, when that feature will be
       # supported
       nodes = map(self.cfg.GetNodeInfo, nodenames)
-      es_nodes = filter(lambda n: _IsExclusiveStorageEnabledNode(self.cfg, n),
+      es_nodes = filter(lambda n: IsExclusiveStorageEnabledNode(self.cfg, n),
                         nodes)
       if es_nodes:
         # With exclusive storage we need to something smarter than just looking
         # at free space; for now, let's simply abort the operation.
         raise errors.OpPrereqError("Cannot grow disks when exclusive_storage"
                                    " is enabled", errors.ECODE_STATE)
-      _CheckNodesFreeDiskPerVG(self, nodenames, req_vgspace)
+      CheckNodesFreeDiskPerVG(self, nodenames, req_vgspace)
 
   def Exec(self, feedback_fn):
     """Execute disk grow.
@@ -1340,7 +1394,7 @@ class LUInstanceGrowDisk(LogicalUnit):
 
     wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
 
-    disks_ok, _ = _AssembleInstanceDisks(self, self.instance, disks=[disk])
+    disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[disk])
     if not disks_ok:
       raise errors.OpExecError("Cannot activate block device to grow")
 
@@ -1358,6 +1412,7 @@ class LUInstanceGrowDisk(LogicalUnit):
 
     if wipe_disks:
       # Get disk size from primary node for wiping
+      self.cfg.SetDiskID(disk, instance.primary_node)
       result = self.rpc.call_blockdev_getsize(instance.primary_node, [disk])
       result.Raise("Failed to retrieve disk size from node '%s'" %
                    instance.primary_node)
@@ -1395,7 +1450,7 @@ class LUInstanceGrowDisk(LogicalUnit):
     self.cfg.Update(instance, feedback_fn)
 
     # Changes have been recorded, release node lock
-    _ReleaseLocks(self, locking.LEVEL_NODE)
+    ReleaseLocks(self, locking.LEVEL_NODE)
 
     # Downgrade lock while waiting for sync
     self.glm.downgrade(locking.LEVEL_INSTANCE)
@@ -1406,17 +1461,17 @@ class LUInstanceGrowDisk(LogicalUnit):
       assert instance.disks[self.op.disk] == disk
 
       # Wipe newly added disk space
-      _WipeDisks(self, instance,
-                 disks=[(self.op.disk, disk, old_disk_size)])
+      WipeDisks(self, instance,
+                disks=[(self.op.disk, disk, old_disk_size)])
 
     if self.op.wait_for_sync:
-      disk_abort = not _WaitForSync(self, instance, disks=[disk])
+      disk_abort = not WaitForSync(self, instance, disks=[disk])
       if disk_abort:
         self.LogWarning("Disk syncing has not returned a good status; check"
                         " the instance")
-      if instance.admin_state != constants.ADMINST_UP:
+      if not instance.disks_active:
         _SafeShutdownInstanceDisks(self, instance, disks=[disk])
-    elif instance.admin_state != constants.ADMINST_UP:
+    elif not instance.disks_active:
       self.LogWarning("Not shutting down the disk even if the instance is"
                       " not supposed to be running because no wait for"
                       " sync mode was requested")
@@ -1445,7 +1500,7 @@ class LUInstanceReplaceDisks(LogicalUnit):
                                    " iallocator script must be used or the"
                                    " new node given", errors.ECODE_INVAL)
       else:
-        _CheckIAllocatorOrNode(self, "iallocator", "remote_node")
+        CheckIAllocatorOrNode(self, "iallocator", "remote_node")
 
     elif remote_node is not None or ialloc is not None:
       # Not replacing the secondary
@@ -1464,7 +1519,7 @@ class LUInstanceReplaceDisks(LogicalUnit):
       "Conflicting options"
 
     if self.op.remote_node is not None:
-      self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
+      self.op.remote_node = ExpandNodeName(self.cfg, self.op.remote_node)
 
       # Warning: do not remove the locking of the new secondary here
       # unless DRBD8.AddChildren is changed to work in parallel;
@@ -1535,7 +1590,7 @@ class LUInstanceReplaceDisks(LogicalUnit):
       "NEW_SECONDARY": self.op.remote_node,
       "OLD_SECONDARY": instance.secondary_nodes[0],
       }
-    env.update(_BuildInstanceHookEnvByObject(self, instance))
+    env.update(BuildInstanceHookEnvByObject(self, instance))
     return env
 
   def BuildHooksNodes(self):
@@ -1561,7 +1616,7 @@ class LUInstanceReplaceDisks(LogicalUnit):
     # Verify if node group locks are still correct
     owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
     if owned_groups:
-      _CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups)
+      CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups)
 
     return LogicalUnit.CheckPrereq(self)
 
@@ -1590,20 +1645,21 @@ class LUInstanceActivateDisks(NoHooksLU):
     self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
     assert self.instance is not None, \
       "Cannot retrieve locked instance %s" % self.op.instance_name
-    _CheckNodeOnline(self, self.instance.primary_node)
+    CheckNodeOnline(self, self.instance.primary_node)
 
   def Exec(self, feedback_fn):
     """Activate the disks.
 
     """
     disks_ok, disks_info = \
-              _AssembleInstanceDisks(self, self.instance,
-                                     ignore_size=self.op.ignore_size)
+              AssembleInstanceDisks(self, self.instance,
+                                    ignore_size=self.op.ignore_size)
     if not disks_ok:
       raise errors.OpExecError("Cannot activate block devices")
 
     if self.op.wait_for_sync:
-      if not _WaitForSync(self, self.instance):
+      if not WaitForSync(self, self.instance):
+        self.cfg.MarkInstanceDisksInactive(self.instance.name)
         raise errors.OpExecError("Some disks of the instance are degraded!")
 
     return disks_info
@@ -1640,7 +1696,7 @@ class LUInstanceDeactivateDisks(NoHooksLU):
     """
     instance = self.instance
     if self.op.force:
-      _ShutdownInstanceDisks(self, instance)
+      ShutdownInstanceDisks(self, instance)
     else:
       _SafeShutdownInstanceDisks(self, instance)
 
@@ -1683,11 +1739,11 @@ def _CheckDiskConsistencyInner(lu, instance, dev, node, on_primary,
   return result
 
 
-def _CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False):
+def CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False):
   """Wrapper around L{_CheckDiskConsistencyInner}.
 
   """
-  (disk,) = _AnnotateDiskParams(instance, [dev], lu.cfg)
+  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
   return _CheckDiskConsistencyInner(lu, instance, disk, node, on_primary,
                                     ldisk=ldisk)
 
@@ -1702,7 +1758,7 @@ def _BlockdevFind(lu, node, dev, instance):
   @returns The result of the rpc call
 
   """
-  (disk,) = _AnnotateDiskParams(instance, [dev], lu.cfg)
+  (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
   return lu.rpc.call_blockdev_find(node, disk)
 
 
@@ -1773,11 +1829,11 @@ class TLReplaceDisks(Tasklet):
     return remote_node_name
 
   def _FindFaultyDisks(self, node_name):
-    """Wrapper for L{_FindFaultyInstanceDisks}.
+    """Wrapper for L{FindFaultyInstanceDisks}.
 
     """
-    return _FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
-                                    node_name, True)
+    return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
+                                   node_name, True)
 
   def _CheckDisksActivated(self, instance):
     """Checks if the instance disks are activated.
@@ -1901,8 +1957,8 @@ class TLReplaceDisks(Tasklet):
         self.target_node = secondary_node
         check_nodes = [self.new_node, self.other_node]
 
-        _CheckNodeNotDrained(self.lu, remote_node)
-        _CheckNodeVmCapable(self.lu, remote_node)
+        CheckNodeNotDrained(self.lu, remote_node)
+        CheckNodeVmCapable(self.lu, remote_node)
 
         old_node_info = self.cfg.GetNodeInfo(secondary_node)
         assert old_node_info is not None
@@ -1928,11 +1984,11 @@ class TLReplaceDisks(Tasklet):
       cluster = self.cfg.GetClusterInfo()
       ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
                                                               new_group_info)
-      _CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info,
-                              self.cfg, ignore=self.ignore_ipolicy)
+      CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info,
+                             self.cfg, ignore=self.ignore_ipolicy)
 
     for node in check_nodes:
-      _CheckNodeOnline(self.lu, node)
+      CheckNodeOnline(self.lu, node)
 
     touched_nodes = frozenset(node_name for node_name in [self.new_node,
                                                           self.other_node,
@@ -1940,12 +1996,12 @@ class TLReplaceDisks(Tasklet):
                               if node_name is not None)
 
     # Release unneeded node and node resource locks
-    _ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
-    _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
-    _ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
+    ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
+    ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
+    ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
 
     # Release any owned node group
-    _ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
+    ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
 
     # Check whether disks are valid
     for disk_idx in self.disks:
@@ -1989,11 +2045,11 @@ class TLReplaceDisks(Tasklet):
     feedback_fn("Current seconary node: %s" %
                 utils.CommaJoin(self.instance.secondary_nodes))
 
-    activate_disks = (self.instance.admin_state != constants.ADMINST_UP)
+    activate_disks = not self.instance.disks_active
 
     # Activate the instance disks if we're replacing them on a down instance
     if activate_disks:
-      _StartInstanceDisks(self.lu, self.instance, True)
+      StartInstanceDisks(self.lu, self.instance, True)
 
     try:
       # Should we replace the secondary node?
@@ -2055,8 +2111,14 @@ class TLReplaceDisks(Tasklet):
         if msg or not result.payload:
           if not msg:
             msg = "disk not found"
-          raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
-                                   (idx, node, msg))
+          if not self._CheckDisksActivated(self.instance):
+            extra_hint = ("\nDisks seem to be not properly activated. Try"
+                          " running activate-disks on the instance before"
+                          " using replace-disks.")
+          else:
+            extra_hint = ""
+          raise errors.OpExecError("Can't find disk/%d on node %s: %s%s" %
+                                   (idx, node, msg, extra_hint))
 
   def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
     for idx, dev in enumerate(self.instance.disks):
@@ -2066,8 +2128,8 @@ class TLReplaceDisks(Tasklet):
       self.lu.LogInfo("Checking disk/%d consistency on node %s" %
                       (idx, node_name))
 
-      if not _CheckDiskConsistency(self.lu, self.instance, dev, node_name,
-                                   on_primary, ldisk=ldisk):
+      if not CheckDiskConsistency(self.lu, self.instance, dev, node_name,
+                                  on_primary, ldisk=ldisk):
         raise errors.OpExecError("Node %s has degraded storage, unsafe to"
                                  " replace disks for instance %s" %
                                  (node_name, self.instance.name))
@@ -2081,7 +2143,7 @@ class TLReplaceDisks(Tasklet):
     """
     iv_names = {}
 
-    disks = _AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
+    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
     for idx, dev in enumerate(disks):
       if idx not in self.disks:
         continue
@@ -2107,13 +2169,16 @@ class TLReplaceDisks(Tasklet):
       new_lvs = [lv_data, lv_meta]
       old_lvs = [child.Copy() for child in dev.children]
       iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
-      excl_stor = _IsExclusiveStorageEnabledNodeName(self.lu.cfg, node_name)
+      excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, node_name)
 
       # we pass force_create=True to force the LVM creation
       for new_lv in new_lvs:
-        _CreateBlockDevInner(self.lu, node_name, self.instance, new_lv, True,
-                             _GetInstanceInfoText(self.instance), False,
-                             excl_stor)
+        try:
+          _CreateBlockDevInner(self.lu, node_name, self.instance, new_lv, True,
+                               GetInstanceInfoText(self.instance), False,
+                               excl_stor)
+        except errors.DeviceCreationError, e:
+          raise errors.OpExecError("Can't create block device: %s" % e.message)
 
     return iv_names
 
@@ -2261,14 +2326,14 @@ class TLReplaceDisks(Tasklet):
       self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
       self._RemoveOldStorage(self.target_node, iv_names)
       # TODO: Check if releasing locks early still makes sense
-      _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
+      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
     else:
       # Release all resource locks except those used by the instance
-      _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
-                    keep=self.node_secondary_ip.keys())
+      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
+                   keep=self.node_secondary_ip.keys())
 
     # Release all node locks while waiting for sync
-    _ReleaseLocks(self.lu, locking.LEVEL_NODE)
+    ReleaseLocks(self.lu, locking.LEVEL_NODE)
 
     # TODO: Can the instance lock be downgraded here? Take the optional disk
     # shutdown in the caller into consideration.
@@ -2277,7 +2342,7 @@ class TLReplaceDisks(Tasklet):
     # This can fail as the old devices are degraded and _WaitForSync
     # does a combined result over all disks, so we don't check its return value
     self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
-    _WaitForSync(self.lu, self.instance)
+    WaitForSync(self.lu, self.instance)
 
     # Check all devices manually
     self._CheckDevices(self.instance.primary_node, iv_names)
@@ -2321,16 +2386,19 @@ class TLReplaceDisks(Tasklet):
 
     # Step: create new storage
     self.lu.LogStep(3, steps_total, "Allocate new storage")
-    disks = _AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
-    excl_stor = _IsExclusiveStorageEnabledNodeName(self.lu.cfg, self.new_node)
+    disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
+    excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, self.new_node)
     for idx, dev in enumerate(disks):
       self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
                       (self.new_node, idx))
       # we pass force_create=True to force LVM creation
       for new_lv in dev.children:
-        _CreateBlockDevInner(self.lu, self.new_node, self.instance, new_lv,
-                             True, _GetInstanceInfoText(self.instance), False,
-                             excl_stor)
+        try:
+          _CreateBlockDevInner(self.lu, self.new_node, self.instance, new_lv,
+                               True, GetInstanceInfoText(self.instance), False,
+                               excl_stor)
+        except errors.DeviceCreationError, e:
+          raise errors.OpExecError("Can't create block device: %s" % e.message)
 
     # Step 4: dbrd minors and drbd setups changes
     # after this, we must manually remove the drbd minors on both the
@@ -2369,13 +2437,13 @@ class TLReplaceDisks(Tasklet):
                               children=dev.children,
                               size=dev.size,
                               params={})
-      (anno_new_drbd,) = _AnnotateDiskParams(self.instance, [new_drbd],
-                                             self.cfg)
+      (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
+                                            self.cfg)
       try:
-        _CreateSingleBlockDev(self.lu, self.new_node, self.instance,
-                              anno_new_drbd,
-                              _GetInstanceInfoText(self.instance), False,
-                              excl_stor)
+        CreateSingleBlockDev(self.lu, self.new_node, self.instance,
+                             anno_new_drbd,
+                             GetInstanceInfoText(self.instance), False,
+                             excl_stor)
       except errors.GenericError:
         self.cfg.ReleaseDRBDMinors(self.instance.name)
         raise
@@ -2413,7 +2481,7 @@ class TLReplaceDisks(Tasklet):
     self.cfg.Update(self.instance, feedback_fn)
 
     # Release all node locks (the configuration has been updated)
-    _ReleaseLocks(self.lu, locking.LEVEL_NODE)
+    ReleaseLocks(self.lu, locking.LEVEL_NODE)
 
     # and now perform the drbd attach
     self.lu.LogInfo("Attaching primary drbds to new secondary"
@@ -2438,11 +2506,11 @@ class TLReplaceDisks(Tasklet):
       self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
       self._RemoveOldStorage(self.target_node, iv_names)
       # TODO: Check if releasing locks early still makes sense
-      _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
+      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
     else:
       # Release all resource locks except those used by the instance
-      _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
-                    keep=self.node_secondary_ip.keys())
+      ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
+                   keep=self.node_secondary_ip.keys())
 
     # TODO: Can the instance lock be downgraded here? Take the optional disk
     # shutdown in the caller into consideration.
@@ -2451,7 +2519,7 @@ class TLReplaceDisks(Tasklet):
     # This can fail as the old devices are degraded and _WaitForSync
     # does a combined result over all disks, so we don't check its return value
     self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
-    _WaitForSync(self.lu, self.instance)
+    WaitForSync(self.lu, self.instance)
 
     # Check all devices manually
     self._CheckDevices(self.instance.primary_node, iv_names)