Merge branch 'stable-2.10' into stable-2.11
[ganeti-github.git] / lib / cmdlib / instance_storage.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions are
9 # met:
10 #
11 # 1. Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
13 #
14 # 2. Redistributions in binary form must reproduce the above copyright
15 # notice, this list of conditions and the following disclaimer in the
16 # documentation and/or other materials provided with the distribution.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
19 # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
20 # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30
31 """Logical units dealing with storage of instances."""
32
33 import itertools
34 import logging
35 import os
36 import time
37
38 from ganeti import compat
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import ht
42 from ganeti import locking
43 from ganeti.masterd import iallocator
44 from ganeti import objects
45 from ganeti import utils
46 import ganeti.rpc.node as rpc
47 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
48 from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
49 AnnotateDiskParams, CheckIAllocatorOrNode, ExpandNodeUuidAndName, \
50 CheckNodeOnline, CheckInstanceNodeGroups, CheckInstanceState, \
51 IsExclusiveStorageEnabledNode, FindFaultyInstanceDisks, GetWantedNodes, \
52 CheckDiskTemplateEnabled
53 from ganeti.cmdlib.instance_utils import GetInstanceInfoText, \
54 CopyLockList, ReleaseLocks, CheckNodeVmCapable, \
55 BuildInstanceHookEnvByObject, CheckNodeNotDrained, CheckTargetNodeIPolicy
56
57 import ganeti.masterd.instance
58
59
60 _DISK_TEMPLATE_NAME_PREFIX = {
61 constants.DT_PLAIN: "",
62 constants.DT_RBD: ".rbd",
63 constants.DT_EXT: ".ext",
64 constants.DT_FILE: ".file",
65 constants.DT_SHARED_FILE: ".sharedfile",
66 }
67
68
69 def CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
70 excl_stor):
71 """Create a single block device on a given node.
72
73 This will not recurse over children of the device, so they must be
74 created in advance.
75
76 @param lu: the lu on whose behalf we execute
77 @param node_uuid: the node on which to create the device
78 @type instance: L{objects.Instance}
79 @param instance: the instance which owns the device
80 @type device: L{objects.Disk}
81 @param device: the device to create
82 @param info: the extra 'metadata' we should attach to the device
83 (this will be represented as a LVM tag)
84 @type force_open: boolean
85 @param force_open: this parameter will be passes to the
86 L{backend.BlockdevCreate} function where it specifies
87 whether we run on primary or not, and it affects both
88 the child assembly and the device own Open() execution
89 @type excl_stor: boolean
90 @param excl_stor: Whether exclusive_storage is active for the node
91
92 """
93 result = lu.rpc.call_blockdev_create(node_uuid, (device, instance),
94 device.size, instance.name, force_open,
95 info, excl_stor)
96 result.Raise("Can't create block device %s on"
97 " node %s for instance %s" % (device,
98 lu.cfg.GetNodeName(node_uuid),
99 instance.name))
100
101
102 def _CreateBlockDevInner(lu, node_uuid, instance, device, force_create,
103 info, force_open, excl_stor):
104 """Create a tree of block devices on a given node.
105
106 If this device type has to be created on secondaries, create it and
107 all its children.
108
109 If not, just recurse to children keeping the same 'force' value.
110
111 @attention: The device has to be annotated already.
112
113 @param lu: the lu on whose behalf we execute
114 @param node_uuid: the node on which to create the device
115 @type instance: L{objects.Instance}
116 @param instance: the instance which owns the device
117 @type device: L{objects.Disk}
118 @param device: the device to create
119 @type force_create: boolean
120 @param force_create: whether to force creation of this device; this
121 will be change to True whenever we find a device which has
122 CreateOnSecondary() attribute
123 @param info: the extra 'metadata' we should attach to the device
124 (this will be represented as a LVM tag)
125 @type force_open: boolean
126 @param force_open: this parameter will be passes to the
127 L{backend.BlockdevCreate} function where it specifies
128 whether we run on primary or not, and it affects both
129 the child assembly and the device own Open() execution
130 @type excl_stor: boolean
131 @param excl_stor: Whether exclusive_storage is active for the node
132
133 @return: list of created devices
134 """
135 created_devices = []
136 try:
137 if device.CreateOnSecondary():
138 force_create = True
139
140 if device.children:
141 for child in device.children:
142 devs = _CreateBlockDevInner(lu, node_uuid, instance, child,
143 force_create, info, force_open, excl_stor)
144 created_devices.extend(devs)
145
146 if not force_create:
147 return created_devices
148
149 CreateSingleBlockDev(lu, node_uuid, instance, device, info, force_open,
150 excl_stor)
151 # The device has been completely created, so there is no point in keeping
152 # its subdevices in the list. We just add the device itself instead.
153 created_devices = [(node_uuid, device)]
154 return created_devices
155
156 except errors.DeviceCreationError, e:
157 e.created_devices.extend(created_devices)
158 raise e
159 except errors.OpExecError, e:
160 raise errors.DeviceCreationError(str(e), created_devices)
161
162
163 def IsExclusiveStorageEnabledNodeUuid(cfg, node_uuid):
164 """Whether exclusive_storage is in effect for the given node.
165
166 @type cfg: L{config.ConfigWriter}
167 @param cfg: The cluster configuration
168 @type node_uuid: string
169 @param node_uuid: The node UUID
170 @rtype: bool
171 @return: The effective value of exclusive_storage
172 @raise errors.OpPrereqError: if no node exists with the given name
173
174 """
175 ni = cfg.GetNodeInfo(node_uuid)
176 if ni is None:
177 raise errors.OpPrereqError("Invalid node UUID %s" % node_uuid,
178 errors.ECODE_NOENT)
179 return IsExclusiveStorageEnabledNode(cfg, ni)
180
181
182 def _CreateBlockDev(lu, node_uuid, instance, device, force_create, info,
183 force_open):
184 """Wrapper around L{_CreateBlockDevInner}.
185
186 This method annotates the root device first.
187
188 """
189 (disk,) = AnnotateDiskParams(instance, [device], lu.cfg)
190 excl_stor = IsExclusiveStorageEnabledNodeUuid(lu.cfg, node_uuid)
191 return _CreateBlockDevInner(lu, node_uuid, instance, disk, force_create, info,
192 force_open, excl_stor)
193
194
195 def _UndoCreateDisks(lu, disks_created, instance):
196 """Undo the work performed by L{CreateDisks}.
197
198 This function is called in case of an error to undo the work of
199 L{CreateDisks}.
200
201 @type lu: L{LogicalUnit}
202 @param lu: the logical unit on whose behalf we execute
203 @param disks_created: the result returned by L{CreateDisks}
204 @type instance: L{objects.Instance}
205 @param instance: the instance for which disks were created
206
207 """
208 for (node_uuid, disk) in disks_created:
209 result = lu.rpc.call_blockdev_remove(node_uuid, (disk, instance))
210 result.Warn("Failed to remove newly-created disk %s on node %s" %
211 (disk, lu.cfg.GetNodeName(node_uuid)), logging.warning)
212
213
214 def CreateDisks(lu, instance, to_skip=None, target_node_uuid=None, disks=None):
215 """Create all disks for an instance.
216
217 This abstracts away some work from AddInstance.
218
219 @type lu: L{LogicalUnit}
220 @param lu: the logical unit on whose behalf we execute
221 @type instance: L{objects.Instance}
222 @param instance: the instance whose disks we should create
223 @type to_skip: list
224 @param to_skip: list of indices to skip
225 @type target_node_uuid: string
226 @param target_node_uuid: if passed, overrides the target node for creation
227 @type disks: list of {objects.Disk}
228 @param disks: the disks to create; if not specified, all the disks of the
229 instance are created
230 @return: information about the created disks, to be used to call
231 L{_UndoCreateDisks}
232 @raise errors.OpPrereqError: in case of error
233
234 """
235 info = GetInstanceInfoText(instance)
236 if target_node_uuid is None:
237 pnode_uuid = instance.primary_node
238 all_node_uuids = instance.all_nodes
239 else:
240 pnode_uuid = target_node_uuid
241 all_node_uuids = [pnode_uuid]
242
243 if disks is None:
244 disks = instance.disks
245
246 CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), instance.disk_template)
247
248 if instance.disk_template in constants.DTS_FILEBASED:
249 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
250 result = lu.rpc.call_file_storage_dir_create(pnode_uuid, file_storage_dir)
251
252 result.Raise("Failed to create directory '%s' on"
253 " node %s" % (file_storage_dir,
254 lu.cfg.GetNodeName(pnode_uuid)))
255
256 disks_created = []
257 for idx, device in enumerate(disks):
258 if to_skip and idx in to_skip:
259 continue
260 logging.info("Creating disk %s for instance '%s'", idx, instance.name)
261 for node_uuid in all_node_uuids:
262 f_create = node_uuid == pnode_uuid
263 try:
264 _CreateBlockDev(lu, node_uuid, instance, device, f_create, info,
265 f_create)
266 disks_created.append((node_uuid, device))
267 except errors.DeviceCreationError, e:
268 logging.warning("Creating disk %s for instance '%s' failed",
269 idx, instance.name)
270 disks_created.extend(e.created_devices)
271 _UndoCreateDisks(lu, disks_created, instance)
272 raise errors.OpExecError(e.message)
273 return disks_created
274
275
276 def ComputeDiskSizePerVG(disk_template, disks):
277 """Compute disk size requirements in the volume group
278
279 """
280 def _compute(disks, payload):
281 """Universal algorithm.
282
283 """
284 vgs = {}
285 for disk in disks:
286 vgs[disk[constants.IDISK_VG]] = \
287 vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
288
289 return vgs
290
291 # Required free disk space as a function of disk and swap space
292 req_size_dict = {
293 constants.DT_DISKLESS: {},
294 constants.DT_PLAIN: _compute(disks, 0),
295 # 128 MB are added for drbd metadata for each disk
296 constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
297 constants.DT_FILE: {},
298 constants.DT_SHARED_FILE: {},
299 constants.DT_GLUSTER: {},
300 }
301
302 if disk_template not in req_size_dict:
303 raise errors.ProgrammerError("Disk template '%s' size requirement"
304 " is unknown" % disk_template)
305
306 return req_size_dict[disk_template]
307
308
309 def ComputeDisks(op, default_vg):
310 """Computes the instance disks.
311
312 @param op: The instance opcode
313 @param default_vg: The default_vg to assume
314
315 @return: The computed disks
316
317 """
318 disks = []
319 for disk in op.disks:
320 mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
321 if mode not in constants.DISK_ACCESS_SET:
322 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
323 mode, errors.ECODE_INVAL)
324 size = disk.get(constants.IDISK_SIZE, None)
325 if size is None:
326 raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
327 try:
328 size = int(size)
329 except (TypeError, ValueError):
330 raise errors.OpPrereqError("Invalid disk size '%s'" % size,
331 errors.ECODE_INVAL)
332
333 ext_provider = disk.get(constants.IDISK_PROVIDER, None)
334 if ext_provider and op.disk_template != constants.DT_EXT:
335 raise errors.OpPrereqError("The '%s' option is only valid for the %s"
336 " disk template, not %s" %
337 (constants.IDISK_PROVIDER, constants.DT_EXT,
338 op.disk_template), errors.ECODE_INVAL)
339
340 data_vg = disk.get(constants.IDISK_VG, default_vg)
341 name = disk.get(constants.IDISK_NAME, None)
342 if name is not None and name.lower() == constants.VALUE_NONE:
343 name = None
344 new_disk = {
345 constants.IDISK_SIZE: size,
346 constants.IDISK_MODE: mode,
347 constants.IDISK_VG: data_vg,
348 constants.IDISK_NAME: name,
349 }
350
351 for key in [
352 constants.IDISK_METAVG,
353 constants.IDISK_ADOPT,
354 constants.IDISK_SPINDLES,
355 ]:
356 if key in disk:
357 new_disk[key] = disk[key]
358
359 # For extstorage, demand the `provider' option and add any
360 # additional parameters (ext-params) to the dict
361 if op.disk_template == constants.DT_EXT:
362 if ext_provider:
363 new_disk[constants.IDISK_PROVIDER] = ext_provider
364 for key in disk:
365 if key not in constants.IDISK_PARAMS:
366 new_disk[key] = disk[key]
367 else:
368 raise errors.OpPrereqError("Missing provider for template '%s'" %
369 constants.DT_EXT, errors.ECODE_INVAL)
370
371 disks.append(new_disk)
372
373 return disks
374
375
376 def CheckRADOSFreeSpace():
377 """Compute disk size requirements inside the RADOS cluster.
378
379 """
380 # For the RADOS cluster we assume there is always enough space.
381 pass
382
383
384 def _GenerateDRBD8Branch(lu, primary_uuid, secondary_uuid, size, vgnames, names,
385 iv_name, p_minor, s_minor):
386 """Generate a drbd8 device complete with its children.
387
388 """
389 assert len(vgnames) == len(names) == 2
390 port = lu.cfg.AllocatePort()
391 shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
392
393 dev_data = objects.Disk(dev_type=constants.DT_PLAIN, size=size,
394 logical_id=(vgnames[0], names[0]),
395 params={})
396 dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
397 dev_meta = objects.Disk(dev_type=constants.DT_PLAIN,
398 size=constants.DRBD_META_SIZE,
399 logical_id=(vgnames[1], names[1]),
400 params={})
401 dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
402 drbd_dev = objects.Disk(dev_type=constants.DT_DRBD8, size=size,
403 logical_id=(primary_uuid, secondary_uuid, port,
404 p_minor, s_minor,
405 shared_secret),
406 children=[dev_data, dev_meta],
407 iv_name=iv_name, params={})
408 drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
409 return drbd_dev
410
411
412 def GenerateDiskTemplate(
413 lu, template_name, instance_uuid, primary_node_uuid, secondary_node_uuids,
414 disk_info, file_storage_dir, file_driver, base_index,
415 feedback_fn, full_disk_params):
416 """Generate the entire disk layout for a given template type.
417
418 """
419 vgname = lu.cfg.GetVGName()
420 disk_count = len(disk_info)
421 disks = []
422
423 CheckDiskTemplateEnabled(lu.cfg.GetClusterInfo(), template_name)
424
425 if template_name == constants.DT_DISKLESS:
426 pass
427 elif template_name == constants.DT_DRBD8:
428 if len(secondary_node_uuids) != 1:
429 raise errors.ProgrammerError("Wrong template configuration")
430 remote_node_uuid = secondary_node_uuids[0]
431 minors = lu.cfg.AllocateDRBDMinor(
432 [primary_node_uuid, remote_node_uuid] * len(disk_info), instance_uuid)
433
434 (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
435 full_disk_params)
436 drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
437
438 names = []
439 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
440 for i in range(disk_count)]):
441 names.append(lv_prefix + "_data")
442 names.append(lv_prefix + "_meta")
443 for idx, disk in enumerate(disk_info):
444 disk_index = idx + base_index
445 data_vg = disk.get(constants.IDISK_VG, vgname)
446 meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
447 disk_dev = _GenerateDRBD8Branch(lu, primary_node_uuid, remote_node_uuid,
448 disk[constants.IDISK_SIZE],
449 [data_vg, meta_vg],
450 names[idx * 2:idx * 2 + 2],
451 "disk/%d" % disk_index,
452 minors[idx * 2], minors[idx * 2 + 1])
453 disk_dev.mode = disk[constants.IDISK_MODE]
454 disk_dev.name = disk.get(constants.IDISK_NAME, None)
455 disks.append(disk_dev)
456 else:
457 if secondary_node_uuids:
458 raise errors.ProgrammerError("Wrong template configuration")
459
460 name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
461 if name_prefix is None:
462 names = None
463 else:
464 names = _GenerateUniqueNames(lu, ["%s.disk%s" %
465 (name_prefix, base_index + i)
466 for i in range(disk_count)])
467
468 if template_name == constants.DT_PLAIN:
469
470 def logical_id_fn(idx, _, disk):
471 vg = disk.get(constants.IDISK_VG, vgname)
472 return (vg, names[idx])
473
474 elif template_name == constants.DT_GLUSTER:
475 logical_id_fn = lambda _1, disk_index, _2: \
476 (file_driver, "ganeti/%s.%d" % (instance_uuid,
477 disk_index))
478
479 elif template_name in constants.DTS_FILEBASED: # Gluster handled above
480 logical_id_fn = \
481 lambda _, disk_index, disk: (file_driver,
482 "%s/%s" % (file_storage_dir,
483 names[idx]))
484 elif template_name == constants.DT_BLOCK:
485 logical_id_fn = \
486 lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
487 disk[constants.IDISK_ADOPT])
488 elif template_name == constants.DT_RBD:
489 logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
490 elif template_name == constants.DT_EXT:
491 def logical_id_fn(idx, _, disk):
492 provider = disk.get(constants.IDISK_PROVIDER, None)
493 if provider is None:
494 raise errors.ProgrammerError("Disk template is %s, but '%s' is"
495 " not found", constants.DT_EXT,
496 constants.IDISK_PROVIDER)
497 return (provider, names[idx])
498 else:
499 raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
500
501 dev_type = template_name
502
503 for idx, disk in enumerate(disk_info):
504 params = {}
505 # Only for the Ext template add disk_info to params
506 if template_name == constants.DT_EXT:
507 params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
508 for key in disk:
509 if key not in constants.IDISK_PARAMS:
510 params[key] = disk[key]
511 disk_index = idx + base_index
512 size = disk[constants.IDISK_SIZE]
513 feedback_fn("* disk %s, size %s" %
514 (disk_index, utils.FormatUnit(size, "h")))
515 disk_dev = objects.Disk(dev_type=dev_type, size=size,
516 logical_id=logical_id_fn(idx, disk_index, disk),
517 iv_name="disk/%d" % disk_index,
518 mode=disk[constants.IDISK_MODE],
519 params=params,
520 spindles=disk.get(constants.IDISK_SPINDLES))
521 disk_dev.name = disk.get(constants.IDISK_NAME, None)
522 disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
523 disks.append(disk_dev)
524
525 return disks
526
527
528 def CheckSpindlesExclusiveStorage(diskdict, es_flag, required):
529 """Check the presence of the spindle options with exclusive_storage.
530
531 @type diskdict: dict
532 @param diskdict: disk parameters
533 @type es_flag: bool
534 @param es_flag: the effective value of the exlusive_storage flag
535 @type required: bool
536 @param required: whether spindles are required or just optional
537 @raise errors.OpPrereqError when spindles are given and they should not
538
539 """
540 if (not es_flag and constants.IDISK_SPINDLES in diskdict and
541 diskdict[constants.IDISK_SPINDLES] is not None):
542 raise errors.OpPrereqError("Spindles in instance disks cannot be specified"
543 " when exclusive storage is not active",
544 errors.ECODE_INVAL)
545 if (es_flag and required and (constants.IDISK_SPINDLES not in diskdict or
546 diskdict[constants.IDISK_SPINDLES] is None)):
547 raise errors.OpPrereqError("You must specify spindles in instance disks"
548 " when exclusive storage is active",
549 errors.ECODE_INVAL)
550
551
552 class LUInstanceRecreateDisks(LogicalUnit):
553 """Recreate an instance's missing disks.
554
555 """
556 HPATH = "instance-recreate-disks"
557 HTYPE = constants.HTYPE_INSTANCE
558 REQ_BGL = False
559
560 _MODIFYABLE = compat.UniqueFrozenset([
561 constants.IDISK_SIZE,
562 constants.IDISK_MODE,
563 constants.IDISK_SPINDLES,
564 ])
565
566 # New or changed disk parameters may have different semantics
567 assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
568 constants.IDISK_ADOPT,
569
570 # TODO: Implement support changing VG while recreating
571 constants.IDISK_VG,
572 constants.IDISK_METAVG,
573 constants.IDISK_PROVIDER,
574 constants.IDISK_NAME,
575 ]))
576
577 def _RunAllocator(self):
578 """Run the allocator based on input opcode.
579
580 """
581 be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
582
583 # FIXME
584 # The allocator should actually run in "relocate" mode, but current
585 # allocators don't support relocating all the nodes of an instance at
586 # the same time. As a workaround we use "allocate" mode, but this is
587 # suboptimal for two reasons:
588 # - The instance name passed to the allocator is present in the list of
589 # existing instances, so there could be a conflict within the
590 # internal structures of the allocator. This doesn't happen with the
591 # current allocators, but it's a liability.
592 # - The allocator counts the resources used by the instance twice: once
593 # because the instance exists already, and once because it tries to
594 # allocate a new instance.
595 # The allocator could choose some of the nodes on which the instance is
596 # running, but that's not a problem. If the instance nodes are broken,
597 # they should be already be marked as drained or offline, and hence
598 # skipped by the allocator. If instance disks have been lost for other
599 # reasons, then recreating the disks on the same nodes should be fine.
600 disk_template = self.instance.disk_template
601 spindle_use = be_full[constants.BE_SPINDLE_USE]
602 disks = [{
603 constants.IDISK_SIZE: d.size,
604 constants.IDISK_MODE: d.mode,
605 constants.IDISK_SPINDLES: d.spindles,
606 } for d in self.instance.disks]
607 req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
608 disk_template=disk_template,
609 tags=list(self.instance.GetTags()),
610 os=self.instance.os,
611 nics=[{}],
612 vcpus=be_full[constants.BE_VCPUS],
613 memory=be_full[constants.BE_MAXMEM],
614 spindle_use=spindle_use,
615 disks=disks,
616 hypervisor=self.instance.hypervisor,
617 node_whitelist=None)
618 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
619
620 ial.Run(self.op.iallocator)
621
622 assert req.RequiredNodes() == len(self.instance.all_nodes)
623
624 if not ial.success:
625 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
626 " %s" % (self.op.iallocator, ial.info),
627 errors.ECODE_NORES)
628
629 (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, ial.result)
630 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
631 self.op.instance_name, self.op.iallocator,
632 utils.CommaJoin(self.op.nodes))
633
634 def CheckArguments(self):
635 if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
636 # Normalize and convert deprecated list of disk indices
637 self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
638
639 duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
640 if duplicates:
641 raise errors.OpPrereqError("Some disks have been specified more than"
642 " once: %s" % utils.CommaJoin(duplicates),
643 errors.ECODE_INVAL)
644
645 # We don't want _CheckIAllocatorOrNode selecting the default iallocator
646 # when neither iallocator nor nodes are specified
647 if self.op.iallocator or self.op.nodes:
648 CheckIAllocatorOrNode(self, "iallocator", "nodes")
649
650 for (idx, params) in self.op.disks:
651 utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
652 unsupported = frozenset(params.keys()) - self._MODIFYABLE
653 if unsupported:
654 raise errors.OpPrereqError("Parameters for disk %s try to change"
655 " unmodifyable parameter(s): %s" %
656 (idx, utils.CommaJoin(unsupported)),
657 errors.ECODE_INVAL)
658
659 def ExpandNames(self):
660 self._ExpandAndLockInstance()
661 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
662
663 if self.op.nodes:
664 (self.op.node_uuids, self.op.nodes) = GetWantedNodes(self, self.op.nodes)
665 self.needed_locks[locking.LEVEL_NODE] = list(self.op.node_uuids)
666 else:
667 self.needed_locks[locking.LEVEL_NODE] = []
668 if self.op.iallocator:
669 # iallocator will select a new node in the same group
670 self.needed_locks[locking.LEVEL_NODEGROUP] = []
671 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
672
673 self.needed_locks[locking.LEVEL_NODE_RES] = []
674
675 def DeclareLocks(self, level):
676 if level == locking.LEVEL_NODEGROUP:
677 assert self.op.iallocator is not None
678 assert not self.op.nodes
679 assert not self.needed_locks[locking.LEVEL_NODEGROUP]
680 self.share_locks[locking.LEVEL_NODEGROUP] = 1
681 # Lock the primary group used by the instance optimistically; this
682 # requires going via the node before it's locked, requiring
683 # verification later on
684 self.needed_locks[locking.LEVEL_NODEGROUP] = \
685 self.cfg.GetInstanceNodeGroups(self.op.instance_uuid, primary_only=True)
686
687 elif level == locking.LEVEL_NODE:
688 # If an allocator is used, then we lock all the nodes in the current
689 # instance group, as we don't know yet which ones will be selected;
690 # if we replace the nodes without using an allocator, locks are
691 # already declared in ExpandNames; otherwise, we need to lock all the
692 # instance nodes for disk re-creation
693 if self.op.iallocator:
694 assert not self.op.nodes
695 assert not self.needed_locks[locking.LEVEL_NODE]
696 assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
697
698 # Lock member nodes of the group of the primary node
699 for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
700 self.needed_locks[locking.LEVEL_NODE].extend(
701 self.cfg.GetNodeGroup(group_uuid).members)
702
703 assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
704 elif not self.op.nodes:
705 self._LockInstancesNodes(primary_only=False)
706 elif level == locking.LEVEL_NODE_RES:
707 # Copy node locks
708 self.needed_locks[locking.LEVEL_NODE_RES] = \
709 CopyLockList(self.needed_locks[locking.LEVEL_NODE])
710
711 def BuildHooksEnv(self):
712 """Build hooks env.
713
714 This runs on master, primary and secondary nodes of the instance.
715
716 """
717 return BuildInstanceHookEnvByObject(self, self.instance)
718
719 def BuildHooksNodes(self):
720 """Build hooks nodes.
721
722 """
723 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
724 return (nl, nl)
725
726 def CheckPrereq(self):
727 """Check prerequisites.
728
729 This checks that the instance is in the cluster and is not running.
730
731 """
732 instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
733 assert instance is not None, \
734 "Cannot retrieve locked instance %s" % self.op.instance_name
735 if self.op.node_uuids:
736 if len(self.op.node_uuids) != len(instance.all_nodes):
737 raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
738 " %d replacement nodes were specified" %
739 (instance.name, len(instance.all_nodes),
740 len(self.op.node_uuids)),
741 errors.ECODE_INVAL)
742 assert instance.disk_template != constants.DT_DRBD8 or \
743 len(self.op.node_uuids) == 2
744 assert instance.disk_template != constants.DT_PLAIN or \
745 len(self.op.node_uuids) == 1
746 primary_node = self.op.node_uuids[0]
747 else:
748 primary_node = instance.primary_node
749 if not self.op.iallocator:
750 CheckNodeOnline(self, primary_node)
751
752 if instance.disk_template == constants.DT_DISKLESS:
753 raise errors.OpPrereqError("Instance '%s' has no disks" %
754 self.op.instance_name, errors.ECODE_INVAL)
755
756 # Verify if node group locks are still correct
757 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
758 if owned_groups:
759 # Node group locks are acquired only for the primary node (and only
760 # when the allocator is used)
761 CheckInstanceNodeGroups(self.cfg, instance.uuid, owned_groups,
762 primary_only=True)
763
764 # if we replace nodes *and* the old primary is offline, we don't
765 # check the instance state
766 old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
767 if not ((self.op.iallocator or self.op.node_uuids) and old_pnode.offline):
768 CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
769 msg="cannot recreate disks")
770
771 if self.op.disks:
772 self.disks = dict(self.op.disks)
773 else:
774 self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
775
776 maxidx = max(self.disks.keys())
777 if maxidx >= len(instance.disks):
778 raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
779 errors.ECODE_INVAL)
780
781 if ((self.op.node_uuids or self.op.iallocator) and
782 sorted(self.disks.keys()) != range(len(instance.disks))):
783 raise errors.OpPrereqError("Can't recreate disks partially and"
784 " change the nodes at the same time",
785 errors.ECODE_INVAL)
786
787 self.instance = instance
788
789 if self.op.iallocator:
790 self._RunAllocator()
791 # Release unneeded node and node resource locks
792 ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.node_uuids)
793 ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.node_uuids)
794 ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
795
796 assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
797
798 if self.op.node_uuids:
799 node_uuids = self.op.node_uuids
800 else:
801 node_uuids = instance.all_nodes
802 excl_stor = compat.any(
803 rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids).values()
804 )
805 for new_params in self.disks.values():
806 CheckSpindlesExclusiveStorage(new_params, excl_stor, False)
807
808 def Exec(self, feedback_fn):
809 """Recreate the disks.
810
811 """
812 assert (self.owned_locks(locking.LEVEL_NODE) ==
813 self.owned_locks(locking.LEVEL_NODE_RES))
814
815 to_skip = []
816 mods = [] # keeps track of needed changes
817
818 for idx, disk in enumerate(self.instance.disks):
819 try:
820 changes = self.disks[idx]
821 except KeyError:
822 # Disk should not be recreated
823 to_skip.append(idx)
824 continue
825
826 # update secondaries for disks, if needed
827 if self.op.node_uuids and disk.dev_type == constants.DT_DRBD8:
828 # need to update the nodes and minors
829 assert len(self.op.node_uuids) == 2
830 assert len(disk.logical_id) == 6 # otherwise disk internals
831 # have changed
832 (_, _, old_port, _, _, old_secret) = disk.logical_id
833 new_minors = self.cfg.AllocateDRBDMinor(self.op.node_uuids,
834 self.instance.uuid)
835 new_id = (self.op.node_uuids[0], self.op.node_uuids[1], old_port,
836 new_minors[0], new_minors[1], old_secret)
837 assert len(disk.logical_id) == len(new_id)
838 else:
839 new_id = None
840
841 mods.append((idx, new_id, changes))
842
843 # now that we have passed all asserts above, we can apply the mods
844 # in a single run (to avoid partial changes)
845 for idx, new_id, changes in mods:
846 disk = self.instance.disks[idx]
847 if new_id is not None:
848 assert disk.dev_type == constants.DT_DRBD8
849 disk.logical_id = new_id
850 if changes:
851 disk.Update(size=changes.get(constants.IDISK_SIZE, None),
852 mode=changes.get(constants.IDISK_MODE, None),
853 spindles=changes.get(constants.IDISK_SPINDLES, None))
854
855 # change primary node, if needed
856 if self.op.node_uuids:
857 self.instance.primary_node = self.op.node_uuids[0]
858 self.LogWarning("Changing the instance's nodes, you will have to"
859 " remove any disks left on the older nodes manually")
860
861 if self.op.node_uuids:
862 self.cfg.Update(self.instance, feedback_fn)
863
864 # All touched nodes must be locked
865 mylocks = self.owned_locks(locking.LEVEL_NODE)
866 assert mylocks.issuperset(frozenset(self.instance.all_nodes))
867 new_disks = CreateDisks(self, self.instance, to_skip=to_skip)
868
869 # TODO: Release node locks before wiping, or explain why it's not possible
870 if self.cfg.GetClusterInfo().prealloc_wipe_disks:
871 wipedisks = [(idx, disk, 0)
872 for (idx, disk) in enumerate(self.instance.disks)
873 if idx not in to_skip]
874 WipeOrCleanupDisks(self, self.instance, disks=wipedisks,
875 cleanup=new_disks)
876
877
878 def _PerformNodeInfoCall(lu, node_uuids, vg):
879 """Prepares the input and performs a node info call.
880
881 @type lu: C{LogicalUnit}
882 @param lu: a logical unit from which we get configuration data
883 @type node_uuids: list of string
884 @param node_uuids: list of node UUIDs to perform the call for
885 @type vg: string
886 @param vg: the volume group's name
887
888 """
889 lvm_storage_units = [(constants.ST_LVM_VG, vg)]
890 storage_units = rpc.PrepareStorageUnitsForNodes(lu.cfg, lvm_storage_units,
891 node_uuids)
892 hvname = lu.cfg.GetHypervisorType()
893 hvparams = lu.cfg.GetClusterInfo().hvparams
894 nodeinfo = lu.rpc.call_node_info(node_uuids, storage_units,
895 [(hvname, hvparams[hvname])])
896 return nodeinfo
897
898
899 def _CheckVgCapacityForNode(node_name, node_info, vg, requested):
900 """Checks the vg capacity for a given node.
901
902 @type node_info: tuple (_, list of dicts, _)
903 @param node_info: the result of the node info call for one node
904 @type node_name: string
905 @param node_name: the name of the node
906 @type vg: string
907 @param vg: volume group name
908 @type requested: int
909 @param requested: the amount of disk in MiB to check for
910 @raise errors.OpPrereqError: if the node doesn't have enough disk,
911 or we cannot check the node
912
913 """
914 (_, space_info, _) = node_info
915 lvm_vg_info = utils.storage.LookupSpaceInfoByStorageType(
916 space_info, constants.ST_LVM_VG)
917 if not lvm_vg_info:
918 raise errors.OpPrereqError("Can't retrieve storage information for LVM")
919 vg_free = lvm_vg_info.get("storage_free", None)
920 if not isinstance(vg_free, int):
921 raise errors.OpPrereqError("Can't compute free disk space on node"
922 " %s for vg %s, result was '%s'" %
923 (node_name, vg, vg_free), errors.ECODE_ENVIRON)
924 if requested > vg_free:
925 raise errors.OpPrereqError("Not enough disk space on target node %s"
926 " vg %s: required %d MiB, available %d MiB" %
927 (node_name, vg, requested, vg_free),
928 errors.ECODE_NORES)
929
930
931 def _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, requested):
932 """Checks if nodes have enough free disk space in the specified VG.
933
934 This function checks if all given nodes have the needed amount of
935 free disk. In case any node has less disk or we cannot get the
936 information from the node, this function raises an OpPrereqError
937 exception.
938
939 @type lu: C{LogicalUnit}
940 @param lu: a logical unit from which we get configuration data
941 @type node_uuids: C{list}
942 @param node_uuids: the list of node UUIDs to check
943 @type vg: C{str}
944 @param vg: the volume group to check
945 @type requested: C{int}
946 @param requested: the amount of disk in MiB to check for
947 @raise errors.OpPrereqError: if the node doesn't have enough disk,
948 or we cannot check the node
949
950 """
951 nodeinfo = _PerformNodeInfoCall(lu, node_uuids, vg)
952 for node_uuid in node_uuids:
953 node_name = lu.cfg.GetNodeName(node_uuid)
954 info = nodeinfo[node_uuid]
955 info.Raise("Cannot get current information from node %s" % node_name,
956 prereq=True, ecode=errors.ECODE_ENVIRON)
957 _CheckVgCapacityForNode(node_name, info.payload, vg, requested)
958
959
960 def CheckNodesFreeDiskPerVG(lu, node_uuids, req_sizes):
961 """Checks if nodes have enough free disk space in all the VGs.
962
963 This function checks if all given nodes have the needed amount of
964 free disk. In case any node has less disk or we cannot get the
965 information from the node, this function raises an OpPrereqError
966 exception.
967
968 @type lu: C{LogicalUnit}
969 @param lu: a logical unit from which we get configuration data
970 @type node_uuids: C{list}
971 @param node_uuids: the list of node UUIDs to check
972 @type req_sizes: C{dict}
973 @param req_sizes: the hash of vg and corresponding amount of disk in
974 MiB to check for
975 @raise errors.OpPrereqError: if the node doesn't have enough disk,
976 or we cannot check the node
977
978 """
979 for vg, req_size in req_sizes.items():
980 _CheckNodesFreeDiskOnVG(lu, node_uuids, vg, req_size)
981
982
983 def _DiskSizeInBytesToMebibytes(lu, size):
984 """Converts a disk size in bytes to mebibytes.
985
986 Warns and rounds up if the size isn't an even multiple of 1 MiB.
987
988 """
989 (mib, remainder) = divmod(size, 1024 * 1024)
990
991 if remainder != 0:
992 lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
993 " to not overwrite existing data (%s bytes will not be"
994 " wiped)", (1024 * 1024) - remainder)
995 mib += 1
996
997 return mib
998
999
1000 def _CalcEta(time_taken, written, total_size):
1001 """Calculates the ETA based on size written and total size.
1002
1003 @param time_taken: The time taken so far
1004 @param written: amount written so far
1005 @param total_size: The total size of data to be written
1006 @return: The remaining time in seconds
1007
1008 """
1009 avg_time = time_taken / float(written)
1010 return (total_size - written) * avg_time
1011
1012
1013 def WipeDisks(lu, instance, disks=None):
1014 """Wipes instance disks.
1015
1016 @type lu: L{LogicalUnit}
1017 @param lu: the logical unit on whose behalf we execute
1018 @type instance: L{objects.Instance}
1019 @param instance: the instance whose disks we should create
1020 @type disks: None or list of tuple of (number, L{objects.Disk}, number)
1021 @param disks: Disk details; tuple contains disk index, disk object and the
1022 start offset
1023
1024 """
1025 node_uuid = instance.primary_node
1026 node_name = lu.cfg.GetNodeName(node_uuid)
1027
1028 if disks is None:
1029 disks = [(idx, disk, 0)
1030 for (idx, disk) in enumerate(instance.disks)]
1031
1032 logging.info("Pausing synchronization of disks of instance '%s'",
1033 instance.name)
1034 result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1035 (map(compat.snd, disks),
1036 instance),
1037 True)
1038 result.Raise("Failed to pause disk synchronization on node '%s'" % node_name)
1039
1040 for idx, success in enumerate(result.payload):
1041 if not success:
1042 logging.warn("Pausing synchronization of disk %s of instance '%s'"
1043 " failed", idx, instance.name)
1044
1045 try:
1046 for (idx, device, offset) in disks:
1047 # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
1048 # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
1049 wipe_chunk_size = \
1050 int(min(constants.MAX_WIPE_CHUNK,
1051 device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
1052
1053 size = device.size
1054 last_output = 0
1055 start_time = time.time()
1056
1057 if offset == 0:
1058 info_text = ""
1059 else:
1060 info_text = (" (from %s to %s)" %
1061 (utils.FormatUnit(offset, "h"),
1062 utils.FormatUnit(size, "h")))
1063
1064 lu.LogInfo("* Wiping disk %s%s", idx, info_text)
1065
1066 logging.info("Wiping disk %d for instance %s on node %s using"
1067 " chunk size %s", idx, instance.name, node_name,
1068 wipe_chunk_size)
1069
1070 while offset < size:
1071 wipe_size = min(wipe_chunk_size, size - offset)
1072
1073 logging.debug("Wiping disk %d, offset %s, chunk %s",
1074 idx, offset, wipe_size)
1075
1076 result = lu.rpc.call_blockdev_wipe(node_uuid, (device, instance),
1077 offset, wipe_size)
1078 result.Raise("Could not wipe disk %d at offset %d for size %d" %
1079 (idx, offset, wipe_size))
1080
1081 now = time.time()
1082 offset += wipe_size
1083 if now - last_output >= 60:
1084 eta = _CalcEta(now - start_time, offset, size)
1085 lu.LogInfo(" - done: %.1f%% ETA: %s",
1086 offset / float(size) * 100, utils.FormatSeconds(eta))
1087 last_output = now
1088 finally:
1089 logging.info("Resuming synchronization of disks for instance '%s'",
1090 instance.name)
1091
1092 result = lu.rpc.call_blockdev_pause_resume_sync(node_uuid,
1093 (map(compat.snd, disks),
1094 instance),
1095 False)
1096
1097 if result.fail_msg:
1098 lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
1099 node_name, result.fail_msg)
1100 else:
1101 for idx, success in enumerate(result.payload):
1102 if not success:
1103 lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
1104 " failed", idx, instance.name)
1105
1106
1107 def WipeOrCleanupDisks(lu, instance, disks=None, cleanup=None):
1108 """Wrapper for L{WipeDisks} that handles errors.
1109
1110 @type lu: L{LogicalUnit}
1111 @param lu: the logical unit on whose behalf we execute
1112 @type instance: L{objects.Instance}
1113 @param instance: the instance whose disks we should wipe
1114 @param disks: see L{WipeDisks}
1115 @param cleanup: the result returned by L{CreateDisks}, used for cleanup in
1116 case of error
1117 @raise errors.OpPrereqError: in case of failure
1118
1119 """
1120 try:
1121 WipeDisks(lu, instance, disks=disks)
1122 except errors.OpExecError:
1123 logging.warning("Wiping disks for instance '%s' failed",
1124 instance.name)
1125 _UndoCreateDisks(lu, cleanup, instance)
1126 raise
1127
1128
1129 def ExpandCheckDisks(instance, disks):
1130 """Return the instance disks selected by the disks list
1131
1132 @type disks: list of L{objects.Disk} or None
1133 @param disks: selected disks
1134 @rtype: list of L{objects.Disk}
1135 @return: selected instance disks to act on
1136
1137 """
1138 if disks is None:
1139 return instance.disks
1140 else:
1141 if not set(disks).issubset(instance.disks):
1142 raise errors.ProgrammerError("Can only act on disks belonging to the"
1143 " target instance: expected a subset of %r,"
1144 " got %r" % (instance.disks, disks))
1145 return disks
1146
1147
1148 def WaitForSync(lu, instance, disks=None, oneshot=False):
1149 """Sleep and poll for an instance's disk to sync.
1150
1151 """
1152 if not instance.disks or disks is not None and not disks:
1153 return True
1154
1155 disks = ExpandCheckDisks(instance, disks)
1156
1157 if not oneshot:
1158 lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1159
1160 node_uuid = instance.primary_node
1161 node_name = lu.cfg.GetNodeName(node_uuid)
1162
1163 # TODO: Convert to utils.Retry
1164
1165 retries = 0
1166 degr_retries = 10 # in seconds, as we sleep 1 second each time
1167 while True:
1168 max_time = 0
1169 done = True
1170 cumul_degraded = False
1171 rstats = lu.rpc.call_blockdev_getmirrorstatus(node_uuid, (disks, instance))
1172 msg = rstats.fail_msg
1173 if msg:
1174 lu.LogWarning("Can't get any data from node %s: %s", node_name, msg)
1175 retries += 1
1176 if retries >= 10:
1177 raise errors.RemoteError("Can't contact node %s for mirror data,"
1178 " aborting." % node_name)
1179 time.sleep(6)
1180 continue
1181 rstats = rstats.payload
1182 retries = 0
1183 for i, mstat in enumerate(rstats):
1184 if mstat is None:
1185 lu.LogWarning("Can't compute data for node %s/%s",
1186 node_name, disks[i].iv_name)
1187 continue
1188
1189 cumul_degraded = (cumul_degraded or
1190 (mstat.is_degraded and mstat.sync_percent is None))
1191 if mstat.sync_percent is not None:
1192 done = False
1193 if mstat.estimated_time is not None:
1194 rem_time = ("%s remaining (estimated)" %
1195 utils.FormatSeconds(mstat.estimated_time))
1196 max_time = mstat.estimated_time
1197 else:
1198 rem_time = "no time estimate"
1199 max_time = 5 # sleep at least a bit between retries
1200 lu.LogInfo("- device %s: %5.2f%% done, %s",
1201 disks[i].iv_name, mstat.sync_percent, rem_time)
1202
1203 # if we're done but degraded, let's do a few small retries, to
1204 # make sure we see a stable and not transient situation; therefore
1205 # we force restart of the loop
1206 if (done or oneshot) and cumul_degraded and degr_retries > 0:
1207 logging.info("Degraded disks found, %d retries left", degr_retries)
1208 degr_retries -= 1
1209 time.sleep(1)
1210 continue
1211
1212 if done or oneshot:
1213 break
1214
1215 time.sleep(min(60, max_time))
1216
1217 if done:
1218 lu.LogInfo("Instance %s's disks are in sync", instance.name)
1219
1220 return not cumul_degraded
1221
1222
1223 def ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1224 """Shutdown block devices of an instance.
1225
1226 This does the shutdown on all nodes of the instance.
1227
1228 If the ignore_primary is false, errors on the primary node are
1229 ignored.
1230
1231 """
1232 all_result = True
1233
1234 if disks is None:
1235 # only mark instance disks as inactive if all disks are affected
1236 lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1237 disks = ExpandCheckDisks(instance, disks)
1238
1239 for disk in disks:
1240 for node_uuid, top_disk in disk.ComputeNodeTree(instance.primary_node):
1241 result = lu.rpc.call_blockdev_shutdown(node_uuid, (top_disk, instance))
1242 msg = result.fail_msg
1243 if msg:
1244 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1245 disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1246 if ((node_uuid == instance.primary_node and not ignore_primary) or
1247 (node_uuid != instance.primary_node and not result.offline)):
1248 all_result = False
1249 return all_result
1250
1251
1252 def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1253 """Shutdown block devices of an instance.
1254
1255 This function checks if an instance is running, before calling
1256 _ShutdownInstanceDisks.
1257
1258 """
1259 CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1260 ShutdownInstanceDisks(lu, instance, disks=disks)
1261
1262
1263 def AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1264 ignore_size=False):
1265 """Prepare the block devices for an instance.
1266
1267 This sets up the block devices on all nodes.
1268
1269 @type lu: L{LogicalUnit}
1270 @param lu: the logical unit on whose behalf we execute
1271 @type instance: L{objects.Instance}
1272 @param instance: the instance for whose disks we assemble
1273 @type disks: list of L{objects.Disk} or None
1274 @param disks: which disks to assemble (or all, if None)
1275 @type ignore_secondaries: boolean
1276 @param ignore_secondaries: if true, errors on secondary nodes
1277 won't result in an error return from the function
1278 @type ignore_size: boolean
1279 @param ignore_size: if true, the current known size of the disk
1280 will not be used during the disk activation, useful for cases
1281 when the size is wrong
1282 @return: False if the operation failed, otherwise a list of
1283 (host, instance_visible_name, node_visible_name)
1284 with the mapping from node devices to instance devices
1285
1286 """
1287 device_info = []
1288 disks_ok = True
1289
1290 if disks is None:
1291 # only mark instance disks as active if all disks are affected
1292 lu.cfg.MarkInstanceDisksActive(instance.uuid)
1293
1294 disks = ExpandCheckDisks(instance, disks)
1295
1296 # With the two passes mechanism we try to reduce the window of
1297 # opportunity for the race condition of switching DRBD to primary
1298 # before handshaking occured, but we do not eliminate it
1299
1300 # The proper fix would be to wait (with some limits) until the
1301 # connection has been made and drbd transitions from WFConnection
1302 # into any other network-connected state (Connected, SyncTarget,
1303 # SyncSource, etc.)
1304
1305 # 1st pass, assemble on all nodes in secondary mode
1306 for idx, inst_disk in enumerate(disks):
1307 for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1308 instance.primary_node):
1309 if ignore_size:
1310 node_disk = node_disk.Copy()
1311 node_disk.UnsetSize()
1312 result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1313 instance, False, idx)
1314 msg = result.fail_msg
1315 if msg:
1316 is_offline_secondary = (node_uuid in instance.secondary_nodes and
1317 result.offline)
1318 lu.LogWarning("Could not prepare block device %s on node %s"
1319 " (is_primary=False, pass=1): %s",
1320 inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1321 if not (ignore_secondaries or is_offline_secondary):
1322 disks_ok = False
1323
1324 # FIXME: race condition on drbd migration to primary
1325
1326 # 2nd pass, do only the primary node
1327 for idx, inst_disk in enumerate(disks):
1328 dev_path = None
1329
1330 for node_uuid, node_disk in inst_disk.ComputeNodeTree(
1331 instance.primary_node):
1332 if node_uuid != instance.primary_node:
1333 continue
1334 if ignore_size:
1335 node_disk = node_disk.Copy()
1336 node_disk.UnsetSize()
1337 result = lu.rpc.call_blockdev_assemble(node_uuid, (node_disk, instance),
1338 instance, True, idx)
1339 msg = result.fail_msg
1340 if msg:
1341 lu.LogWarning("Could not prepare block device %s on node %s"
1342 " (is_primary=True, pass=2): %s",
1343 inst_disk.iv_name, lu.cfg.GetNodeName(node_uuid), msg)
1344 disks_ok = False
1345 else:
1346 dev_path, _, __ = result.payload
1347
1348 device_info.append((lu.cfg.GetNodeName(instance.primary_node),
1349 inst_disk.iv_name, dev_path))
1350
1351 if not disks_ok:
1352 lu.cfg.MarkInstanceDisksInactive(instance.uuid)
1353
1354 return disks_ok, device_info
1355
1356
1357 def StartInstanceDisks(lu, instance, force):
1358 """Start the disks of an instance.
1359
1360 """
1361 disks_ok, _ = AssembleInstanceDisks(lu, instance,
1362 ignore_secondaries=force)
1363 if not disks_ok:
1364 ShutdownInstanceDisks(lu, instance)
1365 if force is not None and not force:
1366 lu.LogWarning("",
1367 hint=("If the message above refers to a secondary node,"
1368 " you can retry the operation using '--force'"))
1369 raise errors.OpExecError("Disk consistency error")
1370
1371
1372 class LUInstanceGrowDisk(LogicalUnit):
1373 """Grow a disk of an instance.
1374
1375 """
1376 HPATH = "disk-grow"
1377 HTYPE = constants.HTYPE_INSTANCE
1378 REQ_BGL = False
1379
1380 def ExpandNames(self):
1381 self._ExpandAndLockInstance()
1382 self.needed_locks[locking.LEVEL_NODE] = []
1383 self.needed_locks[locking.LEVEL_NODE_RES] = []
1384 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1385 self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1386
1387 def DeclareLocks(self, level):
1388 if level == locking.LEVEL_NODE:
1389 self._LockInstancesNodes()
1390 elif level == locking.LEVEL_NODE_RES:
1391 # Copy node locks
1392 self.needed_locks[locking.LEVEL_NODE_RES] = \
1393 CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1394
1395 def BuildHooksEnv(self):
1396 """Build hooks env.
1397
1398 This runs on the master, the primary and all the secondaries.
1399
1400 """
1401 env = {
1402 "DISK": self.op.disk,
1403 "AMOUNT": self.op.amount,
1404 "ABSOLUTE": self.op.absolute,
1405 }
1406 env.update(BuildInstanceHookEnvByObject(self, self.instance))
1407 return env
1408
1409 def BuildHooksNodes(self):
1410 """Build hooks nodes.
1411
1412 """
1413 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1414 return (nl, nl)
1415
1416 def CheckPrereq(self):
1417 """Check prerequisites.
1418
1419 This checks that the instance is in the cluster.
1420
1421 """
1422 self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1423 assert self.instance is not None, \
1424 "Cannot retrieve locked instance %s" % self.op.instance_name
1425 node_uuids = list(self.instance.all_nodes)
1426 for node_uuid in node_uuids:
1427 CheckNodeOnline(self, node_uuid)
1428 self.node_es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, node_uuids)
1429
1430 if self.instance.disk_template not in constants.DTS_GROWABLE:
1431 raise errors.OpPrereqError("Instance's disk layout does not support"
1432 " growing", errors.ECODE_INVAL)
1433
1434 self.disk = self.instance.FindDisk(self.op.disk)
1435
1436 if self.op.absolute:
1437 self.target = self.op.amount
1438 self.delta = self.target - self.disk.size
1439 if self.delta < 0:
1440 raise errors.OpPrereqError("Requested size (%s) is smaller than "
1441 "current disk size (%s)" %
1442 (utils.FormatUnit(self.target, "h"),
1443 utils.FormatUnit(self.disk.size, "h")),
1444 errors.ECODE_STATE)
1445 else:
1446 self.delta = self.op.amount
1447 self.target = self.disk.size + self.delta
1448 if self.delta < 0:
1449 raise errors.OpPrereqError("Requested increment (%s) is negative" %
1450 utils.FormatUnit(self.delta, "h"),
1451 errors.ECODE_INVAL)
1452
1453 self._CheckDiskSpace(node_uuids, self.disk.ComputeGrowth(self.delta))
1454
1455 def _CheckDiskSpace(self, node_uuids, req_vgspace):
1456 template = self.instance.disk_template
1457 if (template not in (constants.DTS_NO_FREE_SPACE_CHECK) and
1458 not any(self.node_es_flags.values())):
1459 # TODO: check the free disk space for file, when that feature will be
1460 # supported
1461 # With exclusive storage we need to do something smarter than just looking
1462 # at free space, which, in the end, is basically a dry run. So we rely on
1463 # the dry run performed in Exec() instead.
1464 CheckNodesFreeDiskPerVG(self, node_uuids, req_vgspace)
1465
1466 def Exec(self, feedback_fn):
1467 """Execute disk grow.
1468
1469 """
1470 assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1471 assert (self.owned_locks(locking.LEVEL_NODE) ==
1472 self.owned_locks(locking.LEVEL_NODE_RES))
1473
1474 wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1475
1476 disks_ok, _ = AssembleInstanceDisks(self, self.instance, disks=[self.disk])
1477 if not disks_ok:
1478 raise errors.OpExecError("Cannot activate block device to grow")
1479
1480 feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1481 (self.op.disk, self.instance.name,
1482 utils.FormatUnit(self.delta, "h"),
1483 utils.FormatUnit(self.target, "h")))
1484
1485 # First run all grow ops in dry-run mode
1486 for node_uuid in self.instance.all_nodes:
1487 result = self.rpc.call_blockdev_grow(node_uuid,
1488 (self.disk, self.instance),
1489 self.delta, True, True,
1490 self.node_es_flags[node_uuid])
1491 result.Raise("Dry-run grow request failed to node %s" %
1492 self.cfg.GetNodeName(node_uuid))
1493
1494 if wipe_disks:
1495 # Get disk size from primary node for wiping
1496 result = self.rpc.call_blockdev_getdimensions(
1497 self.instance.primary_node, [([self.disk], self.instance)])
1498 result.Raise("Failed to retrieve disk size from node '%s'" %
1499 self.instance.primary_node)
1500
1501 (disk_dimensions, ) = result.payload
1502
1503 if disk_dimensions is None:
1504 raise errors.OpExecError("Failed to retrieve disk size from primary"
1505 " node '%s'" % self.instance.primary_node)
1506 (disk_size_in_bytes, _) = disk_dimensions
1507
1508 old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1509
1510 assert old_disk_size >= self.disk.size, \
1511 ("Retrieved disk size too small (got %s, should be at least %s)" %
1512 (old_disk_size, self.disk.size))
1513 else:
1514 old_disk_size = None
1515
1516 # We know that (as far as we can test) operations across different
1517 # nodes will succeed, time to run it for real on the backing storage
1518 for node_uuid in self.instance.all_nodes:
1519 result = self.rpc.call_blockdev_grow(node_uuid,
1520 (self.disk, self.instance),
1521 self.delta, False, True,
1522 self.node_es_flags[node_uuid])
1523 result.Raise("Grow request failed to node %s" %
1524 self.cfg.GetNodeName(node_uuid))
1525
1526 # And now execute it for logical storage, on the primary node
1527 node_uuid = self.instance.primary_node
1528 result = self.rpc.call_blockdev_grow(node_uuid, (self.disk, self.instance),
1529 self.delta, False, False,
1530 self.node_es_flags[node_uuid])
1531 result.Raise("Grow request failed to node %s" %
1532 self.cfg.GetNodeName(node_uuid))
1533
1534 self.disk.RecordGrow(self.delta)
1535 self.cfg.Update(self.instance, feedback_fn)
1536
1537 # Changes have been recorded, release node lock
1538 ReleaseLocks(self, locking.LEVEL_NODE)
1539
1540 # Downgrade lock while waiting for sync
1541 self.glm.downgrade(locking.LEVEL_INSTANCE)
1542
1543 assert wipe_disks ^ (old_disk_size is None)
1544
1545 if wipe_disks:
1546 assert self.instance.disks[self.op.disk] == self.disk
1547
1548 # Wipe newly added disk space
1549 WipeDisks(self, self.instance,
1550 disks=[(self.op.disk, self.disk, old_disk_size)])
1551
1552 if self.op.wait_for_sync:
1553 disk_abort = not WaitForSync(self, self.instance, disks=[self.disk])
1554 if disk_abort:
1555 self.LogWarning("Disk syncing has not returned a good status; check"
1556 " the instance")
1557 if not self.instance.disks_active:
1558 _SafeShutdownInstanceDisks(self, self.instance, disks=[self.disk])
1559 elif not self.instance.disks_active:
1560 self.LogWarning("Not shutting down the disk even if the instance is"
1561 " not supposed to be running because no wait for"
1562 " sync mode was requested")
1563
1564 assert self.owned_locks(locking.LEVEL_NODE_RES)
1565 assert set([self.instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1566
1567
1568 class LUInstanceReplaceDisks(LogicalUnit):
1569 """Replace the disks of an instance.
1570
1571 """
1572 HPATH = "mirrors-replace"
1573 HTYPE = constants.HTYPE_INSTANCE
1574 REQ_BGL = False
1575
1576 def CheckArguments(self):
1577 """Check arguments.
1578
1579 """
1580 if self.op.mode == constants.REPLACE_DISK_CHG:
1581 if self.op.remote_node is None and self.op.iallocator is None:
1582 raise errors.OpPrereqError("When changing the secondary either an"
1583 " iallocator script must be used or the"
1584 " new node given", errors.ECODE_INVAL)
1585 else:
1586 CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1587
1588 elif self.op.remote_node is not None or self.op.iallocator is not None:
1589 # Not replacing the secondary
1590 raise errors.OpPrereqError("The iallocator and new node options can"
1591 " only be used when changing the"
1592 " secondary node", errors.ECODE_INVAL)
1593
1594 def ExpandNames(self):
1595 self._ExpandAndLockInstance()
1596
1597 assert locking.LEVEL_NODE not in self.needed_locks
1598 assert locking.LEVEL_NODE_RES not in self.needed_locks
1599 assert locking.LEVEL_NODEGROUP not in self.needed_locks
1600
1601 assert self.op.iallocator is None or self.op.remote_node is None, \
1602 "Conflicting options"
1603
1604 if self.op.remote_node is not None:
1605 (self.op.remote_node_uuid, self.op.remote_node) = \
1606 ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
1607 self.op.remote_node)
1608
1609 # Warning: do not remove the locking of the new secondary here
1610 # unless DRBD8Dev.AddChildren is changed to work in parallel;
1611 # currently it doesn't since parallel invocations of
1612 # FindUnusedMinor will conflict
1613 self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node_uuid]
1614 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1615 else:
1616 self.needed_locks[locking.LEVEL_NODE] = []
1617 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1618
1619 if self.op.iallocator is not None:
1620 # iallocator will select a new node in the same group
1621 self.needed_locks[locking.LEVEL_NODEGROUP] = []
1622 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1623
1624 self.needed_locks[locking.LEVEL_NODE_RES] = []
1625
1626 self.replacer = TLReplaceDisks(self, self.op.instance_uuid,
1627 self.op.instance_name, self.op.mode,
1628 self.op.iallocator, self.op.remote_node_uuid,
1629 self.op.disks, self.op.early_release,
1630 self.op.ignore_ipolicy)
1631
1632 self.tasklets = [self.replacer]
1633
1634 def DeclareLocks(self, level):
1635 if level == locking.LEVEL_NODEGROUP:
1636 assert self.op.remote_node_uuid is None
1637 assert self.op.iallocator is not None
1638 assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1639
1640 self.share_locks[locking.LEVEL_NODEGROUP] = 1
1641 # Lock all groups used by instance optimistically; this requires going
1642 # via the node before it's locked, requiring verification later on
1643 self.needed_locks[locking.LEVEL_NODEGROUP] = \
1644 self.cfg.GetInstanceNodeGroups(self.op.instance_uuid)
1645
1646 elif level == locking.LEVEL_NODE:
1647 if self.op.iallocator is not None:
1648 assert self.op.remote_node_uuid is None
1649 assert not self.needed_locks[locking.LEVEL_NODE]
1650 assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1651
1652 # Lock member nodes of all locked groups
1653 self.needed_locks[locking.LEVEL_NODE] = \
1654 [node_uuid
1655 for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1656 for node_uuid in self.cfg.GetNodeGroup(group_uuid).members]
1657 else:
1658 assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1659
1660 self._LockInstancesNodes()
1661
1662 elif level == locking.LEVEL_NODE_RES:
1663 # Reuse node locks
1664 self.needed_locks[locking.LEVEL_NODE_RES] = \
1665 self.needed_locks[locking.LEVEL_NODE]
1666
1667 def BuildHooksEnv(self):
1668 """Build hooks env.
1669
1670 This runs on the master, the primary and all the secondaries.
1671
1672 """
1673 instance = self.replacer.instance
1674 env = {
1675 "MODE": self.op.mode,
1676 "NEW_SECONDARY": self.op.remote_node,
1677 "OLD_SECONDARY": self.cfg.GetNodeName(instance.secondary_nodes[0]),
1678 }
1679 env.update(BuildInstanceHookEnvByObject(self, instance))
1680 return env
1681
1682 def BuildHooksNodes(self):
1683 """Build hooks nodes.
1684
1685 """
1686 instance = self.replacer.instance
1687 nl = [
1688 self.cfg.GetMasterNode(),
1689 instance.primary_node,
1690 ]
1691 if self.op.remote_node_uuid is not None:
1692 nl.append(self.op.remote_node_uuid)
1693 return nl, nl
1694
1695 def CheckPrereq(self):
1696 """Check prerequisites.
1697
1698 """
1699 assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1700 self.op.iallocator is None)
1701
1702 # Verify if node group locks are still correct
1703 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1704 if owned_groups:
1705 CheckInstanceNodeGroups(self.cfg, self.op.instance_uuid, owned_groups)
1706
1707 return LogicalUnit.CheckPrereq(self)
1708
1709
1710 class LUInstanceActivateDisks(NoHooksLU):
1711 """Bring up an instance's disks.
1712
1713 """
1714 REQ_BGL = False
1715
1716 def ExpandNames(self):
1717 self._ExpandAndLockInstance()
1718 self.needed_locks[locking.LEVEL_NODE] = []
1719 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1720
1721 def DeclareLocks(self, level):
1722 if level == locking.LEVEL_NODE:
1723 self._LockInstancesNodes()
1724
1725 def CheckPrereq(self):
1726 """Check prerequisites.
1727
1728 This checks that the instance is in the cluster.
1729
1730 """
1731 self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1732 assert self.instance is not None, \
1733 "Cannot retrieve locked instance %s" % self.op.instance_name
1734 CheckNodeOnline(self, self.instance.primary_node)
1735
1736 def Exec(self, feedback_fn):
1737 """Activate the disks.
1738
1739 """
1740 disks_ok, disks_info = \
1741 AssembleInstanceDisks(self, self.instance,
1742 ignore_size=self.op.ignore_size)
1743 if not disks_ok:
1744 raise errors.OpExecError("Cannot activate block devices")
1745
1746 if self.op.wait_for_sync:
1747 if not WaitForSync(self, self.instance):
1748 self.cfg.MarkInstanceDisksInactive(self.instance.uuid)
1749 raise errors.OpExecError("Some disks of the instance are degraded!")
1750
1751 return disks_info
1752
1753
1754 class LUInstanceDeactivateDisks(NoHooksLU):
1755 """Shutdown an instance's disks.
1756
1757 """
1758 REQ_BGL = False
1759
1760 def ExpandNames(self):
1761 self._ExpandAndLockInstance()
1762 self.needed_locks[locking.LEVEL_NODE] = []
1763 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1764
1765 def DeclareLocks(self, level):
1766 if level == locking.LEVEL_NODE:
1767 self._LockInstancesNodes()
1768
1769 def CheckPrereq(self):
1770 """Check prerequisites.
1771
1772 This checks that the instance is in the cluster.
1773
1774 """
1775 self.instance = self.cfg.GetInstanceInfo(self.op.instance_uuid)
1776 assert self.instance is not None, \
1777 "Cannot retrieve locked instance %s" % self.op.instance_name
1778
1779 def Exec(self, feedback_fn):
1780 """Deactivate the disks
1781
1782 """
1783 if self.op.force:
1784 ShutdownInstanceDisks(self, self.instance)
1785 else:
1786 _SafeShutdownInstanceDisks(self, self.instance)
1787
1788
1789 def _CheckDiskConsistencyInner(lu, instance, dev, node_uuid, on_primary,
1790 ldisk=False):
1791 """Check that mirrors are not degraded.
1792
1793 @attention: The device has to be annotated already.
1794
1795 The ldisk parameter, if True, will change the test from the
1796 is_degraded attribute (which represents overall non-ok status for
1797 the device(s)) to the ldisk (representing the local storage status).
1798
1799 """
1800 result = True
1801
1802 if on_primary or dev.AssembleOnSecondary():
1803 rstats = lu.rpc.call_blockdev_find(node_uuid, (dev, instance))
1804 msg = rstats.fail_msg
1805 if msg:
1806 lu.LogWarning("Can't find disk on node %s: %s",
1807 lu.cfg.GetNodeName(node_uuid), msg)
1808 result = False
1809 elif not rstats.payload:
1810 lu.LogWarning("Can't find disk on node %s", lu.cfg.GetNodeName(node_uuid))
1811 result = False
1812 else:
1813 if ldisk:
1814 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1815 else:
1816 result = result and not rstats.payload.is_degraded
1817
1818 if dev.children:
1819 for child in dev.children:
1820 result = result and _CheckDiskConsistencyInner(lu, instance, child,
1821 node_uuid, on_primary)
1822
1823 return result
1824
1825
1826 def CheckDiskConsistency(lu, instance, dev, node_uuid, on_primary, ldisk=False):
1827 """Wrapper around L{_CheckDiskConsistencyInner}.
1828
1829 """
1830 (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1831 return _CheckDiskConsistencyInner(lu, instance, disk, node_uuid, on_primary,
1832 ldisk=ldisk)
1833
1834
1835 def _BlockdevFind(lu, node_uuid, dev, instance):
1836 """Wrapper around call_blockdev_find to annotate diskparams.
1837
1838 @param lu: A reference to the lu object
1839 @param node_uuid: The node to call out
1840 @param dev: The device to find
1841 @param instance: The instance object the device belongs to
1842 @returns The result of the rpc call
1843
1844 """
1845 (disk,) = AnnotateDiskParams(instance, [dev], lu.cfg)
1846 return lu.rpc.call_blockdev_find(node_uuid, (disk, instance))
1847
1848
1849 def _GenerateUniqueNames(lu, exts):
1850 """Generate a suitable LV name.
1851
1852 This will generate a logical volume name for the given instance.
1853
1854 """
1855 results = []
1856 for val in exts:
1857 new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1858 results.append("%s%s" % (new_id, val))
1859 return results
1860
1861
1862 class TLReplaceDisks(Tasklet):
1863 """Replaces disks for an instance.
1864
1865 Note: Locking is not within the scope of this class.
1866
1867 """
1868 def __init__(self, lu, instance_uuid, instance_name, mode, iallocator_name,
1869 remote_node_uuid, disks, early_release, ignore_ipolicy):
1870 """Initializes this class.
1871
1872 """
1873 Tasklet.__init__(self, lu)
1874
1875 # Parameters
1876 self.instance_uuid = instance_uuid
1877 self.instance_name = instance_name
1878 self.mode = mode
1879 self.iallocator_name = iallocator_name
1880 self.remote_node_uuid = remote_node_uuid
1881 self.disks = disks
1882 self.early_release = early_release
1883 self.ignore_ipolicy = ignore_ipolicy
1884
1885 # Runtime data
1886 self.instance = None
1887 self.new_node_uuid = None
1888 self.target_node_uuid = None
1889 self.other_node_uuid = None
1890 self.remote_node_info = None
1891 self.node_secondary_ip = None
1892
1893 @staticmethod
1894 def _RunAllocator(lu, iallocator_name, instance_uuid,
1895 relocate_from_node_uuids):
1896 """Compute a new secondary node using an IAllocator.
1897
1898 """
1899 req = iallocator.IAReqRelocate(
1900 inst_uuid=instance_uuid,
1901 relocate_from_node_uuids=list(relocate_from_node_uuids))
1902 ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1903
1904 ial.Run(iallocator_name)
1905
1906 if not ial.success:
1907 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1908 " %s" % (iallocator_name, ial.info),
1909 errors.ECODE_NORES)
1910
1911 remote_node_name = ial.result[0]
1912 remote_node = lu.cfg.GetNodeInfoByName(remote_node_name)
1913
1914 if remote_node is None:
1915 raise errors.OpPrereqError("Node %s not found in configuration" %
1916 remote_node_name, errors.ECODE_NOENT)
1917
1918 lu.LogInfo("Selected new secondary for instance '%s': %s",
1919 instance_uuid, remote_node_name)
1920
1921 return remote_node.uuid
1922
1923 def _FindFaultyDisks(self, node_uuid):
1924 """Wrapper for L{FindFaultyInstanceDisks}.
1925
1926 """
1927 return FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1928 node_uuid, True)
1929
1930 def _CheckDisksActivated(self, instance):
1931 """Checks if the instance disks are activated.
1932
1933 @param instance: The instance to check disks
1934 @return: True if they are activated, False otherwise
1935
1936 """
1937 node_uuids = instance.all_nodes
1938
1939 for idx, dev in enumerate(instance.disks):
1940 for node_uuid in node_uuids:
1941 self.lu.LogInfo("Checking disk/%d on %s", idx,
1942 self.cfg.GetNodeName(node_uuid))
1943
1944 result = _BlockdevFind(self, node_uuid, dev, instance)
1945
1946 if result.offline:
1947 continue
1948 elif result.fail_msg or not result.payload:
1949 return False
1950
1951 return True
1952
1953 def CheckPrereq(self):
1954 """Check prerequisites.
1955
1956 This checks that the instance is in the cluster.
1957
1958 """
1959 self.instance = self.cfg.GetInstanceInfo(self.instance_uuid)
1960 assert self.instance is not None, \
1961 "Cannot retrieve locked instance %s" % self.instance_name
1962
1963 if self.instance.disk_template != constants.DT_DRBD8:
1964 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1965 " instances", errors.ECODE_INVAL)
1966
1967 if len(self.instance.secondary_nodes) != 1:
1968 raise errors.OpPrereqError("The instance has a strange layout,"
1969 " expected one secondary but found %d" %
1970 len(self.instance.secondary_nodes),
1971 errors.ECODE_FAULT)
1972
1973 secondary_node_uuid = self.instance.secondary_nodes[0]
1974
1975 if self.iallocator_name is None:
1976 remote_node_uuid = self.remote_node_uuid
1977 else:
1978 remote_node_uuid = self._RunAllocator(self.lu, self.iallocator_name,
1979 self.instance.uuid,
1980 self.instance.secondary_nodes)
1981
1982 if remote_node_uuid is None:
1983 self.remote_node_info = None
1984 else:
1985 assert remote_node_uuid in self.lu.owned_locks(locking.LEVEL_NODE), \
1986 "Remote node '%s' is not locked" % remote_node_uuid
1987
1988 self.remote_node_info = self.cfg.GetNodeInfo(remote_node_uuid)
1989 assert self.remote_node_info is not None, \
1990 "Cannot retrieve locked node %s" % remote_node_uuid
1991
1992 if remote_node_uuid == self.instance.primary_node:
1993 raise errors.OpPrereqError("The specified node is the primary node of"
1994 " the instance", errors.ECODE_INVAL)
1995
1996 if remote_node_uuid == secondary_node_uuid:
1997 raise errors.OpPrereqError("The specified node is already the"
1998 " secondary node of the instance",
1999 errors.ECODE_INVAL)
2000
2001 if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
2002 constants.REPLACE_DISK_CHG):
2003 raise errors.OpPrereqError("Cannot specify disks to be replaced",
2004 errors.ECODE_INVAL)
2005
2006 if self.mode == constants.REPLACE_DISK_AUTO:
2007 if not self._CheckDisksActivated(self.instance):
2008 raise errors.OpPrereqError("Please run activate-disks on instance %s"
2009 " first" % self.instance_name,
2010 errors.ECODE_STATE)
2011 faulty_primary = self._FindFaultyDisks(self.instance.primary_node)
2012 faulty_secondary = self._FindFaultyDisks(secondary_node_uuid)
2013
2014 if faulty_primary and faulty_secondary:
2015 raise errors.OpPrereqError("Instance %s has faulty disks on more than"
2016 " one node and can not be repaired"
2017 " automatically" % self.instance_name,
2018 errors.ECODE_STATE)
2019
2020 if faulty_primary:
2021 self.disks = faulty_primary
2022 self.target_node_uuid = self.instance.primary_node
2023 self.other_node_uuid = secondary_node_uuid
2024 check_nodes = [self.target_node_uuid, self.other_node_uuid]
2025 elif faulty_secondary:
2026 self.disks = faulty_secondary
2027 self.target_node_uuid = secondary_node_uuid
2028 self.other_node_uuid = self.instance.primary_node
2029 check_nodes = [self.target_node_uuid, self.other_node_uuid]
2030 else:
2031 self.disks = []
2032 check_nodes = []
2033
2034 else:
2035 # Non-automatic modes
2036 if self.mode == constants.REPLACE_DISK_PRI:
2037 self.target_node_uuid = self.instance.primary_node
2038 self.other_node_uuid = secondary_node_uuid
2039 check_nodes = [self.target_node_uuid, self.other_node_uuid]
2040
2041 elif self.mode == constants.REPLACE_DISK_SEC:
2042 self.target_node_uuid = secondary_node_uuid
2043 self.other_node_uuid = self.instance.primary_node
2044 check_nodes = [self.target_node_uuid, self.other_node_uuid]
2045
2046 elif self.mode == constants.REPLACE_DISK_CHG:
2047 self.new_node_uuid = remote_node_uuid
2048 self.other_node_uuid = self.instance.primary_node
2049 self.target_node_uuid = secondary_node_uuid
2050 check_nodes = [self.new_node_uuid, self.other_node_uuid]
2051
2052 CheckNodeNotDrained(self.lu, remote_node_uuid)
2053 CheckNodeVmCapable(self.lu, remote_node_uuid)
2054
2055 old_node_info = self.cfg.GetNodeInfo(secondary_node_uuid)
2056 assert old_node_info is not None
2057 if old_node_info.offline and not self.early_release:
2058 # doesn't make sense to delay the release
2059 self.early_release = True
2060 self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
2061 " early-release mode", secondary_node_uuid)
2062
2063 else:
2064 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
2065 self.mode)
2066
2067 # If not specified all disks should be replaced
2068 if not self.disks:
2069 self.disks = range(len(self.instance.disks))
2070
2071 # TODO: This is ugly, but right now we can't distinguish between internal
2072 # submitted opcode and external one. We should fix that.
2073 if self.remote_node_info:
2074 # We change the node, lets verify it still meets instance policy
2075 new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
2076 cluster = self.cfg.GetClusterInfo()
2077 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2078 new_group_info)
2079 CheckTargetNodeIPolicy(self.lu, ipolicy, self.instance,
2080 self.remote_node_info, self.cfg,
2081 ignore=self.ignore_ipolicy)
2082
2083 for node_uuid in check_nodes:
2084 CheckNodeOnline(self.lu, node_uuid)
2085
2086 touched_nodes = frozenset(node_uuid for node_uuid in [self.new_node_uuid,
2087 self.other_node_uuid,
2088 self.target_node_uuid]
2089 if node_uuid is not None)
2090
2091 # Release unneeded node and node resource locks
2092 ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
2093 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
2094 ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
2095
2096 # Release any owned node group
2097 ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
2098
2099 # Check whether disks are valid
2100 for disk_idx in self.disks:
2101 self.instance.FindDisk(disk_idx)
2102
2103 # Get secondary node IP addresses
2104 self.node_secondary_ip = dict((uuid, node.secondary_ip) for (uuid, node)
2105 in self.cfg.GetMultiNodeInfo(touched_nodes))
2106
2107 def Exec(self, feedback_fn):
2108 """Execute disk replacement.
2109
2110 This dispatches the disk replacement to the appropriate handler.
2111
2112 """
2113 if __debug__:
2114 # Verify owned locks before starting operation
2115 owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
2116 assert set(owned_nodes) == set(self.node_secondary_ip), \
2117 ("Incorrect node locks, owning %s, expected %s" %
2118 (owned_nodes, self.node_secondary_ip.keys()))
2119 assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
2120 self.lu.owned_locks(locking.LEVEL_NODE_RES))
2121 assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2122
2123 owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
2124 assert list(owned_instances) == [self.instance_name], \
2125 "Instance '%s' not locked" % self.instance_name
2126
2127 assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
2128 "Should not own any node group lock at this point"
2129
2130 if not self.disks:
2131 feedback_fn("No disks need replacement for instance '%s'" %
2132 self.instance.name)
2133 return
2134
2135 feedback_fn("Replacing disk(s) %s for instance '%s'" %
2136 (utils.CommaJoin(self.disks), self.instance.name))
2137 feedback_fn("Current primary node: %s" %
2138 self.cfg.GetNodeName(self.instance.primary_node))
2139 feedback_fn("Current secondary node: %s" %
2140 utils.CommaJoin(self.cfg.GetNodeNames(
2141 self.instance.secondary_nodes)))
2142
2143 activate_disks = not self.instance.disks_active
2144
2145 # Activate the instance disks if we're replacing them on a down instance
2146 if activate_disks:
2147 StartInstanceDisks(self.lu, self.instance, True)
2148
2149 try:
2150 # Should we replace the secondary node?
2151 if self.new_node_uuid is not None:
2152 fn = self._ExecDrbd8Secondary
2153 else:
2154 fn = self._ExecDrbd8DiskOnly
2155
2156 result = fn(feedback_fn)
2157 finally:
2158 # Deactivate the instance disks if we're replacing them on a
2159 # down instance
2160 if activate_disks:
2161 _SafeShutdownInstanceDisks(self.lu, self.instance)
2162
2163 assert not self.lu.owned_locks(locking.LEVEL_NODE)
2164
2165 if __debug__:
2166 # Verify owned locks
2167 owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2168 nodes = frozenset(self.node_secondary_ip)
2169 assert ((self.early_release and not owned_nodes) or
2170 (not self.early_release and not (set(owned_nodes) - nodes))), \
2171 ("Not owning the correct locks, early_release=%s, owned=%r,"
2172 " nodes=%r" % (self.early_release, owned_nodes, nodes))
2173
2174 return result
2175
2176 def _CheckVolumeGroup(self, node_uuids):
2177 self.lu.LogInfo("Checking volume groups")
2178
2179 vgname = self.cfg.GetVGName()
2180
2181 # Make sure volume group exists on all involved nodes
2182 results = self.rpc.call_vg_list(node_uuids)
2183 if not results:
2184 raise errors.OpExecError("Can't list volume groups on the nodes")
2185
2186 for node_uuid in node_uuids:
2187 res = results[node_uuid]
2188 res.Raise("Error checking node %s" % self.cfg.GetNodeName(node_uuid))
2189 if vgname not in res.payload:
2190 raise errors.OpExecError("Volume group '%s' not found on node %s" %
2191 (vgname, self.cfg.GetNodeName(node_uuid)))
2192
2193 def _CheckDisksExistence(self, node_uuids):
2194 # Check disk existence
2195 for idx, dev in enumerate(self.instance.disks):
2196 if idx not in self.disks:
2197 continue
2198
2199 for node_uuid in node_uuids:
2200 self.lu.LogInfo("Checking disk/%d on %s", idx,
2201 self.cfg.GetNodeName(node_uuid))
2202
2203 result = _BlockdevFind(self, node_uuid, dev, self.instance)
2204
2205 msg = result.fail_msg
2206 if msg or not result.payload:
2207 if not msg:
2208 msg = "disk not found"
2209 if not self._CheckDisksActivated(self.instance):
2210 extra_hint = ("\nDisks seem to be not properly activated. Try"
2211 " running activate-disks on the instance before"
2212 " using replace-disks.")
2213 else:
2214 extra_hint = ""
2215 raise errors.OpExecError("Can't find disk/%d on node %s: %s%s" %
2216 (idx, self.cfg.GetNodeName(node_uuid), msg,
2217 extra_hint))
2218
2219 def _CheckDisksConsistency(self, node_uuid, on_primary, ldisk):
2220 for idx, dev in enumerate(self.instance.disks):
2221 if idx not in self.disks:
2222 continue
2223
2224 self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2225 (idx, self.cfg.GetNodeName(node_uuid)))
2226
2227 if not CheckDiskConsistency(self.lu, self.instance, dev, node_uuid,
2228 on_primary, ldisk=ldisk):
2229 raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2230 " replace disks for instance %s" %
2231 (self.cfg.GetNodeName(node_uuid),
2232 self.instance.name))
2233
2234 def _CreateNewStorage(self, node_uuid):
2235 """Create new storage on the primary or secondary node.
2236
2237 This is only used for same-node replaces, not for changing the
2238 secondary node, hence we don't want to modify the existing disk.
2239
2240 """
2241 iv_names = {}
2242
2243 disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2244 for idx, dev in enumerate(disks):
2245 if idx not in self.disks:
2246 continue
2247
2248 self.lu.LogInfo("Adding storage on %s for disk/%d",
2249 self.cfg.GetNodeName(node_uuid), idx)
2250
2251 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2252 names = _GenerateUniqueNames(self.lu, lv_names)
2253
2254 (data_disk, meta_disk) = dev.children
2255 vg_data = data_disk.logical_id[0]
2256 lv_data = objects.Disk(dev_type=constants.DT_PLAIN, size=dev.size,
2257 logical_id=(vg_data, names[0]),
2258 params=data_disk.params)
2259 vg_meta = meta_disk.logical_id[0]
2260 lv_meta = objects.Disk(dev_type=constants.DT_PLAIN,
2261 size=constants.DRBD_META_SIZE,
2262 logical_id=(vg_meta, names[1]),
2263 params=meta_disk.params)
2264
2265 new_lvs = [lv_data, lv_meta]
2266 old_lvs = [child.Copy() for child in dev.children]
2267 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2268 excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg, node_uuid)
2269
2270 # we pass force_create=True to force the LVM creation
2271 for new_lv in new_lvs:
2272 try:
2273 _CreateBlockDevInner(self.lu, node_uuid, self.instance, new_lv, True,
2274 GetInstanceInfoText(self.instance), False,
2275 excl_stor)
2276 except errors.DeviceCreationError, e:
2277 raise errors.OpExecError("Can't create block device: %s" % e.message)
2278
2279 return iv_names
2280
2281 def _CheckDevices(self, node_uuid, iv_names):
2282 for name, (dev, _, _) in iv_names.iteritems():
2283 result = _BlockdevFind(self, node_uuid, dev, self.instance)
2284
2285 msg = result.fail_msg
2286 if msg or not result.payload:
2287 if not msg:
2288 msg = "disk not found"
2289 raise errors.OpExecError("Can't find DRBD device %s: %s" %
2290 (name, msg))
2291
2292 if result.payload.is_degraded:
2293 raise errors.OpExecError("DRBD device %s is degraded!" % name)
2294
2295 def _RemoveOldStorage(self, node_uuid, iv_names):
2296 for name, (_, old_lvs, _) in iv_names.iteritems():
2297 self.lu.LogInfo("Remove logical volumes for %s", name)
2298
2299 for lv in old_lvs:
2300 msg = self.rpc.call_blockdev_remove(node_uuid, (lv, self.instance)) \
2301 .fail_msg
2302 if msg:
2303 self.lu.LogWarning("Can't remove old LV: %s", msg,
2304 hint="remove unused LVs manually")
2305
2306 def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2307 """Replace a disk on the primary or secondary for DRBD 8.
2308
2309 The algorithm for replace is quite complicated:
2310
2311 1. for each disk to be replaced:
2312
2313 1. create new LVs on the target node with unique names
2314 1. detach old LVs from the drbd device
2315 1. rename old LVs to name_replaced.<time_t>
2316 1. rename new LVs to old LVs
2317 1. attach the new LVs (with the old names now) to the drbd device
2318
2319 1. wait for sync across all devices
2320
2321 1. for each modified disk:
2322
2323 1. remove old LVs (which have the name name_replaces.<time_t>)
2324
2325 Failures are not very well handled.
2326
2327 """
2328 steps_total = 6
2329
2330 # Step: check device activation
2331 self.lu.LogStep(1, steps_total, "Check device existence")
2332 self._CheckDisksExistence([self.other_node_uuid, self.target_node_uuid])
2333 self._CheckVolumeGroup([self.target_node_uuid, self.other_node_uuid])
2334
2335 # Step: check other node consistency
2336 self.lu.LogStep(2, steps_total, "Check peer consistency")
2337 self._CheckDisksConsistency(
2338 self.other_node_uuid, self.other_node_uuid == self.instance.primary_node,
2339 False)
2340
2341 # Step: create new storage
2342 self.lu.LogStep(3, steps_total, "Allocate new storage")
2343 iv_names = self._CreateNewStorage(self.target_node_uuid)
2344
2345 # Step: for each lv, detach+rename*2+attach
2346 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2347 for dev, old_lvs, new_lvs in iv_names.itervalues():
2348 self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2349
2350 result = self.rpc.call_blockdev_removechildren(self.target_node_uuid,
2351 (dev, self.instance),
2352 (old_lvs, self.instance))
2353 result.Raise("Can't detach drbd from local storage on node"
2354 " %s for device %s" %
2355 (self.cfg.GetNodeName(self.target_node_uuid), dev.iv_name))
2356 #dev.children = []
2357 #cfg.Update(instance)
2358
2359 # ok, we created the new LVs, so now we know we have the needed
2360 # storage; as such, we proceed on the target node to rename
2361 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2362 # using the assumption that logical_id == unique_id on that node
2363
2364 # FIXME(iustin): use a better name for the replaced LVs
2365 temp_suffix = int(time.time())
2366 ren_fn = lambda d, suff: (d.logical_id[0],
2367 d.logical_id[1] + "_replaced-%s" % suff)
2368
2369 # Build the rename list based on what LVs exist on the node
2370 rename_old_to_new = []
2371 for to_ren in old_lvs:
2372 result = self.rpc.call_blockdev_find(self.target_node_uuid,
2373 (to_ren, self.instance))
2374 if not result.fail_msg and result.payload:
2375 # device exists
2376 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2377
2378 self.lu.LogInfo("Renaming the old LVs on the target node")
2379 result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2380 rename_old_to_new)
2381 result.Raise("Can't rename old LVs on node %s" %
2382 self.cfg.GetNodeName(self.target_node_uuid))
2383
2384 # Now we rename the new LVs to the old LVs
2385 self.lu.LogInfo("Renaming the new LVs on the target node")
2386 rename_new_to_old = [(new, old.logical_id)
2387 for old, new in zip(old_lvs, new_lvs)]
2388 result = self.rpc.call_blockdev_rename(self.target_node_uuid,
2389 rename_new_to_old)
2390 result.Raise("Can't rename new LVs on node %s" %
2391 self.cfg.GetNodeName(self.target_node_uuid))
2392
2393 # Intermediate steps of in memory modifications
2394 for old, new in zip(old_lvs, new_lvs):
2395 new.logical_id = old.logical_id
2396
2397 # We need to modify old_lvs so that removal later removes the
2398 # right LVs, not the newly added ones; note that old_lvs is a
2399 # copy here
2400 for disk in old_lvs:
2401 disk.logical_id = ren_fn(disk, temp_suffix)
2402
2403 # Now that the new lvs have the old name, we can add them to the device
2404 self.lu.LogInfo("Adding new mirror component on %s",
2405 self.cfg.GetNodeName(self.target_node_uuid))
2406 result = self.rpc.call_blockdev_addchildren(self.target_node_uuid,
2407 (dev, self.instance),
2408 (new_lvs, self.instance))
2409 msg = result.fail_msg
2410 if msg:
2411 for new_lv in new_lvs:
2412 msg2 = self.rpc.call_blockdev_remove(self.target_node_uuid,
2413 (new_lv, self.instance)).fail_msg
2414 if msg2:
2415 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2416 hint=("cleanup manually the unused logical"
2417 "volumes"))
2418 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2419
2420 cstep = itertools.count(5)
2421
2422 if self.early_release:
2423 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2424 self._RemoveOldStorage(self.target_node_uuid, iv_names)
2425 # TODO: Check if releasing locks early still makes sense
2426 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2427 else:
2428 # Release all resource locks except those used by the instance
2429 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2430 keep=self.node_secondary_ip.keys())
2431
2432 # Release all node locks while waiting for sync
2433 ReleaseLocks(self.lu, locking.LEVEL_NODE)
2434
2435 # TODO: Can the instance lock be downgraded here? Take the optional disk
2436 # shutdown in the caller into consideration.
2437
2438 # Wait for sync
2439 # This can fail as the old devices are degraded and _WaitForSync
2440 # does a combined result over all disks, so we don't check its return value
2441 self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2442 WaitForSync(self.lu, self.instance)
2443
2444 # Check all devices manually
2445 self._CheckDevices(self.instance.primary_node, iv_names)
2446
2447 # Step: remove old storage
2448 if not self.early_release:
2449 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2450 self._RemoveOldStorage(self.target_node_uuid, iv_names)
2451
2452 def _ExecDrbd8Secondary(self, feedback_fn):
2453 """Replace the secondary node for DRBD 8.
2454
2455 The algorithm for replace is quite complicated:
2456 - for all disks of the instance:
2457 - create new LVs on the new node with same names
2458 - shutdown the drbd device on the old secondary
2459 - disconnect the drbd network on the primary
2460 - create the drbd device on the new secondary
2461 - network attach the drbd on the primary, using an artifice:
2462 the drbd code for Attach() will connect to the network if it
2463 finds a device which is connected to the good local disks but
2464 not network enabled
2465 - wait for sync across all devices
2466 - remove all disks from the old secondary
2467
2468 Failures are not very well handled.
2469
2470 """
2471 steps_total = 6
2472
2473 pnode = self.instance.primary_node
2474
2475 # Step: check device activation
2476 self.lu.LogStep(1, steps_total, "Check device existence")
2477 self._CheckDisksExistence([self.instance.primary_node])
2478 self._CheckVolumeGroup([self.instance.primary_node])
2479
2480 # Step: check other node consistency
2481 self.lu.LogStep(2, steps_total, "Check peer consistency")
2482 self._CheckDisksConsistency(self.instance.primary_node, True, True)
2483
2484 # Step: create new storage
2485 self.lu.LogStep(3, steps_total, "Allocate new storage")
2486 disks = AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2487 excl_stor = IsExclusiveStorageEnabledNodeUuid(self.lu.cfg,
2488 self.new_node_uuid)
2489 for idx, dev in enumerate(disks):
2490 self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2491 (self.cfg.GetNodeName(self.new_node_uuid), idx))
2492 # we pass force_create=True to force LVM creation
2493 for new_lv in dev.children:
2494 try:
2495 _CreateBlockDevInner(self.lu, self.new_node_uuid, self.instance,
2496 new_lv, True, GetInstanceInfoText(self.instance),
2497 False, excl_stor)
2498 except errors.DeviceCreationError, e:
2499 raise errors.OpExecError("Can't create block device: %s" % e.message)
2500
2501 # Step 4: dbrd minors and drbd setups changes
2502 # after this, we must manually remove the drbd minors on both the
2503 # error and the success paths
2504 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2505 minors = self.cfg.AllocateDRBDMinor([self.new_node_uuid
2506 for _ in self.instance.disks],
2507 self.instance.uuid)
2508 logging.debug("Allocated minors %r", minors)
2509
2510 iv_names = {}
2511 for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2512 self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2513 (self.cfg.GetNodeName(self.new_node_uuid), idx))
2514 # create new devices on new_node; note that we create two IDs:
2515 # one without port, so the drbd will be activated without
2516 # networking information on the new node at this stage, and one
2517 # with network, for the latter activation in step 4
2518 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2519 if self.instance.primary_node == o_node1:
2520 p_minor = o_minor1
2521 else:
2522 assert self.instance.primary_node == o_node2, "Three-node instance?"
2523 p_minor = o_minor2
2524
2525 new_alone_id = (self.instance.primary_node, self.new_node_uuid, None,
2526 p_minor, new_minor, o_secret)
2527 new_net_id = (self.instance.primary_node, self.new_node_uuid, o_port,
2528 p_minor, new_minor, o_secret)
2529
2530 iv_names[idx] = (dev, dev.children, new_net_id)
2531 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2532 new_net_id)
2533 new_drbd = objects.Disk(dev_type=constants.DT_DRBD8,
2534 logical_id=new_alone_id,
2535 children=dev.children,
2536 size=dev.size,
2537 params={})
2538 (anno_new_drbd,) = AnnotateDiskParams(self.instance, [new_drbd],
2539 self.cfg)
2540 try:
2541 CreateSingleBlockDev(self.lu, self.new_node_uuid, self.instance,
2542 anno_new_drbd,
2543 GetInstanceInfoText(self.instance), False,
2544 excl_stor)
2545 except errors.GenericError:
2546 self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2547 raise
2548
2549 # We have new devices, shutdown the drbd on the old secondary
2550 for idx, dev in enumerate(self.instance.disks):
2551 self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2552 msg = self.rpc.call_blockdev_shutdown(self.target_node_uuid,
2553 (dev, self.instance)).fail_msg
2554 if msg:
2555 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2556 "node: %s" % (idx, msg),
2557 hint=("Please cleanup this device manually as"
2558 " soon as possible"))
2559
2560 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2561 result = self.rpc.call_drbd_disconnect_net(
2562 [pnode], (self.instance.disks, self.instance))[pnode]
2563
2564 msg = result.fail_msg
2565 if msg:
2566 # detaches didn't succeed (unlikely)
2567 self.cfg.ReleaseDRBDMinors(self.instance.uuid)
2568 raise errors.OpExecError("Can't detach the disks from the network on"
2569 " old node: %s" % (msg,))
2570
2571 # if we managed to detach at least one, we update all the disks of
2572 # the instance to point to the new secondary
2573 self.lu.LogInfo("Updating instance configuration")
2574 for dev, _, new_logical_id in iv_names.itervalues():
2575 dev.logical_id = new_logical_id
2576
2577 self.cfg.Update(self.instance, feedback_fn)
2578
2579 # Release all node locks (the configuration has been updated)
2580 ReleaseLocks(self.lu, locking.LEVEL_NODE)
2581
2582 # and now perform the drbd attach
2583 self.lu.LogInfo("Attaching primary drbds to new secondary"
2584 " (standalone => connected)")
2585 result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2586 self.new_node_uuid],
2587 (self.instance.disks, self.instance),
2588 self.instance.name,
2589 False)
2590 for to_node, to_result in result.items():
2591 msg = to_result.fail_msg
2592 if msg:
2593 raise errors.OpExecError(
2594 "Can't attach drbd disks on node %s: %s (please do a gnt-instance "
2595 "info %s to see the status of disks)" %
2596 (self.cfg.GetNodeName(to_node), msg, self.instance.name))
2597
2598 cstep = itertools.count(5)
2599
2600 if self.early_release:
2601 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2602 self._RemoveOldStorage(self.target_node_uuid, iv_names)
2603 # TODO: Check if releasing locks early still makes sense
2604 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2605 else:
2606 # Release all resource locks except those used by the instance
2607 ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2608 keep=self.node_secondary_ip.keys())
2609
2610 # TODO: Can the instance lock be downgraded here? Take the optional disk
2611 # shutdown in the caller into consideration.
2612
2613 # Wait for sync
2614 # This can fail as the old devices are degraded and _WaitForSync
2615 # does a combined result over all disks, so we don't check its return value
2616 self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2617 WaitForSync(self.lu, self.instance)
2618
2619 # Check all devices manually
2620 self._CheckDevices(self.instance.primary_node, iv_names)
2621
2622 # Step: remove old storage
2623 if not self.early_release:
2624 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2625 self._RemoveOldStorage(self.target_node_uuid, iv_names)