bfdee081f3680f6aa43456909a78bf4364709b79
[ganeti-github.git] / lib / cmdlib / instance_storage.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Logical units dealing with storage of instances."""
23
24 import itertools
25 import logging
26 import os
27 import time
28
29 from ganeti import compat
30 from ganeti import constants
31 from ganeti import errors
32 from ganeti import ht
33 from ganeti import locking
34 from ganeti.masterd import iallocator
35 from ganeti import objects
36 from ganeti import utils
37 from ganeti import opcodes
38 from ganeti import rpc
39 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, Tasklet
40 from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_NOT_RUNNING, \
41 _AnnotateDiskParams, _CheckIAllocatorOrNode, _ExpandNodeName, \
42 _CheckNodeOnline, _CheckInstanceNodeGroups, _CheckInstanceState, \
43 _IsExclusiveStorageEnabledNode, _FindFaultyInstanceDisks
44 from ganeti.cmdlib.instance_utils import _GetInstanceInfoText, \
45 _CopyLockList, _ReleaseLocks, _CheckNodeVmCapable, \
46 _BuildInstanceHookEnvByObject, _CheckNodeNotDrained, _CheckTargetNodeIPolicy
47
48 import ganeti.masterd.instance
49
50
51 _DISK_TEMPLATE_NAME_PREFIX = {
52 constants.DT_PLAIN: "",
53 constants.DT_RBD: ".rbd",
54 constants.DT_EXT: ".ext",
55 }
56
57
58 _DISK_TEMPLATE_DEVICE_TYPE = {
59 constants.DT_PLAIN: constants.LD_LV,
60 constants.DT_FILE: constants.LD_FILE,
61 constants.DT_SHARED_FILE: constants.LD_FILE,
62 constants.DT_BLOCK: constants.LD_BLOCKDEV,
63 constants.DT_RBD: constants.LD_RBD,
64 constants.DT_EXT: constants.LD_EXT,
65 }
66
67
68 def _CreateSingleBlockDev(lu, node, instance, device, info, force_open,
69 excl_stor):
70 """Create a single block device on a given node.
71
72 This will not recurse over children of the device, so they must be
73 created in advance.
74
75 @param lu: the lu on whose behalf we execute
76 @param node: the node on which to create the device
77 @type instance: L{objects.Instance}
78 @param instance: the instance which owns the device
79 @type device: L{objects.Disk}
80 @param device: the device to create
81 @param info: the extra 'metadata' we should attach to the device
82 (this will be represented as a LVM tag)
83 @type force_open: boolean
84 @param force_open: this parameter will be passes to the
85 L{backend.BlockdevCreate} function where it specifies
86 whether we run on primary or not, and it affects both
87 the child assembly and the device own Open() execution
88 @type excl_stor: boolean
89 @param excl_stor: Whether exclusive_storage is active for the node
90
91 """
92 lu.cfg.SetDiskID(device, node)
93 result = lu.rpc.call_blockdev_create(node, device, device.size,
94 instance.name, force_open, info,
95 excl_stor)
96 result.Raise("Can't create block device %s on"
97 " node %s for instance %s" % (device, node, instance.name))
98 if device.physical_id is None:
99 device.physical_id = result.payload
100
101
102 def _CreateBlockDevInner(lu, node, instance, device, force_create,
103 info, force_open, excl_stor):
104 """Create a tree of block devices on a given node.
105
106 If this device type has to be created on secondaries, create it and
107 all its children.
108
109 If not, just recurse to children keeping the same 'force' value.
110
111 @attention: The device has to be annotated already.
112
113 @param lu: the lu on whose behalf we execute
114 @param node: the node on which to create the device
115 @type instance: L{objects.Instance}
116 @param instance: the instance which owns the device
117 @type device: L{objects.Disk}
118 @param device: the device to create
119 @type force_create: boolean
120 @param force_create: whether to force creation of this device; this
121 will be change to True whenever we find a device which has
122 CreateOnSecondary() attribute
123 @param info: the extra 'metadata' we should attach to the device
124 (this will be represented as a LVM tag)
125 @type force_open: boolean
126 @param force_open: this parameter will be passes to the
127 L{backend.BlockdevCreate} function where it specifies
128 whether we run on primary or not, and it affects both
129 the child assembly and the device own Open() execution
130 @type excl_stor: boolean
131 @param excl_stor: Whether exclusive_storage is active for the node
132
133 @return: list of created devices
134 """
135 created_devices = []
136 try:
137 if device.CreateOnSecondary():
138 force_create = True
139
140 if device.children:
141 for child in device.children:
142 devs = _CreateBlockDevInner(lu, node, instance, child, force_create,
143 info, force_open, excl_stor)
144 created_devices.extend(devs)
145
146 if not force_create:
147 return created_devices
148
149 _CreateSingleBlockDev(lu, node, instance, device, info, force_open,
150 excl_stor)
151 # The device has been completely created, so there is no point in keeping
152 # its subdevices in the list. We just add the device itself instead.
153 created_devices = [(node, device)]
154 return created_devices
155
156 except errors.DeviceCreationError, e:
157 e.created_devices.extend(created_devices)
158 raise e
159 except errors.OpExecError, e:
160 raise errors.DeviceCreationError(str(e), created_devices)
161
162
163 def _IsExclusiveStorageEnabledNodeName(cfg, nodename):
164 """Whether exclusive_storage is in effect for the given node.
165
166 @type cfg: L{config.ConfigWriter}
167 @param cfg: The cluster configuration
168 @type nodename: string
169 @param nodename: The node
170 @rtype: bool
171 @return: The effective value of exclusive_storage
172 @raise errors.OpPrereqError: if no node exists with the given name
173
174 """
175 ni = cfg.GetNodeInfo(nodename)
176 if ni is None:
177 raise errors.OpPrereqError("Invalid node name %s" % nodename,
178 errors.ECODE_NOENT)
179 return _IsExclusiveStorageEnabledNode(cfg, ni)
180
181
182 def _CreateBlockDev(lu, node, instance, device, force_create, info,
183 force_open):
184 """Wrapper around L{_CreateBlockDevInner}.
185
186 This method annotates the root device first.
187
188 """
189 (disk,) = _AnnotateDiskParams(instance, [device], lu.cfg)
190 excl_stor = _IsExclusiveStorageEnabledNodeName(lu.cfg, node)
191 return _CreateBlockDevInner(lu, node, instance, disk, force_create, info,
192 force_open, excl_stor)
193
194
195 def _CreateDisks(lu, instance, to_skip=None, target_node=None):
196 """Create all disks for an instance.
197
198 This abstracts away some work from AddInstance.
199
200 @type lu: L{LogicalUnit}
201 @param lu: the logical unit on whose behalf we execute
202 @type instance: L{objects.Instance}
203 @param instance: the instance whose disks we should create
204 @type to_skip: list
205 @param to_skip: list of indices to skip
206 @type target_node: string
207 @param target_node: if passed, overrides the target node for creation
208 @rtype: boolean
209 @return: the success of the creation
210
211 """
212 info = _GetInstanceInfoText(instance)
213 if target_node is None:
214 pnode = instance.primary_node
215 all_nodes = instance.all_nodes
216 else:
217 pnode = target_node
218 all_nodes = [pnode]
219
220 if instance.disk_template in constants.DTS_FILEBASED:
221 file_storage_dir = os.path.dirname(instance.disks[0].logical_id[1])
222 result = lu.rpc.call_file_storage_dir_create(pnode, file_storage_dir)
223
224 result.Raise("Failed to create directory '%s' on"
225 " node %s" % (file_storage_dir, pnode))
226
227 disks_created = []
228 # Note: this needs to be kept in sync with adding of disks in
229 # LUInstanceSetParams
230 for idx, device in enumerate(instance.disks):
231 if to_skip and idx in to_skip:
232 continue
233 logging.info("Creating disk %s for instance '%s'", idx, instance.name)
234 #HARDCODE
235 for node in all_nodes:
236 f_create = node == pnode
237 try:
238 _CreateBlockDev(lu, node, instance, device, f_create, info, f_create)
239 disks_created.append((node, device))
240 except errors.OpExecError:
241 logging.warning("Creating disk %s for instance '%s' failed",
242 idx, instance.name)
243 except errors.DeviceCreationError, e:
244 logging.warning("Creating disk %s for instance '%s' failed",
245 idx, instance.name)
246 disks_created.extend(e.created_devices)
247 for (node, disk) in disks_created:
248 lu.cfg.SetDiskID(disk, node)
249 result = lu.rpc.call_blockdev_remove(node, disk)
250 if result.fail_msg:
251 logging.warning("Failed to remove newly-created disk %s on node %s:"
252 " %s", device, node, result.fail_msg)
253 raise errors.OpExecError(e.message)
254
255
256 def _ComputeDiskSizePerVG(disk_template, disks):
257 """Compute disk size requirements in the volume group
258
259 """
260 def _compute(disks, payload):
261 """Universal algorithm.
262
263 """
264 vgs = {}
265 for disk in disks:
266 vgs[disk[constants.IDISK_VG]] = \
267 vgs.get(constants.IDISK_VG, 0) + disk[constants.IDISK_SIZE] + payload
268
269 return vgs
270
271 # Required free disk space as a function of disk and swap space
272 req_size_dict = {
273 constants.DT_DISKLESS: {},
274 constants.DT_PLAIN: _compute(disks, 0),
275 # 128 MB are added for drbd metadata for each disk
276 constants.DT_DRBD8: _compute(disks, constants.DRBD_META_SIZE),
277 constants.DT_FILE: {},
278 constants.DT_SHARED_FILE: {},
279 }
280
281 if disk_template not in req_size_dict:
282 raise errors.ProgrammerError("Disk template '%s' size requirement"
283 " is unknown" % disk_template)
284
285 return req_size_dict[disk_template]
286
287
288 def _ComputeDisks(op, default_vg):
289 """Computes the instance disks.
290
291 @param op: The instance opcode
292 @param default_vg: The default_vg to assume
293
294 @return: The computed disks
295
296 """
297 disks = []
298 for disk in op.disks:
299 mode = disk.get(constants.IDISK_MODE, constants.DISK_RDWR)
300 if mode not in constants.DISK_ACCESS_SET:
301 raise errors.OpPrereqError("Invalid disk access mode '%s'" %
302 mode, errors.ECODE_INVAL)
303 size = disk.get(constants.IDISK_SIZE, None)
304 if size is None:
305 raise errors.OpPrereqError("Missing disk size", errors.ECODE_INVAL)
306 try:
307 size = int(size)
308 except (TypeError, ValueError):
309 raise errors.OpPrereqError("Invalid disk size '%s'" % size,
310 errors.ECODE_INVAL)
311
312 ext_provider = disk.get(constants.IDISK_PROVIDER, None)
313 if ext_provider and op.disk_template != constants.DT_EXT:
314 raise errors.OpPrereqError("The '%s' option is only valid for the %s"
315 " disk template, not %s" %
316 (constants.IDISK_PROVIDER, constants.DT_EXT,
317 op.disk_template), errors.ECODE_INVAL)
318
319 data_vg = disk.get(constants.IDISK_VG, default_vg)
320 name = disk.get(constants.IDISK_NAME, None)
321 if name is not None and name.lower() == constants.VALUE_NONE:
322 name = None
323 new_disk = {
324 constants.IDISK_SIZE: size,
325 constants.IDISK_MODE: mode,
326 constants.IDISK_VG: data_vg,
327 constants.IDISK_NAME: name,
328 }
329
330 if constants.IDISK_METAVG in disk:
331 new_disk[constants.IDISK_METAVG] = disk[constants.IDISK_METAVG]
332 if constants.IDISK_ADOPT in disk:
333 new_disk[constants.IDISK_ADOPT] = disk[constants.IDISK_ADOPT]
334
335 # For extstorage, demand the `provider' option and add any
336 # additional parameters (ext-params) to the dict
337 if op.disk_template == constants.DT_EXT:
338 if ext_provider:
339 new_disk[constants.IDISK_PROVIDER] = ext_provider
340 for key in disk:
341 if key not in constants.IDISK_PARAMS:
342 new_disk[key] = disk[key]
343 else:
344 raise errors.OpPrereqError("Missing provider for template '%s'" %
345 constants.DT_EXT, errors.ECODE_INVAL)
346
347 disks.append(new_disk)
348
349 return disks
350
351
352 def _CheckRADOSFreeSpace():
353 """Compute disk size requirements inside the RADOS cluster.
354
355 """
356 # For the RADOS cluster we assume there is always enough space.
357 pass
358
359
360 def _GenerateDRBD8Branch(lu, primary, secondary, size, vgnames, names,
361 iv_name, p_minor, s_minor):
362 """Generate a drbd8 device complete with its children.
363
364 """
365 assert len(vgnames) == len(names) == 2
366 port = lu.cfg.AllocatePort()
367 shared_secret = lu.cfg.GenerateDRBDSecret(lu.proc.GetECId())
368
369 dev_data = objects.Disk(dev_type=constants.LD_LV, size=size,
370 logical_id=(vgnames[0], names[0]),
371 params={})
372 dev_data.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
373 dev_meta = objects.Disk(dev_type=constants.LD_LV,
374 size=constants.DRBD_META_SIZE,
375 logical_id=(vgnames[1], names[1]),
376 params={})
377 dev_meta.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
378 drbd_dev = objects.Disk(dev_type=constants.LD_DRBD8, size=size,
379 logical_id=(primary, secondary, port,
380 p_minor, s_minor,
381 shared_secret),
382 children=[dev_data, dev_meta],
383 iv_name=iv_name, params={})
384 drbd_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
385 return drbd_dev
386
387
388 def _GenerateDiskTemplate(
389 lu, template_name, instance_name, primary_node, secondary_nodes,
390 disk_info, file_storage_dir, file_driver, base_index,
391 feedback_fn, full_disk_params, _req_file_storage=opcodes.RequireFileStorage,
392 _req_shr_file_storage=opcodes.RequireSharedFileStorage):
393 """Generate the entire disk layout for a given template type.
394
395 """
396 vgname = lu.cfg.GetVGName()
397 disk_count = len(disk_info)
398 disks = []
399
400 if template_name == constants.DT_DISKLESS:
401 pass
402 elif template_name == constants.DT_DRBD8:
403 if len(secondary_nodes) != 1:
404 raise errors.ProgrammerError("Wrong template configuration")
405 remote_node = secondary_nodes[0]
406 minors = lu.cfg.AllocateDRBDMinor(
407 [primary_node, remote_node] * len(disk_info), instance_name)
408
409 (drbd_params, _, _) = objects.Disk.ComputeLDParams(template_name,
410 full_disk_params)
411 drbd_default_metavg = drbd_params[constants.LDP_DEFAULT_METAVG]
412
413 names = []
414 for lv_prefix in _GenerateUniqueNames(lu, [".disk%d" % (base_index + i)
415 for i in range(disk_count)]):
416 names.append(lv_prefix + "_data")
417 names.append(lv_prefix + "_meta")
418 for idx, disk in enumerate(disk_info):
419 disk_index = idx + base_index
420 data_vg = disk.get(constants.IDISK_VG, vgname)
421 meta_vg = disk.get(constants.IDISK_METAVG, drbd_default_metavg)
422 disk_dev = _GenerateDRBD8Branch(lu, primary_node, remote_node,
423 disk[constants.IDISK_SIZE],
424 [data_vg, meta_vg],
425 names[idx * 2:idx * 2 + 2],
426 "disk/%d" % disk_index,
427 minors[idx * 2], minors[idx * 2 + 1])
428 disk_dev.mode = disk[constants.IDISK_MODE]
429 disk_dev.name = disk.get(constants.IDISK_NAME, None)
430 disks.append(disk_dev)
431 else:
432 if secondary_nodes:
433 raise errors.ProgrammerError("Wrong template configuration")
434
435 if template_name == constants.DT_FILE:
436 _req_file_storage()
437 elif template_name == constants.DT_SHARED_FILE:
438 _req_shr_file_storage()
439
440 name_prefix = _DISK_TEMPLATE_NAME_PREFIX.get(template_name, None)
441 if name_prefix is None:
442 names = None
443 else:
444 names = _GenerateUniqueNames(lu, ["%s.disk%s" %
445 (name_prefix, base_index + i)
446 for i in range(disk_count)])
447
448 if template_name == constants.DT_PLAIN:
449
450 def logical_id_fn(idx, _, disk):
451 vg = disk.get(constants.IDISK_VG, vgname)
452 return (vg, names[idx])
453
454 elif template_name in (constants.DT_FILE, constants.DT_SHARED_FILE):
455 logical_id_fn = \
456 lambda _, disk_index, disk: (file_driver,
457 "%s/disk%d" % (file_storage_dir,
458 disk_index))
459 elif template_name == constants.DT_BLOCK:
460 logical_id_fn = \
461 lambda idx, disk_index, disk: (constants.BLOCKDEV_DRIVER_MANUAL,
462 disk[constants.IDISK_ADOPT])
463 elif template_name == constants.DT_RBD:
464 logical_id_fn = lambda idx, _, disk: ("rbd", names[idx])
465 elif template_name == constants.DT_EXT:
466 def logical_id_fn(idx, _, disk):
467 provider = disk.get(constants.IDISK_PROVIDER, None)
468 if provider is None:
469 raise errors.ProgrammerError("Disk template is %s, but '%s' is"
470 " not found", constants.DT_EXT,
471 constants.IDISK_PROVIDER)
472 return (provider, names[idx])
473 else:
474 raise errors.ProgrammerError("Unknown disk template '%s'" % template_name)
475
476 dev_type = _DISK_TEMPLATE_DEVICE_TYPE[template_name]
477
478 for idx, disk in enumerate(disk_info):
479 params = {}
480 # Only for the Ext template add disk_info to params
481 if template_name == constants.DT_EXT:
482 params[constants.IDISK_PROVIDER] = disk[constants.IDISK_PROVIDER]
483 for key in disk:
484 if key not in constants.IDISK_PARAMS:
485 params[key] = disk[key]
486 disk_index = idx + base_index
487 size = disk[constants.IDISK_SIZE]
488 feedback_fn("* disk %s, size %s" %
489 (disk_index, utils.FormatUnit(size, "h")))
490 disk_dev = objects.Disk(dev_type=dev_type, size=size,
491 logical_id=logical_id_fn(idx, disk_index, disk),
492 iv_name="disk/%d" % disk_index,
493 mode=disk[constants.IDISK_MODE],
494 params=params)
495 disk_dev.name = disk.get(constants.IDISK_NAME, None)
496 disk_dev.uuid = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
497 disks.append(disk_dev)
498
499 return disks
500
501
502 class LUInstanceRecreateDisks(LogicalUnit):
503 """Recreate an instance's missing disks.
504
505 """
506 HPATH = "instance-recreate-disks"
507 HTYPE = constants.HTYPE_INSTANCE
508 REQ_BGL = False
509
510 _MODIFYABLE = compat.UniqueFrozenset([
511 constants.IDISK_SIZE,
512 constants.IDISK_MODE,
513 ])
514
515 # New or changed disk parameters may have different semantics
516 assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
517 constants.IDISK_ADOPT,
518
519 # TODO: Implement support changing VG while recreating
520 constants.IDISK_VG,
521 constants.IDISK_METAVG,
522 constants.IDISK_PROVIDER,
523 constants.IDISK_NAME,
524 ]))
525
526 def _RunAllocator(self):
527 """Run the allocator based on input opcode.
528
529 """
530 be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
531
532 # FIXME
533 # The allocator should actually run in "relocate" mode, but current
534 # allocators don't support relocating all the nodes of an instance at
535 # the same time. As a workaround we use "allocate" mode, but this is
536 # suboptimal for two reasons:
537 # - The instance name passed to the allocator is present in the list of
538 # existing instances, so there could be a conflict within the
539 # internal structures of the allocator. This doesn't happen with the
540 # current allocators, but it's a liability.
541 # - The allocator counts the resources used by the instance twice: once
542 # because the instance exists already, and once because it tries to
543 # allocate a new instance.
544 # The allocator could choose some of the nodes on which the instance is
545 # running, but that's not a problem. If the instance nodes are broken,
546 # they should be already be marked as drained or offline, and hence
547 # skipped by the allocator. If instance disks have been lost for other
548 # reasons, then recreating the disks on the same nodes should be fine.
549 disk_template = self.instance.disk_template
550 spindle_use = be_full[constants.BE_SPINDLE_USE]
551 req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
552 disk_template=disk_template,
553 tags=list(self.instance.GetTags()),
554 os=self.instance.os,
555 nics=[{}],
556 vcpus=be_full[constants.BE_VCPUS],
557 memory=be_full[constants.BE_MAXMEM],
558 spindle_use=spindle_use,
559 disks=[{constants.IDISK_SIZE: d.size,
560 constants.IDISK_MODE: d.mode}
561 for d in self.instance.disks],
562 hypervisor=self.instance.hypervisor,
563 node_whitelist=None)
564 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
565
566 ial.Run(self.op.iallocator)
567
568 assert req.RequiredNodes() == len(self.instance.all_nodes)
569
570 if not ial.success:
571 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
572 " %s" % (self.op.iallocator, ial.info),
573 errors.ECODE_NORES)
574
575 self.op.nodes = ial.result
576 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
577 self.op.instance_name, self.op.iallocator,
578 utils.CommaJoin(ial.result))
579
580 def CheckArguments(self):
581 if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
582 # Normalize and convert deprecated list of disk indices
583 self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
584
585 duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
586 if duplicates:
587 raise errors.OpPrereqError("Some disks have been specified more than"
588 " once: %s" % utils.CommaJoin(duplicates),
589 errors.ECODE_INVAL)
590
591 # We don't want _CheckIAllocatorOrNode selecting the default iallocator
592 # when neither iallocator nor nodes are specified
593 if self.op.iallocator or self.op.nodes:
594 _CheckIAllocatorOrNode(self, "iallocator", "nodes")
595
596 for (idx, params) in self.op.disks:
597 utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
598 unsupported = frozenset(params.keys()) - self._MODIFYABLE
599 if unsupported:
600 raise errors.OpPrereqError("Parameters for disk %s try to change"
601 " unmodifyable parameter(s): %s" %
602 (idx, utils.CommaJoin(unsupported)),
603 errors.ECODE_INVAL)
604
605 def ExpandNames(self):
606 self._ExpandAndLockInstance()
607 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
608
609 if self.op.nodes:
610 self.op.nodes = [_ExpandNodeName(self.cfg, n) for n in self.op.nodes]
611 self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
612 else:
613 self.needed_locks[locking.LEVEL_NODE] = []
614 if self.op.iallocator:
615 # iallocator will select a new node in the same group
616 self.needed_locks[locking.LEVEL_NODEGROUP] = []
617 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
618
619 self.needed_locks[locking.LEVEL_NODE_RES] = []
620
621 def DeclareLocks(self, level):
622 if level == locking.LEVEL_NODEGROUP:
623 assert self.op.iallocator is not None
624 assert not self.op.nodes
625 assert not self.needed_locks[locking.LEVEL_NODEGROUP]
626 self.share_locks[locking.LEVEL_NODEGROUP] = 1
627 # Lock the primary group used by the instance optimistically; this
628 # requires going via the node before it's locked, requiring
629 # verification later on
630 self.needed_locks[locking.LEVEL_NODEGROUP] = \
631 self.cfg.GetInstanceNodeGroups(self.op.instance_name, primary_only=True)
632
633 elif level == locking.LEVEL_NODE:
634 # If an allocator is used, then we lock all the nodes in the current
635 # instance group, as we don't know yet which ones will be selected;
636 # if we replace the nodes without using an allocator, locks are
637 # already declared in ExpandNames; otherwise, we need to lock all the
638 # instance nodes for disk re-creation
639 if self.op.iallocator:
640 assert not self.op.nodes
641 assert not self.needed_locks[locking.LEVEL_NODE]
642 assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
643
644 # Lock member nodes of the group of the primary node
645 for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
646 self.needed_locks[locking.LEVEL_NODE].extend(
647 self.cfg.GetNodeGroup(group_uuid).members)
648
649 assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
650 elif not self.op.nodes:
651 self._LockInstancesNodes(primary_only=False)
652 elif level == locking.LEVEL_NODE_RES:
653 # Copy node locks
654 self.needed_locks[locking.LEVEL_NODE_RES] = \
655 _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
656
657 def BuildHooksEnv(self):
658 """Build hooks env.
659
660 This runs on master, primary and secondary nodes of the instance.
661
662 """
663 return _BuildInstanceHookEnvByObject(self, self.instance)
664
665 def BuildHooksNodes(self):
666 """Build hooks nodes.
667
668 """
669 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
670 return (nl, nl)
671
672 def CheckPrereq(self):
673 """Check prerequisites.
674
675 This checks that the instance is in the cluster and is not running.
676
677 """
678 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
679 assert instance is not None, \
680 "Cannot retrieve locked instance %s" % self.op.instance_name
681 if self.op.nodes:
682 if len(self.op.nodes) != len(instance.all_nodes):
683 raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
684 " %d replacement nodes were specified" %
685 (instance.name, len(instance.all_nodes),
686 len(self.op.nodes)),
687 errors.ECODE_INVAL)
688 assert instance.disk_template != constants.DT_DRBD8 or \
689 len(self.op.nodes) == 2
690 assert instance.disk_template != constants.DT_PLAIN or \
691 len(self.op.nodes) == 1
692 primary_node = self.op.nodes[0]
693 else:
694 primary_node = instance.primary_node
695 if not self.op.iallocator:
696 _CheckNodeOnline(self, primary_node)
697
698 if instance.disk_template == constants.DT_DISKLESS:
699 raise errors.OpPrereqError("Instance '%s' has no disks" %
700 self.op.instance_name, errors.ECODE_INVAL)
701
702 # Verify if node group locks are still correct
703 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
704 if owned_groups:
705 # Node group locks are acquired only for the primary node (and only
706 # when the allocator is used)
707 _CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups,
708 primary_only=True)
709
710 # if we replace nodes *and* the old primary is offline, we don't
711 # check the instance state
712 old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
713 if not ((self.op.iallocator or self.op.nodes) and old_pnode.offline):
714 _CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
715 msg="cannot recreate disks")
716
717 if self.op.disks:
718 self.disks = dict(self.op.disks)
719 else:
720 self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
721
722 maxidx = max(self.disks.keys())
723 if maxidx >= len(instance.disks):
724 raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
725 errors.ECODE_INVAL)
726
727 if ((self.op.nodes or self.op.iallocator) and
728 sorted(self.disks.keys()) != range(len(instance.disks))):
729 raise errors.OpPrereqError("Can't recreate disks partially and"
730 " change the nodes at the same time",
731 errors.ECODE_INVAL)
732
733 self.instance = instance
734
735 if self.op.iallocator:
736 self._RunAllocator()
737 # Release unneeded node and node resource locks
738 _ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes)
739 _ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes)
740 _ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
741
742 assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
743
744 def Exec(self, feedback_fn):
745 """Recreate the disks.
746
747 """
748 instance = self.instance
749
750 assert (self.owned_locks(locking.LEVEL_NODE) ==
751 self.owned_locks(locking.LEVEL_NODE_RES))
752
753 to_skip = []
754 mods = [] # keeps track of needed changes
755
756 for idx, disk in enumerate(instance.disks):
757 try:
758 changes = self.disks[idx]
759 except KeyError:
760 # Disk should not be recreated
761 to_skip.append(idx)
762 continue
763
764 # update secondaries for disks, if needed
765 if self.op.nodes and disk.dev_type == constants.LD_DRBD8:
766 # need to update the nodes and minors
767 assert len(self.op.nodes) == 2
768 assert len(disk.logical_id) == 6 # otherwise disk internals
769 # have changed
770 (_, _, old_port, _, _, old_secret) = disk.logical_id
771 new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name)
772 new_id = (self.op.nodes[0], self.op.nodes[1], old_port,
773 new_minors[0], new_minors[1], old_secret)
774 assert len(disk.logical_id) == len(new_id)
775 else:
776 new_id = None
777
778 mods.append((idx, new_id, changes))
779
780 # now that we have passed all asserts above, we can apply the mods
781 # in a single run (to avoid partial changes)
782 for idx, new_id, changes in mods:
783 disk = instance.disks[idx]
784 if new_id is not None:
785 assert disk.dev_type == constants.LD_DRBD8
786 disk.logical_id = new_id
787 if changes:
788 disk.Update(size=changes.get(constants.IDISK_SIZE, None),
789 mode=changes.get(constants.IDISK_MODE, None))
790
791 # change primary node, if needed
792 if self.op.nodes:
793 instance.primary_node = self.op.nodes[0]
794 self.LogWarning("Changing the instance's nodes, you will have to"
795 " remove any disks left on the older nodes manually")
796
797 if self.op.nodes:
798 self.cfg.Update(instance, feedback_fn)
799
800 # All touched nodes must be locked
801 mylocks = self.owned_locks(locking.LEVEL_NODE)
802 assert mylocks.issuperset(frozenset(instance.all_nodes))
803 _CreateDisks(self, instance, to_skip=to_skip)
804
805
806 def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
807 """Checks if nodes have enough free disk space in the specified VG.
808
809 This function checks if all given nodes have the needed amount of
810 free disk. In case any node has less disk or we cannot get the
811 information from the node, this function raises an OpPrereqError
812 exception.
813
814 @type lu: C{LogicalUnit}
815 @param lu: a logical unit from which we get configuration data
816 @type nodenames: C{list}
817 @param nodenames: the list of node names to check
818 @type vg: C{str}
819 @param vg: the volume group to check
820 @type requested: C{int}
821 @param requested: the amount of disk in MiB to check for
822 @raise errors.OpPrereqError: if the node doesn't have enough disk,
823 or we cannot check the node
824
825 """
826 es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, nodenames)
827 nodeinfo = lu.rpc.call_node_info(nodenames, [vg], None, es_flags)
828 for node in nodenames:
829 info = nodeinfo[node]
830 info.Raise("Cannot get current information from node %s" % node,
831 prereq=True, ecode=errors.ECODE_ENVIRON)
832 (_, (vg_info, ), _) = info.payload
833 vg_free = vg_info.get("vg_free", None)
834 if not isinstance(vg_free, int):
835 raise errors.OpPrereqError("Can't compute free disk space on node"
836 " %s for vg %s, result was '%s'" %
837 (node, vg, vg_free), errors.ECODE_ENVIRON)
838 if requested > vg_free:
839 raise errors.OpPrereqError("Not enough disk space on target node %s"
840 " vg %s: required %d MiB, available %d MiB" %
841 (node, vg, requested, vg_free),
842 errors.ECODE_NORES)
843
844
845 def _CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
846 """Checks if nodes have enough free disk space in all the VGs.
847
848 This function checks if all given nodes have the needed amount of
849 free disk. In case any node has less disk or we cannot get the
850 information from the node, this function raises an OpPrereqError
851 exception.
852
853 @type lu: C{LogicalUnit}
854 @param lu: a logical unit from which we get configuration data
855 @type nodenames: C{list}
856 @param nodenames: the list of node names to check
857 @type req_sizes: C{dict}
858 @param req_sizes: the hash of vg and corresponding amount of disk in
859 MiB to check for
860 @raise errors.OpPrereqError: if the node doesn't have enough disk,
861 or we cannot check the node
862
863 """
864 for vg, req_size in req_sizes.items():
865 _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size)
866
867
868 def _DiskSizeInBytesToMebibytes(lu, size):
869 """Converts a disk size in bytes to mebibytes.
870
871 Warns and rounds up if the size isn't an even multiple of 1 MiB.
872
873 """
874 (mib, remainder) = divmod(size, 1024 * 1024)
875
876 if remainder != 0:
877 lu.LogWarning("Disk size is not an even multiple of 1 MiB; rounding up"
878 " to not overwrite existing data (%s bytes will not be"
879 " wiped)", (1024 * 1024) - remainder)
880 mib += 1
881
882 return mib
883
884
885 def _CalcEta(time_taken, written, total_size):
886 """Calculates the ETA based on size written and total size.
887
888 @param time_taken: The time taken so far
889 @param written: amount written so far
890 @param total_size: The total size of data to be written
891 @return: The remaining time in seconds
892
893 """
894 avg_time = time_taken / float(written)
895 return (total_size - written) * avg_time
896
897
898 def _WipeDisks(lu, instance, disks=None):
899 """Wipes instance disks.
900
901 @type lu: L{LogicalUnit}
902 @param lu: the logical unit on whose behalf we execute
903 @type instance: L{objects.Instance}
904 @param instance: the instance whose disks we should create
905 @type disks: None or list of tuple of (number, L{objects.Disk}, number)
906 @param disks: Disk details; tuple contains disk index, disk object and the
907 start offset
908
909 """
910 node = instance.primary_node
911
912 if disks is None:
913 disks = [(idx, disk, 0)
914 for (idx, disk) in enumerate(instance.disks)]
915
916 for (_, device, _) in disks:
917 lu.cfg.SetDiskID(device, node)
918
919 logging.info("Pausing synchronization of disks of instance '%s'",
920 instance.name)
921 result = lu.rpc.call_blockdev_pause_resume_sync(node,
922 (map(compat.snd, disks),
923 instance),
924 True)
925 result.Raise("Failed to pause disk synchronization on node '%s'" % node)
926
927 for idx, success in enumerate(result.payload):
928 if not success:
929 logging.warn("Pausing synchronization of disk %s of instance '%s'"
930 " failed", idx, instance.name)
931
932 try:
933 for (idx, device, offset) in disks:
934 # The wipe size is MIN_WIPE_CHUNK_PERCENT % of the instance disk but
935 # MAX_WIPE_CHUNK at max. Truncating to integer to avoid rounding errors.
936 wipe_chunk_size = \
937 int(min(constants.MAX_WIPE_CHUNK,
938 device.size / 100.0 * constants.MIN_WIPE_CHUNK_PERCENT))
939
940 size = device.size
941 last_output = 0
942 start_time = time.time()
943
944 if offset == 0:
945 info_text = ""
946 else:
947 info_text = (" (from %s to %s)" %
948 (utils.FormatUnit(offset, "h"),
949 utils.FormatUnit(size, "h")))
950
951 lu.LogInfo("* Wiping disk %s%s", idx, info_text)
952
953 logging.info("Wiping disk %d for instance %s on node %s using"
954 " chunk size %s", idx, instance.name, node, wipe_chunk_size)
955
956 while offset < size:
957 wipe_size = min(wipe_chunk_size, size - offset)
958
959 logging.debug("Wiping disk %d, offset %s, chunk %s",
960 idx, offset, wipe_size)
961
962 result = lu.rpc.call_blockdev_wipe(node, (device, instance), offset,
963 wipe_size)
964 result.Raise("Could not wipe disk %d at offset %d for size %d" %
965 (idx, offset, wipe_size))
966
967 now = time.time()
968 offset += wipe_size
969 if now - last_output >= 60:
970 eta = _CalcEta(now - start_time, offset, size)
971 lu.LogInfo(" - done: %.1f%% ETA: %s",
972 offset / float(size) * 100, utils.FormatSeconds(eta))
973 last_output = now
974 finally:
975 logging.info("Resuming synchronization of disks for instance '%s'",
976 instance.name)
977
978 result = lu.rpc.call_blockdev_pause_resume_sync(node,
979 (map(compat.snd, disks),
980 instance),
981 False)
982
983 if result.fail_msg:
984 lu.LogWarning("Failed to resume disk synchronization on node '%s': %s",
985 node, result.fail_msg)
986 else:
987 for idx, success in enumerate(result.payload):
988 if not success:
989 lu.LogWarning("Resuming synchronization of disk %s of instance '%s'"
990 " failed", idx, instance.name)
991
992
993 def _ExpandCheckDisks(instance, disks):
994 """Return the instance disks selected by the disks list
995
996 @type disks: list of L{objects.Disk} or None
997 @param disks: selected disks
998 @rtype: list of L{objects.Disk}
999 @return: selected instance disks to act on
1000
1001 """
1002 if disks is None:
1003 return instance.disks
1004 else:
1005 if not set(disks).issubset(instance.disks):
1006 raise errors.ProgrammerError("Can only act on disks belonging to the"
1007 " target instance")
1008 return disks
1009
1010
1011 def _WaitForSync(lu, instance, disks=None, oneshot=False):
1012 """Sleep and poll for an instance's disk to sync.
1013
1014 """
1015 if not instance.disks or disks is not None and not disks:
1016 return True
1017
1018 disks = _ExpandCheckDisks(instance, disks)
1019
1020 if not oneshot:
1021 lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
1022
1023 node = instance.primary_node
1024
1025 for dev in disks:
1026 lu.cfg.SetDiskID(dev, node)
1027
1028 # TODO: Convert to utils.Retry
1029
1030 retries = 0
1031 degr_retries = 10 # in seconds, as we sleep 1 second each time
1032 while True:
1033 max_time = 0
1034 done = True
1035 cumul_degraded = False
1036 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, (disks, instance))
1037 msg = rstats.fail_msg
1038 if msg:
1039 lu.LogWarning("Can't get any data from node %s: %s", node, msg)
1040 retries += 1
1041 if retries >= 10:
1042 raise errors.RemoteError("Can't contact node %s for mirror data,"
1043 " aborting." % node)
1044 time.sleep(6)
1045 continue
1046 rstats = rstats.payload
1047 retries = 0
1048 for i, mstat in enumerate(rstats):
1049 if mstat is None:
1050 lu.LogWarning("Can't compute data for node %s/%s",
1051 node, disks[i].iv_name)
1052 continue
1053
1054 cumul_degraded = (cumul_degraded or
1055 (mstat.is_degraded and mstat.sync_percent is None))
1056 if mstat.sync_percent is not None:
1057 done = False
1058 if mstat.estimated_time is not None:
1059 rem_time = ("%s remaining (estimated)" %
1060 utils.FormatSeconds(mstat.estimated_time))
1061 max_time = mstat.estimated_time
1062 else:
1063 rem_time = "no time estimate"
1064 lu.LogInfo("- device %s: %5.2f%% done, %s",
1065 disks[i].iv_name, mstat.sync_percent, rem_time)
1066
1067 # if we're done but degraded, let's do a few small retries, to
1068 # make sure we see a stable and not transient situation; therefore
1069 # we force restart of the loop
1070 if (done or oneshot) and cumul_degraded and degr_retries > 0:
1071 logging.info("Degraded disks found, %d retries left", degr_retries)
1072 degr_retries -= 1
1073 time.sleep(1)
1074 continue
1075
1076 if done or oneshot:
1077 break
1078
1079 time.sleep(min(60, max_time))
1080
1081 if done:
1082 lu.LogInfo("Instance %s's disks are in sync", instance.name)
1083
1084 return not cumul_degraded
1085
1086
1087 def _ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1088 """Shutdown block devices of an instance.
1089
1090 This does the shutdown on all nodes of the instance.
1091
1092 If the ignore_primary is false, errors on the primary node are
1093 ignored.
1094
1095 """
1096 all_result = True
1097 disks = _ExpandCheckDisks(instance, disks)
1098
1099 for disk in disks:
1100 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1101 lu.cfg.SetDiskID(top_disk, node)
1102 result = lu.rpc.call_blockdev_shutdown(node, (top_disk, instance))
1103 msg = result.fail_msg
1104 if msg:
1105 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1106 disk.iv_name, node, msg)
1107 if ((node == instance.primary_node and not ignore_primary) or
1108 (node != instance.primary_node and not result.offline)):
1109 all_result = False
1110 return all_result
1111
1112
1113 def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1114 """Shutdown block devices of an instance.
1115
1116 This function checks if an instance is running, before calling
1117 _ShutdownInstanceDisks.
1118
1119 """
1120 _CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1121 _ShutdownInstanceDisks(lu, instance, disks=disks)
1122
1123
1124 def _AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1125 ignore_size=False):
1126 """Prepare the block devices for an instance.
1127
1128 This sets up the block devices on all nodes.
1129
1130 @type lu: L{LogicalUnit}
1131 @param lu: the logical unit on whose behalf we execute
1132 @type instance: L{objects.Instance}
1133 @param instance: the instance for whose disks we assemble
1134 @type disks: list of L{objects.Disk} or None
1135 @param disks: which disks to assemble (or all, if None)
1136 @type ignore_secondaries: boolean
1137 @param ignore_secondaries: if true, errors on secondary nodes
1138 won't result in an error return from the function
1139 @type ignore_size: boolean
1140 @param ignore_size: if true, the current known size of the disk
1141 will not be used during the disk activation, useful for cases
1142 when the size is wrong
1143 @return: False if the operation failed, otherwise a list of
1144 (host, instance_visible_name, node_visible_name)
1145 with the mapping from node devices to instance devices
1146
1147 """
1148 device_info = []
1149 disks_ok = True
1150 iname = instance.name
1151 disks = _ExpandCheckDisks(instance, disks)
1152
1153 # With the two passes mechanism we try to reduce the window of
1154 # opportunity for the race condition of switching DRBD to primary
1155 # before handshaking occured, but we do not eliminate it
1156
1157 # The proper fix would be to wait (with some limits) until the
1158 # connection has been made and drbd transitions from WFConnection
1159 # into any other network-connected state (Connected, SyncTarget,
1160 # SyncSource, etc.)
1161
1162 # 1st pass, assemble on all nodes in secondary mode
1163 for idx, inst_disk in enumerate(disks):
1164 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1165 if ignore_size:
1166 node_disk = node_disk.Copy()
1167 node_disk.UnsetSize()
1168 lu.cfg.SetDiskID(node_disk, node)
1169 result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1170 False, idx)
1171 msg = result.fail_msg
1172 if msg:
1173 is_offline_secondary = (node in instance.secondary_nodes and
1174 result.offline)
1175 lu.LogWarning("Could not prepare block device %s on node %s"
1176 " (is_primary=False, pass=1): %s",
1177 inst_disk.iv_name, node, msg)
1178 if not (ignore_secondaries or is_offline_secondary):
1179 disks_ok = False
1180
1181 # FIXME: race condition on drbd migration to primary
1182
1183 # 2nd pass, do only the primary node
1184 for idx, inst_disk in enumerate(disks):
1185 dev_path = None
1186
1187 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1188 if node != instance.primary_node:
1189 continue
1190 if ignore_size:
1191 node_disk = node_disk.Copy()
1192 node_disk.UnsetSize()
1193 lu.cfg.SetDiskID(node_disk, node)
1194 result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1195 True, idx)
1196 msg = result.fail_msg
1197 if msg:
1198 lu.LogWarning("Could not prepare block device %s on node %s"
1199 " (is_primary=True, pass=2): %s",
1200 inst_disk.iv_name, node, msg)
1201 disks_ok = False
1202 else:
1203 dev_path = result.payload
1204
1205 device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
1206
1207 # leave the disks configured for the primary node
1208 # this is a workaround that would be fixed better by
1209 # improving the logical/physical id handling
1210 for disk in disks:
1211 lu.cfg.SetDiskID(disk, instance.primary_node)
1212
1213 return disks_ok, device_info
1214
1215
1216 def _StartInstanceDisks(lu, instance, force):
1217 """Start the disks of an instance.
1218
1219 """
1220 disks_ok, _ = _AssembleInstanceDisks(lu, instance,
1221 ignore_secondaries=force)
1222 if not disks_ok:
1223 _ShutdownInstanceDisks(lu, instance)
1224 if force is not None and not force:
1225 lu.LogWarning("",
1226 hint=("If the message above refers to a secondary node,"
1227 " you can retry the operation using '--force'"))
1228 raise errors.OpExecError("Disk consistency error")
1229
1230
1231 class LUInstanceGrowDisk(LogicalUnit):
1232 """Grow a disk of an instance.
1233
1234 """
1235 HPATH = "disk-grow"
1236 HTYPE = constants.HTYPE_INSTANCE
1237 REQ_BGL = False
1238
1239 def ExpandNames(self):
1240 self._ExpandAndLockInstance()
1241 self.needed_locks[locking.LEVEL_NODE] = []
1242 self.needed_locks[locking.LEVEL_NODE_RES] = []
1243 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1244 self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1245
1246 def DeclareLocks(self, level):
1247 if level == locking.LEVEL_NODE:
1248 self._LockInstancesNodes()
1249 elif level == locking.LEVEL_NODE_RES:
1250 # Copy node locks
1251 self.needed_locks[locking.LEVEL_NODE_RES] = \
1252 _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
1253
1254 def BuildHooksEnv(self):
1255 """Build hooks env.
1256
1257 This runs on the master, the primary and all the secondaries.
1258
1259 """
1260 env = {
1261 "DISK": self.op.disk,
1262 "AMOUNT": self.op.amount,
1263 "ABSOLUTE": self.op.absolute,
1264 }
1265 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
1266 return env
1267
1268 def BuildHooksNodes(self):
1269 """Build hooks nodes.
1270
1271 """
1272 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1273 return (nl, nl)
1274
1275 def CheckPrereq(self):
1276 """Check prerequisites.
1277
1278 This checks that the instance is in the cluster.
1279
1280 """
1281 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1282 assert instance is not None, \
1283 "Cannot retrieve locked instance %s" % self.op.instance_name
1284 nodenames = list(instance.all_nodes)
1285 for node in nodenames:
1286 _CheckNodeOnline(self, node)
1287
1288 self.instance = instance
1289
1290 if instance.disk_template not in constants.DTS_GROWABLE:
1291 raise errors.OpPrereqError("Instance's disk layout does not support"
1292 " growing", errors.ECODE_INVAL)
1293
1294 self.disk = instance.FindDisk(self.op.disk)
1295
1296 if self.op.absolute:
1297 self.target = self.op.amount
1298 self.delta = self.target - self.disk.size
1299 if self.delta < 0:
1300 raise errors.OpPrereqError("Requested size (%s) is smaller than "
1301 "current disk size (%s)" %
1302 (utils.FormatUnit(self.target, "h"),
1303 utils.FormatUnit(self.disk.size, "h")),
1304 errors.ECODE_STATE)
1305 else:
1306 self.delta = self.op.amount
1307 self.target = self.disk.size + self.delta
1308 if self.delta < 0:
1309 raise errors.OpPrereqError("Requested increment (%s) is negative" %
1310 utils.FormatUnit(self.delta, "h"),
1311 errors.ECODE_INVAL)
1312
1313 self._CheckDiskSpace(nodenames, self.disk.ComputeGrowth(self.delta))
1314
1315 def _CheckDiskSpace(self, nodenames, req_vgspace):
1316 template = self.instance.disk_template
1317 if template not in (constants.DTS_NO_FREE_SPACE_CHECK):
1318 # TODO: check the free disk space for file, when that feature will be
1319 # supported
1320 nodes = map(self.cfg.GetNodeInfo, nodenames)
1321 es_nodes = filter(lambda n: _IsExclusiveStorageEnabledNode(self.cfg, n),
1322 nodes)
1323 if es_nodes:
1324 # With exclusive storage we need to something smarter than just looking
1325 # at free space; for now, let's simply abort the operation.
1326 raise errors.OpPrereqError("Cannot grow disks when exclusive_storage"
1327 " is enabled", errors.ECODE_STATE)
1328 _CheckNodesFreeDiskPerVG(self, nodenames, req_vgspace)
1329
1330 def Exec(self, feedback_fn):
1331 """Execute disk grow.
1332
1333 """
1334 instance = self.instance
1335 disk = self.disk
1336
1337 assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1338 assert (self.owned_locks(locking.LEVEL_NODE) ==
1339 self.owned_locks(locking.LEVEL_NODE_RES))
1340
1341 wipe_disks = self.cfg.GetClusterInfo().prealloc_wipe_disks
1342
1343 disks_ok, _ = _AssembleInstanceDisks(self, self.instance, disks=[disk])
1344 if not disks_ok:
1345 raise errors.OpExecError("Cannot activate block device to grow")
1346
1347 feedback_fn("Growing disk %s of instance '%s' by %s to %s" %
1348 (self.op.disk, instance.name,
1349 utils.FormatUnit(self.delta, "h"),
1350 utils.FormatUnit(self.target, "h")))
1351
1352 # First run all grow ops in dry-run mode
1353 for node in instance.all_nodes:
1354 self.cfg.SetDiskID(disk, node)
1355 result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1356 True, True)
1357 result.Raise("Dry-run grow request failed to node %s" % node)
1358
1359 if wipe_disks:
1360 # Get disk size from primary node for wiping
1361 result = self.rpc.call_blockdev_getsize(instance.primary_node, [disk])
1362 result.Raise("Failed to retrieve disk size from node '%s'" %
1363 instance.primary_node)
1364
1365 (disk_size_in_bytes, ) = result.payload
1366
1367 if disk_size_in_bytes is None:
1368 raise errors.OpExecError("Failed to retrieve disk size from primary"
1369 " node '%s'" % instance.primary_node)
1370
1371 old_disk_size = _DiskSizeInBytesToMebibytes(self, disk_size_in_bytes)
1372
1373 assert old_disk_size >= disk.size, \
1374 ("Retrieved disk size too small (got %s, should be at least %s)" %
1375 (old_disk_size, disk.size))
1376 else:
1377 old_disk_size = None
1378
1379 # We know that (as far as we can test) operations across different
1380 # nodes will succeed, time to run it for real on the backing storage
1381 for node in instance.all_nodes:
1382 self.cfg.SetDiskID(disk, node)
1383 result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1384 False, True)
1385 result.Raise("Grow request failed to node %s" % node)
1386
1387 # And now execute it for logical storage, on the primary node
1388 node = instance.primary_node
1389 self.cfg.SetDiskID(disk, node)
1390 result = self.rpc.call_blockdev_grow(node, (disk, instance), self.delta,
1391 False, False)
1392 result.Raise("Grow request failed to node %s" % node)
1393
1394 disk.RecordGrow(self.delta)
1395 self.cfg.Update(instance, feedback_fn)
1396
1397 # Changes have been recorded, release node lock
1398 _ReleaseLocks(self, locking.LEVEL_NODE)
1399
1400 # Downgrade lock while waiting for sync
1401 self.glm.downgrade(locking.LEVEL_INSTANCE)
1402
1403 assert wipe_disks ^ (old_disk_size is None)
1404
1405 if wipe_disks:
1406 assert instance.disks[self.op.disk] == disk
1407
1408 # Wipe newly added disk space
1409 _WipeDisks(self, instance,
1410 disks=[(self.op.disk, disk, old_disk_size)])
1411
1412 if self.op.wait_for_sync:
1413 disk_abort = not _WaitForSync(self, instance, disks=[disk])
1414 if disk_abort:
1415 self.LogWarning("Disk syncing has not returned a good status; check"
1416 " the instance")
1417 if instance.admin_state != constants.ADMINST_UP:
1418 _SafeShutdownInstanceDisks(self, instance, disks=[disk])
1419 elif instance.admin_state != constants.ADMINST_UP:
1420 self.LogWarning("Not shutting down the disk even if the instance is"
1421 " not supposed to be running because no wait for"
1422 " sync mode was requested")
1423
1424 assert self.owned_locks(locking.LEVEL_NODE_RES)
1425 assert set([instance.name]) == self.owned_locks(locking.LEVEL_INSTANCE)
1426
1427
1428 class LUInstanceReplaceDisks(LogicalUnit):
1429 """Replace the disks of an instance.
1430
1431 """
1432 HPATH = "mirrors-replace"
1433 HTYPE = constants.HTYPE_INSTANCE
1434 REQ_BGL = False
1435
1436 def CheckArguments(self):
1437 """Check arguments.
1438
1439 """
1440 remote_node = self.op.remote_node
1441 ialloc = self.op.iallocator
1442 if self.op.mode == constants.REPLACE_DISK_CHG:
1443 if remote_node is None and ialloc is None:
1444 raise errors.OpPrereqError("When changing the secondary either an"
1445 " iallocator script must be used or the"
1446 " new node given", errors.ECODE_INVAL)
1447 else:
1448 _CheckIAllocatorOrNode(self, "iallocator", "remote_node")
1449
1450 elif remote_node is not None or ialloc is not None:
1451 # Not replacing the secondary
1452 raise errors.OpPrereqError("The iallocator and new node options can"
1453 " only be used when changing the"
1454 " secondary node", errors.ECODE_INVAL)
1455
1456 def ExpandNames(self):
1457 self._ExpandAndLockInstance()
1458
1459 assert locking.LEVEL_NODE not in self.needed_locks
1460 assert locking.LEVEL_NODE_RES not in self.needed_locks
1461 assert locking.LEVEL_NODEGROUP not in self.needed_locks
1462
1463 assert self.op.iallocator is None or self.op.remote_node is None, \
1464 "Conflicting options"
1465
1466 if self.op.remote_node is not None:
1467 self.op.remote_node = _ExpandNodeName(self.cfg, self.op.remote_node)
1468
1469 # Warning: do not remove the locking of the new secondary here
1470 # unless DRBD8.AddChildren is changed to work in parallel;
1471 # currently it doesn't since parallel invocations of
1472 # FindUnusedMinor will conflict
1473 self.needed_locks[locking.LEVEL_NODE] = [self.op.remote_node]
1474 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
1475 else:
1476 self.needed_locks[locking.LEVEL_NODE] = []
1477 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1478
1479 if self.op.iallocator is not None:
1480 # iallocator will select a new node in the same group
1481 self.needed_locks[locking.LEVEL_NODEGROUP] = []
1482 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1483
1484 self.needed_locks[locking.LEVEL_NODE_RES] = []
1485
1486 self.replacer = TLReplaceDisks(self, self.op.instance_name, self.op.mode,
1487 self.op.iallocator, self.op.remote_node,
1488 self.op.disks, self.op.early_release,
1489 self.op.ignore_ipolicy)
1490
1491 self.tasklets = [self.replacer]
1492
1493 def DeclareLocks(self, level):
1494 if level == locking.LEVEL_NODEGROUP:
1495 assert self.op.remote_node is None
1496 assert self.op.iallocator is not None
1497 assert not self.needed_locks[locking.LEVEL_NODEGROUP]
1498
1499 self.share_locks[locking.LEVEL_NODEGROUP] = 1
1500 # Lock all groups used by instance optimistically; this requires going
1501 # via the node before it's locked, requiring verification later on
1502 self.needed_locks[locking.LEVEL_NODEGROUP] = \
1503 self.cfg.GetInstanceNodeGroups(self.op.instance_name)
1504
1505 elif level == locking.LEVEL_NODE:
1506 if self.op.iallocator is not None:
1507 assert self.op.remote_node is None
1508 assert not self.needed_locks[locking.LEVEL_NODE]
1509 assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
1510
1511 # Lock member nodes of all locked groups
1512 self.needed_locks[locking.LEVEL_NODE] = \
1513 [node_name
1514 for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1515 for node_name in self.cfg.GetNodeGroup(group_uuid).members]
1516 else:
1517 assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1518
1519 self._LockInstancesNodes()
1520
1521 elif level == locking.LEVEL_NODE_RES:
1522 # Reuse node locks
1523 self.needed_locks[locking.LEVEL_NODE_RES] = \
1524 self.needed_locks[locking.LEVEL_NODE]
1525
1526 def BuildHooksEnv(self):
1527 """Build hooks env.
1528
1529 This runs on the master, the primary and all the secondaries.
1530
1531 """
1532 instance = self.replacer.instance
1533 env = {
1534 "MODE": self.op.mode,
1535 "NEW_SECONDARY": self.op.remote_node,
1536 "OLD_SECONDARY": instance.secondary_nodes[0],
1537 }
1538 env.update(_BuildInstanceHookEnvByObject(self, instance))
1539 return env
1540
1541 def BuildHooksNodes(self):
1542 """Build hooks nodes.
1543
1544 """
1545 instance = self.replacer.instance
1546 nl = [
1547 self.cfg.GetMasterNode(),
1548 instance.primary_node,
1549 ]
1550 if self.op.remote_node is not None:
1551 nl.append(self.op.remote_node)
1552 return nl, nl
1553
1554 def CheckPrereq(self):
1555 """Check prerequisites.
1556
1557 """
1558 assert (self.glm.is_owned(locking.LEVEL_NODEGROUP) or
1559 self.op.iallocator is None)
1560
1561 # Verify if node group locks are still correct
1562 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
1563 if owned_groups:
1564 _CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups)
1565
1566 return LogicalUnit.CheckPrereq(self)
1567
1568
1569 class LUInstanceActivateDisks(NoHooksLU):
1570 """Bring up an instance's disks.
1571
1572 """
1573 REQ_BGL = False
1574
1575 def ExpandNames(self):
1576 self._ExpandAndLockInstance()
1577 self.needed_locks[locking.LEVEL_NODE] = []
1578 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1579
1580 def DeclareLocks(self, level):
1581 if level == locking.LEVEL_NODE:
1582 self._LockInstancesNodes()
1583
1584 def CheckPrereq(self):
1585 """Check prerequisites.
1586
1587 This checks that the instance is in the cluster.
1588
1589 """
1590 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1591 assert self.instance is not None, \
1592 "Cannot retrieve locked instance %s" % self.op.instance_name
1593 _CheckNodeOnline(self, self.instance.primary_node)
1594
1595 def Exec(self, feedback_fn):
1596 """Activate the disks.
1597
1598 """
1599 disks_ok, disks_info = \
1600 _AssembleInstanceDisks(self, self.instance,
1601 ignore_size=self.op.ignore_size)
1602 if not disks_ok:
1603 raise errors.OpExecError("Cannot activate block devices")
1604
1605 if self.op.wait_for_sync:
1606 if not _WaitForSync(self, self.instance):
1607 raise errors.OpExecError("Some disks of the instance are degraded!")
1608
1609 return disks_info
1610
1611
1612 class LUInstanceDeactivateDisks(NoHooksLU):
1613 """Shutdown an instance's disks.
1614
1615 """
1616 REQ_BGL = False
1617
1618 def ExpandNames(self):
1619 self._ExpandAndLockInstance()
1620 self.needed_locks[locking.LEVEL_NODE] = []
1621 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1622
1623 def DeclareLocks(self, level):
1624 if level == locking.LEVEL_NODE:
1625 self._LockInstancesNodes()
1626
1627 def CheckPrereq(self):
1628 """Check prerequisites.
1629
1630 This checks that the instance is in the cluster.
1631
1632 """
1633 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1634 assert self.instance is not None, \
1635 "Cannot retrieve locked instance %s" % self.op.instance_name
1636
1637 def Exec(self, feedback_fn):
1638 """Deactivate the disks
1639
1640 """
1641 instance = self.instance
1642 if self.op.force:
1643 _ShutdownInstanceDisks(self, instance)
1644 else:
1645 _SafeShutdownInstanceDisks(self, instance)
1646
1647
1648 def _CheckDiskConsistencyInner(lu, instance, dev, node, on_primary,
1649 ldisk=False):
1650 """Check that mirrors are not degraded.
1651
1652 @attention: The device has to be annotated already.
1653
1654 The ldisk parameter, if True, will change the test from the
1655 is_degraded attribute (which represents overall non-ok status for
1656 the device(s)) to the ldisk (representing the local storage status).
1657
1658 """
1659 lu.cfg.SetDiskID(dev, node)
1660
1661 result = True
1662
1663 if on_primary or dev.AssembleOnSecondary():
1664 rstats = lu.rpc.call_blockdev_find(node, dev)
1665 msg = rstats.fail_msg
1666 if msg:
1667 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
1668 result = False
1669 elif not rstats.payload:
1670 lu.LogWarning("Can't find disk on node %s", node)
1671 result = False
1672 else:
1673 if ldisk:
1674 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
1675 else:
1676 result = result and not rstats.payload.is_degraded
1677
1678 if dev.children:
1679 for child in dev.children:
1680 result = result and _CheckDiskConsistencyInner(lu, instance, child, node,
1681 on_primary)
1682
1683 return result
1684
1685
1686 def _CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False):
1687 """Wrapper around L{_CheckDiskConsistencyInner}.
1688
1689 """
1690 (disk,) = _AnnotateDiskParams(instance, [dev], lu.cfg)
1691 return _CheckDiskConsistencyInner(lu, instance, disk, node, on_primary,
1692 ldisk=ldisk)
1693
1694
1695 def _BlockdevFind(lu, node, dev, instance):
1696 """Wrapper around call_blockdev_find to annotate diskparams.
1697
1698 @param lu: A reference to the lu object
1699 @param node: The node to call out
1700 @param dev: The device to find
1701 @param instance: The instance object the device belongs to
1702 @returns The result of the rpc call
1703
1704 """
1705 (disk,) = _AnnotateDiskParams(instance, [dev], lu.cfg)
1706 return lu.rpc.call_blockdev_find(node, disk)
1707
1708
1709 def _GenerateUniqueNames(lu, exts):
1710 """Generate a suitable LV name.
1711
1712 This will generate a logical volume name for the given instance.
1713
1714 """
1715 results = []
1716 for val in exts:
1717 new_id = lu.cfg.GenerateUniqueID(lu.proc.GetECId())
1718 results.append("%s%s" % (new_id, val))
1719 return results
1720
1721
1722 class TLReplaceDisks(Tasklet):
1723 """Replaces disks for an instance.
1724
1725 Note: Locking is not within the scope of this class.
1726
1727 """
1728 def __init__(self, lu, instance_name, mode, iallocator_name, remote_node,
1729 disks, early_release, ignore_ipolicy):
1730 """Initializes this class.
1731
1732 """
1733 Tasklet.__init__(self, lu)
1734
1735 # Parameters
1736 self.instance_name = instance_name
1737 self.mode = mode
1738 self.iallocator_name = iallocator_name
1739 self.remote_node = remote_node
1740 self.disks = disks
1741 self.early_release = early_release
1742 self.ignore_ipolicy = ignore_ipolicy
1743
1744 # Runtime data
1745 self.instance = None
1746 self.new_node = None
1747 self.target_node = None
1748 self.other_node = None
1749 self.remote_node_info = None
1750 self.node_secondary_ip = None
1751
1752 @staticmethod
1753 def _RunAllocator(lu, iallocator_name, instance_name, relocate_from):
1754 """Compute a new secondary node using an IAllocator.
1755
1756 """
1757 req = iallocator.IAReqRelocate(name=instance_name,
1758 relocate_from=list(relocate_from))
1759 ial = iallocator.IAllocator(lu.cfg, lu.rpc, req)
1760
1761 ial.Run(iallocator_name)
1762
1763 if not ial.success:
1764 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
1765 " %s" % (iallocator_name, ial.info),
1766 errors.ECODE_NORES)
1767
1768 remote_node_name = ial.result[0]
1769
1770 lu.LogInfo("Selected new secondary for instance '%s': %s",
1771 instance_name, remote_node_name)
1772
1773 return remote_node_name
1774
1775 def _FindFaultyDisks(self, node_name):
1776 """Wrapper for L{_FindFaultyInstanceDisks}.
1777
1778 """
1779 return _FindFaultyInstanceDisks(self.cfg, self.rpc, self.instance,
1780 node_name, True)
1781
1782 def _CheckDisksActivated(self, instance):
1783 """Checks if the instance disks are activated.
1784
1785 @param instance: The instance to check disks
1786 @return: True if they are activated, False otherwise
1787
1788 """
1789 nodes = instance.all_nodes
1790
1791 for idx, dev in enumerate(instance.disks):
1792 for node in nodes:
1793 self.lu.LogInfo("Checking disk/%d on %s", idx, node)
1794 self.cfg.SetDiskID(dev, node)
1795
1796 result = _BlockdevFind(self, node, dev, instance)
1797
1798 if result.offline:
1799 continue
1800 elif result.fail_msg or not result.payload:
1801 return False
1802
1803 return True
1804
1805 def CheckPrereq(self):
1806 """Check prerequisites.
1807
1808 This checks that the instance is in the cluster.
1809
1810 """
1811 self.instance = instance = self.cfg.GetInstanceInfo(self.instance_name)
1812 assert instance is not None, \
1813 "Cannot retrieve locked instance %s" % self.instance_name
1814
1815 if instance.disk_template != constants.DT_DRBD8:
1816 raise errors.OpPrereqError("Can only run replace disks for DRBD8-based"
1817 " instances", errors.ECODE_INVAL)
1818
1819 if len(instance.secondary_nodes) != 1:
1820 raise errors.OpPrereqError("The instance has a strange layout,"
1821 " expected one secondary but found %d" %
1822 len(instance.secondary_nodes),
1823 errors.ECODE_FAULT)
1824
1825 instance = self.instance
1826 secondary_node = instance.secondary_nodes[0]
1827
1828 if self.iallocator_name is None:
1829 remote_node = self.remote_node
1830 else:
1831 remote_node = self._RunAllocator(self.lu, self.iallocator_name,
1832 instance.name, instance.secondary_nodes)
1833
1834 if remote_node is None:
1835 self.remote_node_info = None
1836 else:
1837 assert remote_node in self.lu.owned_locks(locking.LEVEL_NODE), \
1838 "Remote node '%s' is not locked" % remote_node
1839
1840 self.remote_node_info = self.cfg.GetNodeInfo(remote_node)
1841 assert self.remote_node_info is not None, \
1842 "Cannot retrieve locked node %s" % remote_node
1843
1844 if remote_node == self.instance.primary_node:
1845 raise errors.OpPrereqError("The specified node is the primary node of"
1846 " the instance", errors.ECODE_INVAL)
1847
1848 if remote_node == secondary_node:
1849 raise errors.OpPrereqError("The specified node is already the"
1850 " secondary node of the instance",
1851 errors.ECODE_INVAL)
1852
1853 if self.disks and self.mode in (constants.REPLACE_DISK_AUTO,
1854 constants.REPLACE_DISK_CHG):
1855 raise errors.OpPrereqError("Cannot specify disks to be replaced",
1856 errors.ECODE_INVAL)
1857
1858 if self.mode == constants.REPLACE_DISK_AUTO:
1859 if not self._CheckDisksActivated(instance):
1860 raise errors.OpPrereqError("Please run activate-disks on instance %s"
1861 " first" % self.instance_name,
1862 errors.ECODE_STATE)
1863 faulty_primary = self._FindFaultyDisks(instance.primary_node)
1864 faulty_secondary = self._FindFaultyDisks(secondary_node)
1865
1866 if faulty_primary and faulty_secondary:
1867 raise errors.OpPrereqError("Instance %s has faulty disks on more than"
1868 " one node and can not be repaired"
1869 " automatically" % self.instance_name,
1870 errors.ECODE_STATE)
1871
1872 if faulty_primary:
1873 self.disks = faulty_primary
1874 self.target_node = instance.primary_node
1875 self.other_node = secondary_node
1876 check_nodes = [self.target_node, self.other_node]
1877 elif faulty_secondary:
1878 self.disks = faulty_secondary
1879 self.target_node = secondary_node
1880 self.other_node = instance.primary_node
1881 check_nodes = [self.target_node, self.other_node]
1882 else:
1883 self.disks = []
1884 check_nodes = []
1885
1886 else:
1887 # Non-automatic modes
1888 if self.mode == constants.REPLACE_DISK_PRI:
1889 self.target_node = instance.primary_node
1890 self.other_node = secondary_node
1891 check_nodes = [self.target_node, self.other_node]
1892
1893 elif self.mode == constants.REPLACE_DISK_SEC:
1894 self.target_node = secondary_node
1895 self.other_node = instance.primary_node
1896 check_nodes = [self.target_node, self.other_node]
1897
1898 elif self.mode == constants.REPLACE_DISK_CHG:
1899 self.new_node = remote_node
1900 self.other_node = instance.primary_node
1901 self.target_node = secondary_node
1902 check_nodes = [self.new_node, self.other_node]
1903
1904 _CheckNodeNotDrained(self.lu, remote_node)
1905 _CheckNodeVmCapable(self.lu, remote_node)
1906
1907 old_node_info = self.cfg.GetNodeInfo(secondary_node)
1908 assert old_node_info is not None
1909 if old_node_info.offline and not self.early_release:
1910 # doesn't make sense to delay the release
1911 self.early_release = True
1912 self.lu.LogInfo("Old secondary %s is offline, automatically enabling"
1913 " early-release mode", secondary_node)
1914
1915 else:
1916 raise errors.ProgrammerError("Unhandled disk replace mode (%s)" %
1917 self.mode)
1918
1919 # If not specified all disks should be replaced
1920 if not self.disks:
1921 self.disks = range(len(self.instance.disks))
1922
1923 # TODO: This is ugly, but right now we can't distinguish between internal
1924 # submitted opcode and external one. We should fix that.
1925 if self.remote_node_info:
1926 # We change the node, lets verify it still meets instance policy
1927 new_group_info = self.cfg.GetNodeGroup(self.remote_node_info.group)
1928 cluster = self.cfg.GetClusterInfo()
1929 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1930 new_group_info)
1931 _CheckTargetNodeIPolicy(self, ipolicy, instance, self.remote_node_info,
1932 self.cfg, ignore=self.ignore_ipolicy)
1933
1934 for node in check_nodes:
1935 _CheckNodeOnline(self.lu, node)
1936
1937 touched_nodes = frozenset(node_name for node_name in [self.new_node,
1938 self.other_node,
1939 self.target_node]
1940 if node_name is not None)
1941
1942 # Release unneeded node and node resource locks
1943 _ReleaseLocks(self.lu, locking.LEVEL_NODE, keep=touched_nodes)
1944 _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES, keep=touched_nodes)
1945 _ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
1946
1947 # Release any owned node group
1948 _ReleaseLocks(self.lu, locking.LEVEL_NODEGROUP)
1949
1950 # Check whether disks are valid
1951 for disk_idx in self.disks:
1952 instance.FindDisk(disk_idx)
1953
1954 # Get secondary node IP addresses
1955 self.node_secondary_ip = dict((name, node.secondary_ip) for (name, node)
1956 in self.cfg.GetMultiNodeInfo(touched_nodes))
1957
1958 def Exec(self, feedback_fn):
1959 """Execute disk replacement.
1960
1961 This dispatches the disk replacement to the appropriate handler.
1962
1963 """
1964 if __debug__:
1965 # Verify owned locks before starting operation
1966 owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE)
1967 assert set(owned_nodes) == set(self.node_secondary_ip), \
1968 ("Incorrect node locks, owning %s, expected %s" %
1969 (owned_nodes, self.node_secondary_ip.keys()))
1970 assert (self.lu.owned_locks(locking.LEVEL_NODE) ==
1971 self.lu.owned_locks(locking.LEVEL_NODE_RES))
1972 assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
1973
1974 owned_instances = self.lu.owned_locks(locking.LEVEL_INSTANCE)
1975 assert list(owned_instances) == [self.instance_name], \
1976 "Instance '%s' not locked" % self.instance_name
1977
1978 assert not self.lu.glm.is_owned(locking.LEVEL_NODEGROUP), \
1979 "Should not own any node group lock at this point"
1980
1981 if not self.disks:
1982 feedback_fn("No disks need replacement for instance '%s'" %
1983 self.instance.name)
1984 return
1985
1986 feedback_fn("Replacing disk(s) %s for instance '%s'" %
1987 (utils.CommaJoin(self.disks), self.instance.name))
1988 feedback_fn("Current primary node: %s" % self.instance.primary_node)
1989 feedback_fn("Current seconary node: %s" %
1990 utils.CommaJoin(self.instance.secondary_nodes))
1991
1992 activate_disks = (self.instance.admin_state != constants.ADMINST_UP)
1993
1994 # Activate the instance disks if we're replacing them on a down instance
1995 if activate_disks:
1996 _StartInstanceDisks(self.lu, self.instance, True)
1997
1998 try:
1999 # Should we replace the secondary node?
2000 if self.new_node is not None:
2001 fn = self._ExecDrbd8Secondary
2002 else:
2003 fn = self._ExecDrbd8DiskOnly
2004
2005 result = fn(feedback_fn)
2006 finally:
2007 # Deactivate the instance disks if we're replacing them on a
2008 # down instance
2009 if activate_disks:
2010 _SafeShutdownInstanceDisks(self.lu, self.instance)
2011
2012 assert not self.lu.owned_locks(locking.LEVEL_NODE)
2013
2014 if __debug__:
2015 # Verify owned locks
2016 owned_nodes = self.lu.owned_locks(locking.LEVEL_NODE_RES)
2017 nodes = frozenset(self.node_secondary_ip)
2018 assert ((self.early_release and not owned_nodes) or
2019 (not self.early_release and not (set(owned_nodes) - nodes))), \
2020 ("Not owning the correct locks, early_release=%s, owned=%r,"
2021 " nodes=%r" % (self.early_release, owned_nodes, nodes))
2022
2023 return result
2024
2025 def _CheckVolumeGroup(self, nodes):
2026 self.lu.LogInfo("Checking volume groups")
2027
2028 vgname = self.cfg.GetVGName()
2029
2030 # Make sure volume group exists on all involved nodes
2031 results = self.rpc.call_vg_list(nodes)
2032 if not results:
2033 raise errors.OpExecError("Can't list volume groups on the nodes")
2034
2035 for node in nodes:
2036 res = results[node]
2037 res.Raise("Error checking node %s" % node)
2038 if vgname not in res.payload:
2039 raise errors.OpExecError("Volume group '%s' not found on node %s" %
2040 (vgname, node))
2041
2042 def _CheckDisksExistence(self, nodes):
2043 # Check disk existence
2044 for idx, dev in enumerate(self.instance.disks):
2045 if idx not in self.disks:
2046 continue
2047
2048 for node in nodes:
2049 self.lu.LogInfo("Checking disk/%d on %s", idx, node)
2050 self.cfg.SetDiskID(dev, node)
2051
2052 result = _BlockdevFind(self, node, dev, self.instance)
2053
2054 msg = result.fail_msg
2055 if msg or not result.payload:
2056 if not msg:
2057 msg = "disk not found"
2058 raise errors.OpExecError("Can't find disk/%d on node %s: %s" %
2059 (idx, node, msg))
2060
2061 def _CheckDisksConsistency(self, node_name, on_primary, ldisk):
2062 for idx, dev in enumerate(self.instance.disks):
2063 if idx not in self.disks:
2064 continue
2065
2066 self.lu.LogInfo("Checking disk/%d consistency on node %s" %
2067 (idx, node_name))
2068
2069 if not _CheckDiskConsistency(self.lu, self.instance, dev, node_name,
2070 on_primary, ldisk=ldisk):
2071 raise errors.OpExecError("Node %s has degraded storage, unsafe to"
2072 " replace disks for instance %s" %
2073 (node_name, self.instance.name))
2074
2075 def _CreateNewStorage(self, node_name):
2076 """Create new storage on the primary or secondary node.
2077
2078 This is only used for same-node replaces, not for changing the
2079 secondary node, hence we don't want to modify the existing disk.
2080
2081 """
2082 iv_names = {}
2083
2084 disks = _AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2085 for idx, dev in enumerate(disks):
2086 if idx not in self.disks:
2087 continue
2088
2089 self.lu.LogInfo("Adding storage on %s for disk/%d", node_name, idx)
2090
2091 self.cfg.SetDiskID(dev, node_name)
2092
2093 lv_names = [".disk%d_%s" % (idx, suffix) for suffix in ["data", "meta"]]
2094 names = _GenerateUniqueNames(self.lu, lv_names)
2095
2096 (data_disk, meta_disk) = dev.children
2097 vg_data = data_disk.logical_id[0]
2098 lv_data = objects.Disk(dev_type=constants.LD_LV, size=dev.size,
2099 logical_id=(vg_data, names[0]),
2100 params=data_disk.params)
2101 vg_meta = meta_disk.logical_id[0]
2102 lv_meta = objects.Disk(dev_type=constants.LD_LV,
2103 size=constants.DRBD_META_SIZE,
2104 logical_id=(vg_meta, names[1]),
2105 params=meta_disk.params)
2106
2107 new_lvs = [lv_data, lv_meta]
2108 old_lvs = [child.Copy() for child in dev.children]
2109 iv_names[dev.iv_name] = (dev, old_lvs, new_lvs)
2110 excl_stor = _IsExclusiveStorageEnabledNodeName(self.lu.cfg, node_name)
2111
2112 # we pass force_create=True to force the LVM creation
2113 for new_lv in new_lvs:
2114 _CreateBlockDevInner(self.lu, node_name, self.instance, new_lv, True,
2115 _GetInstanceInfoText(self.instance), False,
2116 excl_stor)
2117
2118 return iv_names
2119
2120 def _CheckDevices(self, node_name, iv_names):
2121 for name, (dev, _, _) in iv_names.iteritems():
2122 self.cfg.SetDiskID(dev, node_name)
2123
2124 result = _BlockdevFind(self, node_name, dev, self.instance)
2125
2126 msg = result.fail_msg
2127 if msg or not result.payload:
2128 if not msg:
2129 msg = "disk not found"
2130 raise errors.OpExecError("Can't find DRBD device %s: %s" %
2131 (name, msg))
2132
2133 if result.payload.is_degraded:
2134 raise errors.OpExecError("DRBD device %s is degraded!" % name)
2135
2136 def _RemoveOldStorage(self, node_name, iv_names):
2137 for name, (_, old_lvs, _) in iv_names.iteritems():
2138 self.lu.LogInfo("Remove logical volumes for %s", name)
2139
2140 for lv in old_lvs:
2141 self.cfg.SetDiskID(lv, node_name)
2142
2143 msg = self.rpc.call_blockdev_remove(node_name, lv).fail_msg
2144 if msg:
2145 self.lu.LogWarning("Can't remove old LV: %s", msg,
2146 hint="remove unused LVs manually")
2147
2148 def _ExecDrbd8DiskOnly(self, feedback_fn): # pylint: disable=W0613
2149 """Replace a disk on the primary or secondary for DRBD 8.
2150
2151 The algorithm for replace is quite complicated:
2152
2153 1. for each disk to be replaced:
2154
2155 1. create new LVs on the target node with unique names
2156 1. detach old LVs from the drbd device
2157 1. rename old LVs to name_replaced.<time_t>
2158 1. rename new LVs to old LVs
2159 1. attach the new LVs (with the old names now) to the drbd device
2160
2161 1. wait for sync across all devices
2162
2163 1. for each modified disk:
2164
2165 1. remove old LVs (which have the name name_replaces.<time_t>)
2166
2167 Failures are not very well handled.
2168
2169 """
2170 steps_total = 6
2171
2172 # Step: check device activation
2173 self.lu.LogStep(1, steps_total, "Check device existence")
2174 self._CheckDisksExistence([self.other_node, self.target_node])
2175 self._CheckVolumeGroup([self.target_node, self.other_node])
2176
2177 # Step: check other node consistency
2178 self.lu.LogStep(2, steps_total, "Check peer consistency")
2179 self._CheckDisksConsistency(self.other_node,
2180 self.other_node == self.instance.primary_node,
2181 False)
2182
2183 # Step: create new storage
2184 self.lu.LogStep(3, steps_total, "Allocate new storage")
2185 iv_names = self._CreateNewStorage(self.target_node)
2186
2187 # Step: for each lv, detach+rename*2+attach
2188 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2189 for dev, old_lvs, new_lvs in iv_names.itervalues():
2190 self.lu.LogInfo("Detaching %s drbd from local storage", dev.iv_name)
2191
2192 result = self.rpc.call_blockdev_removechildren(self.target_node, dev,
2193 old_lvs)
2194 result.Raise("Can't detach drbd from local storage on node"
2195 " %s for device %s" % (self.target_node, dev.iv_name))
2196 #dev.children = []
2197 #cfg.Update(instance)
2198
2199 # ok, we created the new LVs, so now we know we have the needed
2200 # storage; as such, we proceed on the target node to rename
2201 # old_lv to _old, and new_lv to old_lv; note that we rename LVs
2202 # using the assumption that logical_id == physical_id (which in
2203 # turn is the unique_id on that node)
2204
2205 # FIXME(iustin): use a better name for the replaced LVs
2206 temp_suffix = int(time.time())
2207 ren_fn = lambda d, suff: (d.physical_id[0],
2208 d.physical_id[1] + "_replaced-%s" % suff)
2209
2210 # Build the rename list based on what LVs exist on the node
2211 rename_old_to_new = []
2212 for to_ren in old_lvs:
2213 result = self.rpc.call_blockdev_find(self.target_node, to_ren)
2214 if not result.fail_msg and result.payload:
2215 # device exists
2216 rename_old_to_new.append((to_ren, ren_fn(to_ren, temp_suffix)))
2217
2218 self.lu.LogInfo("Renaming the old LVs on the target node")
2219 result = self.rpc.call_blockdev_rename(self.target_node,
2220 rename_old_to_new)
2221 result.Raise("Can't rename old LVs on node %s" % self.target_node)
2222
2223 # Now we rename the new LVs to the old LVs
2224 self.lu.LogInfo("Renaming the new LVs on the target node")
2225 rename_new_to_old = [(new, old.physical_id)
2226 for old, new in zip(old_lvs, new_lvs)]
2227 result = self.rpc.call_blockdev_rename(self.target_node,
2228 rename_new_to_old)
2229 result.Raise("Can't rename new LVs on node %s" % self.target_node)
2230
2231 # Intermediate steps of in memory modifications
2232 for old, new in zip(old_lvs, new_lvs):
2233 new.logical_id = old.logical_id
2234 self.cfg.SetDiskID(new, self.target_node)
2235
2236 # We need to modify old_lvs so that removal later removes the
2237 # right LVs, not the newly added ones; note that old_lvs is a
2238 # copy here
2239 for disk in old_lvs:
2240 disk.logical_id = ren_fn(disk, temp_suffix)
2241 self.cfg.SetDiskID(disk, self.target_node)
2242
2243 # Now that the new lvs have the old name, we can add them to the device
2244 self.lu.LogInfo("Adding new mirror component on %s", self.target_node)
2245 result = self.rpc.call_blockdev_addchildren(self.target_node,
2246 (dev, self.instance), new_lvs)
2247 msg = result.fail_msg
2248 if msg:
2249 for new_lv in new_lvs:
2250 msg2 = self.rpc.call_blockdev_remove(self.target_node,
2251 new_lv).fail_msg
2252 if msg2:
2253 self.lu.LogWarning("Can't rollback device %s: %s", dev, msg2,
2254 hint=("cleanup manually the unused logical"
2255 "volumes"))
2256 raise errors.OpExecError("Can't add local storage to drbd: %s" % msg)
2257
2258 cstep = itertools.count(5)
2259
2260 if self.early_release:
2261 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2262 self._RemoveOldStorage(self.target_node, iv_names)
2263 # TODO: Check if releasing locks early still makes sense
2264 _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2265 else:
2266 # Release all resource locks except those used by the instance
2267 _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2268 keep=self.node_secondary_ip.keys())
2269
2270 # Release all node locks while waiting for sync
2271 _ReleaseLocks(self.lu, locking.LEVEL_NODE)
2272
2273 # TODO: Can the instance lock be downgraded here? Take the optional disk
2274 # shutdown in the caller into consideration.
2275
2276 # Wait for sync
2277 # This can fail as the old devices are degraded and _WaitForSync
2278 # does a combined result over all disks, so we don't check its return value
2279 self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2280 _WaitForSync(self.lu, self.instance)
2281
2282 # Check all devices manually
2283 self._CheckDevices(self.instance.primary_node, iv_names)
2284
2285 # Step: remove old storage
2286 if not self.early_release:
2287 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2288 self._RemoveOldStorage(self.target_node, iv_names)
2289
2290 def _ExecDrbd8Secondary(self, feedback_fn):
2291 """Replace the secondary node for DRBD 8.
2292
2293 The algorithm for replace is quite complicated:
2294 - for all disks of the instance:
2295 - create new LVs on the new node with same names
2296 - shutdown the drbd device on the old secondary
2297 - disconnect the drbd network on the primary
2298 - create the drbd device on the new secondary
2299 - network attach the drbd on the primary, using an artifice:
2300 the drbd code for Attach() will connect to the network if it
2301 finds a device which is connected to the good local disks but
2302 not network enabled
2303 - wait for sync across all devices
2304 - remove all disks from the old secondary
2305
2306 Failures are not very well handled.
2307
2308 """
2309 steps_total = 6
2310
2311 pnode = self.instance.primary_node
2312
2313 # Step: check device activation
2314 self.lu.LogStep(1, steps_total, "Check device existence")
2315 self._CheckDisksExistence([self.instance.primary_node])
2316 self._CheckVolumeGroup([self.instance.primary_node])
2317
2318 # Step: check other node consistency
2319 self.lu.LogStep(2, steps_total, "Check peer consistency")
2320 self._CheckDisksConsistency(self.instance.primary_node, True, True)
2321
2322 # Step: create new storage
2323 self.lu.LogStep(3, steps_total, "Allocate new storage")
2324 disks = _AnnotateDiskParams(self.instance, self.instance.disks, self.cfg)
2325 excl_stor = _IsExclusiveStorageEnabledNodeName(self.lu.cfg, self.new_node)
2326 for idx, dev in enumerate(disks):
2327 self.lu.LogInfo("Adding new local storage on %s for disk/%d" %
2328 (self.new_node, idx))
2329 # we pass force_create=True to force LVM creation
2330 for new_lv in dev.children:
2331 _CreateBlockDevInner(self.lu, self.new_node, self.instance, new_lv,
2332 True, _GetInstanceInfoText(self.instance), False,
2333 excl_stor)
2334
2335 # Step 4: dbrd minors and drbd setups changes
2336 # after this, we must manually remove the drbd minors on both the
2337 # error and the success paths
2338 self.lu.LogStep(4, steps_total, "Changing drbd configuration")
2339 minors = self.cfg.AllocateDRBDMinor([self.new_node
2340 for dev in self.instance.disks],
2341 self.instance.name)
2342 logging.debug("Allocated minors %r", minors)
2343
2344 iv_names = {}
2345 for idx, (dev, new_minor) in enumerate(zip(self.instance.disks, minors)):
2346 self.lu.LogInfo("activating a new drbd on %s for disk/%d" %
2347 (self.new_node, idx))
2348 # create new devices on new_node; note that we create two IDs:
2349 # one without port, so the drbd will be activated without
2350 # networking information on the new node at this stage, and one
2351 # with network, for the latter activation in step 4
2352 (o_node1, o_node2, o_port, o_minor1, o_minor2, o_secret) = dev.logical_id
2353 if self.instance.primary_node == o_node1:
2354 p_minor = o_minor1
2355 else:
2356 assert self.instance.primary_node == o_node2, "Three-node instance?"
2357 p_minor = o_minor2
2358
2359 new_alone_id = (self.instance.primary_node, self.new_node, None,
2360 p_minor, new_minor, o_secret)
2361 new_net_id = (self.instance.primary_node, self.new_node, o_port,
2362 p_minor, new_minor, o_secret)
2363
2364 iv_names[idx] = (dev, dev.children, new_net_id)
2365 logging.debug("Allocated new_minor: %s, new_logical_id: %s", new_minor,
2366 new_net_id)
2367 new_drbd = objects.Disk(dev_type=constants.LD_DRBD8,
2368 logical_id=new_alone_id,
2369 children=dev.children,
2370 size=dev.size,
2371 params={})
2372 (anno_new_drbd,) = _AnnotateDiskParams(self.instance, [new_drbd],
2373 self.cfg)
2374 try:
2375 _CreateSingleBlockDev(self.lu, self.new_node, self.instance,
2376 anno_new_drbd,
2377 _GetInstanceInfoText(self.instance), False,
2378 excl_stor)
2379 except errors.GenericError:
2380 self.cfg.ReleaseDRBDMinors(self.instance.name)
2381 raise
2382
2383 # We have new devices, shutdown the drbd on the old secondary
2384 for idx, dev in enumerate(self.instance.disks):
2385 self.lu.LogInfo("Shutting down drbd for disk/%d on old node", idx)
2386 self.cfg.SetDiskID(dev, self.target_node)
2387 msg = self.rpc.call_blockdev_shutdown(self.target_node,
2388 (dev, self.instance)).fail_msg
2389 if msg:
2390 self.lu.LogWarning("Failed to shutdown drbd for disk/%d on old"
2391 "node: %s" % (idx, msg),
2392 hint=("Please cleanup this device manually as"
2393 " soon as possible"))
2394
2395 self.lu.LogInfo("Detaching primary drbds from the network (=> standalone)")
2396 result = self.rpc.call_drbd_disconnect_net([pnode], self.node_secondary_ip,
2397 self.instance.disks)[pnode]
2398
2399 msg = result.fail_msg
2400 if msg:
2401 # detaches didn't succeed (unlikely)
2402 self.cfg.ReleaseDRBDMinors(self.instance.name)
2403 raise errors.OpExecError("Can't detach the disks from the network on"
2404 " old node: %s" % (msg,))
2405
2406 # if we managed to detach at least one, we update all the disks of
2407 # the instance to point to the new secondary
2408 self.lu.LogInfo("Updating instance configuration")
2409 for dev, _, new_logical_id in iv_names.itervalues():
2410 dev.logical_id = new_logical_id
2411 self.cfg.SetDiskID(dev, self.instance.primary_node)
2412
2413 self.cfg.Update(self.instance, feedback_fn)
2414
2415 # Release all node locks (the configuration has been updated)
2416 _ReleaseLocks(self.lu, locking.LEVEL_NODE)
2417
2418 # and now perform the drbd attach
2419 self.lu.LogInfo("Attaching primary drbds to new secondary"
2420 " (standalone => connected)")
2421 result = self.rpc.call_drbd_attach_net([self.instance.primary_node,
2422 self.new_node],
2423 self.node_secondary_ip,
2424 (self.instance.disks, self.instance),
2425 self.instance.name,
2426 False)
2427 for to_node, to_result in result.items():
2428 msg = to_result.fail_msg
2429 if msg:
2430 self.lu.LogWarning("Can't attach drbd disks on node %s: %s",
2431 to_node, msg,
2432 hint=("please do a gnt-instance info to see the"
2433 " status of disks"))
2434
2435 cstep = itertools.count(5)
2436
2437 if self.early_release:
2438 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2439 self._RemoveOldStorage(self.target_node, iv_names)
2440 # TODO: Check if releasing locks early still makes sense
2441 _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES)
2442 else:
2443 # Release all resource locks except those used by the instance
2444 _ReleaseLocks(self.lu, locking.LEVEL_NODE_RES,
2445 keep=self.node_secondary_ip.keys())
2446
2447 # TODO: Can the instance lock be downgraded here? Take the optional disk
2448 # shutdown in the caller into consideration.
2449
2450 # Wait for sync
2451 # This can fail as the old devices are degraded and _WaitForSync
2452 # does a combined result over all disks, so we don't check its return value
2453 self.lu.LogStep(cstep.next(), steps_total, "Sync devices")
2454 _WaitForSync(self.lu, self.instance)
2455
2456 # Check all devices manually
2457 self._CheckDevices(self.instance.primary_node, iv_names)
2458
2459 # Step: remove old storage
2460 if not self.early_release:
2461 self.lu.LogStep(cstep.next(), steps_total, "Removing old storage")
2462 self._RemoveOldStorage(self.target_node, iv_names)