Add disks_active to configuration
[ganeti-github.git] / lib / cmdlib / instance_storage.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Logical units dealing with storage of instances."""
23
24 import itertools
25 import logging
26 import os
27 import time
28
29 from ganeti import compat
30 from ganeti import constants
31 from ganeti import errors
32 from ganeti import ht
33 from ganeti import locking
34 from ganeti.masterd import iallocator
35 from ganeti import objects
36 from ganeti import utils
37 from ganeti import opcodes
38 from ganeti import rpc
39 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
40 from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
41 AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeName, \
42 CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
43 IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks
44 from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
45 CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
46 BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
47
48 import ganeti.masterd.instance
49
50
51 _DISK_TEMPLATE_NAME_PREFIX = {
52 constants.DT_PLAIN: "",
53 constants.DT_RBD: ".rbd",
54 constants.DT_EXT: ".ext",
55 }
56
57
58 _DISK_TEMPLATE_DEVICE_TYPE = {
59 constants.DT_PLAIN: constants.LD_LV,
60 constants.DT_FILE: constants.LD_FILE,
61 constants.DT_SHARED_FILE: constants.LD_FILE,
62 constants.DT_BLOCK: constants.LD_BLOCKDEV,
63 constants.DT_RBD: constants.LD_RBD,
64 constants.DT_EXT: constants.LD_EXT,
65 }
66
67
68 def CreateSingleBlockDev(lu, node, instance, device, info, force_open,
69 excl_stor):
70 """Create a single block device on a given node.
71
72 This will not recurse over children of the device, so they must be
73 created in advance.
74
75 @param lu: the lu on whose behalf we execute
76 @param node: the node on which to create the device
77 @type instance: L{objects.Instance}
78 @param instance: the instance which owns the device
79 @type device: L{objects.Disk}
80 @param device: the device to create
81 @param info: the extra 'metadata' we should attach to the device
82 (this will be represented as a LVM tag)
83 @type force_open: boolean
84 @param force_open: this parameter will be passes to the
85 L{backend.BlockdevCreate} function where it specifies
86 whether we run on primary or not, and it affects both
87 the child assembly and the device own Open() execution
88 @type excl_stor: boolean
89 @param excl_stor: Whether exclusive_storage is active for the node
90
91 """
92 lu.cfg.SetDiskID(device, node)
93 result = lu.rpc.call_blockdev_create(node, device, device.size,
94 instance.name, force_open, info,
95 excl_stor)
96 result.Raise("Can't create block device %s on"
97 " node %s for instance %s" % (device, node, instance.name))
98 if device.physical_id is None:
99 device.physical_id = result.payload
100
101
102 def _CreateBlockDevInner(lu, node, instance, device, force_create,
103 info, force_open, excl_stor):
104 """Create a tree of block devices on a given node.
105
106 If this device type has to be created on secondaries, create it and
107 all its children.
108
109 If not, just recurse to children keeping the same 'force' value.
110
111 @attention: The device has to be annotated already.
112
113 @param lu: the lu on whose behalf we execute
114 @param node: the node on which to create the device
115 @type instance: L{objects.Instance}
116 @param instance: the instance which owns the device
117 @type device: L{objects.Disk}
118 @param device: the device to create
119 @type force_create: boolean
120 @param force_create: whether to force creation of this device; this
121 will be change to True whenever we find a device which has
122 CreateOnSecondary() attribute
123 @param info: the extra 'metadata' we should attach to the device
124 (this will be represented as a LVM tag)
125 @type force_open: boolean
126 @param force_open: this parameter will be passes to the
127 L{backend.BlockdevCreate} function where it specifies
128 whether we run on primary or not, and it affects both
129 the child assembly and the device own Open() execution
130 @type excl_stor: boolean
131 @param excl_stor: Whether exclusive_storage is active for the node
132
133 @return: list of created devices
134 """
135 created_devices = []
136 try:
137 if device.CreateOnSecondary():
138 force_create = True
139
140 if device.children:
141 for child in device.children:
142 devs = _CreateBlockDevInner(lu, node, instance, child, force_create,
143 info, force_open, excl_stor)
144 created_devices.extend(devs)
145
146 if not force_create:
147 return created_devices
148
149 CreateSingleBlockDev(lu, node, instance, device, info, force_open,
150 excl_stor)
151 # The device has been completely created, so there is no point in keeping
152 # its subdevices in the list. We just add the device itself instead.
153 created_devices = [(node, device)]
154 return created_devices
155
156 except errors.DeviceCreationError, e:
157 e.created_devices.extend(created_devices)
158 raise e
159 except errors.OpExecError, e:
160 raise errors.DeviceCreationError(str(e), created_devices)
161
162
163 def IsExclusiveStorageEnabledNodeName(cfg, nodename):
164 """Whether exclusive_storage is in effect for the given node.
165
166 @type cfg: L{config.ConfigWriter}
167 @param cfg: The cluster configuration
168 @type nodename: string
169 @param nodename: The node
170 @rtype: bool
171 @return: The effective value of exclusive_storage
172 @raise errors.OpPrereqError: if no node exists with the given name
173
174 """
175 ni = cfg.GetNodeInfo(nodename)
176 if ni is None:
177 raise errors.OpPrereqError("Invalid node name %s" % nodename,
178 errors.ECODE_NOENT)
179 return IsExclusiveStorageEnabledNode(cfg, ni)
180
181
182 def _CreateBlockDev(lu, node, instance, device, force_create, info,
183 force_open):
184 """Wrapper around L{_CreateBlockDevInner}.
185
186 This method annotates the root device first.
187
188 """
189 (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
190 excl_stor = IsExclusiveStorageEnabledNodeName(lu.cfg, node)
191 return _CreateBlockDevInner(lu, node, instance, disk, force_create, info,
192 force_open, excl_stor)
193
194
195 def _UndoCreateDisks(lu, disks_created):
196 """Undo the work performed by L{CreateDisks}.
197
198 This function is called in case of an error to undo the work of
199 L{CreateDisks}.
200
201 @type lu: L{LogicalUnit}
202 @param lu: the logical unit on whose behalf we execute
203 @param disks_created: the result returned by L{CreateDisks}
204
205 """
206 for (node, disk) in disks_created:
207 lu.cfg.SetDiskID(disk, node)
208 result = lu.rpc.call_blockdev_remove(node, disk)
209 if result.fail_msg:
210 logging.warning("Failed to remove newly-created disk %s on node %s:"
211 " %s", disk, node, result.fail_msg)
212
213
214 def CreateDisks(lu, instance, to_skip=None, target_node=None, disks=None):
215 """Create all disks for an instance.
216
217 This abstracts away some work from AddInstance.
218
219 @type lu: L{LogicalUnit}
220 @param lu: the logical unit on whose behalf we execute
221 @type instance: L{objects.Instance}
222 @param instance: the instance whose disks we should create
223 @type to_skip: list
224 @param to_skip: list of indices to skip
225 @type target_node: string
226 @param target_node: if passed, overrides the target node for creation
227 @type disks: list of {objects.Disk}
228 @param disks: the disks to create; if not specified, all the disks of the
229 instance are created
230 @return: information about the created disks, to be used to call
231 L{_UndoCreateDisks}
232 @raise errors.OpPrereqError: in case of error
233
234 """
235 info = GetInstanceInfoText(instance)
236 if target_node is None:
237 pnode = instance.primary_node
238 all_nodes = instance.all_nodes
239 else:
240 pnode = target_node
241 all_nodes = [pnode]
242
243 if disks is None:
244 disks = instance.disks
245
246 if instance.disk_template in constants.DTS_FILEBASED:
247 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
248 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
249
250 result.Raise("Failed to create directory '%s' on"
251 " node %s" % (file_storage_dir, pnode))
252
253 disks_created = []
254 for idx, device in enumerate(disks):
255 if to_skip and idx in to_skip:
256 continue
257 logging.info("Creating disk %s for instance '%s'", idx, instance.name)
258 for node in all_nodes:
259 f_create = node == pnode
260 try:
261 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
262 disks_created.append((node, device))
263 except errors.DeviceCreationError, e:
264 logging.warning("Creating disk %s for instance '%s' failed",
265 idx, instance.name)
266 disks_created.extend(e.created_devices)
267 _UndoCreateDisks(lu, disks_created)
268 raise errors.OpExecError(e.message)
269 return disks_created
270
271
272 def ComputeDiskSizePerVG(disk_template, disks):
273 """Compute disk size requirements in the volume group
274
275 """
276 def _compute(disks, payload):
277 """Universal algorithm.
278
279 """
280 vgs = {}
281 for disk in disks:
282 vgs[disk[constants.IDISK_VG]] = \
283 vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
284
285 return vgs
286
287 # Required free disk space as a function of disk and swap space
288 req_size_dict = {
289 constants.DT_DISKLESS: {},
290 constants.DT_PLAIN: _compute(disks, 0),
291 # 128 MB are added for drbd metadata for each disk
292 constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
293 constants.DT_FILE: {},
294 constants.DT_SHARED_FILE: {},
295 }
296
297 if disk_template not in req_size_dict:
298 raise errors.ProgrammerError("Disk template '%s' size requirement"
299 " is unknown" % disk_template)
300
301 return req_size_dict[disk_template]
302
303
304 def ComputeDisks(op, default_vg):
305 """Computes the instance disks.
306
307 @param op: The instance opcode
308 @param default_vg: The default_vg to assume
309
310 @return: The computed disks
311
312 """
313 disks = []
314 for disk in op.disks:
315 mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
316 if mode not in constants.DISK_ACCESS_SET:
317 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
318 mode, errors.ECODE_INVAL)
319 size = disk.get(constants.IDISK_SIZE, None)
320 if size is None:
321 raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
322 try:
323 size = int(size)
324 except (TypeError, ValueError):
325 raise errors.OpPrereqError("Invalid disk size '%s'" % size,
326 errors.ECODE_INVAL)
327
328 ext_provider = disk.get(constants.IDISK_PROVIDER, None)
329 if ext_provider and op.disk_template != constants.DT_EXT:
330 raise errors.OpPrereqError("The '%s' option is only valid for the %s"
331 " disk template, not %s" %
332 (constants.IDISK_PROVIDER, constants.DT_EXT,
333 op.disk_template), errors.ECODE_INVAL)
334
335 data_vg = disk.get(constants.IDISK_VG, default_vg)
336 name = disk.get(constants.IDISK_NAME, None)
337 if name is not None and name.lower() == constants.VALUE_NONE:
338 name = None
339 new_disk = {
340 constants.IDISK_SIZE: size,
341 constants.IDISK_MODE: mode,
342 constants.IDISK_VG: data_vg,
343 constants.IDISK_NAME: name,
344 }
345
346 if constants.IDISK_METAVG in disk:
347 new_disk[constants.IDISK_METAVG] = disk[constants.IDISK_METAVG]
348 if constants.IDISK_ADOPT in disk:
349 new_disk[constants.IDISK_ADOPT] = disk[constants.IDISK_ADOPT]
350
351 # For extstorage, demand the `provider' option and add any
352 # additional parameters (ext-params) to the dict
353 if op.disk_template == constants.DT_EXT:
354 if ext_provider:
355 new_disk[constants.IDISK_PROVIDER] = ext_provider
356 for key in disk:
357 if key not in constants.IDISK_PARAMS:
358 new_disk[key] = disk[key]
359 else:
360 raise errors.OpPrereqError("Missing provider for template '%s'" %
361 constants.DT_EXT, errors.ECODE_INVAL)
362
363 disks.append(new_disk)
364
365 return disks
366
367
368 def CheckRADOSFreeSpace():
369 """Compute disk size requirements inside the RADOS cluster.
370
371 """
372 # For the RADOS cluster we assume there is always enough space.
373 pass
374
375
376 def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
377 iv_name, p_minor, s_minor):
378 """Generate a drbd8 device complete with its children.
379
380 """
381 assert len(vgnames) == len(names) == 2
382 port = lu.cfg.AllocatePort()
383 shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
384
385 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
386 logical_id=(vgnames[0], names[0]),
387 params={})
388 dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
389 dev_meta = objects.Disk(dev_type=constants.LD_LV,
390 size=constants.DRBD_META_SIZE,
391 logical_id=(vgnames[1], names[1]),
392 params={})
393 dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
394 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
395 logical_id=(primary, secondary, port,
396 p_minor, s_minor,
397 shared_secret),
398 children=[dev_data, dev_meta],
399 iv_name=iv_name, params={})
400 drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
401 return drbd_dev
402
403
404 def GenerateDiskTemplate(
405 lu, template_name, instance_name, primary_node, secondary_nodes,
406 disk_info, file_storage_dir, file_driver, base_index,
407 feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
408 _req_shr_file_storage=opcodes.RequireSharedFileStorage):
409 """Generate the entire disk layout for a given template type.
410
411 """
412 vgname = lu.cfg.GetVGName()
413 disk_count = len(disk_info)
414 disks = []
415
416 if template_name == constants.DT_DISKLESS:
417 pass
418 elif template_name == constants.DT_DRBD8:
419 if len(secondary_nodes) != 1:
420 raise errors.ProgrammerError("Wrong template configuration")
421 remote_node = secondary_nodes[0]
422 minors = lu.cfg.AllocateDRBDMinor(
423 [primary_node, remote_node] * len(disk_info), instance_name)
424
425 (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
426 full_disk_params)
427 drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
428
429 names = []
430 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
431 for i in range(disk_count)]):
432 names.append(lv_prefix + "_data")
433 names.append(lv_prefix + "_meta")
434 for idx, disk in enumerate(disk_info):
435 disk_index = idx + base_index
436 data_vg = disk.get(constants.IDISK_VG, vgname)
437 meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
438 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
439 disk[constants.IDISK_SIZE],
440 [data_vg, meta_vg],
441 names[idx * 2:idx * 2 + 2],
442 "disk/%d" % disk_index,
443 minors[idx * 2], minors[idx * 2 + 1])
444 disk_dev.mode = disk[constants.IDISK_MODE]
445 disk_dev.name = disk.get(constants.IDISK_NAME, None)
446 disks.append(disk_dev)
447 else:
448 if secondary_nodes:
449 raise errors.ProgrammerError("Wrong template configuration")
450
451 if template_name == constants.DT_FILE:
452 _req_file_storage()
453 elif template_name == constants.DT_SHARED_FILE:
454 _req_shr_file_storage()
455
456 name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
457 if name_prefix is None:
458 names = None
459 else:
460 names = _GenerateUniqueNames(lu, ["%s.disk%s" %
461 (name_prefix, base_index + i)
462 for i in range(disk_count)])
463
464 if template_name == constants.DT_PLAIN:
465
466 def logical_id_fn(idx, _, disk):
467 vg = disk.get(constants.IDISK_VG, vgname)
468 return (vg, names[idx])
469
470 elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
471 logical_id_fn = \
472 lambda _, disk_index, disk: (file_driver,
473 "%s/disk%d" % (file_storage_dir,
474 disk_index))
475 elif template_name == constants.DT_BLOCK:
476 logical_id_fn = \
477 lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
478 disk[constants.IDISK_ADOPT])
479 elif template_name == constants.DT_RBD:
480 logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
481 elif template_name == constants.DT_EXT:
482 def logical_id_fn(idx, _, disk):
483 provider = disk.get(constants.IDISK_PROVIDER, None)
484 if provider is None:
485 raise errors.ProgrammerError("Disk template is %s, but '%s' is"
486 " not found", constants.DT_EXT,
487 constants.IDISK_PROVIDER)
488 return (provider, names[idx])
489 else:
490 raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
491
492 dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
493
494 for idx, disk in enumerate(disk_info):
495 params = {}
496 # Only for the Ext template add disk_info to params
497 if template_name == constants.DT_EXT:
498 params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
499 for key in disk:
500 if key not in constants.IDISK_PARAMS:
501 params[key] = disk[key]
502 disk_index = idx + base_index
503 size = disk[constants.IDISK_SIZE]
504 feedback_fn("* disk %s, size %s" %
505 (disk_index, utils.FormatUnit(size, "h")))
506 disk_dev = objects.Disk(dev_type=dev_type, size=size,
507 logical_id=logical_id_fn(idx, disk_index, disk),
508 iv_name="disk/%d" % disk_index,
509 mode=disk[constants.IDISK_MODE],
510 params=params)
511 disk_dev.name = disk.get(constants.IDISK_NAME, None)
512 disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
513 disks.append(disk_dev)
514
515 return disks
516
517
518 class LUInstanceRecreateDisks(LogicalUnit):
519 """Recreate an instance's missing disks.
520
521 """
522 HPATH = "instance-recreate-disks"
523 HTYPE = constants.HTYPE_INSTANCE
524 REQ_BGL = False
525
526 _MODIFYABLE = compat.UniqueFrozenset([
527 constants.IDISK_SIZE,
528 constants.IDISK_MODE,
529 ])
530
531 # New or changed disk parameters may have different semantics
532 assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
533 constants.IDISK_ADOPT,
534
535 # TODO: Implement support changing VG while recreating
536 constants.IDISK_VG,
537 constants.IDISK_METAVG,
538 constants.IDISK_PROVIDER,
539 constants.IDISK_NAME,
540 ]))
541
542 def _RunAllocator(self):
543 """Run the allocator based on input opcode.
544
545 """
546 be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
547
548 # FIXME
549 # The allocator should actually run in "relocate" mode, but current
550 # allocators don't support relocating all the nodes of an instance at
551 # the same time. As a workaround we use "allocate" mode, but this is
552 # suboptimal for two reasons:
553 # - The instance name passed to the allocator is present in the list of
554 # existing instances, so there could be a conflict within the
555 # internal structures of the allocator. This doesn't happen with the
556 # current allocators, but it's a liability.
557 # - The allocator counts the resources used by the instance twice: once
558 # because the instance exists already, and once because it tries to
559 # allocate a new instance.
560 # The allocator could choose some of the nodes on which the instance is
561 # running, but that's not a problem. If the instance nodes are broken,
562 # they should be already be marked as drained or offline, and hence
563 # skipped by the allocator. If instance disks have been lost for other
564 # reasons, then recreating the disks on the same nodes should be fine.
565 disk_template = self.instance.disk_template
566 spindle_use = be_full[constants.BE_SPINDLE_USE]
567 req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
568 disk_template=disk_template,
569 tags=list(self.instance.GetTags()),
570 os=self.instance.os,
571 nics=[{}],
572 vcpus=be_full[constants.BE_VCPUS],
573 memory=be_full[constants.BE_MAXMEM],
574 spindle_use=spindle_use,
575 disks=[{constants.IDISK_SIZE: d.size,
576 constants.IDISK_MODE: d.mode}
577 for d in self.instance.disks],
578 hypervisor=self.instance.hypervisor,
579 node_whitelist=None)
580 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
581
582 ial.Run(self.op.iallocator)
583
584 assert req.RequiredNodes() == len(self.instance.all_nodes)
585
586 if not ial.success:
587 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
588 " %s" % (self.op.iallocator, ial.info),
589 errors.ECODE_NORES)
590
591 self.op.nodes = ial.result
592 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
593 self.op.instance_name, self.op.iallocator,
594 utils.CommaJoin(ial.result))
595
596 def CheckArguments(self):
597 if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
598 # Normalize and convert deprecated list of disk indices
599 self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
600
601 duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
602 if duplicates:
603 raise errors.OpPrereqError("Some disks have been specified more than"
604 " once: %s" % utils.CommaJoin(duplicates),
605 errors.ECODE_INVAL)
606
607 # We don't want _CheckIAllocatorOrNode selecting the default iallocator
608 # when neither iallocator nor nodes are specified
609 if self.op.iallocator or self.op.nodes:
610 CheckIAllocatorOrNode(self, "iallocator", "nodes")
611
612 for (idx, params) in self.op.disks:
613 utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
614 unsupported = frozenset(params.keys()) - self._MODIFYABLE
615 if unsupported:
616 raise errors.OpPrereqError("Parameters for disk %s try to change"
617 " unmodifyable parameter(s): %s" %
618 (idx, utils.CommaJoin(unsupported)),
619 errors.ECODE_INVAL)
620
621 def ExpandNames(self):
622 self._ExpandAndLockInstance()
623 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
624
625 if self.op.nodes:
626 self.op.nodes = [ExpandNodeName(self.cfg, n) for n in self.op.nodes]
627 self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
628 else:
629 self.needed_locks[locking.LEVEL_NODE] = []
630 if self.op.iallocator:
631 # iallocator will select a new node in the same group
632 self.needed_locks[locking.LEVEL_NODEGROUP] = []
633 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
634
635 self.needed_locks[locking.LEVEL_NODE_RES] = []
636
637 def DeclareLocks(self, level):
638 if level == locking.LEVEL_NODEGROUP:
639 assert self.op.iallocator is not None
640 assert not self.op.nodes
641 assert not self.needed_locks[locking.LEVEL_NODEGROUP]
642 self.share_locks[locking.LEVEL_NODEGROUP] = 1
643 # Lock the primary group used by the instance optimistically; this
644 # requires going via the node before it's locked, requiring
645 # verification later on
646 self.needed_locks[locking.LEVEL_NODEGROUP] = \
647 self.cfg.GetInstanceNodeGroups(self.op.instance_name, primary_only=True)
648
649 elif level == locking.LEVEL_NODE:
650 # If an allocator is used, then we lock all the nodes in the current
651 # instance group, as we don't know yet which ones will be selected;
652 # if we replace the nodes without using an allocator, locks are
653 # already declared in ExpandNames; otherwise, we need to lock all the
654 # instance nodes for disk re-creation
655 if self.op.iallocator:
656 assert not self.op.nodes
657 assert not self.needed_locks[locking.LEVEL_NODE]
658 assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
659
660 # Lock member nodes of the group of the primary node
661 for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
662 self.needed_locks[locking.LEVEL_NODE].extend(
663 self.cfg.GetNodeGroup(group_uuid).members)
664
665 assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
666 elif not self.op.nodes:
667 self._LockInstancesNodes(primary_only=False)
668 elif level == locking.LEVEL_NODE_RES:
669 # Copy node locks
670 self.needed_locks[locking.LEVEL_NODE_RES] = \
671 CopyLockList(self.needed_locks[locking.LEVEL_NODE])
672
673 def BuildHooksEnv(self):
674 """Build hooks env.
675
676 This runs on master, primary and secondary nodes of the instance.
677
678 """
679 return BuildInstanceHookEnvByObject(self, self.instance)
680
681 def BuildHooksNodes(self):
682 """Build hooks nodes.
683
684 """
685 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
686 return (nl, nl)
687
688 def CheckPrereq(self):
689 """Check prerequisites.
690
691 This checks that the instance is in the cluster and is not running.
692
693 """
694 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
695 assert instance is not None, \
696 "Cannot retrieve locked instance %s" % self.op.instance_name
697 if self.op.nodes:
698 if len(self.op.nodes) != len(instance.all_nodes):
699 raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
700 " %d replacement nodes were specified" %
701 (instance.name, len(instance.all_nodes),
702 len(self.op.nodes)),
703 errors.ECODE_INVAL)
704 assert instance.disk_template != constants.DT_DRBD8 or \
705 len(self.op.nodes) == 2
706 assert instance.disk_template != constants.DT_PLAIN or \
707 len(self.op.nodes) == 1
708 primary_node = self.op.nodes[0]
709 else:
710 primary_node = instance.primary_node
711 if not self.op.iallocator:
712 CheckNodeOnline(self, primary_node)
713
714 if instance.disk_template == constants.DT_DISKLESS:
715 raise errors.OpPrereqError("Instance '%s' has no disks" %
716 self.op.instance_name, errors.ECODE_INVAL)
717
718 # Verify if node group locks are still correct
719 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
720 if owned_groups:
721 # Node group locks are acquired only for the primary node (and only
722 # when the allocator is used)
723 CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups,
724 primary_only=True)
725
726 # if we replace nodes *and* the old primary is offline, we don't
727 # check the instance state
728 old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
729 if not ((self.op.iallocator or self.op.nodes) and old_pnode.offline):
730 CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
731 msg="cannot recreate disks")
732
733 if self.op.disks:
734 self.disks = dict(self.op.disks)
735 else:
736 self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
737
738 maxidx = max(self.disks.keys())
739 if maxidx >= len(instance.disks):
740 raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
741 errors.ECODE_INVAL)
742
743 if ((self.op.nodes or self.op.iallocator) and
744 sorted(self.disks.keys()) != range(len(instance.disks))):
745 raise errors.OpPrereqError("Can't recreate disks partially and"
746 " change the nodes at the same time",
747 errors.ECODE_INVAL)
748
749 self.instance = instance
750
751 if self.op.iallocator:
752 self._RunAllocator()
753 # Release unneeded node and node resource locks
754 ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes)
755 ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes)
756 ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
757
758 assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
759
760 def Exec(self, feedback_fn):
761 """Recreate the disks.
762
763 """
764 instance = self.instance
765
766 assert (self.owned_locks(locking.LEVEL_NODE) ==
767 self.owned_locks(locking.LEVEL_NODE_RES))
768
769 to_skip = []
770 mods = [] # keeps track of needed changes
771
772 for idx, disk in enumerate(instance.disks):
773 try:
774 changes = self.disks[idx]
775 except KeyError:
776 # Disk should not be recreated
777 to_skip.append(idx)
778 continue
779
780 # update secondaries for disks, if needed
781 if self.op.nodes and disk.dev_type == constants.LD_DRBD8:
782 # need to update the nodes and minors
783 assert len(self.op.nodes) == 2
784 assert len(disk.logical_id) == 6 # otherwise disk internals
785 # have changed
786 (_, _, old_port, _, _, old_secret) = disk.logical_id
787 new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name)
788 new_id = (self.op.nodes[0], self.op.nodes[1], old_port,
789 new_minors[0], new_minors[1], old_secret)
790 assert len(disk.logical_id) == len(new_id)
791 else:
792 new_id = None
793
794 mods.append((idx, new_id, changes))
795
796 # now that we have passed all asserts above, we can apply the mods
797 # in a single run (to avoid partial changes)
798 for idx, new_id, changes in mods:
799 disk = instance.disks[idx]
800 if new_id is not None:
801 assert disk.dev_type == constants.LD_DRBD8
802 disk.logical_id = new_id
803 if changes:
804 disk.Update(size=changes.get(constants.IDISK_SIZE, None),
805 mode=changes.get(constants.IDISK_MODE, None))
806
807 # change primary node, if needed
808 if self.op.nodes:
809 instance.primary_node = self.op.nodes[0]
810 self.LogWarning("Changing the instance's nodes, you will have to"
811 " remove any disks left on the older nodes manually")
812
813 if self.op.nodes:
814 self.cfg.Update(instance, feedback_fn)
815
816 # All touched nodes must be locked
817 mylocks = self.owned_locks(locking.LEVEL_NODE)
818 assert mylocks.issuperset(frozenset(instance.all_nodes))
819 new_disks = CreateDisks(self, instance, to_skip=to_skip)
820
821 # TODO: Release node locks before wiping, or explain why it's not possible
822 if self.cfg.GetClusterInfo().prealloc_wipe_disks:
823 wipedisks = [(idx, disk, 0)
824 for (idx, disk) in enumerate(instance.disks)
825 if idx not in to_skip]
826 WipeOrCleanupDisks(self, instance, disks=wipedisks, cleanup=new_disks)
827
828
829 def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
830 """Checks if nodes have enough free disk space in the specified VG.
831
832 This function checks if all given nodes have the needed amount of
833 free disk. In case any node has less disk or we cannot get the
834 information from the node, this function raises an OpPrereqError
835 exception.
836
837 @type lu: C{LogicalUnit}
838 @param lu: a logical unit from which we get configuration data
839 @type nodenames: C{list}
840 @param nodenames: the list of node names to check
841 @type vg: C{str}
842 @param vg: the volume group to check
843 @type requested: C{int}
844 @param requested: the amount of disk in MiB to check for
845 @raise errors.OpPrereqError: if the node doesn't have enough disk,
846 or we cannot check the node
847
848 """
849 es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, nodenames)
850 nodeinfo = lu.rpc.call_node_info(nodenames, [vg], None, es_flags)
851 for node in nodenames:
852 info = nodeinfo[node]
853 info.Raise("Cannot get current information from node %s" % node,
854 prereq=True, ecode=errors.ECODE_ENVIRON)
855 (_, (vg_info, ), _) = info.payload
856 vg_free = vg_info.get("vg_free", None)
857 if not isinstance(vg_free, int):
858 raise errors.OpPrereqError("Can't compute free disk space on node"
859 " %s for vg %s, result was '%s'" %
860 (node, vg, vg_free), errors.ECODE_ENVIRON)
861 if requested > vg_free:
862 raise errors.OpPrereqError("Not enough disk space on target node %s"
863 " vg %s: required %d MiB, available %d MiB" %
864 (node, vg, requested, vg_free),
865 errors.ECODE_NORES)
866
867
868 def CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
869 """Checks if nodes have enough free disk space in all the VGs.
870
871 This function checks if all given nodes have the needed amount of
872 free disk. In case any node has less disk or we cannot get the
873 information from the node, this function raises an OpPrereqError
874 exception.
875
876 @type lu: C{LogicalUnit}
877 @param lu: a logical unit from which we get configuration data
878 @type nodenames: C{list}
879 @param nodenames: the list of node names to check
880 @type req_sizes: C{dict}
881 @param req_sizes: the hash of vg and corresponding amount of disk in
882 MiB to check for
883 @raise errors.OpPrereqError: if the node doesn't have enough disk,
884 or we cannot check the node
885
886 """
887 for vg, req_size in req_sizes.items():
888 _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size)
889
890
891 def _DiskSizeInBytesToMebibytes(lu, size):
892 """Converts a disk size in bytes to mebibytes.
893
894 Warns and rounds up if the size isn't an even multiple of 1 MiB.
895
896 """
897 (mib, remainder) = divmod(size, 1024 * 1024)
898
899 if remainder != 0:
900 lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
901 " to not overwrite existing data (%s bytes will not be"
902 " wiped)", (1024 * 1024) - remainder)
903 mib += 1
904
905 return mib
906
907
908 def _CalcEta(time_taken, written, total_size):
909 """Calculates the ETA based on size written and total size.
910
911 @param time_taken: The time taken so far
912 @param written: amount written so far
913 @param total_size: The total size of data to be written
914 @return: The remaining time in seconds
915
916 """
917 avg_time = time_taken / float(written)
918 return (total_size - written) * avg_time
919
920
921 def WipeDisks(lu, instance, disks=None):
922 """Wipes instance disks.
923
924 @type lu: L{LogicalUnit}
925 @param lu: the logical unit on whose behalf we execute
926 @type instance: L{objects.Instance}
927 @param instance: the instance whose disks we should create
928 @type disks: None or list of tuple of (number, L{objects.Disk}, number)
929 @param disks: Disk details; tuple contains disk index, disk object and the
930 start offset
931
932 """
933 node = instance.primary_node
934
935 if disks is None:
936 disks = [(idx, disk, 0)
937 for (idx, disk) in enumerate(instance.disks)]
938
939 for (_, device, _) in disks:
940 lu.cfg.SetDiskID(device, node)
941
942 logging.info("Pausing synchronization of disks of instance '%s'",
943 instance.name)
944 result = lu.rpc.call_blockdev_pause_resume_sync(node,
945 (map(compat.snd, disks),
946 instance),
947 True)
948 result.Raise("Failed to pause disk synchronization on node '%s'" % node)
949
950 for idx, success in enumerate(result.payload):
951 if not success:
952 logging.warn("Pausing synchronization of disk %s of instance '%s'"
953 " failed", idx, instance.name)
954
955 try:
956 for (idx, device, offset) in disks:
957 # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
958 # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
959 wipe_chunk_size = \
960 int(min(constants.MAX_WIPE_CHUNK,
961 device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
962
963 size = device.size
964 last_output = 0
965 start_time = time.time()
966
967 if offset == 0:
968 info_text = ""
969 else:
970 info_text = (" (from %s to %s)" %
971 (utils.FormatUnit(offset, "h"),
972 utils.FormatUnit(size, "h")))
973
974 lu.LogInfo("* Wiping disk %s%s", idx, info_text)
975
976 logging.info("Wiping disk %d for instance %s on node %s using"
977 " chunk size %s", idx, instance.name, node, wipe_chunk_size)
978
979 while offset < size:
980 wipe_size = min(wipe_chunk_size, size - offset)
981
982 logging.debug("Wiping disk %d, offset %s, chunk %s",
983 idx, offset, wipe_size)
984
985 result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset,
986 wipe_size)
987 result.Raise("Could not wipe disk %d at offset %d for size %d" %
988 (idx, offset, wipe_size))
989
990 now = time.time()
991 offset += wipe_size
992 if now - last_output >= 60:
993 eta = _CalcEta(now - start_time, offset, size)
994 lu.LogInfo(" - done: %.1f%% ETA: %s",
995 offset / float(size) * 100, utils.FormatSeconds(eta))
996 last_output = now
997 finally:
998 logging.info("Resuming synchronization of disks for instance '%s'",
999 instance.name)
1000
1001 result = lu.rpc.call_blockdev_pause_resume_sync(node,
1002 (map(compat.snd, disks),
1003 instance),
1004 False)
1005
1006 if result.fail_msg:
1007 lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1008 node, result.fail_msg)
1009 else:
1010 for idx, success in enumerate(result.payload):
1011 if not success:
1012 lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1013 " failed", idx, instance.name)
1014
1015
1016 def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1017 """Wrapper for L{WipeDisks} that handles errors.
1018
1019 @type lu: L{LogicalUnit}
1020 @param lu: the logical unit on whose behalf we execute
1021 @type instance: L{objects.Instance}
1022 @param instance: the instance whose disks we should wipe
1023 @param disks: see L{WipeDisks}
1024 @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1025 case of error
1026 @raise errors.OpPrereqError: in case of failure
1027
1028 """
1029 try:
1030 WipeDisks(lu, instance, disks=disks)
1031 except errors.OpExecError:
1032 logging.warning("Wiping disks for instance '%s' failed",
1033 instance.name)
1034 _UndoCreateDisks(lu, cleanup)
1035 raise
1036
1037
1038 def ExpandCheckDisks(instance, disks):
1039 """Return the instance disks selected by the disks list
1040
1041 @type disks: list of L{objects.Disk} or None
1042 @param disks: selected disks
1043 @rtype: list of L{objects.Disk}
1044 @return: selected instance disks to act on
1045
1046 """
1047 if disks is None:
1048 return instance.disks
1049 else:
1050 if not set(disks).issubset(instance.disks):
1051 raise errors.ProgrammerError("Can only act on disks belonging to the"
1052 " target instance: expected a subset of %r,"
1053 " got %r" % (instance.disks, disks))
1054 return disks
1055
1056
1057 def WaitForSync(lu, instance, disks=None, oneshot=False):
1058 """Sleep and poll for an instance's disk to sync.
1059
1060 """
1061 if not instance.disks or disks is not None and not disks:
1062 return True
1063
1064 disks = ExpandCheckDisks(instance, disks)
1065
1066 if not oneshot:
1067 lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1068
1069 node = instance.primary_node
1070
1071 for dev in disks:
1072 lu.cfg.SetDiskID(dev, node)
1073
1074 # TODO: Convert to utils.Retry
1075
1076 retries = 0
1077 degr_retries = 10 # in seconds, as we sleep 1 second each time
1078 while True:
1079 max_time = 0
1080 done = True
1081 cumul_degraded = False
1082 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, (disks, instance))
1083 msg = rstats.fail_msg
1084 if msg:
1085 lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1086 retries += 1
1087 if retries >= 10:
1088 raise errors.RemoteError("Can't contact node %s for mirror data,"
1089 " aborting." % node)
1090 time.sleep(6)
1091 continue
1092 rstats = rstats.payload
1093 retries = 0
1094 for i, mstat in enumerate(rstats):
1095 if mstat is None:
1096 lu.LogWarning("Can't compute data for node %s/%s",
1097 node, disks[i].iv_name)
1098 continue
1099
1100 cumul_degraded = (cumul_degraded or
1101 (mstat.is_degraded and mstat.sync_percent is None))
1102 if mstat.sync_percent is not None:
1103 done = False
1104 if mstat.estimated_time is not None:
1105 rem_time = ("%s remaining (estimated)" %
1106 utils.FormatSeconds(mstat.estimated_time))
1107 max_time = mstat.estimated_time
1108 else:
1109 rem_time = "no time estimate"
1110 lu.LogInfo("- device %s: %5.2f%% done, %s",
1111 disks[i].iv_name, mstat.sync_percent, rem_time)
1112
1113 # if we're done but degraded, let's do a few small retries, to
1114 # make sure we see a stable and not transient situation; therefore
1115 # we force restart of the loop
1116 if (done or oneshot) and cumul_degraded and degr_retries > 0:
1117 logging.info("Degraded disks found, %d retries left", degr_retries)
1118 degr_retries -= 1
1119 time.sleep(1)
1120 continue
1121
1122 if done or oneshot:
1123 break
1124
1125 time.sleep(min(60, max_time))
1126
1127 if done:
1128 lu.LogInfo("Instance %s's disks are in sync", instance.name)
1129
1130 return not cumul_degraded
1131
1132
1133 def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1134 """Shutdown block devices of an instance.
1135
1136 This does the shutdown on all nodes of the instance.
1137
1138 If the ignore_primary is false, errors on the primary node are
1139 ignored.
1140
1141 """
1142 lu.cfg.MarkInstanceDisksInactive(instance.name)
1143 all_result = True
1144 disks = ExpandCheckDisks(instance, disks)
1145
1146 for disk in disks:
1147 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1148 lu.cfg.SetDiskID(top_disk, node)
1149 result = lu.rpc.call_blockdev_shutdown(node, (top_disk, instance))
1150 msg = result.fail_msg
1151 if msg:
1152 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1153 disk.iv_name, node, msg)
1154 if ((node == instance.primary_node and not ignore_primary) or
1155 (node != instance.primary_node and not result.offline)):
1156 all_result = False
1157 return all_result
1158
1159
1160 def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1161 """Shutdown block devices of an instance.
1162
1163 This function checks if an instance is running, before calling
1164 _ShutdownInstanceDisks.
1165
1166 """
1167 CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1168 ShutdownInstanceDisks(lu, instance, disks=disks)
1169
1170
1171 def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1172 ignore_size=False):
1173 """Prepare the block devices for an instance.
1174
1175 This sets up the block devices on all nodes.
1176
1177 @type lu: L{LogicalUnit}
1178 @param lu: the logical unit on whose behalf we execute
1179 @type instance: L{objects.Instance}
1180 @param instance: the instance for whose disks we assemble
1181 @type disks: list of L{objects.Disk} or None
1182 @param disks: which disks to assemble (or all, if None)
1183 @type ignore_secondaries: boolean
1184 @param ignore_secondaries: if true, errors on secondary nodes
1185 won't result in an error return from the function
1186 @type ignore_size: boolean
1187 @param ignore_size: if true, the current known size of the disk
1188 will not be used during the disk activation, useful for cases
1189 when the size is wrong
1190 @return: False if the operation failed, otherwise a list of
1191 (host, instance_visible_name, node_visible_name)
1192 with the mapping from node devices to instance devices
1193
1194 """
1195 device_info = []
1196 disks_ok = True
1197 iname = instance.name
1198 disks = ExpandCheckDisks(instance, disks)
1199
1200 # With the two passes mechanism we try to reduce the window of
1201 # opportunity for the race condition of switching DRBD to primary
1202 # before handshaking occured, but we do not eliminate it
1203
1204 # The proper fix would be to wait (with some limits) until the
1205 # connection has been made and drbd transitions from WFConnection
1206 # into any other network-connected state (Connected, SyncTarget,
1207 # SyncSource, etc.)
1208
1209 # mark instance disks as active before doing actual work, so watcher does
1210 # not try to shut them down erroneously
1211 lu.cfg.MarkInstanceDisksActive(iname)
1212
1213 # 1st pass, assemble on all nodes in secondary mode
1214 for idx, inst_disk in enumerate(disks):
1215 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1216 if ignore_size:
1217 node_disk = node_disk.Copy()
1218 node_disk.UnsetSize()
1219 lu.cfg.SetDiskID(node_disk, node)
1220 result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1221 False, idx)
1222 msg = result.fail_msg
1223 if msg:
1224 is_offline_secondary = (node in instance.secondary_nodes and
1225 result.offline)
1226 lu.LogWarning("Could not prepare block device %s on node %s"
1227 " (is_primary=False, pass=1): %s",
1228 inst_disk.iv_name, node, msg)
1229 if not (ignore_secondaries or is_offline_secondary):
1230 disks_ok = False
1231
1232 # FIXME: race condition on drbd migration to primary
1233
1234 # 2nd pass, do only the primary node
1235 for idx, inst_disk in enumerate(disks):
1236 dev_path = None
1237
1238 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1239 if node != instance.primary_node:
1240 continue
1241 if ignore_size:
1242 node_disk = node_disk.Copy()
1243 node_disk.UnsetSize()
1244 lu.cfg.SetDiskID(node_disk, node)
1245 result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1246 True, idx)
1247 msg = result.fail_msg
1248 if msg:
1249 lu.LogWarning("Could not prepare block device %s on node %s"
1250 " (is_primary=True, pass=2): %s",
1251 inst_disk.iv_name, node, msg)
1252 disks_ok = False
1253 else:
1254 dev_path = result.payload
1255
1256 device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
1257
1258 # leave the disks configured for the primary node
1259 # this is a workaround that would be fixed better by
1260 # improving the logical/physical id handling
1261 for disk in disks:
1262 lu.cfg.SetDiskID(disk, instance.primary_node)
1263
1264 if not disks_ok:
1265 lu.cfg.MarkInstanceDisksInactive(iname)
1266
1267 return disks_ok, device_info
1268
1269
1270 def StartInstanceDisks(lu, instance, force):
1271 """Start the disks of an instance.
1272
1273 """
1274 disks_ok, _ = AssembleInstanceDisks(lu, instance,
1275 ignore_secondaries=force)
1276 if not disks_ok:
1277 ShutdownInstanceDisks(lu, instance)
1278 if force is not None and not force:
1279 lu.LogWarning("",
1280 hint=("If the message above refers to a secondary node,"
1281 " you can retry the operation using '--force'"))
1282 raise errors.OpExecError("Disk consistency error")
1283
1284
1285 class LUInstanceGrowDisk(LogicalUnit):
1286 """Grow a disk of an instance.
1287
1288 """
1289 HPATH = "disk-grow"
1290 HTYPE = constants.HTYPE_INSTANCE
1291 REQ_BGL = False
1292
1293 def ExpandNames(self):
1294 self._ExpandAndLockInstance()
1295 self.needed_locks[locking.LEVEL_NODE] = []
1296 self.needed_locks[locking.LEVEL_NODE_RES] = []
1297 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1298 self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1299
1300 def DeclareLocks(self, level):
1301 if level == locking.LEVEL_NODE:
1302 self._LockInstancesNodes()
1303 elif level == locking.LEVEL_NODE_RES:
1304 # Copy node locks
1305 self.needed_locks[locking.LEVEL_NODE_RES] = \
1306 CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1307
1308 def BuildHooksEnv(self):
1309 """Build hooks env.
1310
1311 This runs on the master, the primary and all the secondaries.
1312
1313 """
1314 env = {
1315 "DISK": self.op.disk,
1316 "AMOUNT": self.op.amount,
1317 "ABSOLUTE": self.op.absolute,
1318 }
1319 env.update(BuildInstanceHookEnvByObject(self, self.instance))
1320 return env
1321
1322 def BuildHooksNodes(self):
1323 """Build hooks nodes.
1324
1325 """
1326 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1327 return (nl, nl)
1328
1329 def CheckPrereq(self):
1330 """Check prerequisites.
1331
1332 This checks that the instance is in the cluster.
1333
1334 """
1335 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1336 assert instance is not None, \
1337 "Cannot retrieve locked instance %s" % self.op.instance_name
1338 nodenames = list(instance.all_nodes)
1339 for node in nodenames:
1340 CheckNodeOnline(self, node)
1341
1342 self.instance = instance
1343
1344 if instance.disk_template not in constants.DTS_GROWABLE:
1345 raise errors.OpPrereqError("Instance's disk layout does not support"
1346 " growing", errors.ECODE_INVAL)
1347
1348 self.disk = instance.FindDisk(self.op.disk)
1349
1350 if self.op.absolute:
1351 self.target = self.op.amount
1352 self.delta = self.target - self.disk.size
1353 if self.delta < 0:
1354 raise errors.OpPrereqError("Requested size (%s) is smaller than "
1355 "current disk size (%s)" %
1356 (utils.FormatUnit(self.target, "h"),
1357 utils.FormatUnit(self.disk.size, "h")),
1358 errors.ECODE_STATE)
1359 else:
1360 self.delta = self.op.amount
1361 self.target = self.disk.size + self.delta
1362 if self.delta < 0:
1363 raise errors.OpPrereqError("Requested increment (%s) is negative" %
1364 utils.FormatUnit(self.delta, "h"),
1365 errors.ECODE_INVAL)
1366
1367 self._CheckDiskSpace(nodenames, self.disk.ComputeGrowth(self.delta))
1368
1369 def _CheckDiskSpace(self, nodenames, req_vgspace):
1370 template = self.instance.disk_template
1371 if template not in (constants.DTS_NO_FREE_SPACE_CHECK):
1372 # TODO: check the free disk space for file, when that feature will be
1373 # supported
1374 nodes = map(self.cfg.GetNodeInfo, nodenames)
1375 es_nodes = filter(lambda n: IsExclusiveStorageEnabledNode(self.cfg, n),
1376 nodes)
1377 if es_nodes:
1378 # With exclusive storage we need to something smarter than just looking
1379 # at free space; for now, let's simply abort the operation.
1380 raise errors.OpPrereqError("Cannot grow disks when exclusive_storage"
1381 " is enabled", errors.ECODE_STATE)
1382 CheckNodesFreeDiskPerVG(self, nodenames, req_vgspace)
1383
1384 def Exec(self, feedback_fn):
1385 """Execute disk grow.
1386
1387 """
1388 instance = self.instance
1389 disk = self.disk
1390
1391 assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1392 assert (self.owned_locks(locking.LEVEL_NODE) ==
1393 self.owned_locks(locking.LEVEL_NODE_RES))
1394
1395 wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1396
1397 disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[disk])
1398 if not disks_ok:
1399 raise errors.OpExecError("Cannot activate block device to grow")
1400
1401 feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1402 (self.op.disk, instance.name,
1403 utils.FormatUnit(self.delta, "h"),
1404 utils.FormatUnit(self.target, "h")))
1405
1406 # First run all grow ops in dry-run mode
1407 for node in instance.all_nodes:
1408 self.cfg.SetDiskID(disk, node)
1409 result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1410 True, True)
1411 result.Raise("Dry-run grow request failed to node %s" % node)
1412
1413 if wipe_disks:
1414 # Get disk size from primary node for wiping
1415 result = self.rpc.call_blockdev_getsize(instance.primary_node, [disk])
1416 result.Raise("Failed to retrieve disk size from node '%s'" %
1417 instance.primary_node)
1418
1419 (disk_size_in_bytes, ) = result.payload
1420
1421 if disk_size_in_bytes is None:
1422 raise errors.OpExecError("Failed to retrieve disk size from primary"
1423 " node '%s'" % instance.primary_node)
1424
1425 old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1426
1427 assert old_disk_size >= disk.size, \
1428 ("Retrieved disk size too small (got %s, should be at least %s)" %
1429 (old_disk_size, disk.size))
1430 else:
1431 old_disk_size = None
1432
1433 # We know that (as far as we can test) operations across different
1434 # nodes will succeed, time to run it for real on the backing storage
1435 for node in instance.all_nodes:
1436 self.cfg.SetDiskID(disk, node)
1437 result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1438 False, True)
1439 result.Raise("Grow request failed to node %s" % node)
1440
1441 # And now execute it for logical storage, on the primary node
1442 node = instance.primary_node
1443 self.cfg.SetDiskID(disk, node)
1444 result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1445 False, False)
1446 result.Raise("Grow request failed to node %s" % node)
1447
1448 disk.RecordGrow(self.delta)
1449 self.cfg.Update(instance, feedback_fn)
1450
1451 # Changes have been recorded, release node lock
1452 ReleaseLocks(self, locking.LEVEL_NODE)
1453
1454 # Downgrade lock while waiting for sync
1455 self.glm.downgrade(locking.LEVEL_INSTANCE)
1456
1457 assert wipe_disks ^ (old_disk_size is None)
1458
1459 if wipe_disks:
1460 assert instance.disks[self.op.disk] == disk
1461
1462 # Wipe newly added disk space
1463 WipeDisks(self, instance,
1464 disks=[(self.op.disk, disk, old_disk_size)])
1465
1466 if self.op.wait_for_sync:
1467 disk_abort = not WaitForSync(self, instance, disks=[disk])
1468 if disk_abort:
1469 self.LogWarning("Disk syncing has not returned a good status; check"
1470 " the instance")
1471 if not instance.disks_active:
1472 _SafeShutdownInstanceDisks(self, instance, disks=[disk])
1473 elif not instance.disks_active:
1474 self.LogWarning("Not shutting down the disk even if the instance is"
1475 " not supposed to be running because no wait for"
1476 " sync mode was requested")
1477
1478 assert self.owned_locks(locking.LEVEL_NODE_RES)
1479 assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1480
1481
1482 class LUInstanceReplaceDisks(LogicalUnit):
1483 """Replace the disks of an instance.
1484
1485 """
1486 HPATH = "mirrors-replace"
1487 HTYPE = constants.HTYPE_INSTANCE
1488 REQ_BGL = False
1489
1490 def CheckArguments(self):
1491 """Check arguments.
1492
1493 """
1494 remote_node = self.op.remote_node
1495 ialloc = self.op.iallocator
1496 if self.op.mode == constants.REPLACE_DISK_CHG:
1497 if remote_node is None and ialloc is None:
1498 raise errors.OpPrereqError("When changing the secondary either an"
1499 " iallocator script must be used or the"
1500 " new node given", errors.ECODE_INVAL)
1501 else:
1502 CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1503
1504 elif remote_node is not None or ialloc is not None:
1505 # Not replacing the secondary
1506 raise errors.OpPrereqError("The iallocator and new node options can"
1507 " only be used when changing the"
1508 " secondary node", errors.ECODE_INVAL)
1509
1510 def ExpandNames(self):
1511 self._ExpandAndLockInstance()
1512
1513 assert locking.LEVEL_NODE not in self.needed_locks
1514 assert locking.LEVEL_NODE_RES not in self.needed_locks
1515 assert locking.LEVEL_NODEGROUP not in self.needed_locks
1516
1517 assert self.op.iallocator is None or self.op.remote_node is None, \
1518 "Conflicting options"
1519
1520 if self.op.remote_node is not None:
1521 self.op.remote_node = ExpandNodeName(self.cfg, self.op.remote_node)
1522
1523 # Warning: do not remove the locking of the new secondary here
1524 # unless DRBD8.AddChildren is changed to work in parallel;
1525 # currently it doesn't since parallel invocations of
1526 # FindUnusedMinor will conflict
1527 self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node]
1528 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1529 else:
1530 self.needed_locks[locking.LEVEL_NODE] = []
1531 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1532
1533 if self.op.iallocator is not None:
1534 # iallocator will select a new node in the same group
1535 self.needed_locks[locking.LEVEL_NODEGROUP] = []
1536 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1537
1538 self.needed_locks[locking.LEVEL_NODE_RES] = []
1539
1540 self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
1541 self.op.iallocator, self.op.remote_node,
1542 self.op.disks, self.op.early_release,
1543 self.op.ignore_ipolicy)
1544
1545 self.tasklets = [self.replacer]
1546
1547 def DeclareLocks(self, level):
1548 if level == locking.LEVEL_NODEGROUP:
1549 assert self.op.remote_node is None
1550 assert self.op.iallocator is not None
1551 assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1552
1553 self.share_locks[locking.LEVEL_NODEGROUP] = 1
1554 # Lock all groups used by instance optimistically; this requires going
1555 # via the node before it's locked, requiring verification later on
1556 self.needed_locks[locking.LEVEL_NODEGROUP] = \
1557 self.cfg.GetInstanceNodeGroups(self.op.instance_name)
1558
1559 elif level == locking.LEVEL_NODE:
1560 if self.op.iallocator is not None:
1561 assert self.op.remote_node is None
1562 assert not self.needed_locks[locking.LEVEL_NODE]
1563 assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1564
1565 # Lock member nodes of all locked groups
1566 self.needed_locks[locking.LEVEL_NODE] = \
1567 [node_name
1568 for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1569 for node_name in self.cfg.GetNodeGroup(group_uuid).members]
1570 else:
1571 assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1572
1573 self._LockInstancesNodes()
1574
1575 elif level == locking.LEVEL_NODE_RES:
1576 # Reuse node locks
1577 self.needed_locks[locking.LEVEL_NODE_RES] = \
1578 self.needed_locks[locking.LEVEL_NODE]
1579
1580 def BuildHooksEnv(self):
1581 """Build hooks env.
1582
1583 This runs on the master, the primary and all the secondaries.
1584
1585 """
1586 instance = self.replacer.instance
1587 env = {
1588 "MODE": self.op.mode,
1589 "NEW_SECONDARY": self.op.remote_node,
1590 "OLD_SECONDARY": instance.secondary_nodes[0],
1591 }
1592 env.update(BuildInstanceHookEnvByObject(self, instance))
1593 return env
1594
1595 def BuildHooksNodes(self):
1596 """Build hooks nodes.
1597
1598 """
1599 instance = self.replacer.instance
1600 nl = [
1601 self.cfg.GetMasterNode(),
1602 instance.primary_node,
1603 ]
1604 if self.op.remote_node is not None:
1605 nl.append(self.op.remote_node)
1606 return nl, nl
1607
1608 def CheckPrereq(self):
1609 """Check prerequisites.
1610
1611 """
1612 assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1613 self.op.iallocator is None)
1614
1615 # Verify if node group locks are still correct
1616 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1617 if owned_groups:
1618 CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups)
1619
1620 return LogicalUnit.CheckPrereq(self)
1621
1622
1623 class LUInstanceActivateDisks(NoHooksLU):
1624 """Bring up an instance's disks.
1625
1626 """
1627 REQ_BGL = False
1628
1629 def ExpandNames(self):
1630 self._ExpandAndLockInstance()
1631 self.needed_locks[locking.LEVEL_NODE] = []
1632 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1633
1634 def DeclareLocks(self, level):
1635 if level == locking.LEVEL_NODE:
1636 self._LockInstancesNodes()
1637
1638 def CheckPrereq(self):
1639 """Check prerequisites.
1640
1641 This checks that the instance is in the cluster.
1642
1643 """
1644 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1645 assert self.instance is not None, \
1646 "Cannot retrieve locked instance %s" % self.op.instance_name
1647 CheckNodeOnline(self, self.instance.primary_node)
1648
1649 def Exec(self, feedback_fn):
1650 """Activate the disks.
1651
1652 """
1653 disks_ok, disks_info = \
1654 AssembleInstanceDisks(self, self.instance,
1655 ignore_size=self.op.ignore_size)
1656 if not disks_ok:
1657 raise errors.OpExecError("Cannot activate block devices")
1658
1659 if self.op.wait_for_sync:
1660 if not WaitForSync(self, self.instance):
1661 self.cfg.MarkInstanceDisksInactive(self.instance.name)
1662 raise errors.OpExecError("Some disks of the instance are degraded!")
1663
1664 return disks_info
1665
1666
1667 class LUInstanceDeactivateDisks(NoHooksLU):
1668 """Shutdown an instance's disks.
1669
1670 """
1671 REQ_BGL = False
1672
1673 def ExpandNames(self):
1674 self._ExpandAndLockInstance()
1675 self.needed_locks[locking.LEVEL_NODE] = []
1676 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1677
1678 def DeclareLocks(self, level):
1679 if level == locking.LEVEL_NODE:
1680 self._LockInstancesNodes()
1681
1682 def CheckPrereq(self):
1683 """Check prerequisites.
1684
1685 This checks that the instance is in the cluster.
1686
1687 """
1688 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1689 assert self.instance is not None, \
1690 "Cannot retrieve locked instance %s" % self.op.instance_name
1691
1692 def Exec(self, feedback_fn):
1693 """Deactivate the disks
1694
1695 """
1696 instance = self.instance
1697 if self.op.force:
1698 ShutdownInstanceDisks(self, instance)
1699 else:
1700 _SafeShutdownInstanceDisks(self, instance)
1701
1702
1703 def _CheckDiskConsistencyInner(lu, instance, dev, node, on_primary,
1704 ldisk=False):
1705 """Check that mirrors are not degraded.
1706
1707 @attention: The device has to be annotated already.
1708
1709 The ldisk parameter, if True, will change the test from the
1710 is_degraded attribute (which represents overall non-ok status for
1711 the device(s)) to the ldisk (representing the local storage status).
1712
1713 """
1714 lu.cfg.SetDiskID(dev, node)
1715
1716 result = True
1717
1718 if on_primary or dev.AssembleOnSecondary():
1719 rstats = lu.rpc.call_blockdev_find(node, dev)
1720 msg = rstats.fail_msg
1721 if msg:
1722 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1723 result = False
1724 elif not rstats.payload:
1725 lu.LogWarning("Can't find disk on node %s", node)
1726 result = False
1727 else:
1728 if ldisk:
1729 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1730 else:
1731 result = result and not rstats.payload.is_degraded
1732
1733 if dev.children:
1734 for child in dev.children:
1735 result = result and _CheckDiskConsistencyInner(lu, instance, child, node,
1736 on_primary)
1737
1738 return result
1739
1740
1741 def CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False):
1742 """Wrapper around L{_CheckDiskConsistencyInner}.
1743
1744 """
1745 (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1746 return _CheckDiskConsistencyInner(lu, instance, disk, node, on_primary,
1747 ldisk=ldisk)
1748
1749
1750 def _BlockdevFind(lu, node, dev, instance):
1751 """Wrapper around call_blockdev_find to annotate diskparams.
1752
1753 @param lu: A reference to the lu object
1754 @param node: The node to call out
1755 @param dev: The device to find
1756 @param instance: The instance object the device belongs to
1757 @returns The result of the rpc call
1758
1759 """
1760 (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1761 return lu.rpc.call_blockdev_find(node, disk)
1762
1763
1764 def _GenerateUniqueNames(lu, exts):
1765 """Generate a suitable LV name.
1766
1767 This will generate a logical volume name for the given instance.
1768
1769 """
1770 results = []
1771 for val in exts:
1772 new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1773 results.append("%s%s" % (new_id, val))
1774 return results
1775
1776
1777 class TLReplaceDisks(Tasklet):
1778 """Replaces disks for an instance.
1779
1780 Note: Locking is not within the scope of this class.
1781
1782 """
1783 def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
1784 disks, early_release, ignore_ipolicy):
1785 """Initializes this class.
1786
1787 """
1788 Tasklet.__init__(self, lu)
1789
1790 # Parameters
1791 self.instance_name = instance_name
1792 self.mode = mode
1793 self.iallocator_name = iallocator_name
1794 self.remote_node = remote_node
1795 self.disks = disks
1796 self.early_release = early_release
1797 self.ignore_ipolicy = ignore_ipolicy
1798
1799 # Runtime data
1800 self.instance = None
1801 self.new_node = None
1802 self.target_node = None
1803 self.other_node = None
1804 self.remote_node_info = None
1805 self.node_secondary_ip = None
1806
1807 @staticmethod
1808 def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
1809 """Compute a new secondary node using an IAllocator.
1810
1811 """
1812 req = iallocator.IAReqRelocate(name=instance_name,
1813 relocate_from=list(relocate_from))
1814 ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1815
1816 ial.Run(iallocator_name)
1817
1818 if not ial.success:
1819 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1820 " %s" % (iallocator_name, ial.info),
1821 errors.ECODE_NORES)
1822
1823 remote_node_name = ial.result[0]
1824
1825 lu.LogInfo("Selected new secondary for instance '%s': %s",
1826 instance_name, remote_node_name)
1827
1828 return remote_node_name
1829
1830 def _FindFaultyDisks(self, node_name):
1831 """Wrapper for L{FindFaultyInstanceDisks}.
1832
1833 """
1834 return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1835 node_name, True)
1836
1837 def _CheckDisksActivated(self, instance):
1838 """Checks if the instance disks are activated.
1839
1840 @param instance: The instance to check disks
1841 @return: True if they are activated, False otherwise
1842
1843 """
1844 nodes = instance.all_nodes
1845
1846 for idx, dev in enumerate(instance.disks):
1847 for node in nodes:
1848 self.lu.LogInfo("Checking disk/%d on %s", idx, node)
1849 self.cfg.SetDiskID(dev, node)
1850
1851 result = _BlockdevFind(self, node, dev, instance)
1852
1853 if result.offline:
1854 continue
1855 elif result.fail_msg or not result.payload:
1856 return False
1857
1858 return True
1859
1860 def CheckPrereq(self):
1861 """Check prerequisites.
1862
1863 This checks that the instance is in the cluster.
1864
1865 """
1866 self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name)
1867 assert instance is not None, \
1868 "Cannot retrieve locked instance %s" % self.instance_name
1869
1870 if instance.disk_template != constants.DT_DRBD8:
1871 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1872 " instances", errors.ECODE_INVAL)
1873
1874 if len(instance.secondary_nodes) != 1:
1875 raise errors.OpPrereqError("The instance has a strange layout,"
1876 " expected one secondary but found %d" %
1877 len(instance.secondary_nodes),
1878 errors.ECODE_FAULT)
1879
1880 instance = self.instance
1881 secondary_node = instance.secondary_nodes[0]
1882
1883 if self.iallocator_name is None:
1884 remote_node = self.remote_node
1885 else:
1886 remote_node = self._RunAllocator(self.lu, self.iallocator_name,
1887 instance.name, instance.secondary_nodes)
1888
1889 if remote_node is None:
1890 self.remote_node_info = None
1891 else:
1892 assert remote_node in self.lu.owned_locks(locking.LEVEL_NODE), \
1893 "Remote node '%s' is not locked" % remote_node
1894
1895 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
1896 assert self.remote_node_info is not None, \
1897 "Cannot retrieve locked node %s" % remote_node
1898
1899 if remote_node == self.instance.primary_node:
1900 raise errors.OpPrereqError("The specified node is the primary node of"
1901 " the instance", errors.ECODE_INVAL)
1902
1903 if remote_node == secondary_node:
1904 raise errors.OpPrereqError("The specified node is already the"
1905 " secondary node of the instance",
1906 errors.ECODE_INVAL)
1907
1908 if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
1909 constants.REPLACE_DISK_CHG):
1910 raise errors.OpPrereqError("Cannot specify disks to be replaced",
1911 errors.ECODE_INVAL)
1912
1913 if self.mode == constants.REPLACE_DISK_AUTO:
1914 if not self._CheckDisksActivated(instance):
1915 raise errors.OpPrereqError("Please run activate-disks on instance %s"
1916 " first" % self.instance_name,
1917 errors.ECODE_STATE)
1918 faulty_primary = self._FindFaultyDisks(instance.primary_node)
1919 faulty_secondary = self._FindFaultyDisks(secondary_node)
1920
1921 if faulty_primary and faulty_secondary:
1922 raise errors.OpPrereqError("Instance %s has faulty disks on more than"
1923 " one node and can not be repaired"
1924 " automatically" % self.instance_name,
1925 errors.ECODE_STATE)
1926
1927 if faulty_primary:
1928 self.disks = faulty_primary
1929 self.target_node = instance.primary_node
1930 self.other_node = secondary_node
1931 check_nodes = [self.target_node, self.other_node]
1932 elif faulty_secondary:
1933 self.disks = faulty_secondary
1934 self.target_node = secondary_node
1935 self.other_node = instance.primary_node
1936 check_nodes = [self.target_node, self.other_node]
1937 else:
1938 self.disks = []
1939 check_nodes = []
1940
1941 else:
1942 # Non-automatic modes
1943 if self.mode == constants.REPLACE_DISK_PRI:
1944 self.target_node = instance.primary_node
1945 self.other_node = secondary_node
1946 check_nodes = [self.target_node, self.other_node]
1947
1948 elif self.mode == constants.REPLACE_DISK_SEC:
1949 self.target_node = secondary_node
1950 self.other_node = instance.primary_node
1951 check_nodes = [self.target_node, self.other_node]
1952
1953 elif self.mode == constants.REPLACE_DISK_CHG:
1954 self.new_node = remote_node
1955 self.other_node = instance.primary_node
1956 self.target_node = secondary_node
1957 check_nodes = [self.new_node, self.other_node]
1958
1959 CheckNodeNotDrained(self.lu, remote_node)
1960 CheckNodeVmCapable(self.lu, remote_node)
1961
1962 old_node_info = self.cfg.GetNodeInfo(secondary_node)
1963 assert old_node_info is not None
1964 if old_node_info.offline and not self.early_release:
1965 # doesn't make sense to delay the release
1966 self.early_release = True
1967 self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
1968 " early-release mode", secondary_node)
1969
1970 else:
1971 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
1972 self.mode)
1973
1974 # If not specified all disks should be replaced
1975 if not self.disks:
1976 self.disks = range(len(self.instance.disks))
1977
1978 # TODO: This is ugly, but right now we can't distinguish between internal
1979 # submitted opcode and external one. We should fix that.
1980 if self.remote_node_info:
1981 # We change the node, lets verify it still meets instance policy
1982 new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
1983 cluster = self.cfg.GetClusterInfo()
1984 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1985 new_group_info)
1986 CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info,
1987 self.cfg, ignore=self.ignore_ipolicy)
1988
1989 for node in check_nodes:
1990 CheckNodeOnline(self.lu, node)
1991
1992 touched_nodes = frozenset(node_name for node_name in [self.new_node,
1993 self.other_node,
1994 self.target_node]
1995 if node_name is not None)
1996
1997 # Release unneeded node and node resource locks
1998 ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
1999 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
2000 ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
2001
2002 # Release any owned node group
2003 ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
2004
2005 # Check whether disks are valid
2006 for disk_idx in self.disks:
2007 instance.FindDisk(disk_idx)
2008
2009 # Get secondary node IP addresses
2010 self.node_secondary_ip = dict((name, node.secondary_ip) for (name, node)
2011 in self.cfg.GetMultiNodeInfo(touched_nodes))
2012
2013 def Exec(self, feedback_fn):
2014 """Execute disk replacement.
2015
2016 This dispatches the disk replacement to the appropriate handler.
2017
2018 """
2019 if __debug__:
2020 # Verify owned locks before starting operation
2021 owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2022 assert set(owned_nodes) == set(self.node_secondary_ip), \
2023 ("Incorrect node locks, owning %s, expected %s" %
2024 (owned_nodes, self.node_secondary_ip.keys()))
2025 assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2026 self.lu.owned_locks(locking.LEVEL_NODE_RES))
2027 assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2028
2029 owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2030 assert list(owned_instances) == [self.instance_name], \
2031 "Instance '%s' not locked" % self.instance_name
2032
2033 assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
2034 "Should not own any node group lock at this point"
2035
2036 if not self.disks:
2037 feedback_fn("No disks need replacement for instance '%s'" %
2038 self.instance.name)
2039 return
2040
2041 feedback_fn("Replacing disk(s) %s for instance '%s'" %
2042 (utils.CommaJoin(self.disks), self.instance.name))
2043 feedback_fn("Current primary node: %s" % self.instance.primary_node)
2044 feedback_fn("Current seconary node: %s" %
2045 utils.CommaJoin(self.instance.secondary_nodes))
2046
2047 activate_disks = not self.instance.disks_active
2048
2049 # Activate the instance disks if we're replacing them on a down instance
2050 if activate_disks:
2051 StartInstanceDisks(self.lu, self.instance, True)
2052
2053 try:
2054 # Should we replace the secondary node?
2055 if self.new_node is not None:
2056 fn = self._ExecDrbd8Secondary
2057 else:
2058 fn = self._ExecDrbd8DiskOnly
2059
2060 result = fn(feedback_fn)
2061 finally:
2062 # Deactivate the instance disks if we're replacing them on a
2063 # down instance
2064 if activate_disks:
2065 _SafeShutdownInstanceDisks(self.lu, self.instance)
2066
2067 assert not self.lu.owned_locks(locking.LEVEL_NODE)
2068
2069 if __debug__:
2070 # Verify owned locks
2071 owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2072 nodes = frozenset(self.node_secondary_ip)
2073 assert ((self.early_release and not owned_nodes) or
2074 (not self.early_release and not (set(owned_nodes) - nodes))), \
2075 ("Not owning the correct locks, early_release=%s, owned=%r,"
2076 " nodes=%r" % (self.early_release, owned_nodes, nodes))
2077
2078 return result
2079
2080 def _CheckVolumeGroup(self, nodes):
2081 self.lu.LogInfo("Checking volume groups")
2082
2083 vgname = self.cfg.GetVGName()
2084
2085 # Make sure volume group exists on all involved nodes
2086 results = self.rpc.call_vg_list(nodes)
2087 if not results:
2088 raise errors.OpExecError("Can't list volume groups on the nodes")
2089
2090 for node in nodes:
2091 res = results[node]
2092 res.Raise("Error checking node %s" % node)
2093 if vgname not in res.payload:
2094 raise errors.OpExecError("Volume group '%s' not found on node %s" %
2095 (vgname, node))
2096
2097 def _CheckDisksExistence(self, nodes):
2098 # Check disk existence
2099 for idx, dev in enumerate(self.instance.disks):
2100 if idx not in self.disks:
2101 continue
2102
2103 for node in nodes:
2104 self.lu.LogInfo("Checking disk/%d on %s", idx, node)
2105 self.cfg.SetDiskID(dev, node)
2106
2107 result = _BlockdevFind(self, node, dev, self.instance)
2108
2109 msg = result.fail_msg
2110 if msg or not result.payload:
2111 if not msg:
2112 msg = "disk not found"
2113 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
2114 (idx, node, msg))
2115
2116 def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
2117 for idx, dev in enumerate(self.instance.disks):
2118 if idx not in self.disks:
2119 continue
2120
2121 self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2122 (idx, node_name))
2123
2124 if not CheckDiskConsistency(self.lu, self.instance, dev, node_name,
2125 on_primary, ldisk=ldisk):
2126 raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2127 " replace disks for instance %s" %
2128 (node_name, self.instance.name))
2129
2130 def _CreateNewStorage(self, node_name):
2131 """Create new storage on the primary or secondary node.
2132
2133 This is only used for same-node replaces, not for changing the
2134 secondary node, hence we don't want to modify the existing disk.
2135
2136 """
2137 iv_names = {}
2138
2139 disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2140 for idx, dev in enumerate(disks):
2141 if idx not in self.disks:
2142 continue
2143
2144 self.lu.LogInfo("Adding storage on %s for disk/%d", node_name, idx)
2145
2146 self.cfg.SetDiskID(dev, node_name)
2147
2148 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2149 names = _GenerateUniqueNames(self.lu, lv_names)
2150
2151 (data_disk, meta_disk) = dev.children
2152 vg_data = data_disk.logical_id[0]
2153 lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
2154 logical_id=(vg_data, names[0]),
2155 params=data_disk.params)
2156 vg_meta = meta_disk.logical_id[0]
2157 lv_meta = objects.Disk(dev_type=constants.LD_LV,
2158 size=constants.DRBD_META_SIZE,
2159 logical_id=(vg_meta, names[1]),
2160 params=meta_disk.params)
2161
2162 new_lvs = [lv_data, lv_meta]
2163 old_lvs = [child.Copy() for child in dev.children]
2164 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2165 excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, node_name)
2166
2167 # we pass force_create=True to force the LVM creation
2168 for new_lv in new_lvs:
2169 _CreateBlockDevInner(self.lu, node_name, self.instance, new_lv, True,
2170 GetInstanceInfoText(self.instance), False,
2171 excl_stor)
2172
2173 return iv_names
2174
2175 def _CheckDevices(self, node_name, iv_names):
2176 for name, (dev, _, _) in iv_names.iteritems():
2177 self.cfg.SetDiskID(dev, node_name)
2178
2179 result = _BlockdevFind(self, node_name, dev, self.instance)
2180
2181 msg = result.fail_msg
2182 if msg or not result.payload:
2183 if not msg:
2184 msg = "disk not found"
2185 raise errors.OpExecError("Can't find DRBD device %s: %s" %
2186 (name, msg))
2187
2188 if result.payload.is_degraded:
2189 raise errors.OpExecError("DRBD device %s is degraded!" % name)
2190
2191 def _RemoveOldStorage(self, node_name, iv_names):
2192 for name, (_, old_lvs, _) in iv_names.iteritems():
2193 self.lu.LogInfo("Remove logical volumes for %s", name)
2194
2195 for lv in old_lvs:
2196 self.cfg.SetDiskID(lv, node_name)
2197
2198 msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
2199 if msg:
2200 self.lu.LogWarning("Can't remove old LV: %s", msg,
2201 hint="remove unused LVs manually")
2202
2203 def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2204 """Replace a disk on the primary or secondary for DRBD 8.
2205
2206 The algorithm for replace is quite complicated:
2207
2208 1. for each disk to be replaced:
2209
2210 1. create new LVs on the target node with unique names
2211 1. detach old LVs from the drbd device
2212 1. rename old LVs to name_replaced.<time_t>
2213 1. rename new LVs to old LVs
2214 1. attach the new LVs (with the old names now) to the drbd device
2215
2216 1. wait for sync across all devices
2217
2218 1. for each modified disk:
2219
2220 1. remove old LVs (which have the name name_replaces.<time_t>)
2221
2222 Failures are not very well handled.
2223
2224 """
2225 steps_total = 6
2226
2227 # Step: check device activation
2228 self.lu.LogStep(1, steps_total, "Check device existence")
2229 self._CheckDisksExistence([self.other_node, self.target_node])
2230 self._CheckVolumeGroup([self.target_node, self.other_node])
2231
2232 # Step: check other node consistency
2233 self.lu.LogStep(2, steps_total, "Check peer consistency")
2234 self._CheckDisksConsistency(self.other_node,
2235 self.other_node == self.instance.primary_node,
2236 False)
2237
2238 # Step: create new storage
2239 self.lu.LogStep(3, steps_total, "Allocate new storage")
2240 iv_names = self._CreateNewStorage(self.target_node)
2241
2242 # Step: for each lv, detach+rename*2+attach
2243 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2244 for dev, old_lvs, new_lvs in iv_names.itervalues():
2245 self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2246
2247 result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
2248 old_lvs)
2249 result.Raise("Can't detach drbd from local storage on node"
2250 " %s for device %s" % (self.target_node, dev.iv_name))
2251 #dev.children = []
2252 #cfg.Update(instance)
2253
2254 # ok, we created the new LVs, so now we know we have the needed
2255 # storage; as such, we proceed on the target node to rename
2256 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2257 # using the assumption that logical_id == physical_id (which in
2258 # turn is the unique_id on that node)
2259
2260 # FIXME(iustin): use a better name for the replaced LVs
2261 temp_suffix = int(time.time())
2262 ren_fn = lambda d, suff: (d.physical_id[0],
2263 d.physical_id[1] + "_replaced-%s" % suff)
2264
2265 # Build the rename list based on what LVs exist on the node
2266 rename_old_to_new = []
2267 for to_ren in old_lvs:
2268 result = self.rpc.call_blockdev_find(self.target_node, to_ren)
2269 if not result.fail_msg and result.payload:
2270 # device exists
2271 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2272
2273 self.lu.LogInfo("Renaming the old LVs on the target node")
2274 result = self.rpc.call_blockdev_rename(self.target_node,
2275 rename_old_to_new)
2276 result.Raise("Can't rename old LVs on node %s" % self.target_node)
2277
2278 # Now we rename the new LVs to the old LVs
2279 self.lu.LogInfo("Renaming the new LVs on the target node")
2280 rename_new_to_old = [(new, old.physical_id)
2281 for old, new in zip(old_lvs, new_lvs)]
2282 result = self.rpc.call_blockdev_rename(self.target_node,
2283 rename_new_to_old)
2284 result.Raise("Can't rename new LVs on node %s" % self.target_node)
2285
2286 # Intermediate steps of in memory modifications
2287 for old, new in zip(old_lvs, new_lvs):
2288 new.logical_id = old.logical_id
2289 self.cfg.SetDiskID(new, self.target_node)
2290
2291 # We need to modify old_lvs so that removal later removes the
2292 # right LVs, not the newly added ones; note that old_lvs is a
2293 # copy here
2294 for disk in old_lvs:
2295 disk.logical_id = ren_fn(disk, temp_suffix)
2296 self.cfg.SetDiskID(disk, self.target_node)
2297
2298 # Now that the new lvs have the old name, we can add them to the device
2299 self.lu.LogInfo("Adding new mirror component on %s", self.target_node)
2300 result = self.rpc.call_blockdev_addchildren(self.target_node,
2301 (dev, self.instance), new_lvs)
2302 msg = result.fail_msg
2303 if msg:
2304 for new_lv in new_lvs:
2305 msg2 = self.rpc.call_blockdev_remove(self.target_node,
2306 new_lv).fail_msg
2307 if msg2:
2308 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2309 hint=("cleanup manually the unused logical"
2310 "volumes"))
2311 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2312
2313 cstep = itertools.count(5)
2314
2315 if self.early_release:
2316 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2317 self._RemoveOldStorage(self.target_node, iv_names)
2318 # TODO: Check if releasing locks early still makes sense
2319 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2320 else:
2321 # Release all resource locks except those used by the instance
2322 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2323 keep=self.node_secondary_ip.keys())
2324
2325 # Release all node locks while waiting for sync
2326 ReleaseLocks(self.lu, locking.LEVEL_NODE)
2327
2328 # TODO: Can the instance lock be downgraded here? Take the optional disk
2329 # shutdown in the caller into consideration.
2330
2331 # Wait for sync
2332 # This can fail as the old devices are degraded and _WaitForSync
2333 # does a combined result over all disks, so we don't check its return value
2334 self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2335 WaitForSync(self.lu, self.instance)
2336
2337 # Check all devices manually
2338 self._CheckDevices(self.instance.primary_node, iv_names)
2339
2340 # Step: remove old storage
2341 if not self.early_release:
2342 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2343 self._RemoveOldStorage(self.target_node, iv_names)
2344
2345 def _ExecDrbd8Secondary(self, feedback_fn):
2346 """Replace the secondary node for DRBD 8.
2347
2348 The algorithm for replace is quite complicated:
2349 - for all disks of the instance:
2350 - create new LVs on the new node with same names
2351 - shutdown the drbd device on the old secondary
2352 - disconnect the drbd network on the primary
2353 - create the drbd device on the new secondary
2354 - network attach the drbd on the primary, using an artifice:
2355 the drbd code for Attach() will connect to the network if it
2356 finds a device which is connected to the good local disks but
2357 not network enabled
2358 - wait for sync across all devices
2359 - remove all disks from the old secondary
2360
2361 Failures are not very well handled.
2362
2363 """
2364 steps_total = 6
2365
2366 pnode = self.instance.primary_node
2367
2368 # Step: check device activation
2369 self.lu.LogStep(1, steps_total, "Check device existence")
2370 self._CheckDisksExistence([self.instance.primary_node])
2371 self._CheckVolumeGroup([self.instance.primary_node])
2372
2373 # Step: check other node consistency
2374 self.lu.LogStep(2, steps_total, "Check peer consistency")
2375 self._CheckDisksConsistency(self.instance.primary_node, True, True)
2376
2377 # Step: create new storage
2378 self.lu.LogStep(3, steps_total, "Allocate new storage")
2379 disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2380 excl_stor = IsExclusiveStorageEnabledNodeName(self.lu.cfg, self.new_node)
2381 for idx, dev in enumerate(disks):
2382 self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2383 (self.new_node, idx))
2384 # we pass force_create=True to force LVM creation
2385 for new_lv in dev.children:
2386 _CreateBlockDevInner(self.lu, self.new_node, self.instance, new_lv,
2387 True, GetInstanceInfoText(self.instance), False,
2388 excl_stor)
2389
2390 # Step 4: dbrd minors and drbd setups changes
2391 # after this, we must manually remove the drbd minors on both the
2392 # error and the success paths
2393 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2394 minors = self.cfg.AllocateDRBDMinor([self.new_node
2395 for dev in self.instance.disks],
2396 self.instance.name)
2397 logging.debug("Allocated minors %r", minors)
2398
2399 iv_names = {}
2400 for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2401 self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2402 (self.new_node, idx))
2403 # create new devices on new_node; note that we create two IDs:
2404 # one without port, so the drbd will be activated without
2405 # networking information on the new node at this stage, and one
2406 # with network, for the latter activation in step 4
2407 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2408 if self.instance.primary_node == o_node1:
2409 p_minor = o_minor1
2410 else:
2411 assert self.instance.primary_node == o_node2, "Three-node instance?"
2412 p_minor = o_minor2
2413
2414 new_alone_id = (self.instance.primary_node, self.new_node, None,
2415 p_minor, new_minor, o_secret)
2416 new_net_id = (self.instance.primary_node, self.new_node, o_port,
2417 p_minor, new_minor, o_secret)
2418
2419 iv_names[idx] = (dev, dev.children, new_net_id)
2420 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2421 new_net_id)
2422 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
2423 logical_id=new_alone_id,
2424 children=dev.children,
2425 size=dev.size,
2426 params={})
2427 (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2428 self.cfg)
2429 try:
2430 CreateSingleBlockDev(self.lu, self.new_node, self.instance,
2431 anno_new_drbd,
2432 GetInstanceInfoText(self.instance), False,
2433 excl_stor)
2434 except errors.GenericError:
2435 self.cfg.ReleaseDRBDMinors(self.instance.name)
2436 raise
2437
2438 # We have new devices, shutdown the drbd on the old secondary
2439 for idx, dev in enumerate(self.instance.disks):
2440 self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2441 self.cfg.SetDiskID(dev, self.target_node)
2442 msg = self.rpc.call_blockdev_shutdown(self.target_node,
2443 (dev, self.instance)).fail_msg
2444 if msg:
2445 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2446 "node: %s" % (idx, msg),
2447 hint=("Please cleanup this device manually as"
2448 " soon as possible"))
2449
2450 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2451 result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip,
2452 self.instance.disks)[pnode]
2453
2454 msg = result.fail_msg
2455 if msg:
2456 # detaches didn't succeed (unlikely)
2457 self.cfg.ReleaseDRBDMinors(self.instance.name)
2458 raise errors.OpExecError("Can't detach the disks from the network on"
2459 " old node: %s" % (msg,))
2460
2461 # if we managed to detach at least one, we update all the disks of
2462 # the instance to point to the new secondary
2463 self.lu.LogInfo("Updating instance configuration")
2464 for dev, _, new_logical_id in iv_names.itervalues():
2465 dev.logical_id = new_logical_id
2466 self.cfg.SetDiskID(dev, self.instance.primary_node)
2467
2468 self.cfg.Update(self.instance, feedback_fn)
2469
2470 # Release all node locks (the configuration has been updated)
2471 ReleaseLocks(self.lu, locking.LEVEL_NODE)
2472
2473 # and now perform the drbd attach
2474 self.lu.LogInfo("Attaching primary drbds to new secondary"
2475 " (standalone => connected)")
2476 result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2477 self.new_node],
2478 self.node_secondary_ip,
2479 (self.instance.disks, self.instance),
2480 self.instance.name,
2481 False)
2482 for to_node, to_result in result.items():
2483 msg = to_result.fail_msg
2484 if msg:
2485 self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
2486 to_node, msg,
2487 hint=("please do a gnt-instance info to see the"
2488 " status of disks"))
2489
2490 cstep = itertools.count(5)
2491
2492 if self.early_release:
2493 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2494 self._RemoveOldStorage(self.target_node, iv_names)
2495 # TODO: Check if releasing locks early still makes sense
2496 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2497 else:
2498 # Release all resource locks except those used by the instance
2499 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2500 keep=self.node_secondary_ip.keys())
2501
2502 # TODO: Can the instance lock be downgraded here? Take the optional disk
2503 # shutdown in the caller into consideration.
2504
2505 # Wait for sync
2506 # This can fail as the old devices are degraded and _WaitForSync
2507 # does a combined result over all disks, so we don't check its return value
2508 self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2509 WaitForSync(self.lu, self.instance)
2510
2511 # Check all devices manually
2512 self._CheckDevices(self.instance.primary_node, iv_names)
2513
2514 # Step: remove old storage
2515 if not self.early_release:
2516 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2517 self._RemoveOldStorage(self.target_node, iv_names)