Fix typo in secondary
[ganeti-github.git] / lib / cmdlib / instance_storage.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions are
9 # met:
10 #
11 # 1. Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
13 #
14 # 2. Redistributions in binary form must reproduce the above copyright
15 # notice, this list of conditions and the following disclaimer in the
16 # documentation and/or other materials provided with the distribution.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
19 # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
20 # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30
31 """Logical units dealing with storage of instances."""
32
33 import itertools
34 import logging
35 import os
36 import time
37
38 from ganeti import compat
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import ht
42 from ganeti import locking
43 from ganeti.masterd import iallocator
44 from ganeti import objects
45 from ganeti import utils
46 from ganeti import rpc
47 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
48 from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
49 AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeUuidAndName, \
50 CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
51 IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks, GetWantedNodes, \
52 CheckDiskTemplateEnabled
53 from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
54 CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
55 BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
56
57 import ganeti.masterd.instance
58
59
60 _DISK_TEMPLATE_NAME_PREFIX = {
61 constants.DT_PLAIN: "",
62 constants.DT_RBD: ".rbd",
63 constants.DT_EXT: ".ext",
64 constants.DT_FILE: ".file",
65 constants.DT_SHARED_FILE: ".sharedfile",
66 }
67
68
69 def CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
70 excl_stor):
71 """Create a single block device on a given node.
72
73 This will not recurse over children of the device, so they must be
74 created in advance.
75
76 @param lu: the lu on whose behalf we execute
77 @param node_uuid: the node on which to create the device
78 @type instance: L{objects.Instance}
79 @param instance: the instance which owns the device
80 @type device: L{objects.Disk}
81 @param device: the device to create
82 @param info: the extra 'metadata' we should attach to the device
83 (this will be represented as a LVM tag)
84 @type force_open: boolean
85 @param force_open: this parameter will be passes to the
86 L{backend.BlockdevCreate} function where it specifies
87 whether we run on primary or not, and it affects both
88 the child assembly and the device own Open() execution
89 @type excl_stor: boolean
90 @param excl_stor: Whether exclusive_storage is active for the node
91
92 """
93 result = lu.rpc.call_blockdev_create(node_uuid, (device, instance),
94 device.size, instance.name, force_open,
95 info, excl_stor)
96 result.Raise("Can't create block device %s on"
97 " node %s for instance %s" % (device,
98 lu.cfg.GetNodeName(node_uuid),
99 instance.name))
100
101
102 def _CreateBlockDevInner(lu, node_uuid, instance, device, force_create,
103 info, force_open, excl_stor):
104 """Create a tree of block devices on a given node.
105
106 If this device type has to be created on secondaries, create it and
107 all its children.
108
109 If not, just recurse to children keeping the same 'force' value.
110
111 @attention: The device has to be annotated already.
112
113 @param lu: the lu on whose behalf we execute
114 @param node_uuid: the node on which to create the device
115 @type instance: L{objects.Instance}
116 @param instance: the instance which owns the device
117 @type device: L{objects.Disk}
118 @param device: the device to create
119 @type force_create: boolean
120 @param force_create: whether to force creation of this device; this
121 will be change to True whenever we find a device which has
122 CreateOnSecondary() attribute
123 @param info: the extra 'metadata' we should attach to the device
124 (this will be represented as a LVM tag)
125 @type force_open: boolean
126 @param force_open: this parameter will be passes to the
127 L{backend.BlockdevCreate} function where it specifies
128 whether we run on primary or not, and it affects both
129 the child assembly and the device own Open() execution
130 @type excl_stor: boolean
131 @param excl_stor: Whether exclusive_storage is active for the node
132
133 @return: list of created devices
134 """
135 created_devices = []
136 try:
137 if device.CreateOnSecondary():
138 force_create = True
139
140 if device.children:
141 for child in device.children:
142 devs = _CreateBlockDevInner(lu, node_uuid, instance, child,
143 force_create, info, force_open, excl_stor)
144 created_devices.extend(devs)
145
146 if not force_create:
147 return created_devices
148
149 CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
150 excl_stor)
151 # The device has been completely created, so there is no point in keeping
152 # its subdevices in the list. We just add the device itself instead.
153 created_devices = [(node_uuid, device)]
154 return created_devices
155
156 except errors.DeviceCreationError, e:
157 e.created_devices.extend(created_devices)
158 raise e
159 except errors.OpExecError, e:
160 raise errors.DeviceCreationError(str(e), created_devices)
161
162
163 def IsExclusiveStorageEnabledNodeUuid(cfg, node_uuid):
164 """Whether exclusive_storage is in effect for the given node.
165
166 @type cfg: L{config.ConfigWriter}
167 @param cfg: The cluster configuration
168 @type node_uuid: string
169 @param node_uuid: The node UUID
170 @rtype: bool
171 @return: The effective value of exclusive_storage
172 @raise errors.OpPrereqError: if no node exists with the given name
173
174 """
175 ni = cfg.GetNodeInfo(node_uuid)
176 if ni is None:
177 raise errors.OpPrereqError("Invalid node UUID %s" % node_uuid,
178 errors.ECODE_NOENT)
179 return IsExclusiveStorageEnabledNode(cfg, ni)
180
181
182 def _CreateBlockDev(lu, node_uuid, instance, device, force_create, info,
183 force_open):
184 """Wrapper around L{_CreateBlockDevInner}.
185
186 This method annotates the root device first.
187
188 """
189 (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
190 excl_stor = IsExclusiveStorageEnabledNodeUuid(lu.cfg, node_uuid)
191 return _CreateBlockDevInner(lu, node_uuid, instance, disk, force_create, info,
192 force_open, excl_stor)
193
194
195 def _UndoCreateDisks(lu, disks_created, instance):
196 """Undo the work performed by L{CreateDisks}.
197
198 This function is called in case of an error to undo the work of
199 L{CreateDisks}.
200
201 @type lu: L{LogicalUnit}
202 @param lu: the logical unit on whose behalf we execute
203 @param disks_created: the result returned by L{CreateDisks}
204 @type instance: L{objects.Instance}
205 @param instance: the instance for which disks were created
206
207 """
208 for (node_uuid, disk) in disks_created:
209 result = lu.rpc.call_blockdev_remove(node_uuid, (disk, instance))
210 result.Warn("Failed to remove newly-created disk %s on node %s" %
211 (disk, lu.cfg.GetNodeName(node_uuid)), logging.warning)
212
213
214 def CreateDisks(lu, instance, to_skip=None, target_node_uuid=None, disks=None):
215 """Create all disks for an instance.
216
217 This abstracts away some work from AddInstance.
218
219 @type lu: L{LogicalUnit}
220 @param lu: the logical unit on whose behalf we execute
221 @type instance: L{objects.Instance}
222 @param instance: the instance whose disks we should create
223 @type to_skip: list
224 @param to_skip: list of indices to skip
225 @type target_node_uuid: string
226 @param target_node_uuid: if passed, overrides the target node for creation
227 @type disks: list of {objects.Disk}
228 @param disks: the disks to create; if not specified, all the disks of the
229 instance are created
230 @return: information about the created disks, to be used to call
231 L{_UndoCreateDisks}
232 @raise errors.OpPrereqError: in case of error
233
234 """
235 info = GetInstanceInfoText(instance)
236 if target_node_uuid is None:
237 pnode_uuid = instance.primary_node
238 all_node_uuids = instance.all_nodes
239 else:
240 pnode_uuid = target_node_uuid
241 all_node_uuids = [pnode_uuid]
242
243 if disks is None:
244 disks = instance.disks
245
246 CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), instance.disk_template)
247
248 if instance.disk_template in constants.DTS_FILEBASED:
249 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
250 result = lu.rpc.call_file_storage_dir_create(pnode_uuid, file_storage_dir)
251
252 result.Raise("Failed to create directory '%s' on"
253 " node %s" % (file_storage_dir,
254 lu.cfg.GetNodeName(pnode_uuid)))
255
256 disks_created = []
257 for idx, device in enumerate(disks):
258 if to_skip and idx in to_skip:
259 continue
260 logging.info("Creating disk %s for instance '%s'", idx, instance.name)
261 for node_uuid in all_node_uuids:
262 f_create = node_uuid == pnode_uuid
263 try:
264 _CreateBlockDev(lu, node_uuid, instance, device, f_create, info,
265 f_create)
266 disks_created.append((node_uuid, device))
267 except errors.DeviceCreationError, e:
268 logging.warning("Creating disk %s for instance '%s' failed",
269 idx, instance.name)
270 disks_created.extend(e.created_devices)
271 _UndoCreateDisks(lu, disks_created, instance)
272 raise errors.OpExecError(e.message)
273 return disks_created
274
275
276 def ComputeDiskSizePerVG(disk_template, disks):
277 """Compute disk size requirements in the volume group
278
279 """
280 def _compute(disks, payload):
281 """Universal algorithm.
282
283 """
284 vgs = {}
285 for disk in disks:
286 vgs[disk[constants.IDISK_VG]] = \
287 vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
288
289 return vgs
290
291 # Required free disk space as a function of disk and swap space
292 req_size_dict = {
293 constants.DT_DISKLESS: {},
294 constants.DT_PLAIN: _compute(disks, 0),
295 # 128 MB are added for drbd metadata for each disk
296 constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
297 constants.DT_FILE: {},
298 constants.DT_SHARED_FILE: {},
299 }
300
301 if disk_template not in req_size_dict:
302 raise errors.ProgrammerError("Disk template '%s' size requirement"
303 " is unknown" % disk_template)
304
305 return req_size_dict[disk_template]
306
307
308 def ComputeDisks(op, default_vg):
309 """Computes the instance disks.
310
311 @param op: The instance opcode
312 @param default_vg: The default_vg to assume
313
314 @return: The computed disks
315
316 """
317 disks = []
318 for disk in op.disks:
319 mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
320 if mode not in constants.DISK_ACCESS_SET:
321 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
322 mode, errors.ECODE_INVAL)
323 size = disk.get(constants.IDISK_SIZE, None)
324 if size is None:
325 raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
326 try:
327 size = int(size)
328 except (TypeError, ValueError):
329 raise errors.OpPrereqError("Invalid disk size '%s'" % size,
330 errors.ECODE_INVAL)
331
332 ext_provider = disk.get(constants.IDISK_PROVIDER, None)
333 if ext_provider and op.disk_template != constants.DT_EXT:
334 raise errors.OpPrereqError("The '%s' option is only valid for the %s"
335 " disk template, not %s" %
336 (constants.IDISK_PROVIDER, constants.DT_EXT,
337 op.disk_template), errors.ECODE_INVAL)
338
339 data_vg = disk.get(constants.IDISK_VG, default_vg)
340 name = disk.get(constants.IDISK_NAME, None)
341 if name is not None and name.lower() == constants.VALUE_NONE:
342 name = None
343 new_disk = {
344 constants.IDISK_SIZE: size,
345 constants.IDISK_MODE: mode,
346 constants.IDISK_VG: data_vg,
347 constants.IDISK_NAME: name,
348 }
349
350 for key in [
351 constants.IDISK_METAVG,
352 constants.IDISK_ADOPT,
353 constants.IDISK_SPINDLES,
354 ]:
355 if key in disk:
356 new_disk[key] = disk[key]
357
358 # For extstorage, demand the `provider' option and add any
359 # additional parameters (ext-params) to the dict
360 if op.disk_template == constants.DT_EXT:
361 if ext_provider:
362 new_disk[constants.IDISK_PROVIDER] = ext_provider
363 for key in disk:
364 if key not in constants.IDISK_PARAMS:
365 new_disk[key] = disk[key]
366 else:
367 raise errors.OpPrereqError("Missing provider for template '%s'" %
368 constants.DT_EXT, errors.ECODE_INVAL)
369
370 disks.append(new_disk)
371
372 return disks
373
374
375 def CheckRADOSFreeSpace():
376 """Compute disk size requirements inside the RADOS cluster.
377
378 """
379 # For the RADOS cluster we assume there is always enough space.
380 pass
381
382
383 def _GenerateDRBD8Branch(lu, primary_uuid, secondary_uuid, size, vgnames, names,
384 iv_name, p_minor, s_minor):
385 """Generate a drbd8 device complete with its children.
386
387 """
388 assert len(vgnames) == len(names) == 2
389 port = lu.cfg.AllocatePort()
390 shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
391
392 dev_data = objects.Disk(dev_type=constants.DT_PLAIN, size=size,
393 logical_id=(vgnames[0], names[0]),
394 params={})
395 dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
396 dev_meta = objects.Disk(dev_type=constants.DT_PLAIN,
397 size=constants.DRBD_META_SIZE,
398 logical_id=(vgnames[1], names[1]),
399 params={})
400 dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
401 drbd_dev = objects.Disk(dev_type=constants.DT_DRBD8, size=size,
402 logical_id=(primary_uuid, secondary_uuid, port,
403 p_minor, s_minor,
404 shared_secret),
405 children=[dev_data, dev_meta],
406 iv_name=iv_name, params={})
407 drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
408 return drbd_dev
409
410
411 def GenerateDiskTemplate(
412 lu, template_name, instance_uuid, primary_node_uuid, secondary_node_uuids,
413 disk_info, file_storage_dir, file_driver, base_index,
414 feedback_fn, full_disk_params):
415 """Generate the entire disk layout for a given template type.
416
417 """
418 vgname = lu.cfg.GetVGName()
419 disk_count = len(disk_info)
420 disks = []
421
422 CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), template_name)
423
424 if template_name == constants.DT_DISKLESS:
425 pass
426 elif template_name == constants.DT_DRBD8:
427 if len(secondary_node_uuids) != 1:
428 raise errors.ProgrammerError("Wrong template configuration")
429 remote_node_uuid = secondary_node_uuids[0]
430 minors = lu.cfg.AllocateDRBDMinor(
431 [primary_node_uuid, remote_node_uuid] * len(disk_info), instance_uuid)
432
433 (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
434 full_disk_params)
435 drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
436
437 names = []
438 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
439 for i in range(disk_count)]):
440 names.append(lv_prefix + "_data")
441 names.append(lv_prefix + "_meta")
442 for idx, disk in enumerate(disk_info):
443 disk_index = idx + base_index
444 data_vg = disk.get(constants.IDISK_VG, vgname)
445 meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
446 disk_dev = _GenerateDRBD8Branch(lu, primary_node_uuid, remote_node_uuid,
447 disk[constants.IDISK_SIZE],
448 [data_vg, meta_vg],
449 names[idx * 2:idx * 2 + 2],
450 "disk/%d" % disk_index,
451 minors[idx * 2], minors[idx * 2 + 1])
452 disk_dev.mode = disk[constants.IDISK_MODE]
453 disk_dev.name = disk.get(constants.IDISK_NAME, None)
454 disks.append(disk_dev)
455 else:
456 if secondary_node_uuids:
457 raise errors.ProgrammerError("Wrong template configuration")
458
459 name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
460 if name_prefix is None:
461 names = None
462 else:
463 names = _GenerateUniqueNames(lu, ["%s.disk%s" %
464 (name_prefix, base_index + i)
465 for i in range(disk_count)])
466
467 if template_name == constants.DT_PLAIN:
468
469 def logical_id_fn(idx, _, disk):
470 vg = disk.get(constants.IDISK_VG, vgname)
471 return (vg, names[idx])
472
473 elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
474 logical_id_fn = \
475 lambda _, disk_index, disk: (file_driver,
476 "%s/%s" % (file_storage_dir,
477 names[idx]))
478 elif template_name == constants.DT_BLOCK:
479 logical_id_fn = \
480 lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
481 disk[constants.IDISK_ADOPT])
482 elif template_name == constants.DT_RBD:
483 logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
484 elif template_name == constants.DT_EXT:
485 def logical_id_fn(idx, _, disk):
486 provider = disk.get(constants.IDISK_PROVIDER, None)
487 if provider is None:
488 raise errors.ProgrammerError("Disk template is %s, but '%s' is"
489 " not found", constants.DT_EXT,
490 constants.IDISK_PROVIDER)
491 return (provider, names[idx])
492 else:
493 raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
494
495 dev_type = template_name
496
497 for idx, disk in enumerate(disk_info):
498 params = {}
499 # Only for the Ext template add disk_info to params
500 if template_name == constants.DT_EXT:
501 params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
502 for key in disk:
503 if key not in constants.IDISK_PARAMS:
504 params[key] = disk[key]
505 disk_index = idx + base_index
506 size = disk[constants.IDISK_SIZE]
507 feedback_fn("* disk %s, size %s" %
508 (disk_index, utils.FormatUnit(size, "h")))
509 disk_dev = objects.Disk(dev_type=dev_type, size=size,
510 logical_id=logical_id_fn(idx, disk_index, disk),
511 iv_name="disk/%d" % disk_index,
512 mode=disk[constants.IDISK_MODE],
513 params=params,
514 spindles=disk.get(constants.IDISK_SPINDLES))
515 disk_dev.name = disk.get(constants.IDISK_NAME, None)
516 disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
517 disks.append(disk_dev)
518
519 return disks
520
521
522 def CheckSpindlesExclusiveStorage(diskdict, es_flag, required):
523 """Check the presence of the spindle options with exclusive_storage.
524
525 @type diskdict: dict
526 @param diskdict: disk parameters
527 @type es_flag: bool
528 @param es_flag: the effective value of the exlusive_storage flag
529 @type required: bool
530 @param required: whether spindles are required or just optional
531 @raise errors.OpPrereqError when spindles are given and they should not
532
533 """
534 if (not es_flag and constants.IDISK_SPINDLES in diskdict and
535 diskdict[constants.IDISK_SPINDLES] is not None):
536 raise errors.OpPrereqError("Spindles in instance disks cannot be specified"
537 " when exclusive storage is not active",
538 errors.ECODE_INVAL)
539 if (es_flag and required and (constants.IDISK_SPINDLES not in diskdict or
540 diskdict[constants.IDISK_SPINDLES] is None)):
541 raise errors.OpPrereqError("You must specify spindles in instance disks"
542 " when exclusive storage is active",
543 errors.ECODE_INVAL)
544
545
546 class LUInstanceRecreateDisks(LogicalUnit):
547 """Recreate an instance's missing disks.
548
549 """
550 HPATH = "instance-recreate-disks"
551 HTYPE = constants.HTYPE_INSTANCE
552 REQ_BGL = False
553
554 _MODIFYABLE = compat.UniqueFrozenset([
555 constants.IDISK_SIZE,
556 constants.IDISK_MODE,
557 constants.IDISK_SPINDLES,
558 ])
559
560 # New or changed disk parameters may have different semantics
561 assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
562 constants.IDISK_ADOPT,
563
564 # TODO: Implement support changing VG while recreating
565 constants.IDISK_VG,
566 constants.IDISK_METAVG,
567 constants.IDISK_PROVIDER,
568 constants.IDISK_NAME,
569 ]))
570
571 def _RunAllocator(self):
572 """Run the allocator based on input opcode.
573
574 """
575 be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
576
577 # FIXME
578 # The allocator should actually run in "relocate" mode, but current
579 # allocators don't support relocating all the nodes of an instance at
580 # the same time. As a workaround we use "allocate" mode, but this is
581 # suboptimal for two reasons:
582 # - The instance name passed to the allocator is present in the list of
583 # existing instances, so there could be a conflict within the
584 # internal structures of the allocator. This doesn't happen with the
585 # current allocators, but it's a liability.
586 # - The allocator counts the resources used by the instance twice: once
587 # because the instance exists already, and once because it tries to
588 # allocate a new instance.
589 # The allocator could choose some of the nodes on which the instance is
590 # running, but that's not a problem. If the instance nodes are broken,
591 # they should be already be marked as drained or offline, and hence
592 # skipped by the allocator. If instance disks have been lost for other
593 # reasons, then recreating the disks on the same nodes should be fine.
594 disk_template = self.instance.disk_template
595 spindle_use = be_full[constants.BE_SPINDLE_USE]
596 disks = [{
597 constants.IDISK_SIZE: d.size,
598 constants.IDISK_MODE: d.mode,
599 constants.IDISK_SPINDLES: d.spindles,
600 } for d in self.instance.disks]
601 req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
602 disk_template=disk_template,
603 tags=list(self.instance.GetTags()),
604 os=self.instance.os,
605 nics=[{}],
606 vcpus=be_full[constants.BE_VCPUS],
607 memory=be_full[constants.BE_MAXMEM],
608 spindle_use=spindle_use,
609 disks=disks,
610 hypervisor=self.instance.hypervisor,
611 node_whitelist=None)
612 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
613
614 ial.Run(self.op.iallocator)
615
616 assert req.RequiredNodes() == len(self.instance.all_nodes)
617
618 if not ial.success:
619 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
620 " %s" % (self.op.iallocator, ial.info),
621 errors.ECODE_NORES)
622
623 (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, ial.result)
624 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
625 self.op.instance_name, self.op.iallocator,
626 utils.CommaJoin(self.op.nodes))
627
628 def CheckArguments(self):
629 if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
630 # Normalize and convert deprecated list of disk indices
631 self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
632
633 duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
634 if duplicates:
635 raise errors.OpPrereqError("Some disks have been specified more than"
636 " once: %s" % utils.CommaJoin(duplicates),
637 errors.ECODE_INVAL)
638
639 # We don't want _CheckIAllocatorOrNode selecting the default iallocator
640 # when neither iallocator nor nodes are specified
641 if self.op.iallocator or self.op.nodes:
642 CheckIAllocatorOrNode(self, "iallocator", "nodes")
643
644 for (idx, params) in self.op.disks:
645 utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
646 unsupported = frozenset(params.keys()) - self._MODIFYABLE
647 if unsupported:
648 raise errors.OpPrereqError("Parameters for disk %s try to change"
649 " unmodifyable parameter(s): %s" %
650 (idx, utils.CommaJoin(unsupported)),
651 errors.ECODE_INVAL)
652
653 def ExpandNames(self):
654 self._ExpandAndLockInstance()
655 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
656
657 if self.op.nodes:
658 (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, self.op.nodes)
659 self.needed_locks[locking.LEVEL_NODE] = list(self.op.node_uuids)
660 else:
661 self.needed_locks[locking.LEVEL_NODE] = []
662 if self.op.iallocator:
663 # iallocator will select a new node in the same group
664 self.needed_locks[locking.LEVEL_NODEGROUP] = []
665 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
666
667 self.needed_locks[locking.LEVEL_NODE_RES] = []
668
669 def DeclareLocks(self, level):
670 if level == locking.LEVEL_NODEGROUP:
671 assert self.op.iallocator is not None
672 assert not self.op.nodes
673 assert not self.needed_locks[locking.LEVEL_NODEGROUP]
674 self.share_locks[locking.LEVEL_NODEGROUP] = 1
675 # Lock the primary group used by the instance optimistically; this
676 # requires going via the node before it's locked, requiring
677 # verification later on
678 self.needed_locks[locking.LEVEL_NODEGROUP] = \
679 self.cfg.GetInstanceNodeGroups(self.op.instance_uuid, primary_only=True)
680
681 elif level == locking.LEVEL_NODE:
682 # If an allocator is used, then we lock all the nodes in the current
683 # instance group, as we don't know yet which ones will be selected;
684 # if we replace the nodes without using an allocator, locks are
685 # already declared in ExpandNames; otherwise, we need to lock all the
686 # instance nodes for disk re-creation
687 if self.op.iallocator:
688 assert not self.op.nodes
689 assert not self.needed_locks[locking.LEVEL_NODE]
690 assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
691
692 # Lock member nodes of the group of the primary node
693 for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
694 self.needed_locks[locking.LEVEL_NODE].extend(
695 self.cfg.GetNodeGroup(group_uuid).members)
696
697 assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
698 elif not self.op.nodes:
699 self._LockInstancesNodes(primary_only=False)
700 elif level == locking.LEVEL_NODE_RES:
701 # Copy node locks
702 self.needed_locks[locking.LEVEL_NODE_RES] = \
703 CopyLockList(self.needed_locks[locking.LEVEL_NODE])
704
705 def BuildHooksEnv(self):
706 """Build hooks env.
707
708 This runs on master, primary and secondary nodes of the instance.
709
710 """
711 return BuildInstanceHookEnvByObject(self, self.instance)
712
713 def BuildHooksNodes(self):
714 """Build hooks nodes.
715
716 """
717 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
718 return (nl, nl)
719
720 def CheckPrereq(self):
721 """Check prerequisites.
722
723 This checks that the instance is in the cluster and is not running.
724
725 """
726 instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
727 assert instance is not None, \
728 "Cannot retrieve locked instance %s" % self.op.instance_name
729 if self.op.node_uuids:
730 if len(self.op.node_uuids) != len(instance.all_nodes):
731 raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
732 " %d replacement nodes were specified" %
733 (instance.name, len(instance.all_nodes),
734 len(self.op.node_uuids)),
735 errors.ECODE_INVAL)
736 assert instance.disk_template != constants.DT_DRBD8 or \
737 len(self.op.node_uuids) == 2
738 assert instance.disk_template != constants.DT_PLAIN or \
739 len(self.op.node_uuids) == 1
740 primary_node = self.op.node_uuids[0]
741 else:
742 primary_node = instance.primary_node
743 if not self.op.iallocator:
744 CheckNodeOnline(self, primary_node)
745
746 if instance.disk_template == constants.DT_DISKLESS:
747 raise errors.OpPrereqError("Instance '%s' has no disks" %
748 self.op.instance_name, errors.ECODE_INVAL)
749
750 # Verify if node group locks are still correct
751 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
752 if owned_groups:
753 # Node group locks are acquired only for the primary node (and only
754 # when the allocator is used)
755 CheckInstanceNodeGroups(self.cfg, instance.uuid, owned_groups,
756 primary_only=True)
757
758 # if we replace nodes *and* the old primary is offline, we don't
759 # check the instance state
760 old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
761 if not ((self.op.iallocator or self.op.node_uuids) and old_pnode.offline):
762 CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
763 msg="cannot recreate disks")
764
765 if self.op.disks:
766 self.disks = dict(self.op.disks)
767 else:
768 self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
769
770 maxidx = max(self.disks.keys())
771 if maxidx >= len(instance.disks):
772 raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
773 errors.ECODE_INVAL)
774
775 if ((self.op.node_uuids or self.op.iallocator) and
776 sorted(self.disks.keys()) != range(len(instance.disks))):
777 raise errors.OpPrereqError("Can't recreate disks partially and"
778 " change the nodes at the same time",
779 errors.ECODE_INVAL)
780
781 self.instance = instance
782
783 if self.op.iallocator:
784 self._RunAllocator()
785 # Release unneeded node and node resource locks
786 ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.node_uuids)
787 ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.node_uuids)
788 ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
789
790 assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
791
792 if self.op.node_uuids:
793 node_uuids = self.op.node_uuids
794 else:
795 node_uuids = instance.all_nodes
796 excl_stor = compat.any(
797 rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids).values()
798 )
799 for new_params in self.disks.values():
800 CheckSpindlesExclusiveStorage(new_params, excl_stor, False)
801
802 def Exec(self, feedback_fn):
803 """Recreate the disks.
804
805 """
806 assert (self.owned_locks(locking.LEVEL_NODE) ==
807 self.owned_locks(locking.LEVEL_NODE_RES))
808
809 to_skip = []
810 mods = [] # keeps track of needed changes
811
812 for idx, disk in enumerate(self.instance.disks):
813 try:
814 changes = self.disks[idx]
815 except KeyError:
816 # Disk should not be recreated
817 to_skip.append(idx)
818 continue
819
820 # update secondaries for disks, if needed
821 if self.op.node_uuids and disk.dev_type == constants.DT_DRBD8:
822 # need to update the nodes and minors
823 assert len(self.op.node_uuids) == 2
824 assert len(disk.logical_id) == 6 # otherwise disk internals
825 # have changed
826 (_, _, old_port, _, _, old_secret) = disk.logical_id
827 new_minors = self.cfg.AllocateDRBDMinor(self.op.node_uuids,
828 self.instance.uuid)
829 new_id = (self.op.node_uuids[0], self.op.node_uuids[1], old_port,
830 new_minors[0], new_minors[1], old_secret)
831 assert len(disk.logical_id) == len(new_id)
832 else:
833 new_id = None
834
835 mods.append((idx, new_id, changes))
836
837 # now that we have passed all asserts above, we can apply the mods
838 # in a single run (to avoid partial changes)
839 for idx, new_id, changes in mods:
840 disk = self.instance.disks[idx]
841 if new_id is not None:
842 assert disk.dev_type == constants.DT_DRBD8
843 disk.logical_id = new_id
844 if changes:
845 disk.Update(size=changes.get(constants.IDISK_SIZE, None),
846 mode=changes.get(constants.IDISK_MODE, None),
847 spindles=changes.get(constants.IDISK_SPINDLES, None))
848
849 # change primary node, if needed
850 if self.op.node_uuids:
851 self.instance.primary_node = self.op.node_uuids[0]
852 self.LogWarning("Changing the instance's nodes, you will have to"
853 " remove any disks left on the older nodes manually")
854
855 if self.op.node_uuids:
856 self.cfg.Update(self.instance, feedback_fn)
857
858 # All touched nodes must be locked
859 mylocks = self.owned_locks(locking.LEVEL_NODE)
860 assert mylocks.issuperset(frozenset(self.instance.all_nodes))
861 new_disks = CreateDisks(self, self.instance, to_skip=to_skip)
862
863 # TODO: Release node locks before wiping, or explain why it's not possible
864 if self.cfg.GetClusterInfo().prealloc_wipe_disks:
865 wipedisks = [(idx, disk, 0)
866 for (idx, disk) in enumerate(self.instance.disks)
867 if idx not in to_skip]
868 WipeOrCleanupDisks(self, self.instance, disks=wipedisks,
869 cleanup=new_disks)
870
871
872 def _PerformNodeInfoCall(lu, node_uuids, vg):
873 """Prepares the input and performs a node info call.
874
875 @type lu: C{LogicalUnit}
876 @param lu: a logical unit from which we get configuration data
877 @type node_uuids: list of string
878 @param node_uuids: list of node UUIDs to perform the call for
879 @type vg: string
880 @param vg: the volume group's name
881
882 """
883 lvm_storage_units = [(constants.ST_LVM_VG, vg)]
884 storage_units = rpc.PrepareStorageUnitsForNodes(lu.cfg, lvm_storage_units,
885 node_uuids)
886 hvname = lu.cfg.GetHypervisorType()
887 hvparams = lu.cfg.GetClusterInfo().hvparams
888 nodeinfo = lu.rpc.call_node_info(node_uuids, storage_units,
889 [(hvname, hvparams[hvname])])
890 return nodeinfo
891
892
893 def _CheckVgCapacityForNode(node_name, node_info, vg, requested):
894 """Checks the vg capacity for a given node.
895
896 @type node_info: tuple (_, list of dicts, _)
897 @param node_info: the result of the node info call for one node
898 @type node_name: string
899 @param node_name: the name of the node
900 @type vg: string
901 @param vg: volume group name
902 @type requested: int
903 @param requested: the amount of disk in MiB to check for
904 @raise errors.OpPrereqError: if the node doesn't have enough disk,
905 or we cannot check the node
906
907 """
908 (_, space_info, _) = node_info
909 lvm_vg_info = utils.storage.LookupSpaceInfoByStorageType(
910 space_info, constants.ST_LVM_VG)
911 if not lvm_vg_info:
912 raise errors.OpPrereqError("Can't retrieve storage information for LVM")
913 vg_free = lvm_vg_info.get("storage_free", None)
914 if not isinstance(vg_free, int):
915 raise errors.OpPrereqError("Can't compute free disk space on node"
916 " %s for vg %s, result was '%s'" %
917 (node_name, vg, vg_free), errors.ECODE_ENVIRON)
918 if requested > vg_free:
919 raise errors.OpPrereqError("Not enough disk space on target node %s"
920 " vg %s: required %d MiB, available %d MiB" %
921 (node_name, vg, requested, vg_free),
922 errors.ECODE_NORES)
923
924
925 def _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, requested):
926 """Checks if nodes have enough free disk space in the specified VG.
927
928 This function checks if all given nodes have the needed amount of
929 free disk. In case any node has less disk or we cannot get the
930 information from the node, this function raises an OpPrereqError
931 exception.
932
933 @type lu: C{LogicalUnit}
934 @param lu: a logical unit from which we get configuration data
935 @type node_uuids: C{list}
936 @param node_uuids: the list of node UUIDs to check
937 @type vg: C{str}
938 @param vg: the volume group to check
939 @type requested: C{int}
940 @param requested: the amount of disk in MiB to check for
941 @raise errors.OpPrereqError: if the node doesn't have enough disk,
942 or we cannot check the node
943
944 """
945 nodeinfo = _PerformNodeInfoCall(lu, node_uuids, vg)
946 for node_uuid in node_uuids:
947 node_name = lu.cfg.GetNodeName(node_uuid)
948 info = nodeinfo[node_uuid]
949 info.Raise("Cannot get current information from node %s" % node_name,
950 prereq=True, ecode=errors.ECODE_ENVIRON)
951 _CheckVgCapacityForNode(node_name, info.payload, vg, requested)
952
953
954 def CheckNodesFreeDiskPerVG(lu, node_uuids, req_sizes):
955 """Checks if nodes have enough free disk space in all the VGs.
956
957 This function checks if all given nodes have the needed amount of
958 free disk. In case any node has less disk or we cannot get the
959 information from the node, this function raises an OpPrereqError
960 exception.
961
962 @type lu: C{LogicalUnit}
963 @param lu: a logical unit from which we get configuration data
964 @type node_uuids: C{list}
965 @param node_uuids: the list of node UUIDs to check
966 @type req_sizes: C{dict}
967 @param req_sizes: the hash of vg and corresponding amount of disk in
968 MiB to check for
969 @raise errors.OpPrereqError: if the node doesn't have enough disk,
970 or we cannot check the node
971
972 """
973 for vg, req_size in req_sizes.items():
974 _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, req_size)
975
976
977 def _DiskSizeInBytesToMebibytes(lu, size):
978 """Converts a disk size in bytes to mebibytes.
979
980 Warns and rounds up if the size isn't an even multiple of 1 MiB.
981
982 """
983 (mib, remainder) = divmod(size, 1024 * 1024)
984
985 if remainder != 0:
986 lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
987 " to not overwrite existing data (%s bytes will not be"
988 " wiped)", (1024 * 1024) - remainder)
989 mib += 1
990
991 return mib
992
993
994 def _CalcEta(time_taken, written, total_size):
995 """Calculates the ETA based on size written and total size.
996
997 @param time_taken: The time taken so far
998 @param written: amount written so far
999 @param total_size: The total size of data to be written
1000 @return: The remaining time in seconds
1001
1002 """
1003 avg_time = time_taken / float(written)
1004 return (total_size - written) * avg_time
1005
1006
1007 def WipeDisks(lu, instance, disks=None):
1008 """Wipes instance disks.
1009
1010 @type lu: L{LogicalUnit}
1011 @param lu: the logical unit on whose behalf we execute
1012 @type instance: L{objects.Instance}
1013 @param instance: the instance whose disks we should create
1014 @type disks: None or list of tuple of (number, L{objects.Disk}, number)
1015 @param disks: Disk details; tuple contains disk index, disk object and the
1016 start offset
1017
1018 """
1019 node_uuid = instance.primary_node
1020 node_name = lu.cfg.GetNodeName(node_uuid)
1021
1022 if disks is None:
1023 disks = [(idx, disk, 0)
1024 for (idx, disk) in enumerate(instance.disks)]
1025
1026 logging.info("Pausing synchronization of disks of instance '%s'",
1027 instance.name)
1028 result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1029 (map(compat.snd, disks),
1030 instance),
1031 True)
1032 result.Raise("Failed to pause disk synchronization on node '%s'" % node_name)
1033
1034 for idx, success in enumerate(result.payload):
1035 if not success:
1036 logging.warn("Pausing synchronization of disk %s of instance '%s'"
1037 " failed", idx, instance.name)
1038
1039 try:
1040 for (idx, device, offset) in disks:
1041 # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
1042 # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
1043 wipe_chunk_size = \
1044 int(min(constants.MAX_WIPE_CHUNK,
1045 device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
1046
1047 size = device.size
1048 last_output = 0
1049 start_time = time.time()
1050
1051 if offset == 0:
1052 info_text = ""
1053 else:
1054 info_text = (" (from %s to %s)" %
1055 (utils.FormatUnit(offset, "h"),
1056 utils.FormatUnit(size, "h")))
1057
1058 lu.LogInfo("* Wiping disk %s%s", idx, info_text)
1059
1060 logging.info("Wiping disk %d for instance %s on node %s using"
1061 " chunk size %s", idx, instance.name, node_name,
1062 wipe_chunk_size)
1063
1064 while offset < size:
1065 wipe_size = min(wipe_chunk_size, size - offset)
1066
1067 logging.debug("Wiping disk %d, offset %s, chunk %s",
1068 idx, offset, wipe_size)
1069
1070 result = lu.rpc.call_blockdev_wipe(node_uuid, (device, instance),
1071 offset, wipe_size)
1072 result.Raise("Could not wipe disk %d at offset %d for size %d" %
1073 (idx, offset, wipe_size))
1074
1075 now = time.time()
1076 offset += wipe_size
1077 if now - last_output >= 60:
1078 eta = _CalcEta(now - start_time, offset, size)
1079 lu.LogInfo(" - done: %.1f%% ETA: %s",
1080 offset / float(size) * 100, utils.FormatSeconds(eta))
1081 last_output = now
1082 finally:
1083 logging.info("Resuming synchronization of disks for instance '%s'",
1084 instance.name)
1085
1086 result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1087 (map(compat.snd, disks),
1088 instance),
1089 False)
1090
1091 if result.fail_msg:
1092 lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1093 node_name, result.fail_msg)
1094 else:
1095 for idx, success in enumerate(result.payload):
1096 if not success:
1097 lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1098 " failed", idx, instance.name)
1099
1100
1101 def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1102 """Wrapper for L{WipeDisks} that handles errors.
1103
1104 @type lu: L{LogicalUnit}
1105 @param lu: the logical unit on whose behalf we execute
1106 @type instance: L{objects.Instance}
1107 @param instance: the instance whose disks we should wipe
1108 @param disks: see L{WipeDisks}
1109 @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1110 case of error
1111 @raise errors.OpPrereqError: in case of failure
1112
1113 """
1114 try:
1115 WipeDisks(lu, instance, disks=disks)
1116 except errors.OpExecError:
1117 logging.warning("Wiping disks for instance '%s' failed",
1118 instance.name)
1119 _UndoCreateDisks(lu, cleanup, instance)
1120 raise
1121
1122
1123 def ExpandCheckDisks(instance, disks):
1124 """Return the instance disks selected by the disks list
1125
1126 @type disks: list of L{objects.Disk} or None
1127 @param disks: selected disks
1128 @rtype: list of L{objects.Disk}
1129 @return: selected instance disks to act on
1130
1131 """
1132 if disks is None:
1133 return instance.disks
1134 else:
1135 if not set(disks).issubset(instance.disks):
1136 raise errors.ProgrammerError("Can only act on disks belonging to the"
1137 " target instance: expected a subset of %r,"
1138 " got %r" % (instance.disks, disks))
1139 return disks
1140
1141
1142 def WaitForSync(lu, instance, disks=None, oneshot=False):
1143 """Sleep and poll for an instance's disk to sync.
1144
1145 """
1146 if not instance.disks or disks is not None and not disks:
1147 return True
1148
1149 disks = ExpandCheckDisks(instance, disks)
1150
1151 if not oneshot:
1152 lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1153
1154 node_uuid = instance.primary_node
1155 node_name = lu.cfg.GetNodeName(node_uuid)
1156
1157 # TODO: Convert to utils.Retry
1158
1159 retries = 0
1160 degr_retries = 10 # in seconds, as we sleep 1 second each time
1161 while True:
1162 max_time = 0
1163 done = True
1164 cumul_degraded = False
1165 rstats = lu.rpc.call_blockdev_getmirrorstatus(node_uuid, (disks, instance))
1166 msg = rstats.fail_msg
1167 if msg:
1168 lu.LogWarning("Can't get any data from node %s: %s", node_name, msg)
1169 retries += 1
1170 if retries >= 10:
1171 raise errors.RemoteError("Can't contact node %s for mirror data,"
1172 " aborting." % node_name)
1173 time.sleep(6)
1174 continue
1175 rstats = rstats.payload
1176 retries = 0
1177 for i, mstat in enumerate(rstats):
1178 if mstat is None:
1179 lu.LogWarning("Can't compute data for node %s/%s",
1180 node_name, disks[i].iv_name)
1181 continue
1182
1183 cumul_degraded = (cumul_degraded or
1184 (mstat.is_degraded and mstat.sync_percent is None))
1185 if mstat.sync_percent is not None:
1186 done = False
1187 if mstat.estimated_time is not None:
1188 rem_time = ("%s remaining (estimated)" %
1189 utils.FormatSeconds(mstat.estimated_time))
1190 max_time = mstat.estimated_time
1191 else:
1192 rem_time = "no time estimate"
1193 max_time = 5 # sleep at least a bit between retries
1194 lu.LogInfo("- device %s: %5.2f%% done, %s",
1195 disks[i].iv_name, mstat.sync_percent, rem_time)
1196
1197 # if we're done but degraded, let's do a few small retries, to
1198 # make sure we see a stable and not transient situation; therefore
1199 # we force restart of the loop
1200 if (done or oneshot) and cumul_degraded and degr_retries > 0:
1201 logging.info("Degraded disks found, %d retries left", degr_retries)
1202 degr_retries -= 1
1203 time.sleep(1)
1204 continue
1205
1206 if done or oneshot:
1207 break
1208
1209 time.sleep(min(60, max_time))
1210
1211 if done:
1212 lu.LogInfo("Instance %s's disks are in sync", instance.name)
1213
1214 return not cumul_degraded
1215
1216
1217 def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1218 """Shutdown block devices of an instance.
1219
1220 This does the shutdown on all nodes of the instance.
1221
1222 If the ignore_primary is false, errors on the primary node are
1223 ignored.
1224
1225 """
1226 all_result = True
1227
1228 if disks is None:
1229 # only mark instance disks as inactive if all disks are affected
1230 lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1231 disks = ExpandCheckDisks(instance, disks)
1232
1233 for disk in disks:
1234 for node_uuid, top_disk in disk.ComputeNodeTree(instance.primary_node):
1235 result = lu.rpc.call_blockdev_shutdown(node_uuid, (top_disk, instance))
1236 msg = result.fail_msg
1237 if msg:
1238 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1239 disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1240 if ((node_uuid == instance.primary_node and not ignore_primary) or
1241 (node_uuid != instance.primary_node and not result.offline)):
1242 all_result = False
1243 return all_result
1244
1245
1246 def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1247 """Shutdown block devices of an instance.
1248
1249 This function checks if an instance is running, before calling
1250 _ShutdownInstanceDisks.
1251
1252 """
1253 CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1254 ShutdownInstanceDisks(lu, instance, disks=disks)
1255
1256
1257 def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1258 ignore_size=False):
1259 """Prepare the block devices for an instance.
1260
1261 This sets up the block devices on all nodes.
1262
1263 @type lu: L{LogicalUnit}
1264 @param lu: the logical unit on whose behalf we execute
1265 @type instance: L{objects.Instance}
1266 @param instance: the instance for whose disks we assemble
1267 @type disks: list of L{objects.Disk} or None
1268 @param disks: which disks to assemble (or all, if None)
1269 @type ignore_secondaries: boolean
1270 @param ignore_secondaries: if true, errors on secondary nodes
1271 won't result in an error return from the function
1272 @type ignore_size: boolean
1273 @param ignore_size: if true, the current known size of the disk
1274 will not be used during the disk activation, useful for cases
1275 when the size is wrong
1276 @return: False if the operation failed, otherwise a list of
1277 (host, instance_visible_name, node_visible_name)
1278 with the mapping from node devices to instance devices
1279
1280 """
1281 device_info = []
1282 disks_ok = True
1283
1284 if disks is None:
1285 # only mark instance disks as active if all disks are affected
1286 lu.cfg.MarkInstanceDisksActive(instance.uuid)
1287
1288 disks = ExpandCheckDisks(instance, disks)
1289
1290 # With the two passes mechanism we try to reduce the window of
1291 # opportunity for the race condition of switching DRBD to primary
1292 # before handshaking occured, but we do not eliminate it
1293
1294 # The proper fix would be to wait (with some limits) until the
1295 # connection has been made and drbd transitions from WFConnection
1296 # into any other network-connected state (Connected, SyncTarget,
1297 # SyncSource, etc.)
1298
1299 # 1st pass, assemble on all nodes in secondary mode
1300 for idx, inst_disk in enumerate(disks):
1301 for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1302 instance.primary_node):
1303 if ignore_size:
1304 node_disk = node_disk.Copy()
1305 node_disk.UnsetSize()
1306 result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1307 instance, False, idx)
1308 msg = result.fail_msg
1309 if msg:
1310 is_offline_secondary = (node_uuid in instance.secondary_nodes and
1311 result.offline)
1312 lu.LogWarning("Could not prepare block device %s on node %s"
1313 " (is_primary=False, pass=1): %s",
1314 inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1315 if not (ignore_secondaries or is_offline_secondary):
1316 disks_ok = False
1317
1318 # FIXME: race condition on drbd migration to primary
1319
1320 # 2nd pass, do only the primary node
1321 for idx, inst_disk in enumerate(disks):
1322 dev_path = None
1323
1324 for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1325 instance.primary_node):
1326 if node_uuid != instance.primary_node:
1327 continue
1328 if ignore_size:
1329 node_disk = node_disk.Copy()
1330 node_disk.UnsetSize()
1331 result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1332 instance, True, idx)
1333 msg = result.fail_msg
1334 if msg:
1335 lu.LogWarning("Could not prepare block device %s on node %s"
1336 " (is_primary=True, pass=2): %s",
1337 inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1338 disks_ok = False
1339 else:
1340 dev_path, _, __ = result.payload
1341
1342 device_info.append((lu.cfg.GetNodeName(instance.primary_node),
1343 inst_disk.iv_name, dev_path))
1344
1345 if not disks_ok:
1346 lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1347
1348 return disks_ok, device_info
1349
1350
1351 def StartInstanceDisks(lu, instance, force):
1352 """Start the disks of an instance.
1353
1354 """
1355 disks_ok, _ = AssembleInstanceDisks(lu, instance,
1356 ignore_secondaries=force)
1357 if not disks_ok:
1358 ShutdownInstanceDisks(lu, instance)
1359 if force is not None and not force:
1360 lu.LogWarning("",
1361 hint=("If the message above refers to a secondary node,"
1362 " you can retry the operation using '--force'"))
1363 raise errors.OpExecError("Disk consistency error")
1364
1365
1366 class LUInstanceGrowDisk(LogicalUnit):
1367 """Grow a disk of an instance.
1368
1369 """
1370 HPATH = "disk-grow"
1371 HTYPE = constants.HTYPE_INSTANCE
1372 REQ_BGL = False
1373
1374 def ExpandNames(self):
1375 self._ExpandAndLockInstance()
1376 self.needed_locks[locking.LEVEL_NODE] = []
1377 self.needed_locks[locking.LEVEL_NODE_RES] = []
1378 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1379 self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1380
1381 def DeclareLocks(self, level):
1382 if level == locking.LEVEL_NODE:
1383 self._LockInstancesNodes()
1384 elif level == locking.LEVEL_NODE_RES:
1385 # Copy node locks
1386 self.needed_locks[locking.LEVEL_NODE_RES] = \
1387 CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1388
1389 def BuildHooksEnv(self):
1390 """Build hooks env.
1391
1392 This runs on the master, the primary and all the secondaries.
1393
1394 """
1395 env = {
1396 "DISK": self.op.disk,
1397 "AMOUNT": self.op.amount,
1398 "ABSOLUTE": self.op.absolute,
1399 }
1400 env.update(BuildInstanceHookEnvByObject(self, self.instance))
1401 return env
1402
1403 def BuildHooksNodes(self):
1404 """Build hooks nodes.
1405
1406 """
1407 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1408 return (nl, nl)
1409
1410 def CheckPrereq(self):
1411 """Check prerequisites.
1412
1413 This checks that the instance is in the cluster.
1414
1415 """
1416 self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1417 assert self.instance is not None, \
1418 "Cannot retrieve locked instance %s" % self.op.instance_name
1419 node_uuids = list(self.instance.all_nodes)
1420 for node_uuid in node_uuids:
1421 CheckNodeOnline(self, node_uuid)
1422 self.node_es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids)
1423
1424 if self.instance.disk_template not in constants.DTS_GROWABLE:
1425 raise errors.OpPrereqError("Instance's disk layout does not support"
1426 " growing", errors.ECODE_INVAL)
1427
1428 self.disk = self.instance.FindDisk(self.op.disk)
1429
1430 if self.op.absolute:
1431 self.target = self.op.amount
1432 self.delta = self.target - self.disk.size
1433 if self.delta < 0:
1434 raise errors.OpPrereqError("Requested size (%s) is smaller than "
1435 "current disk size (%s)" %
1436 (utils.FormatUnit(self.target, "h"),
1437 utils.FormatUnit(self.disk.size, "h")),
1438 errors.ECODE_STATE)
1439 else:
1440 self.delta = self.op.amount
1441 self.target = self.disk.size + self.delta
1442 if self.delta < 0:
1443 raise errors.OpPrereqError("Requested increment (%s) is negative" %
1444 utils.FormatUnit(self.delta, "h"),
1445 errors.ECODE_INVAL)
1446
1447 self._CheckDiskSpace(node_uuids, self.disk.ComputeGrowth(self.delta))
1448
1449 def _CheckDiskSpace(self, node_uuids, req_vgspace):
1450 template = self.instance.disk_template
1451 if (template not in (constants.DTS_NO_FREE_SPACE_CHECK) and
1452 not any(self.node_es_flags.values())):
1453 # TODO: check the free disk space for file, when that feature will be
1454 # supported
1455 # With exclusive storage we need to do something smarter than just looking
1456 # at free space, which, in the end, is basically a dry run. So we rely on
1457 # the dry run performed in Exec() instead.
1458 CheckNodesFreeDiskPerVG(self, node_uuids, req_vgspace)
1459
1460 def Exec(self, feedback_fn):
1461 """Execute disk grow.
1462
1463 """
1464 assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1465 assert (self.owned_locks(locking.LEVEL_NODE) ==
1466 self.owned_locks(locking.LEVEL_NODE_RES))
1467
1468 wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1469
1470 disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[self.disk])
1471 if not disks_ok:
1472 raise errors.OpExecError("Cannot activate block device to grow")
1473
1474 feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1475 (self.op.disk, self.instance.name,
1476 utils.FormatUnit(self.delta, "h"),
1477 utils.FormatUnit(self.target, "h")))
1478
1479 # First run all grow ops in dry-run mode
1480 for node_uuid in self.instance.all_nodes:
1481 result = self.rpc.call_blockdev_grow(node_uuid,
1482 (self.disk, self.instance),
1483 self.delta, True, True,
1484 self.node_es_flags[node_uuid])
1485 result.Raise("Dry-run grow request failed to node %s" %
1486 self.cfg.GetNodeName(node_uuid))
1487
1488 if wipe_disks:
1489 # Get disk size from primary node for wiping
1490 result = self.rpc.call_blockdev_getdimensions(
1491 self.instance.primary_node, [([self.disk], self.instance)])
1492 result.Raise("Failed to retrieve disk size from node '%s'" %
1493 self.instance.primary_node)
1494
1495 (disk_dimensions, ) = result.payload
1496
1497 if disk_dimensions is None:
1498 raise errors.OpExecError("Failed to retrieve disk size from primary"
1499 " node '%s'" % self.instance.primary_node)
1500 (disk_size_in_bytes, _) = disk_dimensions
1501
1502 old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1503
1504 assert old_disk_size >= self.disk.size, \
1505 ("Retrieved disk size too small (got %s, should be at least %s)" %
1506 (old_disk_size, self.disk.size))
1507 else:
1508 old_disk_size = None
1509
1510 # We know that (as far as we can test) operations across different
1511 # nodes will succeed, time to run it for real on the backing storage
1512 for node_uuid in self.instance.all_nodes:
1513 result = self.rpc.call_blockdev_grow(node_uuid,
1514 (self.disk, self.instance),
1515 self.delta, False, True,
1516 self.node_es_flags[node_uuid])
1517 result.Raise("Grow request failed to node %s" %
1518 self.cfg.GetNodeName(node_uuid))
1519
1520 # And now execute it for logical storage, on the primary node
1521 node_uuid = self.instance.primary_node
1522 result = self.rpc.call_blockdev_grow(node_uuid, (self.disk, self.instance),
1523 self.delta, False, False,
1524 self.node_es_flags[node_uuid])
1525 result.Raise("Grow request failed to node %s" %
1526 self.cfg.GetNodeName(node_uuid))
1527
1528 self.disk.RecordGrow(self.delta)
1529 self.cfg.Update(self.instance, feedback_fn)
1530
1531 # Changes have been recorded, release node lock
1532 ReleaseLocks(self, locking.LEVEL_NODE)
1533
1534 # Downgrade lock while waiting for sync
1535 self.glm.downgrade(locking.LEVEL_INSTANCE)
1536
1537 assert wipe_disks ^ (old_disk_size is None)
1538
1539 if wipe_disks:
1540 assert self.instance.disks[self.op.disk] == self.disk
1541
1542 # Wipe newly added disk space
1543 WipeDisks(self, self.instance,
1544 disks=[(self.op.disk, self.disk, old_disk_size)])
1545
1546 if self.op.wait_for_sync:
1547 disk_abort = not WaitForSync(self, self.instance, disks=[self.disk])
1548 if disk_abort:
1549 self.LogWarning("Disk syncing has not returned a good status; check"
1550 " the instance")
1551 if not self.instance.disks_active:
1552 _SafeShutdownInstanceDisks(self, self.instance, disks=[self.disk])
1553 elif not self.instance.disks_active:
1554 self.LogWarning("Not shutting down the disk even if the instance is"
1555 " not supposed to be running because no wait for"
1556 " sync mode was requested")
1557
1558 assert self.owned_locks(locking.LEVEL_NODE_RES)
1559 assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1560
1561
1562 class LUInstanceReplaceDisks(LogicalUnit):
1563 """Replace the disks of an instance.
1564
1565 """
1566 HPATH = "mirrors-replace"
1567 HTYPE = constants.HTYPE_INSTANCE
1568 REQ_BGL = False
1569
1570 def CheckArguments(self):
1571 """Check arguments.
1572
1573 """
1574 if self.op.mode == constants.REPLACE_DISK_CHG:
1575 if self.op.remote_node is None and self.op.iallocator is None:
1576 raise errors.OpPrereqError("When changing the secondary either an"
1577 " iallocator script must be used or the"
1578 " new node given", errors.ECODE_INVAL)
1579 else:
1580 CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1581
1582 elif self.op.remote_node is not None or self.op.iallocator is not None:
1583 # Not replacing the secondary
1584 raise errors.OpPrereqError("The iallocator and new node options can"
1585 " only be used when changing the"
1586 " secondary node", errors.ECODE_INVAL)
1587
1588 def ExpandNames(self):
1589 self._ExpandAndLockInstance()
1590
1591 assert locking.LEVEL_NODE not in self.needed_locks
1592 assert locking.LEVEL_NODE_RES not in self.needed_locks
1593 assert locking.LEVEL_NODEGROUP not in self.needed_locks
1594
1595 assert self.op.iallocator is None or self.op.remote_node is None, \
1596 "Conflicting options"
1597
1598 if self.op.remote_node is not None:
1599 (self.op.remote_node_uuid, self.op.remote_node) = \
1600 ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
1601 self.op.remote_node)
1602
1603 # Warning: do not remove the locking of the new secondary here
1604 # unless DRBD8Dev.AddChildren is changed to work in parallel;
1605 # currently it doesn't since parallel invocations of
1606 # FindUnusedMinor will conflict
1607 self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node_uuid]
1608 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1609 else:
1610 self.needed_locks[locking.LEVEL_NODE] = []
1611 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1612
1613 if self.op.iallocator is not None:
1614 # iallocator will select a new node in the same group
1615 self.needed_locks[locking.LEVEL_NODEGROUP] = []
1616 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1617
1618 self.needed_locks[locking.LEVEL_NODE_RES] = []
1619
1620 self.replacer = TLReplaceDisks(self, self.op.instance_uuid,
1621 self.op.instance_name, self.op.mode,
1622 self.op.iallocator, self.op.remote_node_uuid,
1623 self.op.disks, self.op.early_release,
1624 self.op.ignore_ipolicy)
1625
1626 self.tasklets = [self.replacer]
1627
1628 def DeclareLocks(self, level):
1629 if level == locking.LEVEL_NODEGROUP:
1630 assert self.op.remote_node_uuid is None
1631 assert self.op.iallocator is not None
1632 assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1633
1634 self.share_locks[locking.LEVEL_NODEGROUP] = 1
1635 # Lock all groups used by instance optimistically; this requires going
1636 # via the node before it's locked, requiring verification later on
1637 self.needed_locks[locking.LEVEL_NODEGROUP] = \
1638 self.cfg.GetInstanceNodeGroups(self.op.instance_uuid)
1639
1640 elif level == locking.LEVEL_NODE:
1641 if self.op.iallocator is not None:
1642 assert self.op.remote_node_uuid is None
1643 assert not self.needed_locks[locking.LEVEL_NODE]
1644 assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1645
1646 # Lock member nodes of all locked groups
1647 self.needed_locks[locking.LEVEL_NODE] = \
1648 [node_uuid
1649 for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1650 for node_uuid in self.cfg.GetNodeGroup(group_uuid).members]
1651 else:
1652 assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1653
1654 self._LockInstancesNodes()
1655
1656 elif level == locking.LEVEL_NODE_RES:
1657 # Reuse node locks
1658 self.needed_locks[locking.LEVEL_NODE_RES] = \
1659 self.needed_locks[locking.LEVEL_NODE]
1660
1661 def BuildHooksEnv(self):
1662 """Build hooks env.
1663
1664 This runs on the master, the primary and all the secondaries.
1665
1666 """
1667 instance = self.replacer.instance
1668 env = {
1669 "MODE": self.op.mode,
1670 "NEW_SECONDARY": self.op.remote_node,
1671 "OLD_SECONDARY": self.cfg.GetNodeName(instance.secondary_nodes[0]),
1672 }
1673 env.update(BuildInstanceHookEnvByObject(self, instance))
1674 return env
1675
1676 def BuildHooksNodes(self):
1677 """Build hooks nodes.
1678
1679 """
1680 instance = self.replacer.instance
1681 nl = [
1682 self.cfg.GetMasterNode(),
1683 instance.primary_node,
1684 ]
1685 if self.op.remote_node_uuid is not None:
1686 nl.append(self.op.remote_node_uuid)
1687 return nl, nl
1688
1689 def CheckPrereq(self):
1690 """Check prerequisites.
1691
1692 """
1693 assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1694 self.op.iallocator is None)
1695
1696 # Verify if node group locks are still correct
1697 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1698 if owned_groups:
1699 CheckInstanceNodeGroups(self.cfg, self.op.instance_uuid, owned_groups)
1700
1701 return LogicalUnit.CheckPrereq(self)
1702
1703
1704 class LUInstanceActivateDisks(NoHooksLU):
1705 """Bring up an instance's disks.
1706
1707 """
1708 REQ_BGL = False
1709
1710 def ExpandNames(self):
1711 self._ExpandAndLockInstance()
1712 self.needed_locks[locking.LEVEL_NODE] = []
1713 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1714
1715 def DeclareLocks(self, level):
1716 if level == locking.LEVEL_NODE:
1717 self._LockInstancesNodes()
1718
1719 def CheckPrereq(self):
1720 """Check prerequisites.
1721
1722 This checks that the instance is in the cluster.
1723
1724 """
1725 self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1726 assert self.instance is not None, \
1727 "Cannot retrieve locked instance %s" % self.op.instance_name
1728 CheckNodeOnline(self, self.instance.primary_node)
1729
1730 def Exec(self, feedback_fn):
1731 """Activate the disks.
1732
1733 """
1734 disks_ok, disks_info = \
1735 AssembleInstanceDisks(self, self.instance,
1736 ignore_size=self.op.ignore_size)
1737 if not disks_ok:
1738 raise errors.OpExecError("Cannot activate block devices")
1739
1740 if self.op.wait_for_sync:
1741 if not WaitForSync(self, self.instance):
1742 self.cfg.MarkInstanceDisksInactive(self.instance.uuid)
1743 raise errors.OpExecError("Some disks of the instance are degraded!")
1744
1745 return disks_info
1746
1747
1748 class LUInstanceDeactivateDisks(NoHooksLU):
1749 """Shutdown an instance's disks.
1750
1751 """
1752 REQ_BGL = False
1753
1754 def ExpandNames(self):
1755 self._ExpandAndLockInstance()
1756 self.needed_locks[locking.LEVEL_NODE] = []
1757 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1758
1759 def DeclareLocks(self, level):
1760 if level == locking.LEVEL_NODE:
1761 self._LockInstancesNodes()
1762
1763 def CheckPrereq(self):
1764 """Check prerequisites.
1765
1766 This checks that the instance is in the cluster.
1767
1768 """
1769 self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1770 assert self.instance is not None, \
1771 "Cannot retrieve locked instance %s" % self.op.instance_name
1772
1773 def Exec(self, feedback_fn):
1774 """Deactivate the disks
1775
1776 """
1777 if self.op.force:
1778 ShutdownInstanceDisks(self, self.instance)
1779 else:
1780 _SafeShutdownInstanceDisks(self, self.instance)
1781
1782
1783 def _CheckDiskConsistencyInner(lu, instance, dev, node_uuid, on_primary,
1784 ldisk=False):
1785 """Check that mirrors are not degraded.
1786
1787 @attention: The device has to be annotated already.
1788
1789 The ldisk parameter, if True, will change the test from the
1790 is_degraded attribute (which represents overall non-ok status for
1791 the device(s)) to the ldisk (representing the local storage status).
1792
1793 """
1794 result = True
1795
1796 if on_primary or dev.AssembleOnSecondary():
1797 rstats = lu.rpc.call_blockdev_find(node_uuid, (dev, instance))
1798 msg = rstats.fail_msg
1799 if msg:
1800 lu.LogWarning("Can't find disk on node %s: %s",
1801 lu.cfg.GetNodeName(node_uuid), msg)
1802 result = False
1803 elif not rstats.payload:
1804 lu.LogWarning("Can't find disk on node %s", lu.cfg.GetNodeName(node_uuid))
1805 result = False
1806 else:
1807 if ldisk:
1808 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1809 else:
1810 result = result and not rstats.payload.is_degraded
1811
1812 if dev.children:
1813 for child in dev.children:
1814 result = result and _CheckDiskConsistencyInner(lu, instance, child,
1815 node_uuid, on_primary)
1816
1817 return result
1818
1819
1820 def CheckDiskConsistency(lu, instance, dev, node_uuid, on_primary, ldisk=False):
1821 """Wrapper around L{_CheckDiskConsistencyInner}.
1822
1823 """
1824 (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1825 return _CheckDiskConsistencyInner(lu, instance, disk, node_uuid, on_primary,
1826 ldisk=ldisk)
1827
1828
1829 def _BlockdevFind(lu, node_uuid, dev, instance):
1830 """Wrapper around call_blockdev_find to annotate diskparams.
1831
1832 @param lu: A reference to the lu object
1833 @param node_uuid: The node to call out
1834 @param dev: The device to find
1835 @param instance: The instance object the device belongs to
1836 @returns The result of the rpc call
1837
1838 """
1839 (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1840 return lu.rpc.call_blockdev_find(node_uuid, (disk, instance))
1841
1842
1843 def _GenerateUniqueNames(lu, exts):
1844 """Generate a suitable LV name.
1845
1846 This will generate a logical volume name for the given instance.
1847
1848 """
1849 results = []
1850 for val in exts:
1851 new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1852 results.append("%s%s" % (new_id, val))
1853 return results
1854
1855
1856 class TLReplaceDisks(Tasklet):
1857 """Replaces disks for an instance.
1858
1859 Note: Locking is not within the scope of this class.
1860
1861 """
1862 def __init__(self, lu, instance_uuid, instance_name, mode, iallocator_name,
1863 remote_node_uuid, disks, early_release, ignore_ipolicy):
1864 """Initializes this class.
1865
1866 """
1867 Tasklet.__init__(self, lu)
1868
1869 # Parameters
1870 self.instance_uuid = instance_uuid
1871 self.instance_name = instance_name
1872 self.mode = mode
1873 self.iallocator_name = iallocator_name
1874 self.remote_node_uuid = remote_node_uuid
1875 self.disks = disks
1876 self.early_release = early_release
1877 self.ignore_ipolicy = ignore_ipolicy
1878
1879 # Runtime data
1880 self.instance = None
1881 self.new_node_uuid = None
1882 self.target_node_uuid = None
1883 self.other_node_uuid = None
1884 self.remote_node_info = None
1885 self.node_secondary_ip = None
1886
1887 @staticmethod
1888 def _RunAllocator(lu, iallocator_name, instance_uuid,
1889 relocate_from_node_uuids):
1890 """Compute a new secondary node using an IAllocator.
1891
1892 """
1893 req = iallocator.IAReqRelocate(
1894 inst_uuid=instance_uuid,
1895 relocate_from_node_uuids=list(relocate_from_node_uuids))
1896 ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1897
1898 ial.Run(iallocator_name)
1899
1900 if not ial.success:
1901 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1902 " %s" % (iallocator_name, ial.info),
1903 errors.ECODE_NORES)
1904
1905 remote_node_name = ial.result[0]
1906 remote_node = lu.cfg.GetNodeInfoByName(remote_node_name)
1907
1908 if remote_node is None:
1909 raise errors.OpPrereqError("Node %s not found in configuration" %
1910 remote_node_name, errors.ECODE_NOENT)
1911
1912 lu.LogInfo("Selected new secondary for instance '%s': %s",
1913 instance_uuid, remote_node_name)
1914
1915 return remote_node.uuid
1916
1917 def _FindFaultyDisks(self, node_uuid):
1918 """Wrapper for L{FindFaultyInstanceDisks}.
1919
1920 """
1921 return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1922 node_uuid, True)
1923
1924 def _CheckDisksActivated(self, instance):
1925 """Checks if the instance disks are activated.
1926
1927 @param instance: The instance to check disks
1928 @return: True if they are activated, False otherwise
1929
1930 """
1931 node_uuids = instance.all_nodes
1932
1933 for idx, dev in enumerate(instance.disks):
1934 for node_uuid in node_uuids:
1935 self.lu.LogInfo("Checking disk/%d on %s", idx,
1936 self.cfg.GetNodeName(node_uuid))
1937
1938 result = _BlockdevFind(self, node_uuid, dev, instance)
1939
1940 if result.offline:
1941 continue
1942 elif result.fail_msg or not result.payload:
1943 return False
1944
1945 return True
1946
1947 def CheckPrereq(self):
1948 """Check prerequisites.
1949
1950 This checks that the instance is in the cluster.
1951
1952 """
1953 self.instance = self.cfg.GetInstanceInfo(self.instance_uuid)
1954 assert self.instance is not None, \
1955 "Cannot retrieve locked instance %s" % self.instance_name
1956
1957 if self.instance.disk_template != constants.DT_DRBD8:
1958 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1959 " instances", errors.ECODE_INVAL)
1960
1961 if len(self.instance.secondary_nodes) != 1:
1962 raise errors.OpPrereqError("The instance has a strange layout,"
1963 " expected one secondary but found %d" %
1964 len(self.instance.secondary_nodes),
1965 errors.ECODE_FAULT)
1966
1967 secondary_node_uuid = self.instance.secondary_nodes[0]
1968
1969 if self.iallocator_name is None:
1970 remote_node_uuid = self.remote_node_uuid
1971 else:
1972 remote_node_uuid = self._RunAllocator(self.lu, self.iallocator_name,
1973 self.instance.uuid,
1974 self.instance.secondary_nodes)
1975
1976 if remote_node_uuid is None:
1977 self.remote_node_info = None
1978 else:
1979 assert remote_node_uuid in self.lu.owned_locks(locking.LEVEL_NODE), \
1980 "Remote node '%s' is not locked" % remote_node_uuid
1981
1982 self.remote_node_info = self.cfg.GetNodeInfo(remote_node_uuid)
1983 assert self.remote_node_info is not None, \
1984 "Cannot retrieve locked node %s" % remote_node_uuid
1985
1986 if remote_node_uuid == self.instance.primary_node:
1987 raise errors.OpPrereqError("The specified node is the primary node of"
1988 " the instance", errors.ECODE_INVAL)
1989
1990 if remote_node_uuid == secondary_node_uuid:
1991 raise errors.OpPrereqError("The specified node is already the"
1992 " secondary node of the instance",
1993 errors.ECODE_INVAL)
1994
1995 if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
1996 constants.REPLACE_DISK_CHG):
1997 raise errors.OpPrereqError("Cannot specify disks to be replaced",
1998 errors.ECODE_INVAL)
1999
2000 if self.mode == constants.REPLACE_DISK_AUTO:
2001 if not self._CheckDisksActivated(self.instance):
2002 raise errors.OpPrereqError("Please run activate-disks on instance %s"
2003 " first" % self.instance_name,
2004 errors.ECODE_STATE)
2005 faulty_primary = self._FindFaultyDisks(self.instance.primary_node)
2006 faulty_secondary = self._FindFaultyDisks(secondary_node_uuid)
2007
2008 if faulty_primary and faulty_secondary:
2009 raise errors.OpPrereqError("Instance %s has faulty disks on more than"
2010 " one node and can not be repaired"
2011 " automatically" % self.instance_name,
2012 errors.ECODE_STATE)
2013
2014 if faulty_primary:
2015 self.disks = faulty_primary
2016 self.target_node_uuid = self.instance.primary_node
2017 self.other_node_uuid = secondary_node_uuid
2018 check_nodes = [self.target_node_uuid, self.other_node_uuid]
2019 elif faulty_secondary:
2020 self.disks = faulty_secondary
2021 self.target_node_uuid = secondary_node_uuid
2022 self.other_node_uuid = self.instance.primary_node
2023 check_nodes = [self.target_node_uuid, self.other_node_uuid]
2024 else:
2025 self.disks = []
2026 check_nodes = []
2027
2028 else:
2029 # Non-automatic modes
2030 if self.mode == constants.REPLACE_DISK_PRI:
2031 self.target_node_uuid = self.instance.primary_node
2032 self.other_node_uuid = secondary_node_uuid
2033 check_nodes = [self.target_node_uuid, self.other_node_uuid]
2034
2035 elif self.mode == constants.REPLACE_DISK_SEC:
2036 self.target_node_uuid = secondary_node_uuid
2037 self.other_node_uuid = self.instance.primary_node
2038 check_nodes = [self.target_node_uuid, self.other_node_uuid]
2039
2040 elif self.mode == constants.REPLACE_DISK_CHG:
2041 self.new_node_uuid = remote_node_uuid
2042 self.other_node_uuid = self.instance.primary_node
2043 self.target_node_uuid = secondary_node_uuid
2044 check_nodes = [self.new_node_uuid, self.other_node_uuid]
2045
2046 CheckNodeNotDrained(self.lu, remote_node_uuid)
2047 CheckNodeVmCapable(self.lu, remote_node_uuid)
2048
2049 old_node_info = self.cfg.GetNodeInfo(secondary_node_uuid)
2050 assert old_node_info is not None
2051 if old_node_info.offline and not self.early_release:
2052 # doesn't make sense to delay the release
2053 self.early_release = True
2054 self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
2055 " early-release mode", secondary_node_uuid)
2056
2057 else:
2058 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
2059 self.mode)
2060
2061 # If not specified all disks should be replaced
2062 if not self.disks:
2063 self.disks = range(len(self.instance.disks))
2064
2065 # TODO: This is ugly, but right now we can't distinguish between internal
2066 # submitted opcode and external one. We should fix that.
2067 if self.remote_node_info:
2068 # We change the node, lets verify it still meets instance policy
2069 new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
2070 cluster = self.cfg.GetClusterInfo()
2071 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2072 new_group_info)
2073 CheckTargetNodeIPolicy(self, ipolicy, self.instance,
2074 self.remote_node_info, self.cfg,
2075 ignore=self.ignore_ipolicy)
2076
2077 for node_uuid in check_nodes:
2078 CheckNodeOnline(self.lu, node_uuid)
2079
2080 touched_nodes = frozenset(node_uuid for node_uuid in [self.new_node_uuid,
2081 self.other_node_uuid,
2082 self.target_node_uuid]
2083 if node_uuid is not None)
2084
2085 # Release unneeded node and node resource locks
2086 ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
2087 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
2088 ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
2089
2090 # Release any owned node group
2091 ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
2092
2093 # Check whether disks are valid
2094 for disk_idx in self.disks:
2095 self.instance.FindDisk(disk_idx)
2096
2097 # Get secondary node IP addresses
2098 self.node_secondary_ip = dict((uuid, node.secondary_ip) for (uuid, node)
2099 in self.cfg.GetMultiNodeInfo(touched_nodes))
2100
2101 def Exec(self, feedback_fn):
2102 """Execute disk replacement.
2103
2104 This dispatches the disk replacement to the appropriate handler.
2105
2106 """
2107 if __debug__:
2108 # Verify owned locks before starting operation
2109 owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2110 assert set(owned_nodes) == set(self.node_secondary_ip), \
2111 ("Incorrect node locks, owning %s, expected %s" %
2112 (owned_nodes, self.node_secondary_ip.keys()))
2113 assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2114 self.lu.owned_locks(locking.LEVEL_NODE_RES))
2115 assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2116
2117 owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2118 assert list(owned_instances) == [self.instance_name], \
2119 "Instance '%s' not locked" % self.instance_name
2120
2121 assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
2122 "Should not own any node group lock at this point"
2123
2124 if not self.disks:
2125 feedback_fn("No disks need replacement for instance '%s'" %
2126 self.instance.name)
2127 return
2128
2129 feedback_fn("Replacing disk(s) %s for instance '%s'" %
2130 (utils.CommaJoin(self.disks), self.instance.name))
2131 feedback_fn("Current primary node: %s" %
2132 self.cfg.GetNodeName(self.instance.primary_node))
2133 feedback_fn("Current secondary node: %s" %
2134 utils.CommaJoin(self.cfg.GetNodeNames(
2135 self.instance.secondary_nodes)))
2136
2137 activate_disks = not self.instance.disks_active
2138
2139 # Activate the instance disks if we're replacing them on a down instance
2140 if activate_disks:
2141 StartInstanceDisks(self.lu, self.instance, True)
2142
2143 try:
2144 # Should we replace the secondary node?
2145 if self.new_node_uuid is not None:
2146 fn = self._ExecDrbd8Secondary
2147 else:
2148 fn = self._ExecDrbd8DiskOnly
2149
2150 result = fn(feedback_fn)
2151 finally:
2152 # Deactivate the instance disks if we're replacing them on a
2153 # down instance
2154 if activate_disks:
2155 _SafeShutdownInstanceDisks(self.lu, self.instance)
2156
2157 assert not self.lu.owned_locks(locking.LEVEL_NODE)
2158
2159 if __debug__:
2160 # Verify owned locks
2161 owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2162 nodes = frozenset(self.node_secondary_ip)
2163 assert ((self.early_release and not owned_nodes) or
2164 (not self.early_release and not (set(owned_nodes) - nodes))), \
2165 ("Not owning the correct locks, early_release=%s, owned=%r,"
2166 " nodes=%r" % (self.early_release, owned_nodes, nodes))
2167
2168 return result
2169
2170 def _CheckVolumeGroup(self, node_uuids):
2171 self.lu.LogInfo("Checking volume groups")
2172
2173 vgname = self.cfg.GetVGName()
2174
2175 # Make sure volume group exists on all involved nodes
2176 results = self.rpc.call_vg_list(node_uuids)
2177 if not results:
2178 raise errors.OpExecError("Can't list volume groups on the nodes")
2179
2180 for node_uuid in node_uuids:
2181 res = results[node_uuid]
2182 res.Raise("Error checking node %s" % self.cfg.GetNodeName(node_uuid))
2183 if vgname not in res.payload:
2184 raise errors.OpExecError("Volume group '%s' not found on node %s" %
2185 (vgname, self.cfg.GetNodeName(node_uuid)))
2186
2187 def _CheckDisksExistence(self, node_uuids):
2188 # Check disk existence
2189 for idx, dev in enumerate(self.instance.disks):
2190 if idx not in self.disks:
2191 continue
2192
2193 for node_uuid in node_uuids:
2194 self.lu.LogInfo("Checking disk/%d on %s", idx,
2195 self.cfg.GetNodeName(node_uuid))
2196
2197 result = _BlockdevFind(self, node_uuid, dev, self.instance)
2198
2199 msg = result.fail_msg
2200 if msg or not result.payload:
2201 if not msg:
2202 msg = "disk not found"
2203 if not self._CheckDisksActivated(self.instance):
2204 extra_hint = ("\nDisks seem to be not properly activated. Try"
2205 " running activate-disks on the instance before"
2206 " using replace-disks.")
2207 else:
2208 extra_hint = ""
2209 raise errors.OpExecError("Can't find disk/%d on node %s: %s%s" %
2210 (idx, self.cfg.GetNodeName(node_uuid), msg,
2211 extra_hint))
2212
2213 def _CheckDisksConsistency(self, node_uuid, on_primary, ldisk):
2214 for idx, dev in enumerate(self.instance.disks):
2215 if idx not in self.disks:
2216 continue
2217
2218 self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2219 (idx, self.cfg.GetNodeName(node_uuid)))
2220
2221 if not CheckDiskConsistency(self.lu, self.instance, dev, node_uuid,
2222 on_primary, ldisk=ldisk):
2223 raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2224 " replace disks for instance %s" %
2225 (self.cfg.GetNodeName(node_uuid),
2226 self.instance.name))
2227
2228 def _CreateNewStorage(self, node_uuid):
2229 """Create new storage on the primary or secondary node.
2230
2231 This is only used for same-node replaces, not for changing the
2232 secondary node, hence we don't want to modify the existing disk.
2233
2234 """
2235 iv_names = {}
2236
2237 disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2238 for idx, dev in enumerate(disks):
2239 if idx not in self.disks:
2240 continue
2241
2242 self.lu.LogInfo("Adding storage on %s for disk/%d",
2243 self.cfg.GetNodeName(node_uuid), idx)
2244
2245 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2246 names = _GenerateUniqueNames(self.lu, lv_names)
2247
2248 (data_disk, meta_disk) = dev.children
2249 vg_data = data_disk.logical_id[0]
2250 lv_data = objects.Disk(dev_type=constants.DT_PLAIN, size=dev.size,
2251 logical_id=(vg_data, names[0]),
2252 params=data_disk.params)
2253 vg_meta = meta_disk.logical_id[0]
2254 lv_meta = objects.Disk(dev_type=constants.DT_PLAIN,
2255 size=constants.DRBD_META_SIZE,
2256 logical_id=(vg_meta, names[1]),
2257 params=meta_disk.params)
2258
2259 new_lvs = [lv_data, lv_meta]
2260 old_lvs = [child.Copy() for child in dev.children]
2261 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2262 excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg, node_uuid)
2263
2264 # we pass force_create=True to force the LVM creation
2265 for new_lv in new_lvs:
2266 try:
2267 _CreateBlockDevInner(self.lu, node_uuid, self.instance, new_lv, True,
2268 GetInstanceInfoText(self.instance), False,
2269 excl_stor)
2270 except errors.DeviceCreationError, e:
2271 raise errors.OpExecError("Can't create block device: %s" % e.message)
2272
2273 return iv_names
2274
2275 def _CheckDevices(self, node_uuid, iv_names):
2276 for name, (dev, _, _) in iv_names.iteritems():
2277 result = _BlockdevFind(self, node_uuid, dev, self.instance)
2278
2279 msg = result.fail_msg
2280 if msg or not result.payload:
2281 if not msg:
2282 msg = "disk not found"
2283 raise errors.OpExecError("Can't find DRBD device %s: %s" %
2284 (name, msg))
2285
2286 if result.payload.is_degraded:
2287 raise errors.OpExecError("DRBD device %s is degraded!" % name)
2288
2289 def _RemoveOldStorage(self, node_uuid, iv_names):
2290 for name, (_, old_lvs, _) in iv_names.iteritems():
2291 self.lu.LogInfo("Remove logical volumes for %s", name)
2292
2293 for lv in old_lvs:
2294 msg = self.rpc.call_blockdev_remove(node_uuid, (lv, self.instance)) \
2295 .fail_msg
2296 if msg:
2297 self.lu.LogWarning("Can't remove old LV: %s", msg,
2298 hint="remove unused LVs manually")
2299
2300 def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2301 """Replace a disk on the primary or secondary for DRBD 8.
2302
2303 The algorithm for replace is quite complicated:
2304
2305 1. for each disk to be replaced:
2306
2307 1. create new LVs on the target node with unique names
2308 1. detach old LVs from the drbd device
2309 1. rename old LVs to name_replaced.<time_t>
2310 1. rename new LVs to old LVs
2311 1. attach the new LVs (with the old names now) to the drbd device
2312
2313 1. wait for sync across all devices
2314
2315 1. for each modified disk:
2316
2317 1. remove old LVs (which have the name name_replaces.<time_t>)
2318
2319 Failures are not very well handled.
2320
2321 """
2322 steps_total = 6
2323
2324 # Step: check device activation
2325 self.lu.LogStep(1, steps_total, "Check device existence")
2326 self._CheckDisksExistence([self.other_node_uuid, self.target_node_uuid])
2327 self._CheckVolumeGroup([self.target_node_uuid, self.other_node_uuid])
2328
2329 # Step: check other node consistency
2330 self.lu.LogStep(2, steps_total, "Check peer consistency")
2331 self._CheckDisksConsistency(
2332 self.other_node_uuid, self.other_node_uuid == self.instance.primary_node,
2333 False)
2334
2335 # Step: create new storage
2336 self.lu.LogStep(3, steps_total, "Allocate new storage")
2337 iv_names = self._CreateNewStorage(self.target_node_uuid)
2338
2339 # Step: for each lv, detach+rename*2+attach
2340 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2341 for dev, old_lvs, new_lvs in iv_names.itervalues():
2342 self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2343
2344 result = self.rpc.call_blockdev_removechildren(self.target_node_uuid,
2345 (dev, self.instance),
2346 (old_lvs, self.instance))
2347 result.Raise("Can't detach drbd from local storage on node"
2348 " %s for device %s" %
2349 (self.cfg.GetNodeName(self.target_node_uuid), dev.iv_name))
2350 #dev.children = []
2351 #cfg.Update(instance)
2352
2353 # ok, we created the new LVs, so now we know we have the needed
2354 # storage; as such, we proceed on the target node to rename
2355 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2356 # using the assumption that logical_id == unique_id on that node
2357
2358 # FIXME(iustin): use a better name for the replaced LVs
2359 temp_suffix = int(time.time())
2360 ren_fn = lambda d, suff: (d.logical_id[0],
2361 d.logical_id[1] + "_replaced-%s" % suff)
2362
2363 # Build the rename list based on what LVs exist on the node
2364 rename_old_to_new = []
2365 for to_ren in old_lvs:
2366 result = self.rpc.call_blockdev_find(self.target_node_uuid,
2367 (to_ren, self.instance))
2368 if not result.fail_msg and result.payload:
2369 # device exists
2370 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2371
2372 self.lu.LogInfo("Renaming the old LVs on the target node")
2373 result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2374 rename_old_to_new)
2375 result.Raise("Can't rename old LVs on node %s" %
2376 self.cfg.GetNodeName(self.target_node_uuid))
2377
2378 # Now we rename the new LVs to the old LVs
2379 self.lu.LogInfo("Renaming the new LVs on the target node")
2380 rename_new_to_old = [(new, old.logical_id)
2381 for old, new in zip(old_lvs, new_lvs)]
2382 result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2383 rename_new_to_old)
2384 result.Raise("Can't rename new LVs on node %s" %
2385 self.cfg.GetNodeName(self.target_node_uuid))
2386
2387 # Intermediate steps of in memory modifications
2388 for old, new in zip(old_lvs, new_lvs):
2389 new.logical_id = old.logical_id
2390
2391 # We need to modify old_lvs so that removal later removes the
2392 # right LVs, not the newly added ones; note that old_lvs is a
2393 # copy here
2394 for disk in old_lvs:
2395 disk.logical_id = ren_fn(disk, temp_suffix)
2396
2397 # Now that the new lvs have the old name, we can add them to the device
2398 self.lu.LogInfo("Adding new mirror component on %s",
2399 self.cfg.GetNodeName(self.target_node_uuid))
2400 result = self.rpc.call_blockdev_addchildren(self.target_node_uuid,
2401 (dev, self.instance),
2402 (new_lvs, self.instance))
2403 msg = result.fail_msg
2404 if msg:
2405 for new_lv in new_lvs:
2406 msg2 = self.rpc.call_blockdev_remove(self.target_node_uuid,
2407 (new_lv, self.instance)).fail_msg
2408 if msg2:
2409 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2410 hint=("cleanup manually the unused logical"
2411 "volumes"))
2412 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2413
2414 cstep = itertools.count(5)
2415
2416 if self.early_release:
2417 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2418 self._RemoveOldStorage(self.target_node_uuid, iv_names)
2419 # TODO: Check if releasing locks early still makes sense
2420 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2421 else:
2422 # Release all resource locks except those used by the instance
2423 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2424 keep=self.node_secondary_ip.keys())
2425
2426 # Release all node locks while waiting for sync
2427 ReleaseLocks(self.lu, locking.LEVEL_NODE)
2428
2429 # TODO: Can the instance lock be downgraded here? Take the optional disk
2430 # shutdown in the caller into consideration.
2431
2432 # Wait for sync
2433 # This can fail as the old devices are degraded and _WaitForSync
2434 # does a combined result over all disks, so we don't check its return value
2435 self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2436 WaitForSync(self.lu, self.instance)
2437
2438 # Check all devices manually
2439 self._CheckDevices(self.instance.primary_node, iv_names)
2440
2441 # Step: remove old storage
2442 if not self.early_release:
2443 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2444 self._RemoveOldStorage(self.target_node_uuid, iv_names)
2445
2446 def _ExecDrbd8Secondary(self, feedback_fn):
2447 """Replace the secondary node for DRBD 8.
2448
2449 The algorithm for replace is quite complicated:
2450 - for all disks of the instance:
2451 - create new LVs on the new node with same names
2452 - shutdown the drbd device on the old secondary
2453 - disconnect the drbd network on the primary
2454 - create the drbd device on the new secondary
2455 - network attach the drbd on the primary, using an artifice:
2456 the drbd code for Attach() will connect to the network if it
2457 finds a device which is connected to the good local disks but
2458 not network enabled
2459 - wait for sync across all devices
2460 - remove all disks from the old secondary
2461
2462 Failures are not very well handled.
2463
2464 """
2465 steps_total = 6
2466
2467 pnode = self.instance.primary_node
2468
2469 # Step: check device activation
2470 self.lu.LogStep(1, steps_total, "Check device existence")
2471 self._CheckDisksExistence([self.instance.primary_node])
2472 self._CheckVolumeGroup([self.instance.primary_node])
2473
2474 # Step: check other node consistency
2475 self.lu.LogStep(2, steps_total, "Check peer consistency")
2476 self._CheckDisksConsistency(self.instance.primary_node, True, True)
2477
2478 # Step: create new storage
2479 self.lu.LogStep(3, steps_total, "Allocate new storage")
2480 disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2481 excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg,
2482 self.new_node_uuid)
2483 for idx, dev in enumerate(disks):
2484 self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2485 (self.cfg.GetNodeName(self.new_node_uuid), idx))
2486 # we pass force_create=True to force LVM creation
2487 for new_lv in dev.children:
2488 try:
2489 _CreateBlockDevInner(self.lu, self.new_node_uuid, self.instance,
2490 new_lv, True, GetInstanceInfoText(self.instance),
2491 False, excl_stor)
2492 except errors.DeviceCreationError, e:
2493 raise errors.OpExecError("Can't create block device: %s" % e.message)
2494
2495 # Step 4: dbrd minors and drbd setups changes
2496 # after this, we must manually remove the drbd minors on both the
2497 # error and the success paths
2498 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2499 minors = self.cfg.AllocateDRBDMinor([self.new_node_uuid
2500 for _ in self.instance.disks],
2501 self.instance.uuid)
2502 logging.debug("Allocated minors %r", minors)
2503
2504 iv_names = {}
2505 for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2506 self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2507 (self.cfg.GetNodeName(self.new_node_uuid), idx))
2508 # create new devices on new_node; note that we create two IDs:
2509 # one without port, so the drbd will be activated without
2510 # networking information on the new node at this stage, and one
2511 # with network, for the latter activation in step 4
2512 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2513 if self.instance.primary_node == o_node1:
2514 p_minor = o_minor1
2515 else:
2516 assert self.instance.primary_node == o_node2, "Three-node instance?"
2517 p_minor = o_minor2
2518
2519 new_alone_id = (self.instance.primary_node, self.new_node_uuid, None,
2520 p_minor, new_minor, o_secret)
2521 new_net_id = (self.instance.primary_node, self.new_node_uuid, o_port,
2522 p_minor, new_minor, o_secret)
2523
2524 iv_names[idx] = (dev, dev.children, new_net_id)
2525 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2526 new_net_id)
2527 new_drbd = objects.Disk(dev_type=constants.DT_DRBD8,
2528 logical_id=new_alone_id,
2529 children=dev.children,
2530 size=dev.size,
2531 params={})
2532 (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2533 self.cfg)
2534 try:
2535 CreateSingleBlockDev(self.lu, self.new_node_uuid, self.instance,
2536 anno_new_drbd,
2537 GetInstanceInfoText(self.instance), False,
2538 excl_stor)
2539 except errors.GenericError:
2540 self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2541 raise
2542
2543 # We have new devices, shutdown the drbd on the old secondary
2544 for idx, dev in enumerate(self.instance.disks):
2545 self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2546 msg = self.rpc.call_blockdev_shutdown(self.target_node_uuid,
2547 (dev, self.instance)).fail_msg
2548 if msg:
2549 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2550 "node: %s" % (idx, msg),
2551 hint=("Please cleanup this device manually as"
2552 " soon as possible"))
2553
2554 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2555 result = self.rpc.call_drbd_disconnect_net(
2556 [pnode], (self.instance.disks, self.instance))[pnode]
2557
2558 msg = result.fail_msg
2559 if msg:
2560 # detaches didn't succeed (unlikely)
2561 self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2562 raise errors.OpExecError("Can't detach the disks from the network on"
2563 " old node: %s" % (msg,))
2564
2565 # if we managed to detach at least one, we update all the disks of
2566 # the instance to point to the new secondary
2567 self.lu.LogInfo("Updating instance configuration")
2568 for dev, _, new_logical_id in iv_names.itervalues():
2569 dev.logical_id = new_logical_id
2570
2571 self.cfg.Update(self.instance, feedback_fn)
2572
2573 # Release all node locks (the configuration has been updated)
2574 ReleaseLocks(self.lu, locking.LEVEL_NODE)
2575
2576 # and now perform the drbd attach
2577 self.lu.LogInfo("Attaching primary drbds to new secondary"
2578 " (standalone => connected)")
2579 result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2580 self.new_node_uuid],
2581 (self.instance.disks, self.instance),
2582 self.instance.name,
2583 False)
2584 for to_node, to_result in result.items():
2585 msg = to_result.fail_msg
2586 if msg:
2587 raise errors.OpExecError(
2588 "Can't attach drbd disks on node %s: %s (please do a gnt-instance "
2589 "info %s to see the status of disks)" %
2590 (self.cfg.GetNodeName(to_node), msg, self.instance.name))
2591
2592 cstep = itertools.count(5)
2593
2594 if self.early_release:
2595 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2596 self._RemoveOldStorage(self.target_node_uuid, iv_names)
2597 # TODO: Check if releasing locks early still makes sense
2598 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2599 else:
2600 # Release all resource locks except those used by the instance
2601 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2602 keep=self.node_secondary_ip.keys())
2603
2604 # TODO: Can the instance lock be downgraded here? Take the optional disk
2605 # shutdown in the caller into consideration.
2606
2607 # Wait for sync
2608 # This can fail as the old devices are degraded and _WaitForSync
2609 # does a combined result over all disks, so we don't check its return value
2610 self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2611 WaitForSync(self.lu, self.instance)
2612
2613 # Check all devices manually
2614 self._CheckDevices(self.instance.primary_node, iv_names)
2615
2616 # Step: remove old storage
2617 if not self.early_release:
2618 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2619 self._RemoveOldStorage(self.target_node_uuid, iv_names)