from ganeti import rpc
from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
- _AnnotateDiskParams, _CheckIAllocatorOrNode, _ExpandNodeName, \
- _CheckNodeOnline, _CheckInstanceNodeGroups, _CheckInstanceState, \
- _IsExclusiveStorageEnabledNode, _FindFaultyInstanceDisks
-from ganeti.cmdlib.instance_utils import _GetInstanceInfoText, \
- _CopyLockList, _ReleaseLocks, _CheckNodeVmCapable, \
- _BuildInstanceHookEnvByObject, _CheckNodeNotDrained, _CheckTargetNodeIPolicy
+ AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeName, \
+ CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
+ IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks
+from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
+ CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
+ BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
import ganeti.masterd.instance
}
-def _CreateSingleBlockDev(lu, node, instance, device, info, force_open,
- excl_stor):
+def CreateSingleBlockDev(lu, node, instance, device, info, force_open,
+ excl_stor):
"""Create a single block device on a given node.
This will not recurse over children of the device, so they must be
if not force_create:
return created_devices
- _CreateSingleBlockDev(lu, node, instance, device, info, force_open,
- excl_stor)
+ CreateSingleBlockDev(lu, node, instance, device, info, force_open,
+ excl_stor)
# The device has been completely created, so there is no point in keeping
# its subdevices in the list. We just add the device itself instead.
created_devices = [(node, device)]
raise errors.DeviceCreationError(str(e), created_devices)
-def _IsExclusiveStorageEnabledNodeName(cfg, nodename):
+def IsExclusiveStorageEnabledNodeName(cfg, nodename):
"""Whether exclusive_storage is in effect for the given node.
@type cfg: L{config.ConfigWriter}
if ni is None:
raise errors.OpPrereqError("Invalid node name %s" % nodename,
errors.ECODE_NOENT)
- return _IsExclusiveStorageEnabledNode(cfg, ni)
+ return IsExclusiveStorageEnabledNode(cfg, ni)
def _CreateBlockDev(lu, node, instance, device, force_create, info,
This method annotates the root device first.
"""
- (disk,) = _AnnotateDiskParams(instance, [device], lu.cfg)
- excl_stor = _IsExclusiveStorageEnabledNodeName(lu.cfg, node)
+ (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
+ excl_stor = IsExclusiveStorageEnabledNodeName(lu.cfg, node)
return _CreateBlockDevInner(lu, node, instance, disk, force_create, info,
force_open, excl_stor)
-def _CreateDisks(lu, instance, to_skip=None, target_node=None):
+def _UndoCreateDisks(lu, disks_created):
+ """Undo the work performed by L{CreateDisks}.
+
+ This function is called in case of an error to undo the work of
+ L{CreateDisks}.
+
+ @type lu: L{LogicalUnit}
+ @param lu: the logical unit on whose behalf we execute
+ @param disks_created: the result returned by L{CreateDisks}
+
+ """
+ for (node, disk) in disks_created:
+ lu.cfg.SetDiskID(disk, node)
+ result = lu.rpc.call_blockdev_remove(node, disk)
+ if result.fail_msg:
+ logging.warning("Failed to remove newly-created disk %s on node %s:"
+ " %s", disk, node, result.fail_msg)
+
+
+def CreateDisks(lu, instance, to_skip=None, target_node=None, disks=None):
"""Create all disks for an instance.
This abstracts away some work from AddInstance.
@param to_skip: list of indices to skip
@type target_node: string
@param target_node: if passed, overrides the target node for creation
- @rtype: boolean
- @return: the success of the creation
+ @type disks: list of {objects.Disk}
+ @param disks: the disks to create; if not specified, all the disks of the
+ instance are created
+ @return: information about the created disks, to be used to call
+ L{_UndoCreateDisks}
+ @raise errors.OpPrereqError: in case of error
"""
- info = _GetInstanceInfoText(instance)
+ info = GetInstanceInfoText(instance)
if target_node is None:
pnode = instance.primary_node
all_nodes = instance.all_nodes
pnode = target_node
all_nodes = [pnode]
+ if disks is None:
+ disks = instance.disks
+
if instance.disk_template in constants.DTS_FILEBASED:
file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
" node %s" % (file_storage_dir, pnode))
disks_created = []
- # Note: this needs to be kept in sync with adding of disks in
- # LUInstanceSetParams
- for idx, device in enumerate(instance.disks):
+ for idx, device in enumerate(disks):
if to_skip and idx in to_skip:
continue
logging.info("Creating disk %s for instance '%s'", idx, instance.name)
- #HARDCODE
for node in all_nodes:
f_create = node == pnode
try:
_CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
disks_created.append((node, device))
- except errors.OpExecError:
- logging.warning("Creating disk %s for instance '%s' failed",
- idx, instance.name)
except errors.DeviceCreationError, e:
logging.warning("Creating disk %s for instance '%s' failed",
idx, instance.name)
disks_created.extend(e.created_devices)
- for (node, disk) in disks_created:
- lu.cfg.SetDiskID(disk, node)
- result = lu.rpc.call_blockdev_remove(node, disk)
- if result.fail_msg:
- logging.warning("Failed to remove newly-created disk %s on node %s:"
- " %s", device, node, result.fail_msg)
+ _UndoCreateDisks(lu, disks_created)
raise errors.OpExecError(e.message)
+ return disks_created
-def _ComputeDiskSizePerVG(disk_template, disks):
+def ComputeDiskSizePerVG(disk_template, disks):
"""Compute disk size requirements in the volume group
"""
return req_size_dict[disk_template]
-def _ComputeDisks(op, default_vg):
+def ComputeDisks(op, default_vg):
"""Computes the instance disks.
@param op: The instance opcode
return disks
-def _CheckRADOSFreeSpace():
+def CheckRADOSFreeSpace():
"""Compute disk size requirements inside the RADOS cluster.
"""
return drbd_dev
-def _GenerateDiskTemplate(
+def GenerateDiskTemplate(
lu, template_name, instance_name, primary_node, secondary_nodes,
disk_info, file_storage_dir, file_driver, base_index,
feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
# We don't want _CheckIAllocatorOrNode selecting the default iallocator
# when neither iallocator nor nodes are specified
if self.op.iallocator or self.op.nodes:
- _CheckIAllocatorOrNode(self, "iallocator", "nodes")
+ CheckIAllocatorOrNode(self, "iallocator", "nodes")
for (idx, params) in self.op.disks:
utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
if self.op.nodes:
- self.op.nodes = [_ExpandNodeName(self.cfg, n) for n in self.op.nodes]
+ self.op.nodes = [ExpandNodeName(self.cfg, n) for n in self.op.nodes]
self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
else:
self.needed_locks[locking.LEVEL_NODE] = []
elif level == locking.LEVEL_NODE_RES:
# Copy node locks
self.needed_locks[locking.LEVEL_NODE_RES] = \
- _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
+ CopyLockList(self.needed_locks[locking.LEVEL_NODE])
def BuildHooksEnv(self):
"""Build hooks env.
This runs on master, primary and secondary nodes of the instance.
"""
- return _BuildInstanceHookEnvByObject(self, self.instance)
+ return BuildInstanceHookEnvByObject(self, self.instance)
def BuildHooksNodes(self):
"""Build hooks nodes.
else:
primary_node = instance.primary_node
if not self.op.iallocator:
- _CheckNodeOnline(self, primary_node)
+ CheckNodeOnline(self, primary_node)
if instance.disk_template == constants.DT_DISKLESS:
raise errors.OpPrereqError("Instance '%s' has no disks" %
if owned_groups:
# Node group locks are acquired only for the primary node (and only
# when the allocator is used)
- _CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups,
- primary_only=True)
+ CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups,
+ primary_only=True)
# if we replace nodes *and* the old primary is offline, we don't
# check the instance state
old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
if not ((self.op.iallocator or self.op.nodes) and old_pnode.offline):
- _CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
- msg="cannot recreate disks")
+ CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
+ msg="cannot recreate disks")
if self.op.disks:
self.disks = dict(self.op.disks)
if self.op.iallocator:
self._RunAllocator()
# Release unneeded node and node resource locks
- _ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes)
- _ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes)
- _ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
+ ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes)
+ ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes)
+ ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
# All touched nodes must be locked
mylocks = self.owned_locks(locking.LEVEL_NODE)
assert mylocks.issuperset(frozenset(instance.all_nodes))
- _CreateDisks(self, instance, to_skip=to_skip)
+ new_disks = CreateDisks(self, instance, to_skip=to_skip)
+
+ # TODO: Release node locks before wiping, or explain why it's not possible
+ if self.cfg.GetClusterInfo().prealloc_wipe_disks:
+ wipedisks = [(idx, disk, 0)
+ for (idx, disk) in enumerate(instance.disks)
+ if idx not in to_skip]
+ WipeOrCleanupDisks(self, instance, disks=wipedisks, cleanup=new_disks)
def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
errors.ECODE_NORES)
-def _CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
+def CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
"""Checks if nodes have enough free disk space in all the VGs.
This function checks if all given nodes have the needed amount of
return (total_size - written) * avg_time
-def _WipeDisks(lu, instance, disks=None):
+def WipeDisks(lu, instance, disks=None):
"""Wipes instance disks.
@type lu: L{LogicalUnit}
" failed", idx, instance.name)
-def _ExpandCheckDisks(instance, disks):
+def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
+ """Wrapper for L{WipeDisks} that handles errors.
+
+ @type lu: L{LogicalUnit}
+ @param lu: the logical unit on whose behalf we execute
+ @type instance: L{objects.Instance}
+ @param instance: the instance whose disks we should wipe
+ @param disks: see L{WipeDisks}
+ @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
+ case of error
+ @raise errors.OpPrereqError: in case of failure
+
+ """
+ try:
+ WipeDisks(lu, instance, disks=disks)
+ except errors.OpExecError:
+ logging.warning("Wiping disks for instance '%s' failed",
+ instance.name)
+ _UndoCreateDisks(lu, cleanup)
+ raise
+
+
+def ExpandCheckDisks(instance, disks):
"""Return the instance disks selected by the disks list
@type disks: list of L{objects.Disk} or None
else:
if not set(disks).issubset(instance.disks):
raise errors.ProgrammerError("Can only act on disks belonging to the"
- " target instance")
+ " target instance: expected a subset of %r,"
+ " got %r" % (instance.disks, disks))
return disks
-def _WaitForSync(lu, instance, disks=None, oneshot=False):
+def WaitForSync(lu, instance, disks=None, oneshot=False):
"""Sleep and poll for an instance's disk to sync.
"""
if not instance.disks or disks is not None and not disks:
return True
- disks = _ExpandCheckDisks(instance, disks)
+ disks = ExpandCheckDisks(instance, disks)
if not oneshot:
lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
return not cumul_degraded
-def _ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
+def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
"""Shutdown block devices of an instance.
This does the shutdown on all nodes of the instance.
ignored.
"""
+ lu.cfg.MarkInstanceDisksInactive(instance.name)
all_result = True
- disks = _ExpandCheckDisks(instance, disks)
+ disks = ExpandCheckDisks(instance, disks)
for disk in disks:
for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
_ShutdownInstanceDisks.
"""
- _CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
- _ShutdownInstanceDisks(lu, instance, disks=disks)
+ CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
+ ShutdownInstanceDisks(lu, instance, disks=disks)
-def _AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
+def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
ignore_size=False):
"""Prepare the block devices for an instance.
device_info = []
disks_ok = True
iname = instance.name
- disks = _ExpandCheckDisks(instance, disks)
+ disks = ExpandCheckDisks(instance, disks)
# With the two passes mechanism we try to reduce the window of
# opportunity for the race condition of switching DRBD to primary
# into any other network-connected state (Connected, SyncTarget,
# SyncSource, etc.)
+ # mark instance disks as active before doing actual work, so watcher does
+ # not try to shut them down erroneously
+ lu.cfg.MarkInstanceDisksActive(iname)
+
# 1st pass, assemble on all nodes in secondary mode
for idx, inst_disk in enumerate(disks):
for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
for disk in disks:
lu.cfg.SetDiskID(disk, instance.primary_node)
+ if not disks_ok:
+ lu.cfg.MarkInstanceDisksInactive(iname)
+
return disks_ok, device_info
-def _StartInstanceDisks(lu, instance, force):
+def StartInstanceDisks(lu, instance, force):
"""Start the disks of an instance.
"""
- disks_ok, _ = _AssembleInstanceDisks(lu, instance,
- ignore_secondaries=force)
+ disks_ok, _ = AssembleInstanceDisks(lu, instance,
+ ignore_secondaries=force)
if not disks_ok:
- _ShutdownInstanceDisks(lu, instance)
+ ShutdownInstanceDisks(lu, instance)
if force is not None and not force:
lu.LogWarning("",
hint=("If the message above refers to a secondary node,"
elif level == locking.LEVEL_NODE_RES:
# Copy node locks
self.needed_locks[locking.LEVEL_NODE_RES] = \
- _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
+ CopyLockList(self.needed_locks[locking.LEVEL_NODE])
def BuildHooksEnv(self):
"""Build hooks env.
"AMOUNT": self.op.amount,
"ABSOLUTE": self.op.absolute,
}
- env.update(_BuildInstanceHookEnvByObject(self, self.instance))
+ env.update(BuildInstanceHookEnvByObject(self, self.instance))
return env
def BuildHooksNodes(self):
"Cannot retrieve locked instance %s" % self.op.instance_name
nodenames = list(instance.all_nodes)
for node in nodenames:
- _CheckNodeOnline(self, node)
+ CheckNodeOnline(self, node)
self.instance = instance
# TODO: check the free disk space for file, when that feature will be
# supported
nodes = map(self.cfg.GetNodeInfo, nodenames)
- es_nodes = filter(lambda n: _IsExclusiveStorageEnabledNode(self.cfg, n),
+ es_nodes = filter(lambda n: IsExclusiveStorageEnabledNode(self.cfg, n),
nodes)
if es_nodes:
# With exclusive storage we need to something smarter than just looking
# at free space; for now, let's simply abort the operation.
raise errors.OpPrereqError("Cannot grow disks when exclusive_storage"
" is enabled", errors.ECODE_STATE)
- _CheckNodesFreeDiskPerVG(self, nodenames, req_vgspace)
+ CheckNodesFreeDiskPerVG(self, nodenames, req_vgspace)
def Exec(self, feedback_fn):
"""Execute disk grow.
wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
- disks_ok, _ = _AssembleInstanceDisks(self, self.instance, disks=[disk])
+ disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[disk])
if not disks_ok:
raise errors.OpExecError("Cannot activate block device to grow")
if wipe_disks:
# Get disk size from primary node for wiping
+ self.cfg.SetDiskID(disk, instance.primary_node)
result = self.rpc.call_blockdev_getsize(instance.primary_node, [disk])
result.Raise("Failed to retrieve disk size from node '%s'" %
instance.primary_node)
self.cfg.Update(instance, feedback_fn)
# Changes have been recorded, release node lock
- _ReleaseLocks(self, locking.LEVEL_NODE)
+ ReleaseLocks(self, locking.LEVEL_NODE)
# Downgrade lock while waiting for sync
self.glm.downgrade(locking.LEVEL_INSTANCE)
assert instance.disks[self.op.disk] == disk
# Wipe newly added disk space
- _WipeDisks(self, instance,
- disks=[(self.op.disk, disk, old_disk_size)])
+ WipeDisks(self, instance,
+ disks=[(self.op.disk, disk, old_disk_size)])
if self.op.wait_for_sync:
- disk_abort = not _WaitForSync(self, instance, disks=[disk])
+ disk_abort = not WaitForSync(self, instance, disks=[disk])
if disk_abort:
self.LogWarning("Disk syncing has not returned a good status; check"
" the instance")
- if instance.admin_state != constants.ADMINST_UP:
+ if not instance.disks_active:
_SafeShutdownInstanceDisks(self, instance, disks=[disk])
- elif instance.admin_state != constants.ADMINST_UP:
+ elif not instance.disks_active:
self.LogWarning("Not shutting down the disk even if the instance is"
" not supposed to be running because no wait for"
" sync mode was requested")
" iallocator script must be used or the"
" new node given", errors.ECODE_INVAL)
else:
- _CheckIAllocatorOrNode(self, "iallocator", "remote_node")
+ CheckIAllocatorOrNode(self, "iallocator", "remote_node")
elif remote_node is not None or ialloc is not None:
# Not replacing the secondary
"Conflicting options"
if self.op.remote_node is not None:
- self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
+ self.op.remote_node = ExpandNodeName(self.cfg, self.op.remote_node)
# Warning: do not remove the locking of the new secondary here
# unless DRBD8.AddChildren is changed to work in parallel;
"NEW_SECONDARY": self.op.remote_node,
"OLD_SECONDARY": instance.secondary_nodes[0],
}
- env.update(_BuildInstanceHookEnvByObject(self, instance))
+ env.update(BuildInstanceHookEnvByObject(self, instance))
return env
def BuildHooksNodes(self):
# Verify if node group locks are still correct
owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
if owned_groups:
- _CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups)
+ CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups)
return LogicalUnit.CheckPrereq(self)
self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
assert self.instance is not None, \
"Cannot retrieve locked instance %s" % self.op.instance_name
- _CheckNodeOnline(self, self.instance.primary_node)
+ CheckNodeOnline(self, self.instance.primary_node)
def Exec(self, feedback_fn):
"""Activate the disks.
"""
disks_ok, disks_info = \
- _AssembleInstanceDisks(self, self.instance,
- ignore_size=self.op.ignore_size)
+ AssembleInstanceDisks(self, self.instance,
+ ignore_size=self.op.ignore_size)
if not disks_ok:
raise errors.OpExecError("Cannot activate block devices")
if self.op.wait_for_sync:
- if not _WaitForSync(self, self.instance):
+ if not WaitForSync(self, self.instance):
+ self.cfg.MarkInstanceDisksInactive(self.instance.name)
raise errors.OpExecError("Some disks of the instance are degraded!")
return disks_info
"""
instance = self.instance
if self.op.force:
- _ShutdownInstanceDisks(self, instance)
+ ShutdownInstanceDisks(self, instance)
else:
_SafeShutdownInstanceDisks(self, instance)
return result
-def _CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False):
+def CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False):
"""Wrapper around L{_CheckDiskConsistencyInner}.
"""
- (disk,) = _AnnotateDiskParams(instance, [dev], lu.cfg)
+ (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
return _CheckDiskConsistencyInner(lu, instance, disk, node, on_primary,
ldisk=ldisk)
@returns The result of the rpc call
"""
- (disk,) = _AnnotateDiskParams(instance, [dev], lu.cfg)
+ (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
return lu.rpc.call_blockdev_find(node, disk)
return remote_node_name
def _FindFaultyDisks(self, node_name):
- """Wrapper for L{_FindFaultyInstanceDisks}.
+ """Wrapper for L{FindFaultyInstanceDisks}.
"""
- return _FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
- node_name, True)
+ return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
+ node_name, True)
def _CheckDisksActivated(self, instance):
"""Checks if the instance disks are activated.
self.target_node = secondary_node
check_nodes = [self.new_node, self.other_node]
- _CheckNodeNotDrained(self.lu, remote_node)
- _CheckNodeVmCapable(self.lu, remote_node)
+ CheckNodeNotDrained(self.lu, remote_node)
+ CheckNodeVmCapable(self.lu, remote_node)
old_node_info = self.cfg.GetNodeInfo(secondary_node)
assert old_node_info is not None
cluster = self.cfg.GetClusterInfo()
ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
new_group_info)
- _CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info,
- self.cfg, ignore=self.ignore_ipolicy)
+ CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info,
+ self.cfg, ignore=self.ignore_ipolicy)
for node in check_nodes:
- _CheckNodeOnline(self.lu, node)
+ CheckNodeOnline(self.lu, node)
touched_nodes = frozenset(node_name for node_name in [self.new_node,
self.other_node,
if node_name is not None)
# Release unneeded node and node resource locks
- _ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
- _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
- _ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
+ ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
+ ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
+ ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
# Release any owned node group
- _ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
+ ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
# Check whether disks are valid
for disk_idx in self.disks:
feedback_fn("Current seconary node: %s" %
utils.CommaJoin(self.instance.secondary_nodes))
- activate_disks = (self.instance.admin_state != constants.ADMINST_UP)
+ activate_disks = not self.instance.disks_active
# Activate the instance disks if we're replacing them on a down instance
if activate_disks:
- _StartInstanceDisks(self.lu, self.instance, True)
+ StartInstanceDisks(self.lu, self.instance, True)
try:
# Should we replace the secondary node?
if msg or not result.payload:
if not msg:
msg = "disk not found"
- raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
- (idx, node, msg))
+ if not self._CheckDisksActivated(self.instance):
+ extra_hint = ("\nDisks seem to be not properly activated. Try"
+ " running activate-disks on the instance before"
+ " using replace-disks.")
+ else:
+ extra_hint = ""
+ raise errors.OpExecError("Can't find disk/%d on node %s: %s%s" %
+ (idx, node, msg, extra_hint))
def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
for idx, dev in enumerate(self.instance.disks):
self.lu.LogInfo("Checking disk/%d consistency on node %s" %
(idx, node_name))
- if not _CheckDiskConsistency(self.lu, self.instance, dev, node_name,
- on_primary, ldisk=ldisk):
+ if not CheckDiskConsistency(self.lu, self.instance, dev, node_name,
+ on_primary, ldisk=ldisk):
raise errors.OpExecError("Node %s has degraded storage, unsafe to"
" replace disks for instance %s" %
(node_name, self.instance.name))
"""
iv_names = {}
- disks = _AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
+ disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
for idx, dev in enumerate(disks):
if idx not in self.disks:
continue
new_lvs = [lv_data, lv_meta]
old_lvs = [child.Copy() for child in dev.children]
iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
- excl_stor = _IsExclusiveStorageEnabledNodeName(self.lu.cfg, node_name)
+ excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, node_name)
# we pass force_create=True to force the LVM creation
for new_lv in new_lvs:
- _CreateBlockDevInner(self.lu, node_name, self.instance, new_lv, True,
- _GetInstanceInfoText(self.instance), False,
- excl_stor)
+ try:
+ _CreateBlockDevInner(self.lu, node_name, self.instance, new_lv, True,
+ GetInstanceInfoText(self.instance), False,
+ excl_stor)
+ except errors.DeviceCreationError, e:
+ raise errors.OpExecError("Can't create block device: %s" % e.message)
return iv_names
self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
self._RemoveOldStorage(self.target_node, iv_names)
# TODO: Check if releasing locks early still makes sense
- _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
+ ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
else:
# Release all resource locks except those used by the instance
- _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
- keep=self.node_secondary_ip.keys())
+ ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
+ keep=self.node_secondary_ip.keys())
# Release all node locks while waiting for sync
- _ReleaseLocks(self.lu, locking.LEVEL_NODE)
+ ReleaseLocks(self.lu, locking.LEVEL_NODE)
# TODO: Can the instance lock be downgraded here? Take the optional disk
# shutdown in the caller into consideration.
# This can fail as the old devices are degraded and _WaitForSync
# does a combined result over all disks, so we don't check its return value
self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
- _WaitForSync(self.lu, self.instance)
+ WaitForSync(self.lu, self.instance)
# Check all devices manually
self._CheckDevices(self.instance.primary_node, iv_names)
# Step: create new storage
self.lu.LogStep(3, steps_total, "Allocate new storage")
- disks = _AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
- excl_stor = _IsExclusiveStorageEnabledNodeName(self.lu.cfg, self.new_node)
+ disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
+ excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, self.new_node)
for idx, dev in enumerate(disks):
self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
(self.new_node, idx))
# we pass force_create=True to force LVM creation
for new_lv in dev.children:
- _CreateBlockDevInner(self.lu, self.new_node, self.instance, new_lv,
- True, _GetInstanceInfoText(self.instance), False,
- excl_stor)
+ try:
+ _CreateBlockDevInner(self.lu, self.new_node, self.instance, new_lv,
+ True, GetInstanceInfoText(self.instance), False,
+ excl_stor)
+ except errors.DeviceCreationError, e:
+ raise errors.OpExecError("Can't create block device: %s" % e.message)
# Step 4: dbrd minors and drbd setups changes
# after this, we must manually remove the drbd minors on both the
children=dev.children,
size=dev.size,
params={})
- (anno_new_drbd,) = _AnnotateDiskParams(self.instance, [new_drbd],
- self.cfg)
+ (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
+ self.cfg)
try:
- _CreateSingleBlockDev(self.lu, self.new_node, self.instance,
- anno_new_drbd,
- _GetInstanceInfoText(self.instance), False,
- excl_stor)
+ CreateSingleBlockDev(self.lu, self.new_node, self.instance,
+ anno_new_drbd,
+ GetInstanceInfoText(self.instance), False,
+ excl_stor)
except errors.GenericError:
self.cfg.ReleaseDRBDMinors(self.instance.name)
raise
self.cfg.Update(self.instance, feedback_fn)
# Release all node locks (the configuration has been updated)
- _ReleaseLocks(self.lu, locking.LEVEL_NODE)
+ ReleaseLocks(self.lu, locking.LEVEL_NODE)
# and now perform the drbd attach
self.lu.LogInfo("Attaching primary drbds to new secondary"
self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
self._RemoveOldStorage(self.target_node, iv_names)
# TODO: Check if releasing locks early still makes sense
- _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
+ ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
else:
# Release all resource locks except those used by the instance
- _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
- keep=self.node_secondary_ip.keys())
+ ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
+ keep=self.node_secondary_ip.keys())
# TODO: Can the instance lock be downgraded here? Take the optional disk
# shutdown in the caller into consideration.
# This can fail as the old devices are degraded and _WaitForSync
# does a combined result over all disks, so we don't check its return value
self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
- _WaitForSync(self.lu, self.instance)
+ WaitForSync(self.lu, self.instance)
# Check all devices manually
self._CheckDevices(self.instance.primary_node, iv_names)