replace-disks: fix --ignore-ipolicy
[ganeti-github.git] / lib / cmdlib / instance_storage.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Logical units dealing with storage of instances."""
23
24 import itertools
25 import logging
26 import os
27 import time
28
29 from ganeti import compat
30 from ganeti import constants
31 from ganeti import errors
32 from ganeti import ht
33 from ganeti import locking
34 from ganeti.masterd import iallocator
35 from ganeti import objects
36 from ganeti import utils
37 from ganeti import rpc
38 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
39 from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
40 AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeUuidAndName, \
41 CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
42 IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks, GetWantedNodes, \
43 CheckDiskTemplateEnabled
44 from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
45 CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
46 BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
47
48 import ganeti.masterd.instance
49
50
51 _DISK_TEMPLATE_NAME_PREFIX = {
52 constants.DT_PLAIN: "",
53 constants.DT_RBD: ".rbd",
54 constants.DT_EXT: ".ext",
55 constants.DT_FILE: ".file",
56 constants.DT_SHARED_FILE: ".sharedfile",
57 }
58
59
60 def CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
61 excl_stor):
62 """Create a single block device on a given node.
63
64 This will not recurse over children of the device, so they must be
65 created in advance.
66
67 @param lu: the lu on whose behalf we execute
68 @param node_uuid: the node on which to create the device
69 @type instance: L{objects.Instance}
70 @param instance: the instance which owns the device
71 @type device: L{objects.Disk}
72 @param device: the device to create
73 @param info: the extra 'metadata' we should attach to the device
74 (this will be represented as a LVM tag)
75 @type force_open: boolean
76 @param force_open: this parameter will be passes to the
77 L{backend.BlockdevCreate} function where it specifies
78 whether we run on primary or not, and it affects both
79 the child assembly and the device own Open() execution
80 @type excl_stor: boolean
81 @param excl_stor: Whether exclusive_storage is active for the node
82
83 """
84 lu.cfg.SetDiskID(device, node_uuid)
85 result = lu.rpc.call_blockdev_create(node_uuid, device, device.size,
86 instance.name, force_open, info,
87 excl_stor)
88 result.Raise("Can't create block device %s on"
89 " node %s for instance %s" % (device,
90 lu.cfg.GetNodeName(node_uuid),
91 instance.name))
92 if device.physical_id is None:
93 device.physical_id = result.payload
94
95
96 def _CreateBlockDevInner(lu, node_uuid, instance, device, force_create,
97 info, force_open, excl_stor):
98 """Create a tree of block devices on a given node.
99
100 If this device type has to be created on secondaries, create it and
101 all its children.
102
103 If not, just recurse to children keeping the same 'force' value.
104
105 @attention: The device has to be annotated already.
106
107 @param lu: the lu on whose behalf we execute
108 @param node_uuid: the node on which to create the device
109 @type instance: L{objects.Instance}
110 @param instance: the instance which owns the device
111 @type device: L{objects.Disk}
112 @param device: the device to create
113 @type force_create: boolean
114 @param force_create: whether to force creation of this device; this
115 will be change to True whenever we find a device which has
116 CreateOnSecondary() attribute
117 @param info: the extra 'metadata' we should attach to the device
118 (this will be represented as a LVM tag)
119 @type force_open: boolean
120 @param force_open: this parameter will be passes to the
121 L{backend.BlockdevCreate} function where it specifies
122 whether we run on primary or not, and it affects both
123 the child assembly and the device own Open() execution
124 @type excl_stor: boolean
125 @param excl_stor: Whether exclusive_storage is active for the node
126
127 @return: list of created devices
128 """
129 created_devices = []
130 try:
131 if device.CreateOnSecondary():
132 force_create = True
133
134 if device.children:
135 for child in device.children:
136 devs = _CreateBlockDevInner(lu, node_uuid, instance, child,
137 force_create, info, force_open, excl_stor)
138 created_devices.extend(devs)
139
140 if not force_create:
141 return created_devices
142
143 CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
144 excl_stor)
145 # The device has been completely created, so there is no point in keeping
146 # its subdevices in the list. We just add the device itself instead.
147 created_devices = [(node_uuid, device)]
148 return created_devices
149
150 except errors.DeviceCreationError, e:
151 e.created_devices.extend(created_devices)
152 raise e
153 except errors.OpExecError, e:
154 raise errors.DeviceCreationError(str(e), created_devices)
155
156
157 def IsExclusiveStorageEnabledNodeUuid(cfg, node_uuid):
158 """Whether exclusive_storage is in effect for the given node.
159
160 @type cfg: L{config.ConfigWriter}
161 @param cfg: The cluster configuration
162 @type node_uuid: string
163 @param node_uuid: The node UUID
164 @rtype: bool
165 @return: The effective value of exclusive_storage
166 @raise errors.OpPrereqError: if no node exists with the given name
167
168 """
169 ni = cfg.GetNodeInfo(node_uuid)
170 if ni is None:
171 raise errors.OpPrereqError("Invalid node UUID %s" % node_uuid,
172 errors.ECODE_NOENT)
173 return IsExclusiveStorageEnabledNode(cfg, ni)
174
175
176 def _CreateBlockDev(lu, node_uuid, instance, device, force_create, info,
177 force_open):
178 """Wrapper around L{_CreateBlockDevInner}.
179
180 This method annotates the root device first.
181
182 """
183 (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
184 excl_stor = IsExclusiveStorageEnabledNodeUuid(lu.cfg, node_uuid)
185 return _CreateBlockDevInner(lu, node_uuid, instance, disk, force_create, info,
186 force_open, excl_stor)
187
188
189 def _UndoCreateDisks(lu, disks_created):
190 """Undo the work performed by L{CreateDisks}.
191
192 This function is called in case of an error to undo the work of
193 L{CreateDisks}.
194
195 @type lu: L{LogicalUnit}
196 @param lu: the logical unit on whose behalf we execute
197 @param disks_created: the result returned by L{CreateDisks}
198
199 """
200 for (node_uuid, disk) in disks_created:
201 lu.cfg.SetDiskID(disk, node_uuid)
202 result = lu.rpc.call_blockdev_remove(node_uuid, disk)
203 result.Warn("Failed to remove newly-created disk %s on node %s" %
204 (disk, lu.cfg.GetNodeName(node_uuid)), logging.warning)
205
206
207 def CreateDisks(lu, instance, to_skip=None, target_node_uuid=None, disks=None):
208 """Create all disks for an instance.
209
210 This abstracts away some work from AddInstance.
211
212 @type lu: L{LogicalUnit}
213 @param lu: the logical unit on whose behalf we execute
214 @type instance: L{objects.Instance}
215 @param instance: the instance whose disks we should create
216 @type to_skip: list
217 @param to_skip: list of indices to skip
218 @type target_node_uuid: string
219 @param target_node_uuid: if passed, overrides the target node for creation
220 @type disks: list of {objects.Disk}
221 @param disks: the disks to create; if not specified, all the disks of the
222 instance are created
223 @return: information about the created disks, to be used to call
224 L{_UndoCreateDisks}
225 @raise errors.OpPrereqError: in case of error
226
227 """
228 info = GetInstanceInfoText(instance)
229 if target_node_uuid is None:
230 pnode_uuid = instance.primary_node
231 all_node_uuids = instance.all_nodes
232 else:
233 pnode_uuid = target_node_uuid
234 all_node_uuids = [pnode_uuid]
235
236 if disks is None:
237 disks = instance.disks
238
239 CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), instance.disk_template)
240
241 if instance.disk_template in constants.DTS_FILEBASED:
242 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
243 result = lu.rpc.call_file_storage_dir_create(pnode_uuid, file_storage_dir)
244
245 result.Raise("Failed to create directory '%s' on"
246 " node %s" % (file_storage_dir,
247 lu.cfg.GetNodeName(pnode_uuid)))
248
249 disks_created = []
250 for idx, device in enumerate(disks):
251 if to_skip and idx in to_skip:
252 continue
253 logging.info("Creating disk %s for instance '%s'", idx, instance.name)
254 for node_uuid in all_node_uuids:
255 f_create = node_uuid == pnode_uuid
256 try:
257 _CreateBlockDev(lu, node_uuid, instance, device, f_create, info,
258 f_create)
259 disks_created.append((node_uuid, device))
260 except errors.DeviceCreationError, e:
261 logging.warning("Creating disk %s for instance '%s' failed",
262 idx, instance.name)
263 disks_created.extend(e.created_devices)
264 _UndoCreateDisks(lu, disks_created)
265 raise errors.OpExecError(e.message)
266 return disks_created
267
268
269 def ComputeDiskSizePerVG(disk_template, disks):
270 """Compute disk size requirements in the volume group
271
272 """
273 def _compute(disks, payload):
274 """Universal algorithm.
275
276 """
277 vgs = {}
278 for disk in disks:
279 vgs[disk[constants.IDISK_VG]] = \
280 vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
281
282 return vgs
283
284 # Required free disk space as a function of disk and swap space
285 req_size_dict = {
286 constants.DT_DISKLESS: {},
287 constants.DT_PLAIN: _compute(disks, 0),
288 # 128 MB are added for drbd metadata for each disk
289 constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
290 constants.DT_FILE: {},
291 constants.DT_SHARED_FILE: {},
292 }
293
294 if disk_template not in req_size_dict:
295 raise errors.ProgrammerError("Disk template '%s' size requirement"
296 " is unknown" % disk_template)
297
298 return req_size_dict[disk_template]
299
300
301 def ComputeDisks(op, default_vg):
302 """Computes the instance disks.
303
304 @param op: The instance opcode
305 @param default_vg: The default_vg to assume
306
307 @return: The computed disks
308
309 """
310 disks = []
311 for disk in op.disks:
312 mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
313 if mode not in constants.DISK_ACCESS_SET:
314 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
315 mode, errors.ECODE_INVAL)
316 size = disk.get(constants.IDISK_SIZE, None)
317 if size is None:
318 raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
319 try:
320 size = int(size)
321 except (TypeError, ValueError):
322 raise errors.OpPrereqError("Invalid disk size '%s'" % size,
323 errors.ECODE_INVAL)
324
325 ext_provider = disk.get(constants.IDISK_PROVIDER, None)
326 if ext_provider and op.disk_template != constants.DT_EXT:
327 raise errors.OpPrereqError("The '%s' option is only valid for the %s"
328 " disk template, not %s" %
329 (constants.IDISK_PROVIDER, constants.DT_EXT,
330 op.disk_template), errors.ECODE_INVAL)
331
332 data_vg = disk.get(constants.IDISK_VG, default_vg)
333 name = disk.get(constants.IDISK_NAME, None)
334 if name is not None and name.lower() == constants.VALUE_NONE:
335 name = None
336 new_disk = {
337 constants.IDISK_SIZE: size,
338 constants.IDISK_MODE: mode,
339 constants.IDISK_VG: data_vg,
340 constants.IDISK_NAME: name,
341 }
342
343 for key in [
344 constants.IDISK_METAVG,
345 constants.IDISK_ADOPT,
346 constants.IDISK_SPINDLES,
347 ]:
348 if key in disk:
349 new_disk[key] = disk[key]
350
351 # For extstorage, demand the `provider' option and add any
352 # additional parameters (ext-params) to the dict
353 if op.disk_template == constants.DT_EXT:
354 if ext_provider:
355 new_disk[constants.IDISK_PROVIDER] = ext_provider
356 for key in disk:
357 if key not in constants.IDISK_PARAMS:
358 new_disk[key] = disk[key]
359 else:
360 raise errors.OpPrereqError("Missing provider for template '%s'" %
361 constants.DT_EXT, errors.ECODE_INVAL)
362
363 disks.append(new_disk)
364
365 return disks
366
367
368 def CheckRADOSFreeSpace():
369 """Compute disk size requirements inside the RADOS cluster.
370
371 """
372 # For the RADOS cluster we assume there is always enough space.
373 pass
374
375
376 def _GenerateDRBD8Branch(lu, primary_uuid, secondary_uuid, size, vgnames, names,
377 iv_name, p_minor, s_minor):
378 """Generate a drbd8 device complete with its children.
379
380 """
381 assert len(vgnames) == len(names) == 2
382 port = lu.cfg.AllocatePort()
383 shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
384
385 dev_data = objects.Disk(dev_type=constants.DT_PLAIN, size=size,
386 logical_id=(vgnames[0], names[0]),
387 params={})
388 dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
389 dev_meta = objects.Disk(dev_type=constants.DT_PLAIN,
390 size=constants.DRBD_META_SIZE,
391 logical_id=(vgnames[1], names[1]),
392 params={})
393 dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
394 drbd_dev = objects.Disk(dev_type=constants.DT_DRBD8, size=size,
395 logical_id=(primary_uuid, secondary_uuid, port,
396 p_minor, s_minor,
397 shared_secret),
398 children=[dev_data, dev_meta],
399 iv_name=iv_name, params={})
400 drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
401 return drbd_dev
402
403
404 def GenerateDiskTemplate(
405 lu, template_name, instance_uuid, primary_node_uuid, secondary_node_uuids,
406 disk_info, file_storage_dir, file_driver, base_index,
407 feedback_fn, full_disk_params):
408 """Generate the entire disk layout for a given template type.
409
410 """
411 vgname = lu.cfg.GetVGName()
412 disk_count = len(disk_info)
413 disks = []
414
415 CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), template_name)
416
417 if template_name == constants.DT_DISKLESS:
418 pass
419 elif template_name == constants.DT_DRBD8:
420 if len(secondary_node_uuids) != 1:
421 raise errors.ProgrammerError("Wrong template configuration")
422 remote_node_uuid = secondary_node_uuids[0]
423 minors = lu.cfg.AllocateDRBDMinor(
424 [primary_node_uuid, remote_node_uuid] * len(disk_info), instance_uuid)
425
426 (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
427 full_disk_params)
428 drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
429
430 names = []
431 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
432 for i in range(disk_count)]):
433 names.append(lv_prefix + "_data")
434 names.append(lv_prefix + "_meta")
435 for idx, disk in enumerate(disk_info):
436 disk_index = idx + base_index
437 data_vg = disk.get(constants.IDISK_VG, vgname)
438 meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
439 disk_dev = _GenerateDRBD8Branch(lu, primary_node_uuid, remote_node_uuid,
440 disk[constants.IDISK_SIZE],
441 [data_vg, meta_vg],
442 names[idx * 2:idx * 2 + 2],
443 "disk/%d" % disk_index,
444 minors[idx * 2], minors[idx * 2 + 1])
445 disk_dev.mode = disk[constants.IDISK_MODE]
446 disk_dev.name = disk.get(constants.IDISK_NAME, None)
447 disks.append(disk_dev)
448 else:
449 if secondary_node_uuids:
450 raise errors.ProgrammerError("Wrong template configuration")
451
452 name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
453 if name_prefix is None:
454 names = None
455 else:
456 names = _GenerateUniqueNames(lu, ["%s.disk%s" %
457 (name_prefix, base_index + i)
458 for i in range(disk_count)])
459
460 if template_name == constants.DT_PLAIN:
461
462 def logical_id_fn(idx, _, disk):
463 vg = disk.get(constants.IDISK_VG, vgname)
464 return (vg, names[idx])
465
466 elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
467 logical_id_fn = \
468 lambda _, disk_index, disk: (file_driver,
469 "%s/%s" % (file_storage_dir,
470 names[idx]))
471 elif template_name == constants.DT_BLOCK:
472 logical_id_fn = \
473 lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
474 disk[constants.IDISK_ADOPT])
475 elif template_name == constants.DT_RBD:
476 logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
477 elif template_name == constants.DT_EXT:
478 def logical_id_fn(idx, _, disk):
479 provider = disk.get(constants.IDISK_PROVIDER, None)
480 if provider is None:
481 raise errors.ProgrammerError("Disk template is %s, but '%s' is"
482 " not found", constants.DT_EXT,
483 constants.IDISK_PROVIDER)
484 return (provider, names[idx])
485 else:
486 raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
487
488 dev_type = template_name
489
490 for idx, disk in enumerate(disk_info):
491 params = {}
492 # Only for the Ext template add disk_info to params
493 if template_name == constants.DT_EXT:
494 params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
495 for key in disk:
496 if key not in constants.IDISK_PARAMS:
497 params[key] = disk[key]
498 disk_index = idx + base_index
499 size = disk[constants.IDISK_SIZE]
500 feedback_fn("* disk %s, size %s" %
501 (disk_index, utils.FormatUnit(size, "h")))
502 disk_dev = objects.Disk(dev_type=dev_type, size=size,
503 logical_id=logical_id_fn(idx, disk_index, disk),
504 iv_name="disk/%d" % disk_index,
505 mode=disk[constants.IDISK_MODE],
506 params=params,
507 spindles=disk.get(constants.IDISK_SPINDLES))
508 disk_dev.name = disk.get(constants.IDISK_NAME, None)
509 disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
510 disks.append(disk_dev)
511
512 return disks
513
514
515 def CheckSpindlesExclusiveStorage(diskdict, es_flag, required):
516 """Check the presence of the spindle options with exclusive_storage.
517
518 @type diskdict: dict
519 @param diskdict: disk parameters
520 @type es_flag: bool
521 @param es_flag: the effective value of the exlusive_storage flag
522 @type required: bool
523 @param required: whether spindles are required or just optional
524 @raise errors.OpPrereqError when spindles are given and they should not
525
526 """
527 if (not es_flag and constants.IDISK_SPINDLES in diskdict and
528 diskdict[constants.IDISK_SPINDLES] is not None):
529 raise errors.OpPrereqError("Spindles in instance disks cannot be specified"
530 " when exclusive storage is not active",
531 errors.ECODE_INVAL)
532 if (es_flag and required and (constants.IDISK_SPINDLES not in diskdict or
533 diskdict[constants.IDISK_SPINDLES] is None)):
534 raise errors.OpPrereqError("You must specify spindles in instance disks"
535 " when exclusive storage is active",
536 errors.ECODE_INVAL)
537
538
539 class LUInstanceRecreateDisks(LogicalUnit):
540 """Recreate an instance's missing disks.
541
542 """
543 HPATH = "instance-recreate-disks"
544 HTYPE = constants.HTYPE_INSTANCE
545 REQ_BGL = False
546
547 _MODIFYABLE = compat.UniqueFrozenset([
548 constants.IDISK_SIZE,
549 constants.IDISK_MODE,
550 constants.IDISK_SPINDLES,
551 ])
552
553 # New or changed disk parameters may have different semantics
554 assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
555 constants.IDISK_ADOPT,
556
557 # TODO: Implement support changing VG while recreating
558 constants.IDISK_VG,
559 constants.IDISK_METAVG,
560 constants.IDISK_PROVIDER,
561 constants.IDISK_NAME,
562 ]))
563
564 def _RunAllocator(self):
565 """Run the allocator based on input opcode.
566
567 """
568 be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
569
570 # FIXME
571 # The allocator should actually run in "relocate" mode, but current
572 # allocators don't support relocating all the nodes of an instance at
573 # the same time. As a workaround we use "allocate" mode, but this is
574 # suboptimal for two reasons:
575 # - The instance name passed to the allocator is present in the list of
576 # existing instances, so there could be a conflict within the
577 # internal structures of the allocator. This doesn't happen with the
578 # current allocators, but it's a liability.
579 # - The allocator counts the resources used by the instance twice: once
580 # because the instance exists already, and once because it tries to
581 # allocate a new instance.
582 # The allocator could choose some of the nodes on which the instance is
583 # running, but that's not a problem. If the instance nodes are broken,
584 # they should be already be marked as drained or offline, and hence
585 # skipped by the allocator. If instance disks have been lost for other
586 # reasons, then recreating the disks on the same nodes should be fine.
587 disk_template = self.instance.disk_template
588 spindle_use = be_full[constants.BE_SPINDLE_USE]
589 disks = [{
590 constants.IDISK_SIZE: d.size,
591 constants.IDISK_MODE: d.mode,
592 constants.IDISK_SPINDLES: d.spindles,
593 } for d in self.instance.disks]
594 req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
595 disk_template=disk_template,
596 tags=list(self.instance.GetTags()),
597 os=self.instance.os,
598 nics=[{}],
599 vcpus=be_full[constants.BE_VCPUS],
600 memory=be_full[constants.BE_MAXMEM],
601 spindle_use=spindle_use,
602 disks=disks,
603 hypervisor=self.instance.hypervisor,
604 node_whitelist=None)
605 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
606
607 ial.Run(self.op.iallocator)
608
609 assert req.RequiredNodes() == len(self.instance.all_nodes)
610
611 if not ial.success:
612 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
613 " %s" % (self.op.iallocator, ial.info),
614 errors.ECODE_NORES)
615
616 (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, ial.result)
617 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
618 self.op.instance_name, self.op.iallocator,
619 utils.CommaJoin(self.op.nodes))
620
621 def CheckArguments(self):
622 if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
623 # Normalize and convert deprecated list of disk indices
624 self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
625
626 duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
627 if duplicates:
628 raise errors.OpPrereqError("Some disks have been specified more than"
629 " once: %s" % utils.CommaJoin(duplicates),
630 errors.ECODE_INVAL)
631
632 # We don't want _CheckIAllocatorOrNode selecting the default iallocator
633 # when neither iallocator nor nodes are specified
634 if self.op.iallocator or self.op.nodes:
635 CheckIAllocatorOrNode(self, "iallocator", "nodes")
636
637 for (idx, params) in self.op.disks:
638 utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
639 unsupported = frozenset(params.keys()) - self._MODIFYABLE
640 if unsupported:
641 raise errors.OpPrereqError("Parameters for disk %s try to change"
642 " unmodifyable parameter(s): %s" %
643 (idx, utils.CommaJoin(unsupported)),
644 errors.ECODE_INVAL)
645
646 def ExpandNames(self):
647 self._ExpandAndLockInstance()
648 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
649
650 if self.op.nodes:
651 (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, self.op.nodes)
652 self.needed_locks[locking.LEVEL_NODE] = list(self.op.node_uuids)
653 else:
654 self.needed_locks[locking.LEVEL_NODE] = []
655 if self.op.iallocator:
656 # iallocator will select a new node in the same group
657 self.needed_locks[locking.LEVEL_NODEGROUP] = []
658 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
659
660 self.needed_locks[locking.LEVEL_NODE_RES] = []
661
662 def DeclareLocks(self, level):
663 if level == locking.LEVEL_NODEGROUP:
664 assert self.op.iallocator is not None
665 assert not self.op.nodes
666 assert not self.needed_locks[locking.LEVEL_NODEGROUP]
667 self.share_locks[locking.LEVEL_NODEGROUP] = 1
668 # Lock the primary group used by the instance optimistically; this
669 # requires going via the node before it's locked, requiring
670 # verification later on
671 self.needed_locks[locking.LEVEL_NODEGROUP] = \
672 self.cfg.GetInstanceNodeGroups(self.op.instance_uuid, primary_only=True)
673
674 elif level == locking.LEVEL_NODE:
675 # If an allocator is used, then we lock all the nodes in the current
676 # instance group, as we don't know yet which ones will be selected;
677 # if we replace the nodes without using an allocator, locks are
678 # already declared in ExpandNames; otherwise, we need to lock all the
679 # instance nodes for disk re-creation
680 if self.op.iallocator:
681 assert not self.op.nodes
682 assert not self.needed_locks[locking.LEVEL_NODE]
683 assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
684
685 # Lock member nodes of the group of the primary node
686 for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
687 self.needed_locks[locking.LEVEL_NODE].extend(
688 self.cfg.GetNodeGroup(group_uuid).members)
689
690 assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
691 elif not self.op.nodes:
692 self._LockInstancesNodes(primary_only=False)
693 elif level == locking.LEVEL_NODE_RES:
694 # Copy node locks
695 self.needed_locks[locking.LEVEL_NODE_RES] = \
696 CopyLockList(self.needed_locks[locking.LEVEL_NODE])
697
698 def BuildHooksEnv(self):
699 """Build hooks env.
700
701 This runs on master, primary and secondary nodes of the instance.
702
703 """
704 return BuildInstanceHookEnvByObject(self, self.instance)
705
706 def BuildHooksNodes(self):
707 """Build hooks nodes.
708
709 """
710 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
711 return (nl, nl)
712
713 def CheckPrereq(self):
714 """Check prerequisites.
715
716 This checks that the instance is in the cluster and is not running.
717
718 """
719 instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
720 assert instance is not None, \
721 "Cannot retrieve locked instance %s" % self.op.instance_name
722 if self.op.node_uuids:
723 if len(self.op.node_uuids) != len(instance.all_nodes):
724 raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
725 " %d replacement nodes were specified" %
726 (instance.name, len(instance.all_nodes),
727 len(self.op.node_uuids)),
728 errors.ECODE_INVAL)
729 assert instance.disk_template != constants.DT_DRBD8 or \
730 len(self.op.node_uuids) == 2
731 assert instance.disk_template != constants.DT_PLAIN or \
732 len(self.op.node_uuids) == 1
733 primary_node = self.op.node_uuids[0]
734 else:
735 primary_node = instance.primary_node
736 if not self.op.iallocator:
737 CheckNodeOnline(self, primary_node)
738
739 if instance.disk_template == constants.DT_DISKLESS:
740 raise errors.OpPrereqError("Instance '%s' has no disks" %
741 self.op.instance_name, errors.ECODE_INVAL)
742
743 # Verify if node group locks are still correct
744 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
745 if owned_groups:
746 # Node group locks are acquired only for the primary node (and only
747 # when the allocator is used)
748 CheckInstanceNodeGroups(self.cfg, instance.uuid, owned_groups,
749 primary_only=True)
750
751 # if we replace nodes *and* the old primary is offline, we don't
752 # check the instance state
753 old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
754 if not ((self.op.iallocator or self.op.node_uuids) and old_pnode.offline):
755 CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
756 msg="cannot recreate disks")
757
758 if self.op.disks:
759 self.disks = dict(self.op.disks)
760 else:
761 self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
762
763 maxidx = max(self.disks.keys())
764 if maxidx >= len(instance.disks):
765 raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
766 errors.ECODE_INVAL)
767
768 if ((self.op.node_uuids or self.op.iallocator) and
769 sorted(self.disks.keys()) != range(len(instance.disks))):
770 raise errors.OpPrereqError("Can't recreate disks partially and"
771 " change the nodes at the same time",
772 errors.ECODE_INVAL)
773
774 self.instance = instance
775
776 if self.op.iallocator:
777 self._RunAllocator()
778 # Release unneeded node and node resource locks
779 ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.node_uuids)
780 ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.node_uuids)
781 ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
782
783 assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
784
785 if self.op.node_uuids:
786 node_uuids = self.op.node_uuids
787 else:
788 node_uuids = instance.all_nodes
789 excl_stor = compat.any(
790 rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids).values()
791 )
792 for new_params in self.disks.values():
793 CheckSpindlesExclusiveStorage(new_params, excl_stor, False)
794
795 def Exec(self, feedback_fn):
796 """Recreate the disks.
797
798 """
799 assert (self.owned_locks(locking.LEVEL_NODE) ==
800 self.owned_locks(locking.LEVEL_NODE_RES))
801
802 to_skip = []
803 mods = [] # keeps track of needed changes
804
805 for idx, disk in enumerate(self.instance.disks):
806 try:
807 changes = self.disks[idx]
808 except KeyError:
809 # Disk should not be recreated
810 to_skip.append(idx)
811 continue
812
813 # update secondaries for disks, if needed
814 if self.op.node_uuids and disk.dev_type == constants.DT_DRBD8:
815 # need to update the nodes and minors
816 assert len(self.op.node_uuids) == 2
817 assert len(disk.logical_id) == 6 # otherwise disk internals
818 # have changed
819 (_, _, old_port, _, _, old_secret) = disk.logical_id
820 new_minors = self.cfg.AllocateDRBDMinor(self.op.node_uuids,
821 self.instance.uuid)
822 new_id = (self.op.node_uuids[0], self.op.node_uuids[1], old_port,
823 new_minors[0], new_minors[1], old_secret)
824 assert len(disk.logical_id) == len(new_id)
825 else:
826 new_id = None
827
828 mods.append((idx, new_id, changes))
829
830 # now that we have passed all asserts above, we can apply the mods
831 # in a single run (to avoid partial changes)
832 for idx, new_id, changes in mods:
833 disk = self.instance.disks[idx]
834 if new_id is not None:
835 assert disk.dev_type == constants.DT_DRBD8
836 disk.logical_id = new_id
837 if changes:
838 disk.Update(size=changes.get(constants.IDISK_SIZE, None),
839 mode=changes.get(constants.IDISK_MODE, None),
840 spindles=changes.get(constants.IDISK_SPINDLES, None))
841
842 # change primary node, if needed
843 if self.op.node_uuids:
844 self.instance.primary_node = self.op.node_uuids[0]
845 self.LogWarning("Changing the instance's nodes, you will have to"
846 " remove any disks left on the older nodes manually")
847
848 if self.op.node_uuids:
849 self.cfg.Update(self.instance, feedback_fn)
850
851 # All touched nodes must be locked
852 mylocks = self.owned_locks(locking.LEVEL_NODE)
853 assert mylocks.issuperset(frozenset(self.instance.all_nodes))
854 new_disks = CreateDisks(self, self.instance, to_skip=to_skip)
855
856 # TODO: Release node locks before wiping, or explain why it's not possible
857 if self.cfg.GetClusterInfo().prealloc_wipe_disks:
858 wipedisks = [(idx, disk, 0)
859 for (idx, disk) in enumerate(self.instance.disks)
860 if idx not in to_skip]
861 WipeOrCleanupDisks(self, self.instance, disks=wipedisks,
862 cleanup=new_disks)
863
864
865 def _PerformNodeInfoCall(lu, node_uuids, vg):
866 """Prepares the input and performs a node info call.
867
868 @type lu: C{LogicalUnit}
869 @param lu: a logical unit from which we get configuration data
870 @type node_uuids: list of string
871 @param node_uuids: list of node UUIDs to perform the call for
872 @type vg: string
873 @param vg: the volume group's name
874
875 """
876 lvm_storage_units = [(constants.ST_LVM_VG, vg)]
877 storage_units = rpc.PrepareStorageUnitsForNodes(lu.cfg, lvm_storage_units,
878 node_uuids)
879 hvname = lu.cfg.GetHypervisorType()
880 hvparams = lu.cfg.GetClusterInfo().hvparams
881 nodeinfo = lu.rpc.call_node_info(node_uuids, storage_units,
882 [(hvname, hvparams[hvname])])
883 return nodeinfo
884
885
886 def _CheckVgCapacityForNode(node_name, node_info, vg, requested):
887 """Checks the vg capacity for a given node.
888
889 @type node_info: tuple (_, list of dicts, _)
890 @param node_info: the result of the node info call for one node
891 @type node_name: string
892 @param node_name: the name of the node
893 @type vg: string
894 @param vg: volume group name
895 @type requested: int
896 @param requested: the amount of disk in MiB to check for
897 @raise errors.OpPrereqError: if the node doesn't have enough disk,
898 or we cannot check the node
899
900 """
901 (_, space_info, _) = node_info
902 lvm_vg_info = utils.storage.LookupSpaceInfoByStorageType(
903 space_info, constants.ST_LVM_VG)
904 if not lvm_vg_info:
905 raise errors.OpPrereqError("Can't retrieve storage information for LVM")
906 vg_free = lvm_vg_info.get("storage_free", None)
907 if not isinstance(vg_free, int):
908 raise errors.OpPrereqError("Can't compute free disk space on node"
909 " %s for vg %s, result was '%s'" %
910 (node_name, vg, vg_free), errors.ECODE_ENVIRON)
911 if requested > vg_free:
912 raise errors.OpPrereqError("Not enough disk space on target node %s"
913 " vg %s: required %d MiB, available %d MiB" %
914 (node_name, vg, requested, vg_free),
915 errors.ECODE_NORES)
916
917
918 def _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, requested):
919 """Checks if nodes have enough free disk space in the specified VG.
920
921 This function checks if all given nodes have the needed amount of
922 free disk. In case any node has less disk or we cannot get the
923 information from the node, this function raises an OpPrereqError
924 exception.
925
926 @type lu: C{LogicalUnit}
927 @param lu: a logical unit from which we get configuration data
928 @type node_uuids: C{list}
929 @param node_uuids: the list of node UUIDs to check
930 @type vg: C{str}
931 @param vg: the volume group to check
932 @type requested: C{int}
933 @param requested: the amount of disk in MiB to check for
934 @raise errors.OpPrereqError: if the node doesn't have enough disk,
935 or we cannot check the node
936
937 """
938 nodeinfo = _PerformNodeInfoCall(lu, node_uuids, vg)
939 for node in node_uuids:
940 node_name = lu.cfg.GetNodeName(node)
941 info = nodeinfo[node]
942 info.Raise("Cannot get current information from node %s" % node_name,
943 prereq=True, ecode=errors.ECODE_ENVIRON)
944 _CheckVgCapacityForNode(node_name, info.payload, vg, requested)
945
946
947 def CheckNodesFreeDiskPerVG(lu, node_uuids, req_sizes):
948 """Checks if nodes have enough free disk space in all the VGs.
949
950 This function checks if all given nodes have the needed amount of
951 free disk. In case any node has less disk or we cannot get the
952 information from the node, this function raises an OpPrereqError
953 exception.
954
955 @type lu: C{LogicalUnit}
956 @param lu: a logical unit from which we get configuration data
957 @type node_uuids: C{list}
958 @param node_uuids: the list of node UUIDs to check
959 @type req_sizes: C{dict}
960 @param req_sizes: the hash of vg and corresponding amount of disk in
961 MiB to check for
962 @raise errors.OpPrereqError: if the node doesn't have enough disk,
963 or we cannot check the node
964
965 """
966 for vg, req_size in req_sizes.items():
967 _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, req_size)
968
969
970 def _DiskSizeInBytesToMebibytes(lu, size):
971 """Converts a disk size in bytes to mebibytes.
972
973 Warns and rounds up if the size isn't an even multiple of 1 MiB.
974
975 """
976 (mib, remainder) = divmod(size, 1024 * 1024)
977
978 if remainder != 0:
979 lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
980 " to not overwrite existing data (%s bytes will not be"
981 " wiped)", (1024 * 1024) - remainder)
982 mib += 1
983
984 return mib
985
986
987 def _CalcEta(time_taken, written, total_size):
988 """Calculates the ETA based on size written and total size.
989
990 @param time_taken: The time taken so far
991 @param written: amount written so far
992 @param total_size: The total size of data to be written
993 @return: The remaining time in seconds
994
995 """
996 avg_time = time_taken / float(written)
997 return (total_size - written) * avg_time
998
999
1000 def WipeDisks(lu, instance, disks=None):
1001 """Wipes instance disks.
1002
1003 @type lu: L{LogicalUnit}
1004 @param lu: the logical unit on whose behalf we execute
1005 @type instance: L{objects.Instance}
1006 @param instance: the instance whose disks we should create
1007 @type disks: None or list of tuple of (number, L{objects.Disk}, number)
1008 @param disks: Disk details; tuple contains disk index, disk object and the
1009 start offset
1010
1011 """
1012 node_uuid = instance.primary_node
1013 node_name = lu.cfg.GetNodeName(node_uuid)
1014
1015 if disks is None:
1016 disks = [(idx, disk, 0)
1017 for (idx, disk) in enumerate(instance.disks)]
1018
1019 for (_, device, _) in disks:
1020 lu.cfg.SetDiskID(device, node_uuid)
1021
1022 logging.info("Pausing synchronization of disks of instance '%s'",
1023 instance.name)
1024 result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1025 (map(compat.snd, disks),
1026 instance),
1027 True)
1028 result.Raise("Failed to pause disk synchronization on node '%s'" % node_name)
1029
1030 for idx, success in enumerate(result.payload):
1031 if not success:
1032 logging.warn("Pausing synchronization of disk %s of instance '%s'"
1033 " failed", idx, instance.name)
1034
1035 try:
1036 for (idx, device, offset) in disks:
1037 # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
1038 # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
1039 wipe_chunk_size = \
1040 int(min(constants.MAX_WIPE_CHUNK,
1041 device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
1042
1043 size = device.size
1044 last_output = 0
1045 start_time = time.time()
1046
1047 if offset == 0:
1048 info_text = ""
1049 else:
1050 info_text = (" (from %s to %s)" %
1051 (utils.FormatUnit(offset, "h"),
1052 utils.FormatUnit(size, "h")))
1053
1054 lu.LogInfo("* Wiping disk %s%s", idx, info_text)
1055
1056 logging.info("Wiping disk %d for instance %s on node %s using"
1057 " chunk size %s", idx, instance.name, node_name,
1058 wipe_chunk_size)
1059
1060 while offset < size:
1061 wipe_size = min(wipe_chunk_size, size - offset)
1062
1063 logging.debug("Wiping disk %d, offset %s, chunk %s",
1064 idx, offset, wipe_size)
1065
1066 result = lu.rpc.call_blockdev_wipe(node_uuid, (device, instance),
1067 offset, wipe_size)
1068 result.Raise("Could not wipe disk %d at offset %d for size %d" %
1069 (idx, offset, wipe_size))
1070
1071 now = time.time()
1072 offset += wipe_size
1073 if now - last_output >= 60:
1074 eta = _CalcEta(now - start_time, offset, size)
1075 lu.LogInfo(" - done: %.1f%% ETA: %s",
1076 offset / float(size) * 100, utils.FormatSeconds(eta))
1077 last_output = now
1078 finally:
1079 logging.info("Resuming synchronization of disks for instance '%s'",
1080 instance.name)
1081
1082 result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1083 (map(compat.snd, disks),
1084 instance),
1085 False)
1086
1087 if result.fail_msg:
1088 lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1089 node_name, result.fail_msg)
1090 else:
1091 for idx, success in enumerate(result.payload):
1092 if not success:
1093 lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1094 " failed", idx, instance.name)
1095
1096
1097 def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1098 """Wrapper for L{WipeDisks} that handles errors.
1099
1100 @type lu: L{LogicalUnit}
1101 @param lu: the logical unit on whose behalf we execute
1102 @type instance: L{objects.Instance}
1103 @param instance: the instance whose disks we should wipe
1104 @param disks: see L{WipeDisks}
1105 @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1106 case of error
1107 @raise errors.OpPrereqError: in case of failure
1108
1109 """
1110 try:
1111 WipeDisks(lu, instance, disks=disks)
1112 except errors.OpExecError:
1113 logging.warning("Wiping disks for instance '%s' failed",
1114 instance.name)
1115 _UndoCreateDisks(lu, cleanup)
1116 raise
1117
1118
1119 def ExpandCheckDisks(instance, disks):
1120 """Return the instance disks selected by the disks list
1121
1122 @type disks: list of L{objects.Disk} or None
1123 @param disks: selected disks
1124 @rtype: list of L{objects.Disk}
1125 @return: selected instance disks to act on
1126
1127 """
1128 if disks is None:
1129 return instance.disks
1130 else:
1131 if not set(disks).issubset(instance.disks):
1132 raise errors.ProgrammerError("Can only act on disks belonging to the"
1133 " target instance: expected a subset of %r,"
1134 " got %r" % (instance.disks, disks))
1135 return disks
1136
1137
1138 def WaitForSync(lu, instance, disks=None, oneshot=False):
1139 """Sleep and poll for an instance's disk to sync.
1140
1141 """
1142 if not instance.disks or disks is not None and not disks:
1143 return True
1144
1145 disks = ExpandCheckDisks(instance, disks)
1146
1147 if not oneshot:
1148 lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1149
1150 node_uuid = instance.primary_node
1151 node_name = lu.cfg.GetNodeName(node_uuid)
1152
1153 for dev in disks:
1154 lu.cfg.SetDiskID(dev, node_uuid)
1155
1156 # TODO: Convert to utils.Retry
1157
1158 retries = 0
1159 degr_retries = 10 # in seconds, as we sleep 1 second each time
1160 while True:
1161 max_time = 0
1162 done = True
1163 cumul_degraded = False
1164 rstats = lu.rpc.call_blockdev_getmirrorstatus(node_uuid, (disks, instance))
1165 msg = rstats.fail_msg
1166 if msg:
1167 lu.LogWarning("Can't get any data from node %s: %s", node_name, msg)
1168 retries += 1
1169 if retries >= 10:
1170 raise errors.RemoteError("Can't contact node %s for mirror data,"
1171 " aborting." % node_name)
1172 time.sleep(6)
1173 continue
1174 rstats = rstats.payload
1175 retries = 0
1176 for i, mstat in enumerate(rstats):
1177 if mstat is None:
1178 lu.LogWarning("Can't compute data for node %s/%s",
1179 node_name, disks[i].iv_name)
1180 continue
1181
1182 cumul_degraded = (cumul_degraded or
1183 (mstat.is_degraded and mstat.sync_percent is None))
1184 if mstat.sync_percent is not None:
1185 done = False
1186 if mstat.estimated_time is not None:
1187 rem_time = ("%s remaining (estimated)" %
1188 utils.FormatSeconds(mstat.estimated_time))
1189 max_time = mstat.estimated_time
1190 else:
1191 rem_time = "no time estimate"
1192 lu.LogInfo("- device %s: %5.2f%% done, %s",
1193 disks[i].iv_name, mstat.sync_percent, rem_time)
1194
1195 # if we're done but degraded, let's do a few small retries, to
1196 # make sure we see a stable and not transient situation; therefore
1197 # we force restart of the loop
1198 if (done or oneshot) and cumul_degraded and degr_retries > 0:
1199 logging.info("Degraded disks found, %d retries left", degr_retries)
1200 degr_retries -= 1
1201 time.sleep(1)
1202 continue
1203
1204 if done or oneshot:
1205 break
1206
1207 time.sleep(min(60, max_time))
1208
1209 if done:
1210 lu.LogInfo("Instance %s's disks are in sync", instance.name)
1211
1212 return not cumul_degraded
1213
1214
1215 def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1216 """Shutdown block devices of an instance.
1217
1218 This does the shutdown on all nodes of the instance.
1219
1220 If the ignore_primary is false, errors on the primary node are
1221 ignored.
1222
1223 """
1224 lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1225 all_result = True
1226 disks = ExpandCheckDisks(instance, disks)
1227
1228 for disk in disks:
1229 for node_uuid, top_disk in disk.ComputeNodeTree(instance.primary_node):
1230 lu.cfg.SetDiskID(top_disk, node_uuid)
1231 result = lu.rpc.call_blockdev_shutdown(node_uuid, (top_disk, instance))
1232 msg = result.fail_msg
1233 if msg:
1234 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1235 disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1236 if ((node_uuid == instance.primary_node and not ignore_primary) or
1237 (node_uuid != instance.primary_node and not result.offline)):
1238 all_result = False
1239 return all_result
1240
1241
1242 def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1243 """Shutdown block devices of an instance.
1244
1245 This function checks if an instance is running, before calling
1246 _ShutdownInstanceDisks.
1247
1248 """
1249 CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1250 ShutdownInstanceDisks(lu, instance, disks=disks)
1251
1252
1253 def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1254 ignore_size=False):
1255 """Prepare the block devices for an instance.
1256
1257 This sets up the block devices on all nodes.
1258
1259 @type lu: L{LogicalUnit}
1260 @param lu: the logical unit on whose behalf we execute
1261 @type instance: L{objects.Instance}
1262 @param instance: the instance for whose disks we assemble
1263 @type disks: list of L{objects.Disk} or None
1264 @param disks: which disks to assemble (or all, if None)
1265 @type ignore_secondaries: boolean
1266 @param ignore_secondaries: if true, errors on secondary nodes
1267 won't result in an error return from the function
1268 @type ignore_size: boolean
1269 @param ignore_size: if true, the current known size of the disk
1270 will not be used during the disk activation, useful for cases
1271 when the size is wrong
1272 @return: False if the operation failed, otherwise a list of
1273 (host, instance_visible_name, node_visible_name)
1274 with the mapping from node devices to instance devices
1275
1276 """
1277 device_info = []
1278 disks_ok = True
1279 disks = ExpandCheckDisks(instance, disks)
1280
1281 # With the two passes mechanism we try to reduce the window of
1282 # opportunity for the race condition of switching DRBD to primary
1283 # before handshaking occured, but we do not eliminate it
1284
1285 # The proper fix would be to wait (with some limits) until the
1286 # connection has been made and drbd transitions from WFConnection
1287 # into any other network-connected state (Connected, SyncTarget,
1288 # SyncSource, etc.)
1289
1290 # mark instance disks as active before doing actual work, so watcher does
1291 # not try to shut them down erroneously
1292 lu.cfg.MarkInstanceDisksActive(instance.uuid)
1293
1294 # 1st pass, assemble on all nodes in secondary mode
1295 for idx, inst_disk in enumerate(disks):
1296 for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1297 instance.primary_node):
1298 if ignore_size:
1299 node_disk = node_disk.Copy()
1300 node_disk.UnsetSize()
1301 lu.cfg.SetDiskID(node_disk, node_uuid)
1302 result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1303 instance.name, False, idx)
1304 msg = result.fail_msg
1305 if msg:
1306 is_offline_secondary = (node_uuid in instance.secondary_nodes and
1307 result.offline)
1308 lu.LogWarning("Could not prepare block device %s on node %s"
1309 " (is_primary=False, pass=1): %s",
1310 inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1311 if not (ignore_secondaries or is_offline_secondary):
1312 disks_ok = False
1313
1314 # FIXME: race condition on drbd migration to primary
1315
1316 # 2nd pass, do only the primary node
1317 for idx, inst_disk in enumerate(disks):
1318 dev_path = None
1319
1320 for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1321 instance.primary_node):
1322 if node_uuid != instance.primary_node:
1323 continue
1324 if ignore_size:
1325 node_disk = node_disk.Copy()
1326 node_disk.UnsetSize()
1327 lu.cfg.SetDiskID(node_disk, node_uuid)
1328 result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1329 instance.name, True, idx)
1330 msg = result.fail_msg
1331 if msg:
1332 lu.LogWarning("Could not prepare block device %s on node %s"
1333 " (is_primary=True, pass=2): %s",
1334 inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1335 disks_ok = False
1336 else:
1337 dev_path = result.payload
1338
1339 device_info.append((lu.cfg.GetNodeName(instance.primary_node),
1340 inst_disk.iv_name, dev_path))
1341
1342 # leave the disks configured for the primary node
1343 # this is a workaround that would be fixed better by
1344 # improving the logical/physical id handling
1345 for disk in disks:
1346 lu.cfg.SetDiskID(disk, instance.primary_node)
1347
1348 if not disks_ok:
1349 lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1350
1351 return disks_ok, device_info
1352
1353
1354 def StartInstanceDisks(lu, instance, force):
1355 """Start the disks of an instance.
1356
1357 """
1358 disks_ok, _ = AssembleInstanceDisks(lu, instance,
1359 ignore_secondaries=force)
1360 if not disks_ok:
1361 ShutdownInstanceDisks(lu, instance)
1362 if force is not None and not force:
1363 lu.LogWarning("",
1364 hint=("If the message above refers to a secondary node,"
1365 " you can retry the operation using '--force'"))
1366 raise errors.OpExecError("Disk consistency error")
1367
1368
1369 class LUInstanceGrowDisk(LogicalUnit):
1370 """Grow a disk of an instance.
1371
1372 """
1373 HPATH = "disk-grow"
1374 HTYPE = constants.HTYPE_INSTANCE
1375 REQ_BGL = False
1376
1377 def ExpandNames(self):
1378 self._ExpandAndLockInstance()
1379 self.needed_locks[locking.LEVEL_NODE] = []
1380 self.needed_locks[locking.LEVEL_NODE_RES] = []
1381 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1382 self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1383
1384 def DeclareLocks(self, level):
1385 if level == locking.LEVEL_NODE:
1386 self._LockInstancesNodes()
1387 elif level == locking.LEVEL_NODE_RES:
1388 # Copy node locks
1389 self.needed_locks[locking.LEVEL_NODE_RES] = \
1390 CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1391
1392 def BuildHooksEnv(self):
1393 """Build hooks env.
1394
1395 This runs on the master, the primary and all the secondaries.
1396
1397 """
1398 env = {
1399 "DISK": self.op.disk,
1400 "AMOUNT": self.op.amount,
1401 "ABSOLUTE": self.op.absolute,
1402 }
1403 env.update(BuildInstanceHookEnvByObject(self, self.instance))
1404 return env
1405
1406 def BuildHooksNodes(self):
1407 """Build hooks nodes.
1408
1409 """
1410 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1411 return (nl, nl)
1412
1413 def CheckPrereq(self):
1414 """Check prerequisites.
1415
1416 This checks that the instance is in the cluster.
1417
1418 """
1419 self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1420 assert self.instance is not None, \
1421 "Cannot retrieve locked instance %s" % self.op.instance_name
1422 node_uuids = list(self.instance.all_nodes)
1423 for node_uuid in node_uuids:
1424 CheckNodeOnline(self, node_uuid)
1425 self.node_es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids)
1426
1427 if self.instance.disk_template not in constants.DTS_GROWABLE:
1428 raise errors.OpPrereqError("Instance's disk layout does not support"
1429 " growing", errors.ECODE_INVAL)
1430
1431 self.disk = self.instance.FindDisk(self.op.disk)
1432
1433 if self.op.absolute:
1434 self.target = self.op.amount
1435 self.delta = self.target - self.disk.size
1436 if self.delta < 0:
1437 raise errors.OpPrereqError("Requested size (%s) is smaller than "
1438 "current disk size (%s)" %
1439 (utils.FormatUnit(self.target, "h"),
1440 utils.FormatUnit(self.disk.size, "h")),
1441 errors.ECODE_STATE)
1442 else:
1443 self.delta = self.op.amount
1444 self.target = self.disk.size + self.delta
1445 if self.delta < 0:
1446 raise errors.OpPrereqError("Requested increment (%s) is negative" %
1447 utils.FormatUnit(self.delta, "h"),
1448 errors.ECODE_INVAL)
1449
1450 self._CheckDiskSpace(node_uuids, self.disk.ComputeGrowth(self.delta))
1451
1452 def _CheckDiskSpace(self, node_uuids, req_vgspace):
1453 template = self.instance.disk_template
1454 if (template not in (constants.DTS_NO_FREE_SPACE_CHECK) and
1455 not any(self.node_es_flags.values())):
1456 # TODO: check the free disk space for file, when that feature will be
1457 # supported
1458 # With exclusive storage we need to do something smarter than just looking
1459 # at free space, which, in the end, is basically a dry run. So we rely on
1460 # the dry run performed in Exec() instead.
1461 CheckNodesFreeDiskPerVG(self, node_uuids, req_vgspace)
1462
1463 def Exec(self, feedback_fn):
1464 """Execute disk grow.
1465
1466 """
1467 assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1468 assert (self.owned_locks(locking.LEVEL_NODE) ==
1469 self.owned_locks(locking.LEVEL_NODE_RES))
1470
1471 wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1472
1473 disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[self.disk])
1474 if not disks_ok:
1475 raise errors.OpExecError("Cannot activate block device to grow")
1476
1477 feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1478 (self.op.disk, self.instance.name,
1479 utils.FormatUnit(self.delta, "h"),
1480 utils.FormatUnit(self.target, "h")))
1481
1482 # First run all grow ops in dry-run mode
1483 for node_uuid in self.instance.all_nodes:
1484 self.cfg.SetDiskID(self.disk, node_uuid)
1485 result = self.rpc.call_blockdev_grow(node_uuid,
1486 (self.disk, self.instance),
1487 self.delta, True, True,
1488 self.node_es_flags[node_uuid])
1489 result.Raise("Dry-run grow request failed to node %s" %
1490 self.cfg.GetNodeName(node_uuid))
1491
1492 if wipe_disks:
1493 # Get disk size from primary node for wiping
1494 self.cfg.SetDiskID(self.disk, self.instance.primary_node)
1495 result = self.rpc.call_blockdev_getdimensions(self.instance.primary_node,
1496 [self.disk])
1497 result.Raise("Failed to retrieve disk size from node '%s'" %
1498 self.instance.primary_node)
1499
1500 (disk_dimensions, ) = result.payload
1501
1502 if disk_dimensions is None:
1503 raise errors.OpExecError("Failed to retrieve disk size from primary"
1504 " node '%s'" % self.instance.primary_node)
1505 (disk_size_in_bytes, _) = disk_dimensions
1506
1507 old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1508
1509 assert old_disk_size >= self.disk.size, \
1510 ("Retrieved disk size too small (got %s, should be at least %s)" %
1511 (old_disk_size, self.disk.size))
1512 else:
1513 old_disk_size = None
1514
1515 # We know that (as far as we can test) operations across different
1516 # nodes will succeed, time to run it for real on the backing storage
1517 for node_uuid in self.instance.all_nodes:
1518 self.cfg.SetDiskID(self.disk, node_uuid)
1519 result = self.rpc.call_blockdev_grow(node_uuid,
1520 (self.disk, self.instance),
1521 self.delta, False, True,
1522 self.node_es_flags[node_uuid])
1523 result.Raise("Grow request failed to node %s" %
1524 self.cfg.GetNodeName(node_uuid))
1525
1526 # And now execute it for logical storage, on the primary node
1527 node_uuid = self.instance.primary_node
1528 self.cfg.SetDiskID(self.disk, node_uuid)
1529 result = self.rpc.call_blockdev_grow(node_uuid, (self.disk, self.instance),
1530 self.delta, False, False,
1531 self.node_es_flags[node_uuid])
1532 result.Raise("Grow request failed to node %s" %
1533 self.cfg.GetNodeName(node_uuid))
1534
1535 self.disk.RecordGrow(self.delta)
1536 self.cfg.Update(self.instance, feedback_fn)
1537
1538 # Changes have been recorded, release node lock
1539 ReleaseLocks(self, locking.LEVEL_NODE)
1540
1541 # Downgrade lock while waiting for sync
1542 self.glm.downgrade(locking.LEVEL_INSTANCE)
1543
1544 assert wipe_disks ^ (old_disk_size is None)
1545
1546 if wipe_disks:
1547 assert self.instance.disks[self.op.disk] == self.disk
1548
1549 # Wipe newly added disk space
1550 WipeDisks(self, self.instance,
1551 disks=[(self.op.disk, self.disk, old_disk_size)])
1552
1553 if self.op.wait_for_sync:
1554 disk_abort = not WaitForSync(self, self.instance, disks=[self.disk])
1555 if disk_abort:
1556 self.LogWarning("Disk syncing has not returned a good status; check"
1557 " the instance")
1558 if not self.instance.disks_active:
1559 _SafeShutdownInstanceDisks(self, self.instance, disks=[self.disk])
1560 elif not self.instance.disks_active:
1561 self.LogWarning("Not shutting down the disk even if the instance is"
1562 " not supposed to be running because no wait for"
1563 " sync mode was requested")
1564
1565 assert self.owned_locks(locking.LEVEL_NODE_RES)
1566 assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1567
1568
1569 class LUInstanceReplaceDisks(LogicalUnit):
1570 """Replace the disks of an instance.
1571
1572 """
1573 HPATH = "mirrors-replace"
1574 HTYPE = constants.HTYPE_INSTANCE
1575 REQ_BGL = False
1576
1577 def CheckArguments(self):
1578 """Check arguments.
1579
1580 """
1581 if self.op.mode == constants.REPLACE_DISK_CHG:
1582 if self.op.remote_node is None and self.op.iallocator is None:
1583 raise errors.OpPrereqError("When changing the secondary either an"
1584 " iallocator script must be used or the"
1585 " new node given", errors.ECODE_INVAL)
1586 else:
1587 CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1588
1589 elif self.op.remote_node is not None or self.op.iallocator is not None:
1590 # Not replacing the secondary
1591 raise errors.OpPrereqError("The iallocator and new node options can"
1592 " only be used when changing the"
1593 " secondary node", errors.ECODE_INVAL)
1594
1595 def ExpandNames(self):
1596 self._ExpandAndLockInstance()
1597
1598 assert locking.LEVEL_NODE not in self.needed_locks
1599 assert locking.LEVEL_NODE_RES not in self.needed_locks
1600 assert locking.LEVEL_NODEGROUP not in self.needed_locks
1601
1602 assert self.op.iallocator is None or self.op.remote_node is None, \
1603 "Conflicting options"
1604
1605 if self.op.remote_node is not None:
1606 (self.op.remote_node_uuid, self.op.remote_node) = \
1607 ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
1608 self.op.remote_node)
1609
1610 # Warning: do not remove the locking of the new secondary here
1611 # unless DRBD8Dev.AddChildren is changed to work in parallel;
1612 # currently it doesn't since parallel invocations of
1613 # FindUnusedMinor will conflict
1614 self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node_uuid]
1615 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1616 else:
1617 self.needed_locks[locking.LEVEL_NODE] = []
1618 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1619
1620 if self.op.iallocator is not None:
1621 # iallocator will select a new node in the same group
1622 self.needed_locks[locking.LEVEL_NODEGROUP] = []
1623 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1624
1625 self.needed_locks[locking.LEVEL_NODE_RES] = []
1626
1627 self.replacer = TLReplaceDisks(self, self.op.instance_uuid,
1628 self.op.instance_name, self.op.mode,
1629 self.op.iallocator, self.op.remote_node_uuid,
1630 self.op.disks, self.op.early_release,
1631 self.op.ignore_ipolicy)
1632
1633 self.tasklets = [self.replacer]
1634
1635 def DeclareLocks(self, level):
1636 if level == locking.LEVEL_NODEGROUP:
1637 assert self.op.remote_node_uuid is None
1638 assert self.op.iallocator is not None
1639 assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1640
1641 self.share_locks[locking.LEVEL_NODEGROUP] = 1
1642 # Lock all groups used by instance optimistically; this requires going
1643 # via the node before it's locked, requiring verification later on
1644 self.needed_locks[locking.LEVEL_NODEGROUP] = \
1645 self.cfg.GetInstanceNodeGroups(self.op.instance_uuid)
1646
1647 elif level == locking.LEVEL_NODE:
1648 if self.op.iallocator is not None:
1649 assert self.op.remote_node_uuid is None
1650 assert not self.needed_locks[locking.LEVEL_NODE]
1651 assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1652
1653 # Lock member nodes of all locked groups
1654 self.needed_locks[locking.LEVEL_NODE] = \
1655 [node_uuid
1656 for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1657 for node_uuid in self.cfg.GetNodeGroup(group_uuid).members]
1658 else:
1659 assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1660
1661 self._LockInstancesNodes()
1662
1663 elif level == locking.LEVEL_NODE_RES:
1664 # Reuse node locks
1665 self.needed_locks[locking.LEVEL_NODE_RES] = \
1666 self.needed_locks[locking.LEVEL_NODE]
1667
1668 def BuildHooksEnv(self):
1669 """Build hooks env.
1670
1671 This runs on the master, the primary and all the secondaries.
1672
1673 """
1674 instance = self.replacer.instance
1675 env = {
1676 "MODE": self.op.mode,
1677 "NEW_SECONDARY": self.op.remote_node,
1678 "OLD_SECONDARY": self.cfg.GetNodeName(instance.secondary_nodes[0]),
1679 }
1680 env.update(BuildInstanceHookEnvByObject(self, instance))
1681 return env
1682
1683 def BuildHooksNodes(self):
1684 """Build hooks nodes.
1685
1686 """
1687 instance = self.replacer.instance
1688 nl = [
1689 self.cfg.GetMasterNode(),
1690 instance.primary_node,
1691 ]
1692 if self.op.remote_node_uuid is not None:
1693 nl.append(self.op.remote_node_uuid)
1694 return nl, nl
1695
1696 def CheckPrereq(self):
1697 """Check prerequisites.
1698
1699 """
1700 assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1701 self.op.iallocator is None)
1702
1703 # Verify if node group locks are still correct
1704 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1705 if owned_groups:
1706 CheckInstanceNodeGroups(self.cfg, self.op.instance_uuid, owned_groups)
1707
1708 return LogicalUnit.CheckPrereq(self)
1709
1710
1711 class LUInstanceActivateDisks(NoHooksLU):
1712 """Bring up an instance's disks.
1713
1714 """
1715 REQ_BGL = False
1716
1717 def ExpandNames(self):
1718 self._ExpandAndLockInstance()
1719 self.needed_locks[locking.LEVEL_NODE] = []
1720 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1721
1722 def DeclareLocks(self, level):
1723 if level == locking.LEVEL_NODE:
1724 self._LockInstancesNodes()
1725
1726 def CheckPrereq(self):
1727 """Check prerequisites.
1728
1729 This checks that the instance is in the cluster.
1730
1731 """
1732 self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1733 assert self.instance is not None, \
1734 "Cannot retrieve locked instance %s" % self.op.instance_name
1735 CheckNodeOnline(self, self.instance.primary_node)
1736
1737 def Exec(self, feedback_fn):
1738 """Activate the disks.
1739
1740 """
1741 disks_ok, disks_info = \
1742 AssembleInstanceDisks(self, self.instance,
1743 ignore_size=self.op.ignore_size)
1744 if not disks_ok:
1745 raise errors.OpExecError("Cannot activate block devices")
1746
1747 if self.op.wait_for_sync:
1748 if not WaitForSync(self, self.instance):
1749 self.cfg.MarkInstanceDisksInactive(self.instance.uuid)
1750 raise errors.OpExecError("Some disks of the instance are degraded!")
1751
1752 return disks_info
1753
1754
1755 class LUInstanceDeactivateDisks(NoHooksLU):
1756 """Shutdown an instance's disks.
1757
1758 """
1759 REQ_BGL = False
1760
1761 def ExpandNames(self):
1762 self._ExpandAndLockInstance()
1763 self.needed_locks[locking.LEVEL_NODE] = []
1764 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1765
1766 def DeclareLocks(self, level):
1767 if level == locking.LEVEL_NODE:
1768 self._LockInstancesNodes()
1769
1770 def CheckPrereq(self):
1771 """Check prerequisites.
1772
1773 This checks that the instance is in the cluster.
1774
1775 """
1776 self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1777 assert self.instance is not None, \
1778 "Cannot retrieve locked instance %s" % self.op.instance_name
1779
1780 def Exec(self, feedback_fn):
1781 """Deactivate the disks
1782
1783 """
1784 if self.op.force:
1785 ShutdownInstanceDisks(self, self.instance)
1786 else:
1787 _SafeShutdownInstanceDisks(self, self.instance)
1788
1789
1790 def _CheckDiskConsistencyInner(lu, instance, dev, node_uuid, on_primary,
1791 ldisk=False):
1792 """Check that mirrors are not degraded.
1793
1794 @attention: The device has to be annotated already.
1795
1796 The ldisk parameter, if True, will change the test from the
1797 is_degraded attribute (which represents overall non-ok status for
1798 the device(s)) to the ldisk (representing the local storage status).
1799
1800 """
1801 lu.cfg.SetDiskID(dev, node_uuid)
1802
1803 result = True
1804
1805 if on_primary or dev.AssembleOnSecondary():
1806 rstats = lu.rpc.call_blockdev_find(node_uuid, dev)
1807 msg = rstats.fail_msg
1808 if msg:
1809 lu.LogWarning("Can't find disk on node %s: %s",
1810 lu.cfg.GetNodeName(node_uuid), msg)
1811 result = False
1812 elif not rstats.payload:
1813 lu.LogWarning("Can't find disk on node %s", lu.cfg.GetNodeName(node_uuid))
1814 result = False
1815 else:
1816 if ldisk:
1817 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1818 else:
1819 result = result and not rstats.payload.is_degraded
1820
1821 if dev.children:
1822 for child in dev.children:
1823 result = result and _CheckDiskConsistencyInner(lu, instance, child,
1824 node_uuid, on_primary)
1825
1826 return result
1827
1828
1829 def CheckDiskConsistency(lu, instance, dev, node_uuid, on_primary, ldisk=False):
1830 """Wrapper around L{_CheckDiskConsistencyInner}.
1831
1832 """
1833 (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1834 return _CheckDiskConsistencyInner(lu, instance, disk, node_uuid, on_primary,
1835 ldisk=ldisk)
1836
1837
1838 def _BlockdevFind(lu, node_uuid, dev, instance):
1839 """Wrapper around call_blockdev_find to annotate diskparams.
1840
1841 @param lu: A reference to the lu object
1842 @param node_uuid: The node to call out
1843 @param dev: The device to find
1844 @param instance: The instance object the device belongs to
1845 @returns The result of the rpc call
1846
1847 """
1848 (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1849 return lu.rpc.call_blockdev_find(node_uuid, disk)
1850
1851
1852 def _GenerateUniqueNames(lu, exts):
1853 """Generate a suitable LV name.
1854
1855 This will generate a logical volume name for the given instance.
1856
1857 """
1858 results = []
1859 for val in exts:
1860 new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1861 results.append("%s%s" % (new_id, val))
1862 return results
1863
1864
1865 class TLReplaceDisks(Tasklet):
1866 """Replaces disks for an instance.
1867
1868 Note: Locking is not within the scope of this class.
1869
1870 """
1871 def __init__(self, lu, instance_uuid, instance_name, mode, iallocator_name,
1872 remote_node_uuid, disks, early_release, ignore_ipolicy):
1873 """Initializes this class.
1874
1875 """
1876 Tasklet.__init__(self, lu)
1877
1878 # Parameters
1879 self.instance_uuid = instance_uuid
1880 self.instance_name = instance_name
1881 self.mode = mode
1882 self.iallocator_name = iallocator_name
1883 self.remote_node_uuid = remote_node_uuid
1884 self.disks = disks
1885 self.early_release = early_release
1886 self.ignore_ipolicy = ignore_ipolicy
1887
1888 # Runtime data
1889 self.instance = None
1890 self.new_node_uuid = None
1891 self.target_node_uuid = None
1892 self.other_node_uuid = None
1893 self.remote_node_info = None
1894 self.node_secondary_ip = None
1895
1896 @staticmethod
1897 def _RunAllocator(lu, iallocator_name, instance_uuid,
1898 relocate_from_node_uuids):
1899 """Compute a new secondary node using an IAllocator.
1900
1901 """
1902 req = iallocator.IAReqRelocate(
1903 inst_uuid=instance_uuid,
1904 relocate_from_node_uuids=list(relocate_from_node_uuids))
1905 ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1906
1907 ial.Run(iallocator_name)
1908
1909 if not ial.success:
1910 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1911 " %s" % (iallocator_name, ial.info),
1912 errors.ECODE_NORES)
1913
1914 remote_node_name = ial.result[0]
1915 remote_node = lu.cfg.GetNodeInfoByName(remote_node_name)
1916
1917 if remote_node is None:
1918 raise errors.OpPrereqError("Node %s not found in configuration" %
1919 remote_node_name, errors.ECODE_NOENT)
1920
1921 lu.LogInfo("Selected new secondary for instance '%s': %s",
1922 instance_uuid, remote_node_name)
1923
1924 return remote_node.uuid
1925
1926 def _FindFaultyDisks(self, node_uuid):
1927 """Wrapper for L{FindFaultyInstanceDisks}.
1928
1929 """
1930 return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1931 node_uuid, True)
1932
1933 def _CheckDisksActivated(self, instance):
1934 """Checks if the instance disks are activated.
1935
1936 @param instance: The instance to check disks
1937 @return: True if they are activated, False otherwise
1938
1939 """
1940 node_uuids = instance.all_nodes
1941
1942 for idx, dev in enumerate(instance.disks):
1943 for node_uuid in node_uuids:
1944 self.lu.LogInfo("Checking disk/%d on %s", idx,
1945 self.cfg.GetNodeName(node_uuid))
1946 self.cfg.SetDiskID(dev, node_uuid)
1947
1948 result = _BlockdevFind(self, node_uuid, dev, instance)
1949
1950 if result.offline:
1951 continue
1952 elif result.fail_msg or not result.payload:
1953 return False
1954
1955 return True
1956
1957 def CheckPrereq(self):
1958 """Check prerequisites.
1959
1960 This checks that the instance is in the cluster.
1961
1962 """
1963 self.instance = self.cfg.GetInstanceInfo(self.instance_uuid)
1964 assert self.instance is not None, \
1965 "Cannot retrieve locked instance %s" % self.instance_name
1966
1967 if self.instance.disk_template != constants.DT_DRBD8:
1968 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1969 " instances", errors.ECODE_INVAL)
1970
1971 if len(self.instance.secondary_nodes) != 1:
1972 raise errors.OpPrereqError("The instance has a strange layout,"
1973 " expected one secondary but found %d" %
1974 len(self.instance.secondary_nodes),
1975 errors.ECODE_FAULT)
1976
1977 secondary_node_uuid = self.instance.secondary_nodes[0]
1978
1979 if self.iallocator_name is None:
1980 remote_node_uuid = self.remote_node_uuid
1981 else:
1982 remote_node_uuid = self._RunAllocator(self.lu, self.iallocator_name,
1983 self.instance.uuid,
1984 self.instance.secondary_nodes)
1985
1986 if remote_node_uuid is None:
1987 self.remote_node_info = None
1988 else:
1989 assert remote_node_uuid in self.lu.owned_locks(locking.LEVEL_NODE), \
1990 "Remote node '%s' is not locked" % remote_node_uuid
1991
1992 self.remote_node_info = self.cfg.GetNodeInfo(remote_node_uuid)
1993 assert self.remote_node_info is not None, \
1994 "Cannot retrieve locked node %s" % remote_node_uuid
1995
1996 if remote_node_uuid == self.instance.primary_node:
1997 raise errors.OpPrereqError("The specified node is the primary node of"
1998 " the instance", errors.ECODE_INVAL)
1999
2000 if remote_node_uuid == secondary_node_uuid:
2001 raise errors.OpPrereqError("The specified node is already the"
2002 " secondary node of the instance",
2003 errors.ECODE_INVAL)
2004
2005 if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
2006 constants.REPLACE_DISK_CHG):
2007 raise errors.OpPrereqError("Cannot specify disks to be replaced",
2008 errors.ECODE_INVAL)
2009
2010 if self.mode == constants.REPLACE_DISK_AUTO:
2011 if not self._CheckDisksActivated(self.instance):
2012 raise errors.OpPrereqError("Please run activate-disks on instance %s"
2013 " first" % self.instance_name,
2014 errors.ECODE_STATE)
2015 faulty_primary = self._FindFaultyDisks(self.instance.primary_node)
2016 faulty_secondary = self._FindFaultyDisks(secondary_node_uuid)
2017
2018 if faulty_primary and faulty_secondary:
2019 raise errors.OpPrereqError("Instance %s has faulty disks on more than"
2020 " one node and can not be repaired"
2021 " automatically" % self.instance_name,
2022 errors.ECODE_STATE)
2023
2024 if faulty_primary:
2025 self.disks = faulty_primary
2026 self.target_node_uuid = self.instance.primary_node
2027 self.other_node_uuid = secondary_node_uuid
2028 check_nodes = [self.target_node_uuid, self.other_node_uuid]
2029 elif faulty_secondary:
2030 self.disks = faulty_secondary
2031 self.target_node_uuid = secondary_node_uuid
2032 self.other_node_uuid = self.instance.primary_node
2033 check_nodes = [self.target_node_uuid, self.other_node_uuid]
2034 else:
2035 self.disks = []
2036 check_nodes = []
2037
2038 else:
2039 # Non-automatic modes
2040 if self.mode == constants.REPLACE_DISK_PRI:
2041 self.target_node_uuid = self.instance.primary_node
2042 self.other_node_uuid = secondary_node_uuid
2043 check_nodes = [self.target_node_uuid, self.other_node_uuid]
2044
2045 elif self.mode == constants.REPLACE_DISK_SEC:
2046 self.target_node_uuid = secondary_node_uuid
2047 self.other_node_uuid = self.instance.primary_node
2048 check_nodes = [self.target_node_uuid, self.other_node_uuid]
2049
2050 elif self.mode == constants.REPLACE_DISK_CHG:
2051 self.new_node_uuid = remote_node_uuid
2052 self.other_node_uuid = self.instance.primary_node
2053 self.target_node_uuid = secondary_node_uuid
2054 check_nodes = [self.new_node_uuid, self.other_node_uuid]
2055
2056 CheckNodeNotDrained(self.lu, remote_node_uuid)
2057 CheckNodeVmCapable(self.lu, remote_node_uuid)
2058
2059 old_node_info = self.cfg.GetNodeInfo(secondary_node_uuid)
2060 assert old_node_info is not None
2061 if old_node_info.offline and not self.early_release:
2062 # doesn't make sense to delay the release
2063 self.early_release = True
2064 self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
2065 " early-release mode", secondary_node_uuid)
2066
2067 else:
2068 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
2069 self.mode)
2070
2071 # If not specified all disks should be replaced
2072 if not self.disks:
2073 self.disks = range(len(self.instance.disks))
2074
2075 # TODO: This is ugly, but right now we can't distinguish between internal
2076 # submitted opcode and external one. We should fix that.
2077 if self.remote_node_info:
2078 # We change the node, lets verify it still meets instance policy
2079 new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
2080 cluster = self.cfg.GetClusterInfo()
2081 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2082 new_group_info)
2083 CheckTargetNodeIPolicy(self.lu, ipolicy, self.instance,
2084 self.remote_node_info, self.cfg,
2085 ignore=self.ignore_ipolicy)
2086
2087 for node_uuid in check_nodes:
2088 CheckNodeOnline(self.lu, node_uuid)
2089
2090 touched_nodes = frozenset(node_uuid for node_uuid in [self.new_node_uuid,
2091 self.other_node_uuid,
2092 self.target_node_uuid]
2093 if node_uuid is not None)
2094
2095 # Release unneeded node and node resource locks
2096 ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
2097 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
2098 ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
2099
2100 # Release any owned node group
2101 ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
2102
2103 # Check whether disks are valid
2104 for disk_idx in self.disks:
2105 self.instance.FindDisk(disk_idx)
2106
2107 # Get secondary node IP addresses
2108 self.node_secondary_ip = dict((uuid, node.secondary_ip) for (uuid, node)
2109 in self.cfg.GetMultiNodeInfo(touched_nodes))
2110
2111 def Exec(self, feedback_fn):
2112 """Execute disk replacement.
2113
2114 This dispatches the disk replacement to the appropriate handler.
2115
2116 """
2117 if __debug__:
2118 # Verify owned locks before starting operation
2119 owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2120 assert set(owned_nodes) == set(self.node_secondary_ip), \
2121 ("Incorrect node locks, owning %s, expected %s" %
2122 (owned_nodes, self.node_secondary_ip.keys()))
2123 assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2124 self.lu.owned_locks(locking.LEVEL_NODE_RES))
2125 assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2126
2127 owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2128 assert list(owned_instances) == [self.instance_name], \
2129 "Instance '%s' not locked" % self.instance_name
2130
2131 assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
2132 "Should not own any node group lock at this point"
2133
2134 if not self.disks:
2135 feedback_fn("No disks need replacement for instance '%s'" %
2136 self.instance.name)
2137 return
2138
2139 feedback_fn("Replacing disk(s) %s for instance '%s'" %
2140 (utils.CommaJoin(self.disks), self.instance.name))
2141 feedback_fn("Current primary node: %s" %
2142 self.cfg.GetNodeName(self.instance.primary_node))
2143 feedback_fn("Current seconary node: %s" %
2144 utils.CommaJoin(self.cfg.GetNodeNames(
2145 self.instance.secondary_nodes)))
2146
2147 activate_disks = not self.instance.disks_active
2148
2149 # Activate the instance disks if we're replacing them on a down instance
2150 if activate_disks:
2151 StartInstanceDisks(self.lu, self.instance, True)
2152
2153 try:
2154 # Should we replace the secondary node?
2155 if self.new_node_uuid is not None:
2156 fn = self._ExecDrbd8Secondary
2157 else:
2158 fn = self._ExecDrbd8DiskOnly
2159
2160 result = fn(feedback_fn)
2161 finally:
2162 # Deactivate the instance disks if we're replacing them on a
2163 # down instance
2164 if activate_disks:
2165 _SafeShutdownInstanceDisks(self.lu, self.instance)
2166
2167 assert not self.lu.owned_locks(locking.LEVEL_NODE)
2168
2169 if __debug__:
2170 # Verify owned locks
2171 owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2172 nodes = frozenset(self.node_secondary_ip)
2173 assert ((self.early_release and not owned_nodes) or
2174 (not self.early_release and not (set(owned_nodes) - nodes))), \
2175 ("Not owning the correct locks, early_release=%s, owned=%r,"
2176 " nodes=%r" % (self.early_release, owned_nodes, nodes))
2177
2178 return result
2179
2180 def _CheckVolumeGroup(self, node_uuids):
2181 self.lu.LogInfo("Checking volume groups")
2182
2183 vgname = self.cfg.GetVGName()
2184
2185 # Make sure volume group exists on all involved nodes
2186 results = self.rpc.call_vg_list(node_uuids)
2187 if not results:
2188 raise errors.OpExecError("Can't list volume groups on the nodes")
2189
2190 for node_uuid in node_uuids:
2191 res = results[node_uuid]
2192 res.Raise("Error checking node %s" % self.cfg.GetNodeName(node_uuid))
2193 if vgname not in res.payload:
2194 raise errors.OpExecError("Volume group '%s' not found on node %s" %
2195 (vgname, self.cfg.GetNodeName(node_uuid)))
2196
2197 def _CheckDisksExistence(self, node_uuids):
2198 # Check disk existence
2199 for idx, dev in enumerate(self.instance.disks):
2200 if idx not in self.disks:
2201 continue
2202
2203 for node_uuid in node_uuids:
2204 self.lu.LogInfo("Checking disk/%d on %s", idx,
2205 self.cfg.GetNodeName(node_uuid))
2206 self.cfg.SetDiskID(dev, node_uuid)
2207
2208 result = _BlockdevFind(self, node_uuid, dev, self.instance)
2209
2210 msg = result.fail_msg
2211 if msg or not result.payload:
2212 if not msg:
2213 msg = "disk not found"
2214 if not self._CheckDisksActivated(self.instance):
2215 extra_hint = ("\nDisks seem to be not properly activated. Try"
2216 " running activate-disks on the instance before"
2217 " using replace-disks.")
2218 else:
2219 extra_hint = ""
2220 raise errors.OpExecError("Can't find disk/%d on node %s: %s%s" %
2221 (idx, self.cfg.GetNodeName(node_uuid), msg,
2222 extra_hint))
2223
2224 def _CheckDisksConsistency(self, node_uuid, on_primary, ldisk):
2225 for idx, dev in enumerate(self.instance.disks):
2226 if idx not in self.disks:
2227 continue
2228
2229 self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2230 (idx, self.cfg.GetNodeName(node_uuid)))
2231
2232 if not CheckDiskConsistency(self.lu, self.instance, dev, node_uuid,
2233 on_primary, ldisk=ldisk):
2234 raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2235 " replace disks for instance %s" %
2236 (self.cfg.GetNodeName(node_uuid),
2237 self.instance.name))
2238
2239 def _CreateNewStorage(self, node_uuid):
2240 """Create new storage on the primary or secondary node.
2241
2242 This is only used for same-node replaces, not for changing the
2243 secondary node, hence we don't want to modify the existing disk.
2244
2245 """
2246 iv_names = {}
2247
2248 disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2249 for idx, dev in enumerate(disks):
2250 if idx not in self.disks:
2251 continue
2252
2253 self.lu.LogInfo("Adding storage on %s for disk/%d",
2254 self.cfg.GetNodeName(node_uuid), idx)
2255
2256 self.cfg.SetDiskID(dev, node_uuid)
2257
2258 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2259 names = _GenerateUniqueNames(self.lu, lv_names)
2260
2261 (data_disk, meta_disk) = dev.children
2262 vg_data = data_disk.logical_id[0]
2263 lv_data = objects.Disk(dev_type=constants.DT_PLAIN, size=dev.size,
2264 logical_id=(vg_data, names[0]),
2265 params=data_disk.params)
2266 vg_meta = meta_disk.logical_id[0]
2267 lv_meta = objects.Disk(dev_type=constants.DT_PLAIN,
2268 size=constants.DRBD_META_SIZE,
2269 logical_id=(vg_meta, names[1]),
2270 params=meta_disk.params)
2271
2272 new_lvs = [lv_data, lv_meta]
2273 old_lvs = [child.Copy() for child in dev.children]
2274 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2275 excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg, node_uuid)
2276
2277 # we pass force_create=True to force the LVM creation
2278 for new_lv in new_lvs:
2279 try:
2280 _CreateBlockDevInner(self.lu, node_uuid, self.instance, new_lv, True,
2281 GetInstanceInfoText(self.instance), False,
2282 excl_stor)
2283 except errors.DeviceCreationError, e:
2284 raise errors.OpExecError("Can't create block device: %s" % e.message)
2285
2286 return iv_names
2287
2288 def _CheckDevices(self, node_uuid, iv_names):
2289 for name, (dev, _, _) in iv_names.iteritems():
2290 self.cfg.SetDiskID(dev, node_uuid)
2291
2292 result = _BlockdevFind(self, node_uuid, dev, self.instance)
2293
2294 msg = result.fail_msg
2295 if msg or not result.payload:
2296 if not msg:
2297 msg = "disk not found"
2298 raise errors.OpExecError("Can't find DRBD device %s: %s" %
2299 (name, msg))
2300
2301 if result.payload.is_degraded:
2302 raise errors.OpExecError("DRBD device %s is degraded!" % name)
2303
2304 def _RemoveOldStorage(self, node_uuid, iv_names):
2305 for name, (_, old_lvs, _) in iv_names.iteritems():
2306 self.lu.LogInfo("Remove logical volumes for %s", name)
2307
2308 for lv in old_lvs:
2309 self.cfg.SetDiskID(lv, node_uuid)
2310
2311 msg = self.rpc.call_blockdev_remove(node_uuid, lv).fail_msg
2312 if msg:
2313 self.lu.LogWarning("Can't remove old LV: %s", msg,
2314 hint="remove unused LVs manually")
2315
2316 def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2317 """Replace a disk on the primary or secondary for DRBD 8.
2318
2319 The algorithm for replace is quite complicated:
2320
2321 1. for each disk to be replaced:
2322
2323 1. create new LVs on the target node with unique names
2324 1. detach old LVs from the drbd device
2325 1. rename old LVs to name_replaced.<time_t>
2326 1. rename new LVs to old LVs
2327 1. attach the new LVs (with the old names now) to the drbd device
2328
2329 1. wait for sync across all devices
2330
2331 1. for each modified disk:
2332
2333 1. remove old LVs (which have the name name_replaces.<time_t>)
2334
2335 Failures are not very well handled.
2336
2337 """
2338 steps_total = 6
2339
2340 # Step: check device activation
2341 self.lu.LogStep(1, steps_total, "Check device existence")
2342 self._CheckDisksExistence([self.other_node_uuid, self.target_node_uuid])
2343 self._CheckVolumeGroup([self.target_node_uuid, self.other_node_uuid])
2344
2345 # Step: check other node consistency
2346 self.lu.LogStep(2, steps_total, "Check peer consistency")
2347 self._CheckDisksConsistency(
2348 self.other_node_uuid, self.other_node_uuid == self.instance.primary_node,
2349 False)
2350
2351 # Step: create new storage
2352 self.lu.LogStep(3, steps_total, "Allocate new storage")
2353 iv_names = self._CreateNewStorage(self.target_node_uuid)
2354
2355 # Step: for each lv, detach+rename*2+attach
2356 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2357 for dev, old_lvs, new_lvs in iv_names.itervalues():
2358 self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2359
2360 result = self.rpc.call_blockdev_removechildren(self.target_node_uuid, dev,
2361 old_lvs)
2362 result.Raise("Can't detach drbd from local storage on node"
2363 " %s for device %s" %
2364 (self.cfg.GetNodeName(self.target_node_uuid), dev.iv_name))
2365 #dev.children = []
2366 #cfg.Update(instance)
2367
2368 # ok, we created the new LVs, so now we know we have the needed
2369 # storage; as such, we proceed on the target node to rename
2370 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2371 # using the assumption that logical_id == physical_id (which in
2372 # turn is the unique_id on that node)
2373
2374 # FIXME(iustin): use a better name for the replaced LVs
2375 temp_suffix = int(time.time())
2376 ren_fn = lambda d, suff: (d.physical_id[0],
2377 d.physical_id[1] + "_replaced-%s" % suff)
2378
2379 # Build the rename list based on what LVs exist on the node
2380 rename_old_to_new = []
2381 for to_ren in old_lvs:
2382 result = self.rpc.call_blockdev_find(self.target_node_uuid, to_ren)
2383 if not result.fail_msg and result.payload:
2384 # device exists
2385 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2386
2387 self.lu.LogInfo("Renaming the old LVs on the target node")
2388 result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2389 rename_old_to_new)
2390 result.Raise("Can't rename old LVs on node %s" %
2391 self.cfg.GetNodeName(self.target_node_uuid))
2392
2393 # Now we rename the new LVs to the old LVs
2394 self.lu.LogInfo("Renaming the new LVs on the target node")
2395 rename_new_to_old = [(new, old.physical_id)
2396 for old, new in zip(old_lvs, new_lvs)]
2397 result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2398 rename_new_to_old)
2399 result.Raise("Can't rename new LVs on node %s" %
2400 self.cfg.GetNodeName(self.target_node_uuid))
2401
2402 # Intermediate steps of in memory modifications
2403 for old, new in zip(old_lvs, new_lvs):
2404 new.logical_id = old.logical_id
2405 self.cfg.SetDiskID(new, self.target_node_uuid)
2406
2407 # We need to modify old_lvs so that removal later removes the
2408 # right LVs, not the newly added ones; note that old_lvs is a
2409 # copy here
2410 for disk in old_lvs:
2411 disk.logical_id = ren_fn(disk, temp_suffix)
2412 self.cfg.SetDiskID(disk, self.target_node_uuid)
2413
2414 # Now that the new lvs have the old name, we can add them to the device
2415 self.lu.LogInfo("Adding new mirror component on %s",
2416 self.cfg.GetNodeName(self.target_node_uuid))
2417 result = self.rpc.call_blockdev_addchildren(self.target_node_uuid,
2418 (dev, self.instance), new_lvs)
2419 msg = result.fail_msg
2420 if msg:
2421 for new_lv in new_lvs:
2422 msg2 = self.rpc.call_blockdev_remove(self.target_node_uuid,
2423 new_lv).fail_msg
2424 if msg2:
2425 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2426 hint=("cleanup manually the unused logical"
2427 "volumes"))
2428 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2429
2430 cstep = itertools.count(5)
2431
2432 if self.early_release:
2433 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2434 self._RemoveOldStorage(self.target_node_uuid, iv_names)
2435 # TODO: Check if releasing locks early still makes sense
2436 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2437 else:
2438 # Release all resource locks except those used by the instance
2439 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2440 keep=self.node_secondary_ip.keys())
2441
2442 # Release all node locks while waiting for sync
2443 ReleaseLocks(self.lu, locking.LEVEL_NODE)
2444
2445 # TODO: Can the instance lock be downgraded here? Take the optional disk
2446 # shutdown in the caller into consideration.
2447
2448 # Wait for sync
2449 # This can fail as the old devices are degraded and _WaitForSync
2450 # does a combined result over all disks, so we don't check its return value
2451 self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2452 WaitForSync(self.lu, self.instance)
2453
2454 # Check all devices manually
2455 self._CheckDevices(self.instance.primary_node, iv_names)
2456
2457 # Step: remove old storage
2458 if not self.early_release:
2459 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2460 self._RemoveOldStorage(self.target_node_uuid, iv_names)
2461
2462 def _ExecDrbd8Secondary(self, feedback_fn):
2463 """Replace the secondary node for DRBD 8.
2464
2465 The algorithm for replace is quite complicated:
2466 - for all disks of the instance:
2467 - create new LVs on the new node with same names
2468 - shutdown the drbd device on the old secondary
2469 - disconnect the drbd network on the primary
2470 - create the drbd device on the new secondary
2471 - network attach the drbd on the primary, using an artifice:
2472 the drbd code for Attach() will connect to the network if it
2473 finds a device which is connected to the good local disks but
2474 not network enabled
2475 - wait for sync across all devices
2476 - remove all disks from the old secondary
2477
2478 Failures are not very well handled.
2479
2480 """
2481 steps_total = 6
2482
2483 pnode = self.instance.primary_node
2484
2485 # Step: check device activation
2486 self.lu.LogStep(1, steps_total, "Check device existence")
2487 self._CheckDisksExistence([self.instance.primary_node])
2488 self._CheckVolumeGroup([self.instance.primary_node])
2489
2490 # Step: check other node consistency
2491 self.lu.LogStep(2, steps_total, "Check peer consistency")
2492 self._CheckDisksConsistency(self.instance.primary_node, True, True)
2493
2494 # Step: create new storage
2495 self.lu.LogStep(3, steps_total, "Allocate new storage")
2496 disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2497 excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg,
2498 self.new_node_uuid)
2499 for idx, dev in enumerate(disks):
2500 self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2501 (self.cfg.GetNodeName(self.new_node_uuid), idx))
2502 # we pass force_create=True to force LVM creation
2503 for new_lv in dev.children:
2504 try:
2505 _CreateBlockDevInner(self.lu, self.new_node_uuid, self.instance,
2506 new_lv, True, GetInstanceInfoText(self.instance),
2507 False, excl_stor)
2508 except errors.DeviceCreationError, e:
2509 raise errors.OpExecError("Can't create block device: %s" % e.message)
2510
2511 # Step 4: dbrd minors and drbd setups changes
2512 # after this, we must manually remove the drbd minors on both the
2513 # error and the success paths
2514 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2515 minors = self.cfg.AllocateDRBDMinor([self.new_node_uuid
2516 for _ in self.instance.disks],
2517 self.instance.uuid)
2518 logging.debug("Allocated minors %r", minors)
2519
2520 iv_names = {}
2521 for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2522 self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2523 (self.cfg.GetNodeName(self.new_node_uuid), idx))
2524 # create new devices on new_node; note that we create two IDs:
2525 # one without port, so the drbd will be activated without
2526 # networking information on the new node at this stage, and one
2527 # with network, for the latter activation in step 4
2528 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2529 if self.instance.primary_node == o_node1:
2530 p_minor = o_minor1
2531 else:
2532 assert self.instance.primary_node == o_node2, "Three-node instance?"
2533 p_minor = o_minor2
2534
2535 new_alone_id = (self.instance.primary_node, self.new_node_uuid, None,
2536 p_minor, new_minor, o_secret)
2537 new_net_id = (self.instance.primary_node, self.new_node_uuid, o_port,
2538 p_minor, new_minor, o_secret)
2539
2540 iv_names[idx] = (dev, dev.children, new_net_id)
2541 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2542 new_net_id)
2543 new_drbd = objects.Disk(dev_type=constants.DT_DRBD8,
2544 logical_id=new_alone_id,
2545 children=dev.children,
2546 size=dev.size,
2547 params={})
2548 (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2549 self.cfg)
2550 try:
2551 CreateSingleBlockDev(self.lu, self.new_node_uuid, self.instance,
2552 anno_new_drbd,
2553 GetInstanceInfoText(self.instance), False,
2554 excl_stor)
2555 except errors.GenericError:
2556 self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2557 raise
2558
2559 # We have new devices, shutdown the drbd on the old secondary
2560 for idx, dev in enumerate(self.instance.disks):
2561 self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2562 self.cfg.SetDiskID(dev, self.target_node_uuid)
2563 msg = self.rpc.call_blockdev_shutdown(self.target_node_uuid,
2564 (dev, self.instance)).fail_msg
2565 if msg:
2566 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2567 "node: %s" % (idx, msg),
2568 hint=("Please cleanup this device manually as"
2569 " soon as possible"))
2570
2571 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2572 result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip,
2573 self.instance.disks)[pnode]
2574
2575 msg = result.fail_msg
2576 if msg:
2577 # detaches didn't succeed (unlikely)
2578 self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2579 raise errors.OpExecError("Can't detach the disks from the network on"
2580 " old node: %s" % (msg,))
2581
2582 # if we managed to detach at least one, we update all the disks of
2583 # the instance to point to the new secondary
2584 self.lu.LogInfo("Updating instance configuration")
2585 for dev, _, new_logical_id in iv_names.itervalues():
2586 dev.logical_id = new_logical_id
2587 self.cfg.SetDiskID(dev, self.instance.primary_node)
2588
2589 self.cfg.Update(self.instance, feedback_fn)
2590
2591 # Release all node locks (the configuration has been updated)
2592 ReleaseLocks(self.lu, locking.LEVEL_NODE)
2593
2594 # and now perform the drbd attach
2595 self.lu.LogInfo("Attaching primary drbds to new secondary"
2596 " (standalone => connected)")
2597 result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2598 self.new_node_uuid],
2599 self.node_secondary_ip,
2600 (self.instance.disks, self.instance),
2601 self.instance.name,
2602 False)
2603 for to_node, to_result in result.items():
2604 msg = to_result.fail_msg
2605 if msg:
2606 self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
2607 self.cfg.GetNodeName(to_node), msg,
2608 hint=("please do a gnt-instance info to see the"
2609 " status of disks"))
2610
2611 cstep = itertools.count(5)
2612
2613 if self.early_release:
2614 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2615 self._RemoveOldStorage(self.target_node_uuid, iv_names)
2616 # TODO: Check if releasing locks early still makes sense
2617 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2618 else:
2619 # Release all resource locks except those used by the instance
2620 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2621 keep=self.node_secondary_ip.keys())
2622
2623 # TODO: Can the instance lock be downgraded here? Take the optional disk
2624 # shutdown in the caller into consideration.
2625
2626 # Wait for sync
2627 # This can fail as the old devices are degraded and _WaitForSync
2628 # does a combined result over all disks, so we don't check its return value
2629 self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2630 WaitForSync(self.lu, self.instance)
2631
2632 # Check all devices manually
2633 self._CheckDevices(self.instance.primary_node, iv_names)
2634
2635 # Step: remove old storage
2636 if not self.early_release:
2637 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2638 self._RemoveOldStorage(self.target_node_uuid, iv_names)