f37230ddebf2088c1a9f42981b686f7b1e3838ed
[ganeti-github.git] / lib / cmdlib / __init__.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable=W0201,C0302
25
26 # W0201 since most LU attributes are defined in CheckPrereq or similar
27 # functions
28
29 # C0302: since we have waaaay too many lines in this module
30
31 import os
32 import time
33 import logging
34 import copy
35 import OpenSSL
36 import itertools
37 import operator
38
39 from ganeti import utils
40 from ganeti import errors
41 from ganeti import hypervisor
42 from ganeti import locking
43 from ganeti import constants
44 from ganeti import objects
45 from ganeti import compat
46 from ganeti import masterd
47 from ganeti import netutils
48 from ganeti import query
49 from ganeti import qlang
50 from ganeti import opcodes
51 from ganeti import ht
52 from ganeti import rpc
53 from ganeti import pathutils
54 from ganeti import network
55 from ganeti.masterd import iallocator
56
57 from ganeti.cmdlib.base import ResultWithJobs, LogicalUnit, NoHooksLU, \
58 Tasklet, _QueryBase
59 from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_ONLINE, \
60 INSTANCE_NOT_RUNNING, CAN_CHANGE_INSTANCE_OFFLINE, \
61 _ExpandInstanceName, _ExpandItemName, \
62 _ExpandNodeName, _ShareAll, _CheckNodeGroupInstances, _GetWantedNodes, \
63 _GetWantedInstances, _RunPostHook, _RedistributeAncillaryFiles, \
64 _MergeAndVerifyHvState, _MergeAndVerifyDiskState, _GetUpdatedIPolicy, \
65 _ComputeNewInstanceViolations, _GetUpdatedParams, _CheckOSParams, \
66 _CheckHVParams, _AdjustCandidatePool, _CheckNodePVs, \
67 _ComputeIPolicyInstanceViolation, _AnnotateDiskParams, _SupportsOob, \
68 _ComputeIPolicySpecViolation, _GetDefaultIAllocator, \
69 _CheckInstancesNodeGroups, _LoadNodeEvacResult, _MapInstanceDisksToNodes, \
70 _CheckInstanceNodeGroups, _CheckParamsNotGlobal, \
71 _IsExclusiveStorageEnabledNode, _CheckInstanceState, \
72 _CheckIAllocatorOrNode, _FindFaultyInstanceDisks
73
74 from ganeti.cmdlib.cluster import LUClusterActivateMasterIp, \
75 LUClusterDeactivateMasterIp, LUClusterConfigQuery, LUClusterDestroy, \
76 LUClusterPostInit, _ClusterQuery, LUClusterQuery, LUClusterRedistConf, \
77 LUClusterRename, LUClusterRepairDiskSizes, LUClusterSetParams, \
78 LUClusterVerify, LUClusterVerifyConfig, LUClusterVerifyGroup, \
79 LUClusterVerifyDisks
80 from ganeti.cmdlib.group import LUGroupAdd, LUGroupAssignNodes, \
81 _GroupQuery, LUGroupQuery, LUGroupSetParams, LUGroupRemove, \
82 LUGroupRename, LUGroupEvacuate, LUGroupVerifyDisks
83 from ganeti.cmdlib.node import LUNodeAdd, LUNodeSetParams, \
84 LUNodePowercycle, LUNodeEvacuate, LUNodeMigrate, LUNodeModifyStorage, \
85 _NodeQuery, LUNodeQuery, LUNodeQueryvols, LUNodeQueryStorage, \
86 LUNodeRemove, LURepairNodeStorage
87 from ganeti.cmdlib.tags import LUTagsGet, LUTagsSearch, LUTagsSet, LUTagsDel
88 from ganeti.cmdlib.network import LUNetworkAdd, LUNetworkRemove, \
89 LUNetworkSetParams, _NetworkQuery, LUNetworkQuery, LUNetworkConnect, \
90 LUNetworkDisconnect
91 from ganeti.cmdlib.test import LUTestDelay, LUTestJqueue, LUTestAllocator
92
93 import ganeti.masterd.instance # pylint: disable=W0611
94
95
96 def _IsExclusiveStorageEnabledNodeName(cfg, nodename):
97 """Whether exclusive_storage is in effect for the given node.
98
99 @type cfg: L{config.ConfigWriter}
100 @param cfg: The cluster configuration
101 @type nodename: string
102 @param nodename: The node
103 @rtype: bool
104 @return: The effective value of exclusive_storage
105 @raise errors.OpPrereqError: if no node exists with the given name
106
107 """
108 ni = cfg.GetNodeInfo(nodename)
109 if ni is None:
110 raise errors.OpPrereqError("Invalid node name %s" % nodename,
111 errors.ECODE_NOENT)
112 return _IsExclusiveStorageEnabledNode(cfg, ni)
113
114
115 def _CopyLockList(names):
116 """Makes a copy of a list of lock names.
117
118 Handles L{locking.ALL_SET} correctly.
119
120 """
121 if names == locking.ALL_SET:
122 return locking.ALL_SET
123 else:
124 return names[:]
125
126
127 def _ReleaseLocks(lu, level, names=None, keep=None):
128 """Releases locks owned by an LU.
129
130 @type lu: L{LogicalUnit}
131 @param level: Lock level
132 @type names: list or None
133 @param names: Names of locks to release
134 @type keep: list or None
135 @param keep: Names of locks to retain
136
137 """
138 assert not (keep is not None and names is not None), \
139 "Only one of the 'names' and the 'keep' parameters can be given"
140
141 if names is not None:
142 should_release = names.__contains__
143 elif keep:
144 should_release = lambda name: name not in keep
145 else:
146 should_release = None
147
148 owned = lu.owned_locks(level)
149 if not owned:
150 # Not owning any lock at this level, do nothing
151 pass
152
153 elif should_release:
154 retain = []
155 release = []
156
157 # Determine which locks to release
158 for name in owned:
159 if should_release(name):
160 release.append(name)
161 else:
162 retain.append(name)
163
164 assert len(lu.owned_locks(level)) == (len(retain) + len(release))
165
166 # Release just some locks
167 lu.glm.release(level, names=release)
168
169 assert frozenset(lu.owned_locks(level)) == frozenset(retain)
170 else:
171 # Release everything
172 lu.glm.release(level)
173
174 assert not lu.glm.is_owned(level), "No locks should be owned"
175
176
177 def _CheckNodeOnline(lu, node, msg=None):
178 """Ensure that a given node is online.
179
180 @param lu: the LU on behalf of which we make the check
181 @param node: the node to check
182 @param msg: if passed, should be a message to replace the default one
183 @raise errors.OpPrereqError: if the node is offline
184
185 """
186 if msg is None:
187 msg = "Can't use offline node"
188 if lu.cfg.GetNodeInfo(node).offline:
189 raise errors.OpPrereqError("%s: %s" % (msg, node), errors.ECODE_STATE)
190
191
192 def _CheckNodeNotDrained(lu, node):
193 """Ensure that a given node is not drained.
194
195 @param lu: the LU on behalf of which we make the check
196 @param node: the node to check
197 @raise errors.OpPrereqError: if the node is drained
198
199 """
200 if lu.cfg.GetNodeInfo(node).drained:
201 raise errors.OpPrereqError("Can't use drained node %s" % node,
202 errors.ECODE_STATE)
203
204
205 def _CheckNodeVmCapable(lu, node):
206 """Ensure that a given node is vm capable.
207
208 @param lu: the LU on behalf of which we make the check
209 @param node: the node to check
210 @raise errors.OpPrereqError: if the node is not vm capable
211
212 """
213 if not lu.cfg.GetNodeInfo(node).vm_capable:
214 raise errors.OpPrereqError("Can't use non-vm_capable node %s" % node,
215 errors.ECODE_STATE)
216
217
218 def _CheckNodeHasOS(lu, node, os_name, force_variant):
219 """Ensure that a node supports a given OS.
220
221 @param lu: the LU on behalf of which we make the check
222 @param node: the node to check
223 @param os_name: the OS to query about
224 @param force_variant: whether to ignore variant errors
225 @raise errors.OpPrereqError: if the node is not supporting the OS
226
227 """
228 result = lu.rpc.call_os_get(node, os_name)
229 result.Raise("OS '%s' not in supported OS list for node %s" %
230 (os_name, node),
231 prereq=True, ecode=errors.ECODE_INVAL)
232 if not force_variant:
233 _CheckOSVariant(result.payload, os_name)
234
235
236 def _GetClusterDomainSecret():
237 """Reads the cluster domain secret.
238
239 """
240 return utils.ReadOneLineFile(pathutils.CLUSTER_DOMAIN_SECRET_FILE,
241 strict=True)
242
243
244 def _ComputeIPolicyInstanceSpecViolation(
245 ipolicy, instance_spec, disk_template,
246 _compute_fn=_ComputeIPolicySpecViolation):
247 """Compute if instance specs meets the specs of ipolicy.
248
249 @type ipolicy: dict
250 @param ipolicy: The ipolicy to verify against
251 @param instance_spec: dict
252 @param instance_spec: The instance spec to verify
253 @type disk_template: string
254 @param disk_template: the disk template of the instance
255 @param _compute_fn: The function to verify ipolicy (unittest only)
256 @see: L{_ComputeIPolicySpecViolation}
257
258 """
259 mem_size = instance_spec.get(constants.ISPEC_MEM_SIZE, None)
260 cpu_count = instance_spec.get(constants.ISPEC_CPU_COUNT, None)
261 disk_count = instance_spec.get(constants.ISPEC_DISK_COUNT, 0)
262 disk_sizes = instance_spec.get(constants.ISPEC_DISK_SIZE, [])
263 nic_count = instance_spec.get(constants.ISPEC_NIC_COUNT, 0)
264 spindle_use = instance_spec.get(constants.ISPEC_SPINDLE_USE, None)
265
266 return _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count,
267 disk_sizes, spindle_use, disk_template)
268
269
270 def _ComputeIPolicyNodeViolation(ipolicy, instance, current_group,
271 target_group, cfg,
272 _compute_fn=_ComputeIPolicyInstanceViolation):
273 """Compute if instance meets the specs of the new target group.
274
275 @param ipolicy: The ipolicy to verify
276 @param instance: The instance object to verify
277 @param current_group: The current group of the instance
278 @param target_group: The new group of the instance
279 @type cfg: L{config.ConfigWriter}
280 @param cfg: Cluster configuration
281 @param _compute_fn: The function to verify ipolicy (unittest only)
282 @see: L{_ComputeIPolicySpecViolation}
283
284 """
285 if current_group == target_group:
286 return []
287 else:
288 return _compute_fn(ipolicy, instance, cfg)
289
290
291 def _CheckTargetNodeIPolicy(lu, ipolicy, instance, node, cfg, ignore=False,
292 _compute_fn=_ComputeIPolicyNodeViolation):
293 """Checks that the target node is correct in terms of instance policy.
294
295 @param ipolicy: The ipolicy to verify
296 @param instance: The instance object to verify
297 @param node: The new node to relocate
298 @type cfg: L{config.ConfigWriter}
299 @param cfg: Cluster configuration
300 @param ignore: Ignore violations of the ipolicy
301 @param _compute_fn: The function to verify ipolicy (unittest only)
302 @see: L{_ComputeIPolicySpecViolation}
303
304 """
305 primary_node = lu.cfg.GetNodeInfo(instance.primary_node)
306 res = _compute_fn(ipolicy, instance, primary_node.group, node.group, cfg)
307
308 if res:
309 msg = ("Instance does not meet target node group's (%s) instance"
310 " policy: %s") % (node.group, utils.CommaJoin(res))
311 if ignore:
312 lu.LogWarning(msg)
313 else:
314 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
315
316
317 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
318 minmem, maxmem, vcpus, nics, disk_template, disks,
319 bep, hvp, hypervisor_name, tags):
320 """Builds instance related env variables for hooks
321
322 This builds the hook environment from individual variables.
323
324 @type name: string
325 @param name: the name of the instance
326 @type primary_node: string
327 @param primary_node: the name of the instance's primary node
328 @type secondary_nodes: list
329 @param secondary_nodes: list of secondary nodes as strings
330 @type os_type: string
331 @param os_type: the name of the instance's OS
332 @type status: string
333 @param status: the desired status of the instance
334 @type minmem: string
335 @param minmem: the minimum memory size of the instance
336 @type maxmem: string
337 @param maxmem: the maximum memory size of the instance
338 @type vcpus: string
339 @param vcpus: the count of VCPUs the instance has
340 @type nics: list
341 @param nics: list of tuples (name, uuid, ip, mac, mode, link, net, netinfo)
342 representing the NICs the instance has
343 @type disk_template: string
344 @param disk_template: the disk template of the instance
345 @type disks: list
346 @param disks: list of tuples (name, uuid, size, mode)
347 @type bep: dict
348 @param bep: the backend parameters for the instance
349 @type hvp: dict
350 @param hvp: the hypervisor parameters for the instance
351 @type hypervisor_name: string
352 @param hypervisor_name: the hypervisor for the instance
353 @type tags: list
354 @param tags: list of instance tags as strings
355 @rtype: dict
356 @return: the hook environment for this instance
357
358 """
359 env = {
360 "OP_TARGET": name,
361 "INSTANCE_NAME": name,
362 "INSTANCE_PRIMARY": primary_node,
363 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
364 "INSTANCE_OS_TYPE": os_type,
365 "INSTANCE_STATUS": status,
366 "INSTANCE_MINMEM": minmem,
367 "INSTANCE_MAXMEM": maxmem,
368 # TODO(2.9) remove deprecated "memory" value
369 "INSTANCE_MEMORY": maxmem,
370 "INSTANCE_VCPUS": vcpus,
371 "INSTANCE_DISK_TEMPLATE": disk_template,
372 "INSTANCE_HYPERVISOR": hypervisor_name,
373 }
374 if nics:
375 nic_count = len(nics)
376 for idx, (name, _, ip, mac, mode, link, net, netinfo) in enumerate(nics):
377 if ip is None:
378 ip = ""
379 env["INSTANCE_NIC%d_NAME" % idx] = name
380 env["INSTANCE_NIC%d_IP" % idx] = ip
381 env["INSTANCE_NIC%d_MAC" % idx] = mac
382 env["INSTANCE_NIC%d_MODE" % idx] = mode
383 env["INSTANCE_NIC%d_LINK" % idx] = link
384 if netinfo:
385 nobj = objects.Network.FromDict(netinfo)
386 env.update(nobj.HooksDict("INSTANCE_NIC%d_" % idx))
387 elif network:
388 # FIXME: broken network reference: the instance NIC specifies a
389 # network, but the relevant network entry was not in the config. This
390 # should be made impossible.
391 env["INSTANCE_NIC%d_NETWORK_NAME" % idx] = net
392 if mode == constants.NIC_MODE_BRIDGED:
393 env["INSTANCE_NIC%d_BRIDGE" % idx] = link
394 else:
395 nic_count = 0
396
397 env["INSTANCE_NIC_COUNT"] = nic_count
398
399 if disks:
400 disk_count = len(disks)
401 for idx, (name, size, mode) in enumerate(disks):
402 env["INSTANCE_DISK%d_NAME" % idx] = name
403 env["INSTANCE_DISK%d_SIZE" % idx] = size
404 env["INSTANCE_DISK%d_MODE" % idx] = mode
405 else:
406 disk_count = 0
407
408 env["INSTANCE_DISK_COUNT"] = disk_count
409
410 if not tags:
411 tags = []
412
413 env["INSTANCE_TAGS"] = " ".join(tags)
414
415 for source, kind in [(bep, "BE"), (hvp, "HV")]:
416 for key, value in source.items():
417 env["INSTANCE_%s_%s" % (kind, key)] = value
418
419 return env
420
421
422 def _NICToTuple(lu, nic):
423 """Build a tupple of nic information.
424
425 @type lu: L{LogicalUnit}
426 @param lu: the logical unit on whose behalf we execute
427 @type nic: L{objects.NIC}
428 @param nic: nic to convert to hooks tuple
429
430 """
431 cluster = lu.cfg.GetClusterInfo()
432 filled_params = cluster.SimpleFillNIC(nic.nicparams)
433 mode = filled_params[constants.NIC_MODE]
434 link = filled_params[constants.NIC_LINK]
435 netinfo = None
436 if nic.network:
437 nobj = lu.cfg.GetNetwork(nic.network)
438 netinfo = objects.Network.ToDict(nobj)
439 return (nic.name, nic.uuid, nic.ip, nic.mac, mode, link, nic.network, netinfo)
440
441
442 def _NICListToTuple(lu, nics):
443 """Build a list of nic information tuples.
444
445 This list is suitable to be passed to _BuildInstanceHookEnv or as a return
446 value in LUInstanceQueryData.
447
448 @type lu: L{LogicalUnit}
449 @param lu: the logical unit on whose behalf we execute
450 @type nics: list of L{objects.NIC}
451 @param nics: list of nics to convert to hooks tuples
452
453 """
454 hooks_nics = []
455 for nic in nics:
456 hooks_nics.append(_NICToTuple(lu, nic))
457 return hooks_nics
458
459
460 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
461 """Builds instance related env variables for hooks from an object.
462
463 @type lu: L{LogicalUnit}
464 @param lu: the logical unit on whose behalf we execute
465 @type instance: L{objects.Instance}
466 @param instance: the instance for which we should build the
467 environment
468 @type override: dict
469 @param override: dictionary with key/values that will override
470 our values
471 @rtype: dict
472 @return: the hook environment dictionary
473
474 """
475 cluster = lu.cfg.GetClusterInfo()
476 bep = cluster.FillBE(instance)
477 hvp = cluster.FillHV(instance)
478 args = {
479 "name": instance.name,
480 "primary_node": instance.primary_node,
481 "secondary_nodes": instance.secondary_nodes,
482 "os_type": instance.os,
483 "status": instance.admin_state,
484 "maxmem": bep[constants.BE_MAXMEM],
485 "minmem": bep[constants.BE_MINMEM],
486 "vcpus": bep[constants.BE_VCPUS],
487 "nics": _NICListToTuple(lu, instance.nics),
488 "disk_template": instance.disk_template,
489 "disks": [(disk.name, disk.size, disk.mode)
490 for disk in instance.disks],
491 "bep": bep,
492 "hvp": hvp,
493 "hypervisor_name": instance.hypervisor,
494 "tags": instance.tags,
495 }
496 if override:
497 args.update(override)
498 return _BuildInstanceHookEnv(**args) # pylint: disable=W0142
499
500
501 def _CheckNicsBridgesExist(lu, target_nics, target_node):
502 """Check that the brigdes needed by a list of nics exist.
503
504 """
505 cluster = lu.cfg.GetClusterInfo()
506 paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics]
507 brlist = [params[constants.NIC_LINK] for params in paramslist
508 if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
509 if brlist:
510 result = lu.rpc.call_bridges_exist(target_node, brlist)
511 result.Raise("Error checking bridges on destination node '%s'" %
512 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
513
514
515 def _CheckInstanceBridgesExist(lu, instance, node=None):
516 """Check that the brigdes needed by an instance exist.
517
518 """
519 if node is None:
520 node = instance.primary_node
521 _CheckNicsBridgesExist(lu, instance.nics, node)
522
523
524 def _CheckOSVariant(os_obj, name):
525 """Check whether an OS name conforms to the os variants specification.
526
527 @type os_obj: L{objects.OS}
528 @param os_obj: OS object to check
529 @type name: string
530 @param name: OS name passed by the user, to check for validity
531
532 """
533 variant = objects.OS.GetVariant(name)
534 if not os_obj.supported_variants:
535 if variant:
536 raise errors.OpPrereqError("OS '%s' doesn't support variants ('%s'"
537 " passed)" % (os_obj.name, variant),
538 errors.ECODE_INVAL)
539 return
540 if not variant:
541 raise errors.OpPrereqError("OS name must include a variant",
542 errors.ECODE_INVAL)
543
544 if variant not in os_obj.supported_variants:
545 raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
546
547
548 def _CheckHostnameSane(lu, name):
549 """Ensures that a given hostname resolves to a 'sane' name.
550
551 The given name is required to be a prefix of the resolved hostname,
552 to prevent accidental mismatches.
553
554 @param lu: the logical unit on behalf of which we're checking
555 @param name: the name we should resolve and check
556 @return: the resolved hostname object
557
558 """
559 hostname = netutils.GetHostname(name=name)
560 if hostname.name != name:
561 lu.LogInfo("Resolved given name '%s' to '%s'", name, hostname.name)
562 if not utils.MatchNameComponent(name, [hostname.name]):
563 raise errors.OpPrereqError(("Resolved hostname '%s' does not look the"
564 " same as given hostname '%s'") %
565 (hostname.name, name), errors.ECODE_INVAL)
566 return hostname
567
568
569 def _WaitForSync(lu, instance, disks=None, oneshot=False):
570 """Sleep and poll for an instance's disk to sync.
571
572 """
573 if not instance.disks or disks is not None and not disks:
574 return True
575
576 disks = _ExpandCheckDisks(instance, disks)
577
578 if not oneshot:
579 lu.LogInfo("Waiting for instance %s to sync disks", instance.name)
580
581 node = instance.primary_node
582
583 for dev in disks:
584 lu.cfg.SetDiskID(dev, node)
585
586 # TODO: Convert to utils.Retry
587
588 retries = 0
589 degr_retries = 10 # in seconds, as we sleep 1 second each time
590 while True:
591 max_time = 0
592 done = True
593 cumul_degraded = False
594 rstats = lu.rpc.call_blockdev_getmirrorstatus(node, (disks, instance))
595 msg = rstats.fail_msg
596 if msg:
597 lu.LogWarning("Can't get any data from node %s: %s", node, msg)
598 retries += 1
599 if retries >= 10:
600 raise errors.RemoteError("Can't contact node %s for mirror data,"
601 " aborting." % node)
602 time.sleep(6)
603 continue
604 rstats = rstats.payload
605 retries = 0
606 for i, mstat in enumerate(rstats):
607 if mstat is None:
608 lu.LogWarning("Can't compute data for node %s/%s",
609 node, disks[i].iv_name)
610 continue
611
612 cumul_degraded = (cumul_degraded or
613 (mstat.is_degraded and mstat.sync_percent is None))
614 if mstat.sync_percent is not None:
615 done = False
616 if mstat.estimated_time is not None:
617 rem_time = ("%s remaining (estimated)" %
618 utils.FormatSeconds(mstat.estimated_time))
619 max_time = mstat.estimated_time
620 else:
621 rem_time = "no time estimate"
622 lu.LogInfo("- device %s: %5.2f%% done, %s",
623 disks[i].iv_name, mstat.sync_percent, rem_time)
624
625 # if we're done but degraded, let's do a few small retries, to
626 # make sure we see a stable and not transient situation; therefore
627 # we force restart of the loop
628 if (done or oneshot) and cumul_degraded and degr_retries > 0:
629 logging.info("Degraded disks found, %d retries left", degr_retries)
630 degr_retries -= 1
631 time.sleep(1)
632 continue
633
634 if done or oneshot:
635 break
636
637 time.sleep(min(60, max_time))
638
639 if done:
640 lu.LogInfo("Instance %s's disks are in sync", instance.name)
641
642 return not cumul_degraded
643
644
645 def _BlockdevFind(lu, node, dev, instance):
646 """Wrapper around call_blockdev_find to annotate diskparams.
647
648 @param lu: A reference to the lu object
649 @param node: The node to call out
650 @param dev: The device to find
651 @param instance: The instance object the device belongs to
652 @returns The result of the rpc call
653
654 """
655 (disk,) = _AnnotateDiskParams(instance, [dev], lu.cfg)
656 return lu.rpc.call_blockdev_find(node, disk)
657
658
659 def _CheckDiskConsistency(lu, instance, dev, node, on_primary, ldisk=False):
660 """Wrapper around L{_CheckDiskConsistencyInner}.
661
662 """
663 (disk,) = _AnnotateDiskParams(instance, [dev], lu.cfg)
664 return _CheckDiskConsistencyInner(lu, instance, disk, node, on_primary,
665 ldisk=ldisk)
666
667
668 def _CheckDiskConsistencyInner(lu, instance, dev, node, on_primary,
669 ldisk=False):
670 """Check that mirrors are not degraded.
671
672 @attention: The device has to be annotated already.
673
674 The ldisk parameter, if True, will change the test from the
675 is_degraded attribute (which represents overall non-ok status for
676 the device(s)) to the ldisk (representing the local storage status).
677
678 """
679 lu.cfg.SetDiskID(dev, node)
680
681 result = True
682
683 if on_primary or dev.AssembleOnSecondary():
684 rstats = lu.rpc.call_blockdev_find(node, dev)
685 msg = rstats.fail_msg
686 if msg:
687 lu.LogWarning("Can't find disk on node %s: %s", node, msg)
688 result = False
689 elif not rstats.payload:
690 lu.LogWarning("Can't find disk on node %s", node)
691 result = False
692 else:
693 if ldisk:
694 result = result and rstats.payload.ldisk_status == constants.LDS_OKAY
695 else:
696 result = result and not rstats.payload.is_degraded
697
698 if dev.children:
699 for child in dev.children:
700 result = result and _CheckDiskConsistencyInner(lu, instance, child, node,
701 on_primary)
702
703 return result
704
705
706 class LUOobCommand(NoHooksLU):
707 """Logical unit for OOB handling.
708
709 """
710 REQ_BGL = False
711 _SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE)
712
713 def ExpandNames(self):
714 """Gather locks we need.
715
716 """
717 if self.op.node_names:
718 self.op.node_names = _GetWantedNodes(self, self.op.node_names)
719 lock_names = self.op.node_names
720 else:
721 lock_names = locking.ALL_SET
722
723 self.needed_locks = {
724 locking.LEVEL_NODE: lock_names,
725 }
726
727 self.share_locks[locking.LEVEL_NODE_ALLOC] = 1
728
729 if not self.op.node_names:
730 # Acquire node allocation lock only if all nodes are affected
731 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
732
733 def CheckPrereq(self):
734 """Check prerequisites.
735
736 This checks:
737 - the node exists in the configuration
738 - OOB is supported
739
740 Any errors are signaled by raising errors.OpPrereqError.
741
742 """
743 self.nodes = []
744 self.master_node = self.cfg.GetMasterNode()
745
746 assert self.op.power_delay >= 0.0
747
748 if self.op.node_names:
749 if (self.op.command in self._SKIP_MASTER and
750 self.master_node in self.op.node_names):
751 master_node_obj = self.cfg.GetNodeInfo(self.master_node)
752 master_oob_handler = _SupportsOob(self.cfg, master_node_obj)
753
754 if master_oob_handler:
755 additional_text = ("run '%s %s %s' if you want to operate on the"
756 " master regardless") % (master_oob_handler,
757 self.op.command,
758 self.master_node)
759 else:
760 additional_text = "it does not support out-of-band operations"
761
762 raise errors.OpPrereqError(("Operating on the master node %s is not"
763 " allowed for %s; %s") %
764 (self.master_node, self.op.command,
765 additional_text), errors.ECODE_INVAL)
766 else:
767 self.op.node_names = self.cfg.GetNodeList()
768 if self.op.command in self._SKIP_MASTER:
769 self.op.node_names.remove(self.master_node)
770
771 if self.op.command in self._SKIP_MASTER:
772 assert self.master_node not in self.op.node_names
773
774 for (node_name, node) in self.cfg.GetMultiNodeInfo(self.op.node_names):
775 if node is None:
776 raise errors.OpPrereqError("Node %s not found" % node_name,
777 errors.ECODE_NOENT)
778 else:
779 self.nodes.append(node)
780
781 if (not self.op.ignore_status and
782 (self.op.command == constants.OOB_POWER_OFF and not node.offline)):
783 raise errors.OpPrereqError(("Cannot power off node %s because it is"
784 " not marked offline") % node_name,
785 errors.ECODE_STATE)
786
787 def Exec(self, feedback_fn):
788 """Execute OOB and return result if we expect any.
789
790 """
791 master_node = self.master_node
792 ret = []
793
794 for idx, node in enumerate(utils.NiceSort(self.nodes,
795 key=lambda node: node.name)):
796 node_entry = [(constants.RS_NORMAL, node.name)]
797 ret.append(node_entry)
798
799 oob_program = _SupportsOob(self.cfg, node)
800
801 if not oob_program:
802 node_entry.append((constants.RS_UNAVAIL, None))
803 continue
804
805 logging.info("Executing out-of-band command '%s' using '%s' on %s",
806 self.op.command, oob_program, node.name)
807 result = self.rpc.call_run_oob(master_node, oob_program,
808 self.op.command, node.name,
809 self.op.timeout)
810
811 if result.fail_msg:
812 self.LogWarning("Out-of-band RPC failed on node '%s': %s",
813 node.name, result.fail_msg)
814 node_entry.append((constants.RS_NODATA, None))
815 else:
816 try:
817 self._CheckPayload(result)
818 except errors.OpExecError, err:
819 self.LogWarning("Payload returned by node '%s' is not valid: %s",
820 node.name, err)
821 node_entry.append((constants.RS_NODATA, None))
822 else:
823 if self.op.command == constants.OOB_HEALTH:
824 # For health we should log important events
825 for item, status in result.payload:
826 if status in [constants.OOB_STATUS_WARNING,
827 constants.OOB_STATUS_CRITICAL]:
828 self.LogWarning("Item '%s' on node '%s' has status '%s'",
829 item, node.name, status)
830
831 if self.op.command == constants.OOB_POWER_ON:
832 node.powered = True
833 elif self.op.command == constants.OOB_POWER_OFF:
834 node.powered = False
835 elif self.op.command == constants.OOB_POWER_STATUS:
836 powered = result.payload[constants.OOB_POWER_STATUS_POWERED]
837 if powered != node.powered:
838 logging.warning(("Recorded power state (%s) of node '%s' does not"
839 " match actual power state (%s)"), node.powered,
840 node.name, powered)
841
842 # For configuration changing commands we should update the node
843 if self.op.command in (constants.OOB_POWER_ON,
844 constants.OOB_POWER_OFF):
845 self.cfg.Update(node, feedback_fn)
846
847 node_entry.append((constants.RS_NORMAL, result.payload))
848
849 if (self.op.command == constants.OOB_POWER_ON and
850 idx < len(self.nodes) - 1):
851 time.sleep(self.op.power_delay)
852
853 return ret
854
855 def _CheckPayload(self, result):
856 """Checks if the payload is valid.
857
858 @param result: RPC result
859 @raises errors.OpExecError: If payload is not valid
860
861 """
862 errs = []
863 if self.op.command == constants.OOB_HEALTH:
864 if not isinstance(result.payload, list):
865 errs.append("command 'health' is expected to return a list but got %s" %
866 type(result.payload))
867 else:
868 for item, status in result.payload:
869 if status not in constants.OOB_STATUSES:
870 errs.append("health item '%s' has invalid status '%s'" %
871 (item, status))
872
873 if self.op.command == constants.OOB_POWER_STATUS:
874 if not isinstance(result.payload, dict):
875 errs.append("power-status is expected to return a dict but got %s" %
876 type(result.payload))
877
878 if self.op.command in [
879 constants.OOB_POWER_ON,
880 constants.OOB_POWER_OFF,
881 constants.OOB_POWER_CYCLE,
882 ]:
883 if result.payload is not None:
884 errs.append("%s is expected to not return payload but got '%s'" %
885 (self.op.command, result.payload))
886
887 if errs:
888 raise errors.OpExecError("Check of out-of-band payload failed due to %s" %
889 utils.CommaJoin(errs))
890
891
892 class _OsQuery(_QueryBase):
893 FIELDS = query.OS_FIELDS
894
895 def ExpandNames(self, lu):
896 # Lock all nodes in shared mode
897 # Temporary removal of locks, should be reverted later
898 # TODO: reintroduce locks when they are lighter-weight
899 lu.needed_locks = {}
900 #self.share_locks[locking.LEVEL_NODE] = 1
901 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
902
903 # The following variables interact with _QueryBase._GetNames
904 if self.names:
905 self.wanted = self.names
906 else:
907 self.wanted = locking.ALL_SET
908
909 self.do_locking = self.use_locking
910
911 def DeclareLocks(self, lu, level):
912 pass
913
914 @staticmethod
915 def _DiagnoseByOS(rlist):
916 """Remaps a per-node return list into an a per-os per-node dictionary
917
918 @param rlist: a map with node names as keys and OS objects as values
919
920 @rtype: dict
921 @return: a dictionary with osnames as keys and as value another
922 map, with nodes as keys and tuples of (path, status, diagnose,
923 variants, parameters, api_versions) as values, eg::
924
925 {"debian-etch": {"node1": [(/usr/lib/..., True, "", [], []),
926 (/srv/..., False, "invalid api")],
927 "node2": [(/srv/..., True, "", [], [])]}
928 }
929
930 """
931 all_os = {}
932 # we build here the list of nodes that didn't fail the RPC (at RPC
933 # level), so that nodes with a non-responding node daemon don't
934 # make all OSes invalid
935 good_nodes = [node_name for node_name in rlist
936 if not rlist[node_name].fail_msg]
937 for node_name, nr in rlist.items():
938 if nr.fail_msg or not nr.payload:
939 continue
940 for (name, path, status, diagnose, variants,
941 params, api_versions) in nr.payload:
942 if name not in all_os:
943 # build a list of nodes for this os containing empty lists
944 # for each node in node_list
945 all_os[name] = {}
946 for nname in good_nodes:
947 all_os[name][nname] = []
948 # convert params from [name, help] to (name, help)
949 params = [tuple(v) for v in params]
950 all_os[name][node_name].append((path, status, diagnose,
951 variants, params, api_versions))
952 return all_os
953
954 def _GetQueryData(self, lu):
955 """Computes the list of nodes and their attributes.
956
957 """
958 # Locking is not used
959 assert not (compat.any(lu.glm.is_owned(level)
960 for level in locking.LEVELS
961 if level != locking.LEVEL_CLUSTER) or
962 self.do_locking or self.use_locking)
963
964 valid_nodes = [node.name
965 for node in lu.cfg.GetAllNodesInfo().values()
966 if not node.offline and node.vm_capable]
967 pol = self._DiagnoseByOS(lu.rpc.call_os_diagnose(valid_nodes))
968 cluster = lu.cfg.GetClusterInfo()
969
970 data = {}
971
972 for (os_name, os_data) in pol.items():
973 info = query.OsInfo(name=os_name, valid=True, node_status=os_data,
974 hidden=(os_name in cluster.hidden_os),
975 blacklisted=(os_name in cluster.blacklisted_os))
976
977 variants = set()
978 parameters = set()
979 api_versions = set()
980
981 for idx, osl in enumerate(os_data.values()):
982 info.valid = bool(info.valid and osl and osl[0][1])
983 if not info.valid:
984 break
985
986 (node_variants, node_params, node_api) = osl[0][3:6]
987 if idx == 0:
988 # First entry
989 variants.update(node_variants)
990 parameters.update(node_params)
991 api_versions.update(node_api)
992 else:
993 # Filter out inconsistent values
994 variants.intersection_update(node_variants)
995 parameters.intersection_update(node_params)
996 api_versions.intersection_update(node_api)
997
998 info.variants = list(variants)
999 info.parameters = list(parameters)
1000 info.api_versions = list(api_versions)
1001
1002 data[os_name] = info
1003
1004 # Prepare data in requested order
1005 return [data[name] for name in self._GetNames(lu, pol.keys(), None)
1006 if name in data]
1007
1008
1009 class LUOsDiagnose(NoHooksLU):
1010 """Logical unit for OS diagnose/query.
1011
1012 """
1013 REQ_BGL = False
1014
1015 @staticmethod
1016 def _BuildFilter(fields, names):
1017 """Builds a filter for querying OSes.
1018
1019 """
1020 name_filter = qlang.MakeSimpleFilter("name", names)
1021
1022 # Legacy behaviour: Hide hidden, blacklisted or invalid OSes if the
1023 # respective field is not requested
1024 status_filter = [[qlang.OP_NOT, [qlang.OP_TRUE, fname]]
1025 for fname in ["hidden", "blacklisted"]
1026 if fname not in fields]
1027 if "valid" not in fields:
1028 status_filter.append([qlang.OP_TRUE, "valid"])
1029
1030 if status_filter:
1031 status_filter.insert(0, qlang.OP_AND)
1032 else:
1033 status_filter = None
1034
1035 if name_filter and status_filter:
1036 return [qlang.OP_AND, name_filter, status_filter]
1037 elif name_filter:
1038 return name_filter
1039 else:
1040 return status_filter
1041
1042 def CheckArguments(self):
1043 self.oq = _OsQuery(self._BuildFilter(self.op.output_fields, self.op.names),
1044 self.op.output_fields, False)
1045
1046 def ExpandNames(self):
1047 self.oq.ExpandNames(self)
1048
1049 def Exec(self, feedback_fn):
1050 return self.oq.OldStyleQuery(self)
1051
1052
1053 class _ExtStorageQuery(_QueryBase):
1054 FIELDS = query.EXTSTORAGE_FIELDS
1055
1056 def ExpandNames(self, lu):
1057 # Lock all nodes in shared mode
1058 # Temporary removal of locks, should be reverted later
1059 # TODO: reintroduce locks when they are lighter-weight
1060 lu.needed_locks = {}
1061 #self.share_locks[locking.LEVEL_NODE] = 1
1062 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
1063
1064 # The following variables interact with _QueryBase._GetNames
1065 if self.names:
1066 self.wanted = self.names
1067 else:
1068 self.wanted = locking.ALL_SET
1069
1070 self.do_locking = self.use_locking
1071
1072 def DeclareLocks(self, lu, level):
1073 pass
1074
1075 @staticmethod
1076 def _DiagnoseByProvider(rlist):
1077 """Remaps a per-node return list into an a per-provider per-node dictionary
1078
1079 @param rlist: a map with node names as keys and ExtStorage objects as values
1080
1081 @rtype: dict
1082 @return: a dictionary with extstorage providers as keys and as
1083 value another map, with nodes as keys and tuples of
1084 (path, status, diagnose, parameters) as values, eg::
1085
1086 {"provider1": {"node1": [(/usr/lib/..., True, "", [])]
1087 "node2": [(/srv/..., False, "missing file")]
1088 "node3": [(/srv/..., True, "", [])]
1089 }
1090
1091 """
1092 all_es = {}
1093 # we build here the list of nodes that didn't fail the RPC (at RPC
1094 # level), so that nodes with a non-responding node daemon don't
1095 # make all OSes invalid
1096 good_nodes = [node_name for node_name in rlist
1097 if not rlist[node_name].fail_msg]
1098 for node_name, nr in rlist.items():
1099 if nr.fail_msg or not nr.payload:
1100 continue
1101 for (name, path, status, diagnose, params) in nr.payload:
1102 if name not in all_es:
1103 # build a list of nodes for this os containing empty lists
1104 # for each node in node_list
1105 all_es[name] = {}
1106 for nname in good_nodes:
1107 all_es[name][nname] = []
1108 # convert params from [name, help] to (name, help)
1109 params = [tuple(v) for v in params]
1110 all_es[name][node_name].append((path, status, diagnose, params))
1111 return all_es
1112
1113 def _GetQueryData(self, lu):
1114 """Computes the list of nodes and their attributes.
1115
1116 """
1117 # Locking is not used
1118 assert not (compat.any(lu.glm.is_owned(level)
1119 for level in locking.LEVELS
1120 if level != locking.LEVEL_CLUSTER) or
1121 self.do_locking or self.use_locking)
1122
1123 valid_nodes = [node.name
1124 for node in lu.cfg.GetAllNodesInfo().values()
1125 if not node.offline and node.vm_capable]
1126 pol = self._DiagnoseByProvider(lu.rpc.call_extstorage_diagnose(valid_nodes))
1127
1128 data = {}
1129
1130 nodegroup_list = lu.cfg.GetNodeGroupList()
1131
1132 for (es_name, es_data) in pol.items():
1133 # For every provider compute the nodegroup validity.
1134 # To do this we need to check the validity of each node in es_data
1135 # and then construct the corresponding nodegroup dict:
1136 # { nodegroup1: status
1137 # nodegroup2: status
1138 # }
1139 ndgrp_data = {}
1140 for nodegroup in nodegroup_list:
1141 ndgrp = lu.cfg.GetNodeGroup(nodegroup)
1142
1143 nodegroup_nodes = ndgrp.members
1144 nodegroup_name = ndgrp.name
1145 node_statuses = []
1146
1147 for node in nodegroup_nodes:
1148 if node in valid_nodes:
1149 if es_data[node] != []:
1150 node_status = es_data[node][0][1]
1151 node_statuses.append(node_status)
1152 else:
1153 node_statuses.append(False)
1154
1155 if False in node_statuses:
1156 ndgrp_data[nodegroup_name] = False
1157 else:
1158 ndgrp_data[nodegroup_name] = True
1159
1160 # Compute the provider's parameters
1161 parameters = set()
1162 for idx, esl in enumerate(es_data.values()):
1163 valid = bool(esl and esl[0][1])
1164 if not valid:
1165 break
1166
1167 node_params = esl[0][3]
1168 if idx == 0:
1169 # First entry
1170 parameters.update(node_params)
1171 else:
1172 # Filter out inconsistent values
1173 parameters.intersection_update(node_params)
1174
1175 params = list(parameters)
1176
1177 # Now fill all the info for this provider
1178 info = query.ExtStorageInfo(name=es_name, node_status=es_data,
1179 nodegroup_status=ndgrp_data,
1180 parameters=params)
1181
1182 data[es_name] = info
1183
1184 # Prepare data in requested order
1185 return [data[name] for name in self._GetNames(lu, pol.keys(), None)
1186 if name in data]
1187
1188
1189 class LUExtStorageDiagnose(NoHooksLU):
1190 """Logical unit for ExtStorage diagnose/query.
1191
1192 """
1193 REQ_BGL = False
1194
1195 def CheckArguments(self):
1196 self.eq = _ExtStorageQuery(qlang.MakeSimpleFilter("name", self.op.names),
1197 self.op.output_fields, False)
1198
1199 def ExpandNames(self):
1200 self.eq.ExpandNames(self)
1201
1202 def Exec(self, feedback_fn):
1203 return self.eq.OldStyleQuery(self)
1204
1205
1206 class _InstanceQuery(_QueryBase):
1207 FIELDS = query.INSTANCE_FIELDS
1208
1209 def ExpandNames(self, lu):
1210 lu.needed_locks = {}
1211 lu.share_locks = _ShareAll()
1212
1213 if self.names:
1214 self.wanted = _GetWantedInstances(lu, self.names)
1215 else:
1216 self.wanted = locking.ALL_SET
1217
1218 self.do_locking = (self.use_locking and
1219 query.IQ_LIVE in self.requested_data)
1220 if self.do_locking:
1221 lu.needed_locks[locking.LEVEL_INSTANCE] = self.wanted
1222 lu.needed_locks[locking.LEVEL_NODEGROUP] = []
1223 lu.needed_locks[locking.LEVEL_NODE] = []
1224 lu.needed_locks[locking.LEVEL_NETWORK] = []
1225 lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1226
1227 self.do_grouplocks = (self.do_locking and
1228 query.IQ_NODES in self.requested_data)
1229
1230 def DeclareLocks(self, lu, level):
1231 if self.do_locking:
1232 if level == locking.LEVEL_NODEGROUP and self.do_grouplocks:
1233 assert not lu.needed_locks[locking.LEVEL_NODEGROUP]
1234
1235 # Lock all groups used by instances optimistically; this requires going
1236 # via the node before it's locked, requiring verification later on
1237 lu.needed_locks[locking.LEVEL_NODEGROUP] = \
1238 set(group_uuid
1239 for instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
1240 for group_uuid in lu.cfg.GetInstanceNodeGroups(instance_name))
1241 elif level == locking.LEVEL_NODE:
1242 lu._LockInstancesNodes() # pylint: disable=W0212
1243
1244 elif level == locking.LEVEL_NETWORK:
1245 lu.needed_locks[locking.LEVEL_NETWORK] = \
1246 frozenset(net_uuid
1247 for instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
1248 for net_uuid in lu.cfg.GetInstanceNetworks(instance_name))
1249
1250 @staticmethod
1251 def _CheckGroupLocks(lu):
1252 owned_instances = frozenset(lu.owned_locks(locking.LEVEL_INSTANCE))
1253 owned_groups = frozenset(lu.owned_locks(locking.LEVEL_NODEGROUP))
1254
1255 # Check if node groups for locked instances are still correct
1256 for instance_name in owned_instances:
1257 _CheckInstanceNodeGroups(lu.cfg, instance_name, owned_groups)
1258
1259 def _GetQueryData(self, lu):
1260 """Computes the list of instances and their attributes.
1261
1262 """
1263 if self.do_grouplocks:
1264 self._CheckGroupLocks(lu)
1265
1266 cluster = lu.cfg.GetClusterInfo()
1267 all_info = lu.cfg.GetAllInstancesInfo()
1268
1269 instance_names = self._GetNames(lu, all_info.keys(), locking.LEVEL_INSTANCE)
1270
1271 instance_list = [all_info[name] for name in instance_names]
1272 nodes = frozenset(itertools.chain(*(inst.all_nodes
1273 for inst in instance_list)))
1274 hv_list = list(set([inst.hypervisor for inst in instance_list]))
1275 bad_nodes = []
1276 offline_nodes = []
1277 wrongnode_inst = set()
1278
1279 # Gather data as requested
1280 if self.requested_data & set([query.IQ_LIVE, query.IQ_CONSOLE]):
1281 live_data = {}
1282 node_data = lu.rpc.call_all_instances_info(nodes, hv_list)
1283 for name in nodes:
1284 result = node_data[name]
1285 if result.offline:
1286 # offline nodes will be in both lists
1287 assert result.fail_msg
1288 offline_nodes.append(name)
1289 if result.fail_msg:
1290 bad_nodes.append(name)
1291 elif result.payload:
1292 for inst in result.payload:
1293 if inst in all_info:
1294 if all_info[inst].primary_node == name:
1295 live_data.update(result.payload)
1296 else:
1297 wrongnode_inst.add(inst)
1298 else:
1299 # orphan instance; we don't list it here as we don't
1300 # handle this case yet in the output of instance listing
1301 logging.warning("Orphan instance '%s' found on node %s",
1302 inst, name)
1303 # else no instance is alive
1304 else:
1305 live_data = {}
1306
1307 if query.IQ_DISKUSAGE in self.requested_data:
1308 gmi = ganeti.masterd.instance
1309 disk_usage = dict((inst.name,
1310 gmi.ComputeDiskSize(inst.disk_template,
1311 [{constants.IDISK_SIZE: disk.size}
1312 for disk in inst.disks]))
1313 for inst in instance_list)
1314 else:
1315 disk_usage = None
1316
1317 if query.IQ_CONSOLE in self.requested_data:
1318 consinfo = {}
1319 for inst in instance_list:
1320 if inst.name in live_data:
1321 # Instance is running
1322 consinfo[inst.name] = _GetInstanceConsole(cluster, inst)
1323 else:
1324 consinfo[inst.name] = None
1325 assert set(consinfo.keys()) == set(instance_names)
1326 else:
1327 consinfo = None
1328
1329 if query.IQ_NODES in self.requested_data:
1330 node_names = set(itertools.chain(*map(operator.attrgetter("all_nodes"),
1331 instance_list)))
1332 nodes = dict(lu.cfg.GetMultiNodeInfo(node_names))
1333 groups = dict((uuid, lu.cfg.GetNodeGroup(uuid))
1334 for uuid in set(map(operator.attrgetter("group"),
1335 nodes.values())))
1336 else:
1337 nodes = None
1338 groups = None
1339
1340 if query.IQ_NETWORKS in self.requested_data:
1341 net_uuids = itertools.chain(*(lu.cfg.GetInstanceNetworks(i.name)
1342 for i in instance_list))
1343 networks = dict((uuid, lu.cfg.GetNetwork(uuid)) for uuid in net_uuids)
1344 else:
1345 networks = None
1346
1347 return query.InstanceQueryData(instance_list, lu.cfg.GetClusterInfo(),
1348 disk_usage, offline_nodes, bad_nodes,
1349 live_data, wrongnode_inst, consinfo,
1350 nodes, groups, networks)
1351
1352
1353 class LUQuery(NoHooksLU):
1354 """Query for resources/items of a certain kind.
1355
1356 """
1357 # pylint: disable=W0142
1358 REQ_BGL = False
1359
1360 def CheckArguments(self):
1361 qcls = _GetQueryImplementation(self.op.what)
1362
1363 self.impl = qcls(self.op.qfilter, self.op.fields, self.op.use_locking)
1364
1365 def ExpandNames(self):
1366 self.impl.ExpandNames(self)
1367
1368 def DeclareLocks(self, level):
1369 self.impl.DeclareLocks(self, level)
1370
1371 def Exec(self, feedback_fn):
1372 return self.impl.NewStyleQuery(self)
1373
1374
1375 class LUQueryFields(NoHooksLU):
1376 """Query for resources/items of a certain kind.
1377
1378 """
1379 # pylint: disable=W0142
1380 REQ_BGL = False
1381
1382 def CheckArguments(self):
1383 self.qcls = _GetQueryImplementation(self.op.what)
1384
1385 def ExpandNames(self):
1386 self.needed_locks = {}
1387
1388 def Exec(self, feedback_fn):
1389 return query.QueryFields(self.qcls.FIELDS, self.op.fields)
1390
1391
1392 class LUInstanceActivateDisks(NoHooksLU):
1393 """Bring up an instance's disks.
1394
1395 """
1396 REQ_BGL = False
1397
1398 def ExpandNames(self):
1399 self._ExpandAndLockInstance()
1400 self.needed_locks[locking.LEVEL_NODE] = []
1401 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1402
1403 def DeclareLocks(self, level):
1404 if level == locking.LEVEL_NODE:
1405 self._LockInstancesNodes()
1406
1407 def CheckPrereq(self):
1408 """Check prerequisites.
1409
1410 This checks that the instance is in the cluster.
1411
1412 """
1413 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1414 assert self.instance is not None, \
1415 "Cannot retrieve locked instance %s" % self.op.instance_name
1416 _CheckNodeOnline(self, self.instance.primary_node)
1417
1418 def Exec(self, feedback_fn):
1419 """Activate the disks.
1420
1421 """
1422 disks_ok, disks_info = \
1423 _AssembleInstanceDisks(self, self.instance,
1424 ignore_size=self.op.ignore_size)
1425 if not disks_ok:
1426 raise errors.OpExecError("Cannot activate block devices")
1427
1428 if self.op.wait_for_sync:
1429 if not _WaitForSync(self, self.instance):
1430 raise errors.OpExecError("Some disks of the instance are degraded!")
1431
1432 return disks_info
1433
1434
1435 def _AssembleInstanceDisks(lu, instance, disks=None, ignore_secondaries=False,
1436 ignore_size=False):
1437 """Prepare the block devices for an instance.
1438
1439 This sets up the block devices on all nodes.
1440
1441 @type lu: L{LogicalUnit}
1442 @param lu: the logical unit on whose behalf we execute
1443 @type instance: L{objects.Instance}
1444 @param instance: the instance for whose disks we assemble
1445 @type disks: list of L{objects.Disk} or None
1446 @param disks: which disks to assemble (or all, if None)
1447 @type ignore_secondaries: boolean
1448 @param ignore_secondaries: if true, errors on secondary nodes
1449 won't result in an error return from the function
1450 @type ignore_size: boolean
1451 @param ignore_size: if true, the current known size of the disk
1452 will not be used during the disk activation, useful for cases
1453 when the size is wrong
1454 @return: False if the operation failed, otherwise a list of
1455 (host, instance_visible_name, node_visible_name)
1456 with the mapping from node devices to instance devices
1457
1458 """
1459 device_info = []
1460 disks_ok = True
1461 iname = instance.name
1462 disks = _ExpandCheckDisks(instance, disks)
1463
1464 # With the two passes mechanism we try to reduce the window of
1465 # opportunity for the race condition of switching DRBD to primary
1466 # before handshaking occured, but we do not eliminate it
1467
1468 # The proper fix would be to wait (with some limits) until the
1469 # connection has been made and drbd transitions from WFConnection
1470 # into any other network-connected state (Connected, SyncTarget,
1471 # SyncSource, etc.)
1472
1473 # 1st pass, assemble on all nodes in secondary mode
1474 for idx, inst_disk in enumerate(disks):
1475 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1476 if ignore_size:
1477 node_disk = node_disk.Copy()
1478 node_disk.UnsetSize()
1479 lu.cfg.SetDiskID(node_disk, node)
1480 result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1481 False, idx)
1482 msg = result.fail_msg
1483 if msg:
1484 is_offline_secondary = (node in instance.secondary_nodes and
1485 result.offline)
1486 lu.LogWarning("Could not prepare block device %s on node %s"
1487 " (is_primary=False, pass=1): %s",
1488 inst_disk.iv_name, node, msg)
1489 if not (ignore_secondaries or is_offline_secondary):
1490 disks_ok = False
1491
1492 # FIXME: race condition on drbd migration to primary
1493
1494 # 2nd pass, do only the primary node
1495 for idx, inst_disk in enumerate(disks):
1496 dev_path = None
1497
1498 for node, node_disk in inst_disk.ComputeNodeTree(instance.primary_node):
1499 if node != instance.primary_node:
1500 continue
1501 if ignore_size:
1502 node_disk = node_disk.Copy()
1503 node_disk.UnsetSize()
1504 lu.cfg.SetDiskID(node_disk, node)
1505 result = lu.rpc.call_blockdev_assemble(node, (node_disk, instance), iname,
1506 True, idx)
1507 msg = result.fail_msg
1508 if msg:
1509 lu.LogWarning("Could not prepare block device %s on node %s"
1510 " (is_primary=True, pass=2): %s",
1511 inst_disk.iv_name, node, msg)
1512 disks_ok = False
1513 else:
1514 dev_path = result.payload
1515
1516 device_info.append((instance.primary_node, inst_disk.iv_name, dev_path))
1517
1518 # leave the disks configured for the primary node
1519 # this is a workaround that would be fixed better by
1520 # improving the logical/physical id handling
1521 for disk in disks:
1522 lu.cfg.SetDiskID(disk, instance.primary_node)
1523
1524 return disks_ok, device_info
1525
1526
1527 def _StartInstanceDisks(lu, instance, force):
1528 """Start the disks of an instance.
1529
1530 """
1531 disks_ok, _ = _AssembleInstanceDisks(lu, instance,
1532 ignore_secondaries=force)
1533 if not disks_ok:
1534 _ShutdownInstanceDisks(lu, instance)
1535 if force is not None and not force:
1536 lu.LogWarning("",
1537 hint=("If the message above refers to a secondary node,"
1538 " you can retry the operation using '--force'"))
1539 raise errors.OpExecError("Disk consistency error")
1540
1541
1542 class LUInstanceDeactivateDisks(NoHooksLU):
1543 """Shutdown an instance's disks.
1544
1545 """
1546 REQ_BGL = False
1547
1548 def ExpandNames(self):
1549 self._ExpandAndLockInstance()
1550 self.needed_locks[locking.LEVEL_NODE] = []
1551 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
1552
1553 def DeclareLocks(self, level):
1554 if level == locking.LEVEL_NODE:
1555 self._LockInstancesNodes()
1556
1557 def CheckPrereq(self):
1558 """Check prerequisites.
1559
1560 This checks that the instance is in the cluster.
1561
1562 """
1563 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1564 assert self.instance is not None, \
1565 "Cannot retrieve locked instance %s" % self.op.instance_name
1566
1567 def Exec(self, feedback_fn):
1568 """Deactivate the disks
1569
1570 """
1571 instance = self.instance
1572 if self.op.force:
1573 _ShutdownInstanceDisks(self, instance)
1574 else:
1575 _SafeShutdownInstanceDisks(self, instance)
1576
1577
1578 def _SafeShutdownInstanceDisks(lu, instance, disks=None):
1579 """Shutdown block devices of an instance.
1580
1581 This function checks if an instance is running, before calling
1582 _ShutdownInstanceDisks.
1583
1584 """
1585 _CheckInstanceState(lu, instance, INSTANCE_DOWN, msg="cannot shutdown disks")
1586 _ShutdownInstanceDisks(lu, instance, disks=disks)
1587
1588
1589 def _ExpandCheckDisks(instance, disks):
1590 """Return the instance disks selected by the disks list
1591
1592 @type disks: list of L{objects.Disk} or None
1593 @param disks: selected disks
1594 @rtype: list of L{objects.Disk}
1595 @return: selected instance disks to act on
1596
1597 """
1598 if disks is None:
1599 return instance.disks
1600 else:
1601 if not set(disks).issubset(instance.disks):
1602 raise errors.ProgrammerError("Can only act on disks belonging to the"
1603 " target instance")
1604 return disks
1605
1606
1607 def _ShutdownInstanceDisks(lu, instance, disks=None, ignore_primary=False):
1608 """Shutdown block devices of an instance.
1609
1610 This does the shutdown on all nodes of the instance.
1611
1612 If the ignore_primary is false, errors on the primary node are
1613 ignored.
1614
1615 """
1616 all_result = True
1617 disks = _ExpandCheckDisks(instance, disks)
1618
1619 for disk in disks:
1620 for node, top_disk in disk.ComputeNodeTree(instance.primary_node):
1621 lu.cfg.SetDiskID(top_disk, node)
1622 result = lu.rpc.call_blockdev_shutdown(node, (top_disk, instance))
1623 msg = result.fail_msg
1624 if msg:
1625 lu.LogWarning("Could not shutdown block device %s on node %s: %s",
1626 disk.iv_name, node, msg)
1627 if ((node == instance.primary_node and not ignore_primary) or
1628 (node != instance.primary_node and not result.offline)):
1629 all_result = False
1630 return all_result
1631
1632
1633 def _CheckNodeFreeMemory(lu, node, reason, requested, hypervisor_name):
1634 """Checks if a node has enough free memory.
1635
1636 This function checks if a given node has the needed amount of free
1637 memory. In case the node has less memory or we cannot get the
1638 information from the node, this function raises an OpPrereqError
1639 exception.
1640
1641 @type lu: C{LogicalUnit}
1642 @param lu: a logical unit from which we get configuration data
1643 @type node: C{str}
1644 @param node: the node to check
1645 @type reason: C{str}
1646 @param reason: string to use in the error message
1647 @type requested: C{int}
1648 @param requested: the amount of memory in MiB to check for
1649 @type hypervisor_name: C{str}
1650 @param hypervisor_name: the hypervisor to ask for memory stats
1651 @rtype: integer
1652 @return: node current free memory
1653 @raise errors.OpPrereqError: if the node doesn't have enough memory, or
1654 we cannot check the node
1655
1656 """
1657 nodeinfo = lu.rpc.call_node_info([node], None, [hypervisor_name], False)
1658 nodeinfo[node].Raise("Can't get data from node %s" % node,
1659 prereq=True, ecode=errors.ECODE_ENVIRON)
1660 (_, _, (hv_info, )) = nodeinfo[node].payload
1661
1662 free_mem = hv_info.get("memory_free", None)
1663 if not isinstance(free_mem, int):
1664 raise errors.OpPrereqError("Can't compute free memory on node %s, result"
1665 " was '%s'" % (node, free_mem),
1666 errors.ECODE_ENVIRON)
1667 if requested > free_mem:
1668 raise errors.OpPrereqError("Not enough memory on node %s for %s:"
1669 " needed %s MiB, available %s MiB" %
1670 (node, reason, requested, free_mem),
1671 errors.ECODE_NORES)
1672 return free_mem
1673
1674
1675 def _CheckNodesFreeDiskPerVG(lu, nodenames, req_sizes):
1676 """Checks if nodes have enough free disk space in all the VGs.
1677
1678 This function checks if all given nodes have the needed amount of
1679 free disk. In case any node has less disk or we cannot get the
1680 information from the node, this function raises an OpPrereqError
1681 exception.
1682
1683 @type lu: C{LogicalUnit}
1684 @param lu: a logical unit from which we get configuration data
1685 @type nodenames: C{list}
1686 @param nodenames: the list of node names to check
1687 @type req_sizes: C{dict}
1688 @param req_sizes: the hash of vg and corresponding amount of disk in
1689 MiB to check for
1690 @raise errors.OpPrereqError: if the node doesn't have enough disk,
1691 or we cannot check the node
1692
1693 """
1694 for vg, req_size in req_sizes.items():
1695 _CheckNodesFreeDiskOnVG(lu, nodenames, vg, req_size)
1696
1697
1698 def _CheckNodesFreeDiskOnVG(lu, nodenames, vg, requested):
1699 """Checks if nodes have enough free disk space in the specified VG.
1700
1701 This function checks if all given nodes have the needed amount of
1702 free disk. In case any node has less disk or we cannot get the
1703 information from the node, this function raises an OpPrereqError
1704 exception.
1705
1706 @type lu: C{LogicalUnit}
1707 @param lu: a logical unit from which we get configuration data
1708 @type nodenames: C{list}
1709 @param nodenames: the list of node names to check
1710 @type vg: C{str}
1711 @param vg: the volume group to check
1712 @type requested: C{int}
1713 @param requested: the amount of disk in MiB to check for
1714 @raise errors.OpPrereqError: if the node doesn't have enough disk,
1715 or we cannot check the node
1716
1717 """
1718 es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, nodenames)
1719 nodeinfo = lu.rpc.call_node_info(nodenames, [vg], None, es_flags)
1720 for node in nodenames:
1721 info = nodeinfo[node]
1722 info.Raise("Cannot get current information from node %s" % node,
1723 prereq=True, ecode=errors.ECODE_ENVIRON)
1724 (_, (vg_info, ), _) = info.payload
1725 vg_free = vg_info.get("vg_free", None)
1726 if not isinstance(vg_free, int):
1727 raise errors.OpPrereqError("Can't compute free disk space on node"
1728 " %s for vg %s, result was '%s'" %
1729 (node, vg, vg_free), errors.ECODE_ENVIRON)
1730 if requested > vg_free:
1731 raise errors.OpPrereqError("Not enough disk space on target node %s"
1732 " vg %s: required %d MiB, available %d MiB" %
1733 (node, vg, requested, vg_free),
1734 errors.ECODE_NORES)
1735
1736
1737 def _CheckNodesPhysicalCPUs(lu, nodenames, requested, hypervisor_name):
1738 """Checks if nodes have enough physical CPUs
1739
1740 This function checks if all given nodes have the needed number of
1741 physical CPUs. In case any node has less CPUs or we cannot get the
1742 information from the node, this function raises an OpPrereqError
1743 exception.
1744
1745 @type lu: C{LogicalUnit}
1746 @param lu: a logical unit from which we get configuration data
1747 @type nodenames: C{list}
1748 @param nodenames: the list of node names to check
1749 @type requested: C{int}
1750 @param requested: the minimum acceptable number of physical CPUs
1751 @raise errors.OpPrereqError: if the node doesn't have enough CPUs,
1752 or we cannot check the node
1753
1754 """
1755 nodeinfo = lu.rpc.call_node_info(nodenames, None, [hypervisor_name], None)
1756 for node in nodenames:
1757 info = nodeinfo[node]
1758 info.Raise("Cannot get current information from node %s" % node,
1759 prereq=True, ecode=errors.ECODE_ENVIRON)
1760 (_, _, (hv_info, )) = info.payload
1761 num_cpus = hv_info.get("cpu_total", None)
1762 if not isinstance(num_cpus, int):
1763 raise errors.OpPrereqError("Can't compute the number of physical CPUs"
1764 " on node %s, result was '%s'" %
1765 (node, num_cpus), errors.ECODE_ENVIRON)
1766 if requested > num_cpus:
1767 raise errors.OpPrereqError("Node %s has %s physical CPUs, but %s are "
1768 "required" % (node, num_cpus, requested),
1769 errors.ECODE_NORES)
1770
1771
1772 class LUInstanceStartup(LogicalUnit):
1773 """Starts an instance.
1774
1775 """
1776 HPATH = "instance-start"
1777 HTYPE = constants.HTYPE_INSTANCE
1778 REQ_BGL = False
1779
1780 def CheckArguments(self):
1781 # extra beparams
1782 if self.op.beparams:
1783 # fill the beparams dict
1784 objects.UpgradeBeParams(self.op.beparams)
1785 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1786
1787 def ExpandNames(self):
1788 self._ExpandAndLockInstance()
1789 self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
1790
1791 def DeclareLocks(self, level):
1792 if level == locking.LEVEL_NODE_RES:
1793 self._LockInstancesNodes(primary_only=True, level=locking.LEVEL_NODE_RES)
1794
1795 def BuildHooksEnv(self):
1796 """Build hooks env.
1797
1798 This runs on master, primary and secondary nodes of the instance.
1799
1800 """
1801 env = {
1802 "FORCE": self.op.force,
1803 }
1804
1805 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
1806
1807 return env
1808
1809 def BuildHooksNodes(self):
1810 """Build hooks nodes.
1811
1812 """
1813 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1814 return (nl, nl)
1815
1816 def CheckPrereq(self):
1817 """Check prerequisites.
1818
1819 This checks that the instance is in the cluster.
1820
1821 """
1822 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1823 assert self.instance is not None, \
1824 "Cannot retrieve locked instance %s" % self.op.instance_name
1825
1826 # extra hvparams
1827 if self.op.hvparams:
1828 # check hypervisor parameter syntax (locally)
1829 cluster = self.cfg.GetClusterInfo()
1830 utils.ForceDictType(self.op.hvparams, constants.HVS_PARAMETER_TYPES)
1831 filled_hvp = cluster.FillHV(instance)
1832 filled_hvp.update(self.op.hvparams)
1833 hv_type = hypervisor.GetHypervisorClass(instance.hypervisor)
1834 hv_type.CheckParameterSyntax(filled_hvp)
1835 _CheckHVParams(self, instance.all_nodes, instance.hypervisor, filled_hvp)
1836
1837 _CheckInstanceState(self, instance, INSTANCE_ONLINE)
1838
1839 self.primary_offline = self.cfg.GetNodeInfo(instance.primary_node).offline
1840
1841 if self.primary_offline and self.op.ignore_offline_nodes:
1842 self.LogWarning("Ignoring offline primary node")
1843
1844 if self.op.hvparams or self.op.beparams:
1845 self.LogWarning("Overridden parameters are ignored")
1846 else:
1847 _CheckNodeOnline(self, instance.primary_node)
1848
1849 bep = self.cfg.GetClusterInfo().FillBE(instance)
1850 bep.update(self.op.beparams)
1851
1852 # check bridges existence
1853 _CheckInstanceBridgesExist(self, instance)
1854
1855 remote_info = self.rpc.call_instance_info(instance.primary_node,
1856 instance.name,
1857 instance.hypervisor)
1858 remote_info.Raise("Error checking node %s" % instance.primary_node,
1859 prereq=True, ecode=errors.ECODE_ENVIRON)
1860 if not remote_info.payload: # not running already
1861 _CheckNodeFreeMemory(self, instance.primary_node,
1862 "starting instance %s" % instance.name,
1863 bep[constants.BE_MINMEM], instance.hypervisor)
1864
1865 def Exec(self, feedback_fn):
1866 """Start the instance.
1867
1868 """
1869 instance = self.instance
1870 force = self.op.force
1871 reason = self.op.reason
1872
1873 if not self.op.no_remember:
1874 self.cfg.MarkInstanceUp(instance.name)
1875
1876 if self.primary_offline:
1877 assert self.op.ignore_offline_nodes
1878 self.LogInfo("Primary node offline, marked instance as started")
1879 else:
1880 node_current = instance.primary_node
1881
1882 _StartInstanceDisks(self, instance, force)
1883
1884 result = \
1885 self.rpc.call_instance_start(node_current,
1886 (instance, self.op.hvparams,
1887 self.op.beparams),
1888 self.op.startup_paused, reason)
1889 msg = result.fail_msg
1890 if msg:
1891 _ShutdownInstanceDisks(self, instance)
1892 raise errors.OpExecError("Could not start instance: %s" % msg)
1893
1894
1895 class LUInstanceReboot(LogicalUnit):
1896 """Reboot an instance.
1897
1898 """
1899 HPATH = "instance-reboot"
1900 HTYPE = constants.HTYPE_INSTANCE
1901 REQ_BGL = False
1902
1903 def ExpandNames(self):
1904 self._ExpandAndLockInstance()
1905
1906 def BuildHooksEnv(self):
1907 """Build hooks env.
1908
1909 This runs on master, primary and secondary nodes of the instance.
1910
1911 """
1912 env = {
1913 "IGNORE_SECONDARIES": self.op.ignore_secondaries,
1914 "REBOOT_TYPE": self.op.reboot_type,
1915 "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
1916 }
1917
1918 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
1919
1920 return env
1921
1922 def BuildHooksNodes(self):
1923 """Build hooks nodes.
1924
1925 """
1926 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
1927 return (nl, nl)
1928
1929 def CheckPrereq(self):
1930 """Check prerequisites.
1931
1932 This checks that the instance is in the cluster.
1933
1934 """
1935 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
1936 assert self.instance is not None, \
1937 "Cannot retrieve locked instance %s" % self.op.instance_name
1938 _CheckInstanceState(self, instance, INSTANCE_ONLINE)
1939 _CheckNodeOnline(self, instance.primary_node)
1940
1941 # check bridges existence
1942 _CheckInstanceBridgesExist(self, instance)
1943
1944 def Exec(self, feedback_fn):
1945 """Reboot the instance.
1946
1947 """
1948 instance = self.instance
1949 ignore_secondaries = self.op.ignore_secondaries
1950 reboot_type = self.op.reboot_type
1951 reason = self.op.reason
1952
1953 remote_info = self.rpc.call_instance_info(instance.primary_node,
1954 instance.name,
1955 instance.hypervisor)
1956 remote_info.Raise("Error checking node %s" % instance.primary_node)
1957 instance_running = bool(remote_info.payload)
1958
1959 node_current = instance.primary_node
1960
1961 if instance_running and reboot_type in [constants.INSTANCE_REBOOT_SOFT,
1962 constants.INSTANCE_REBOOT_HARD]:
1963 for disk in instance.disks:
1964 self.cfg.SetDiskID(disk, node_current)
1965 result = self.rpc.call_instance_reboot(node_current, instance,
1966 reboot_type,
1967 self.op.shutdown_timeout, reason)
1968 result.Raise("Could not reboot instance")
1969 else:
1970 if instance_running:
1971 result = self.rpc.call_instance_shutdown(node_current, instance,
1972 self.op.shutdown_timeout,
1973 reason)
1974 result.Raise("Could not shutdown instance for full reboot")
1975 _ShutdownInstanceDisks(self, instance)
1976 else:
1977 self.LogInfo("Instance %s was already stopped, starting now",
1978 instance.name)
1979 _StartInstanceDisks(self, instance, ignore_secondaries)
1980 result = self.rpc.call_instance_start(node_current,
1981 (instance, None, None), False,
1982 reason)
1983 msg = result.fail_msg
1984 if msg:
1985 _ShutdownInstanceDisks(self, instance)
1986 raise errors.OpExecError("Could not start instance for"
1987 " full reboot: %s" % msg)
1988
1989 self.cfg.MarkInstanceUp(instance.name)
1990
1991
1992 class LUInstanceShutdown(LogicalUnit):
1993 """Shutdown an instance.
1994
1995 """
1996 HPATH = "instance-stop"
1997 HTYPE = constants.HTYPE_INSTANCE
1998 REQ_BGL = False
1999
2000 def ExpandNames(self):
2001 self._ExpandAndLockInstance()
2002
2003 def BuildHooksEnv(self):
2004 """Build hooks env.
2005
2006 This runs on master, primary and secondary nodes of the instance.
2007
2008 """
2009 env = _BuildInstanceHookEnvByObject(self, self.instance)
2010 env["TIMEOUT"] = self.op.timeout
2011 return env
2012
2013 def BuildHooksNodes(self):
2014 """Build hooks nodes.
2015
2016 """
2017 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2018 return (nl, nl)
2019
2020 def CheckPrereq(self):
2021 """Check prerequisites.
2022
2023 This checks that the instance is in the cluster.
2024
2025 """
2026 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2027 assert self.instance is not None, \
2028 "Cannot retrieve locked instance %s" % self.op.instance_name
2029
2030 if not self.op.force:
2031 _CheckInstanceState(self, self.instance, INSTANCE_ONLINE)
2032 else:
2033 self.LogWarning("Ignoring offline instance check")
2034
2035 self.primary_offline = \
2036 self.cfg.GetNodeInfo(self.instance.primary_node).offline
2037
2038 if self.primary_offline and self.op.ignore_offline_nodes:
2039 self.LogWarning("Ignoring offline primary node")
2040 else:
2041 _CheckNodeOnline(self, self.instance.primary_node)
2042
2043 def Exec(self, feedback_fn):
2044 """Shutdown the instance.
2045
2046 """
2047 instance = self.instance
2048 node_current = instance.primary_node
2049 timeout = self.op.timeout
2050 reason = self.op.reason
2051
2052 # If the instance is offline we shouldn't mark it as down, as that
2053 # resets the offline flag.
2054 if not self.op.no_remember and instance.admin_state in INSTANCE_ONLINE:
2055 self.cfg.MarkInstanceDown(instance.name)
2056
2057 if self.primary_offline:
2058 assert self.op.ignore_offline_nodes
2059 self.LogInfo("Primary node offline, marked instance as stopped")
2060 else:
2061 result = self.rpc.call_instance_shutdown(node_current, instance, timeout,
2062 reason)
2063 msg = result.fail_msg
2064 if msg:
2065 self.LogWarning("Could not shutdown instance: %s", msg)
2066
2067 _ShutdownInstanceDisks(self, instance)
2068
2069
2070 class LUInstanceReinstall(LogicalUnit):
2071 """Reinstall an instance.
2072
2073 """
2074 HPATH = "instance-reinstall"
2075 HTYPE = constants.HTYPE_INSTANCE
2076 REQ_BGL = False
2077
2078 def ExpandNames(self):
2079 self._ExpandAndLockInstance()
2080
2081 def BuildHooksEnv(self):
2082 """Build hooks env.
2083
2084 This runs on master, primary and secondary nodes of the instance.
2085
2086 """
2087 return _BuildInstanceHookEnvByObject(self, self.instance)
2088
2089 def BuildHooksNodes(self):
2090 """Build hooks nodes.
2091
2092 """
2093 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2094 return (nl, nl)
2095
2096 def CheckPrereq(self):
2097 """Check prerequisites.
2098
2099 This checks that the instance is in the cluster and is not running.
2100
2101 """
2102 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2103 assert instance is not None, \
2104 "Cannot retrieve locked instance %s" % self.op.instance_name
2105 _CheckNodeOnline(self, instance.primary_node, "Instance primary node"
2106 " offline, cannot reinstall")
2107
2108 if instance.disk_template == constants.DT_DISKLESS:
2109 raise errors.OpPrereqError("Instance '%s' has no disks" %
2110 self.op.instance_name,
2111 errors.ECODE_INVAL)
2112 _CheckInstanceState(self, instance, INSTANCE_DOWN, msg="cannot reinstall")
2113
2114 if self.op.os_type is not None:
2115 # OS verification
2116 pnode = _ExpandNodeName(self.cfg, instance.primary_node)
2117 _CheckNodeHasOS(self, pnode, self.op.os_type, self.op.force_variant)
2118 instance_os = self.op.os_type
2119 else:
2120 instance_os = instance.os
2121
2122 nodelist = list(instance.all_nodes)
2123
2124 if self.op.osparams:
2125 i_osdict = _GetUpdatedParams(instance.osparams, self.op.osparams)
2126 _CheckOSParams(self, True, nodelist, instance_os, i_osdict)
2127 self.os_inst = i_osdict # the new dict (without defaults)
2128 else:
2129 self.os_inst = None
2130
2131 self.instance = instance
2132
2133 def Exec(self, feedback_fn):
2134 """Reinstall the instance.
2135
2136 """
2137 inst = self.instance
2138
2139 if self.op.os_type is not None:
2140 feedback_fn("Changing OS to '%s'..." % self.op.os_type)
2141 inst.os = self.op.os_type
2142 # Write to configuration
2143 self.cfg.Update(inst, feedback_fn)
2144
2145 _StartInstanceDisks(self, inst, None)
2146 try:
2147 feedback_fn("Running the instance OS create scripts...")
2148 # FIXME: pass debug option from opcode to backend
2149 result = self.rpc.call_instance_os_add(inst.primary_node,
2150 (inst, self.os_inst), True,
2151 self.op.debug_level)
2152 result.Raise("Could not install OS for instance %s on node %s" %
2153 (inst.name, inst.primary_node))
2154 finally:
2155 _ShutdownInstanceDisks(self, inst)
2156
2157
2158 class LUInstanceRecreateDisks(LogicalUnit):
2159 """Recreate an instance's missing disks.
2160
2161 """
2162 HPATH = "instance-recreate-disks"
2163 HTYPE = constants.HTYPE_INSTANCE
2164 REQ_BGL = False
2165
2166 _MODIFYABLE = compat.UniqueFrozenset([
2167 constants.IDISK_SIZE,
2168 constants.IDISK_MODE,
2169 ])
2170
2171 # New or changed disk parameters may have different semantics
2172 assert constants.IDISK_PARAMS == (_MODIFYABLE | frozenset([
2173 constants.IDISK_ADOPT,
2174
2175 # TODO: Implement support changing VG while recreating
2176 constants.IDISK_VG,
2177 constants.IDISK_METAVG,
2178 constants.IDISK_PROVIDER,
2179 constants.IDISK_NAME,
2180 ]))
2181
2182 def _RunAllocator(self):
2183 """Run the allocator based on input opcode.
2184
2185 """
2186 be_full = self.cfg.GetClusterInfo().FillBE(self.instance)
2187
2188 # FIXME
2189 # The allocator should actually run in "relocate" mode, but current
2190 # allocators don't support relocating all the nodes of an instance at
2191 # the same time. As a workaround we use "allocate" mode, but this is
2192 # suboptimal for two reasons:
2193 # - The instance name passed to the allocator is present in the list of
2194 # existing instances, so there could be a conflict within the
2195 # internal structures of the allocator. This doesn't happen with the
2196 # current allocators, but it's a liability.
2197 # - The allocator counts the resources used by the instance twice: once
2198 # because the instance exists already, and once because it tries to
2199 # allocate a new instance.
2200 # The allocator could choose some of the nodes on which the instance is
2201 # running, but that's not a problem. If the instance nodes are broken,
2202 # they should be already be marked as drained or offline, and hence
2203 # skipped by the allocator. If instance disks have been lost for other
2204 # reasons, then recreating the disks on the same nodes should be fine.
2205 disk_template = self.instance.disk_template
2206 spindle_use = be_full[constants.BE_SPINDLE_USE]
2207 req = iallocator.IAReqInstanceAlloc(name=self.op.instance_name,
2208 disk_template=disk_template,
2209 tags=list(self.instance.GetTags()),
2210 os=self.instance.os,
2211 nics=[{}],
2212 vcpus=be_full[constants.BE_VCPUS],
2213 memory=be_full[constants.BE_MAXMEM],
2214 spindle_use=spindle_use,
2215 disks=[{constants.IDISK_SIZE: d.size,
2216 constants.IDISK_MODE: d.mode}
2217 for d in self.instance.disks],
2218 hypervisor=self.instance.hypervisor,
2219 node_whitelist=None)
2220 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
2221
2222 ial.Run(self.op.iallocator)
2223
2224 assert req.RequiredNodes() == len(self.instance.all_nodes)
2225
2226 if not ial.success:
2227 raise errors.OpPrereqError("Can't compute nodes using iallocator '%s':"
2228 " %s" % (self.op.iallocator, ial.info),
2229 errors.ECODE_NORES)
2230
2231 self.op.nodes = ial.result
2232 self.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
2233 self.op.instance_name, self.op.iallocator,
2234 utils.CommaJoin(ial.result))
2235
2236 def CheckArguments(self):
2237 if self.op.disks and ht.TNonNegativeInt(self.op.disks[0]):
2238 # Normalize and convert deprecated list of disk indices
2239 self.op.disks = [(idx, {}) for idx in sorted(frozenset(self.op.disks))]
2240
2241 duplicates = utils.FindDuplicates(map(compat.fst, self.op.disks))
2242 if duplicates:
2243 raise errors.OpPrereqError("Some disks have been specified more than"
2244 " once: %s" % utils.CommaJoin(duplicates),
2245 errors.ECODE_INVAL)
2246
2247 # We don't want _CheckIAllocatorOrNode selecting the default iallocator
2248 # when neither iallocator nor nodes are specified
2249 if self.op.iallocator or self.op.nodes:
2250 _CheckIAllocatorOrNode(self, "iallocator", "nodes")
2251
2252 for (idx, params) in self.op.disks:
2253 utils.ForceDictType(params, constants.IDISK_PARAMS_TYPES)
2254 unsupported = frozenset(params.keys()) - self._MODIFYABLE
2255 if unsupported:
2256 raise errors.OpPrereqError("Parameters for disk %s try to change"
2257 " unmodifyable parameter(s): %s" %
2258 (idx, utils.CommaJoin(unsupported)),
2259 errors.ECODE_INVAL)
2260
2261 def ExpandNames(self):
2262 self._ExpandAndLockInstance()
2263 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
2264
2265 if self.op.nodes:
2266 self.op.nodes = [_ExpandNodeName(self.cfg, n) for n in self.op.nodes]
2267 self.needed_locks[locking.LEVEL_NODE] = list(self.op.nodes)
2268 else:
2269 self.needed_locks[locking.LEVEL_NODE] = []
2270 if self.op.iallocator:
2271 # iallocator will select a new node in the same group
2272 self.needed_locks[locking.LEVEL_NODEGROUP] = []
2273 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
2274
2275 self.needed_locks[locking.LEVEL_NODE_RES] = []
2276
2277 def DeclareLocks(self, level):
2278 if level == locking.LEVEL_NODEGROUP:
2279 assert self.op.iallocator is not None
2280 assert not self.op.nodes
2281 assert not self.needed_locks[locking.LEVEL_NODEGROUP]
2282 self.share_locks[locking.LEVEL_NODEGROUP] = 1
2283 # Lock the primary group used by the instance optimistically; this
2284 # requires going via the node before it's locked, requiring
2285 # verification later on
2286 self.needed_locks[locking.LEVEL_NODEGROUP] = \
2287 self.cfg.GetInstanceNodeGroups(self.op.instance_name, primary_only=True)
2288
2289 elif level == locking.LEVEL_NODE:
2290 # If an allocator is used, then we lock all the nodes in the current
2291 # instance group, as we don't know yet which ones will be selected;
2292 # if we replace the nodes without using an allocator, locks are
2293 # already declared in ExpandNames; otherwise, we need to lock all the
2294 # instance nodes for disk re-creation
2295 if self.op.iallocator:
2296 assert not self.op.nodes
2297 assert not self.needed_locks[locking.LEVEL_NODE]
2298 assert len(self.owned_locks(locking.LEVEL_NODEGROUP)) == 1
2299
2300 # Lock member nodes of the group of the primary node
2301 for group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP):
2302 self.needed_locks[locking.LEVEL_NODE].extend(
2303 self.cfg.GetNodeGroup(group_uuid).members)
2304
2305 assert locking.NAL in self.owned_locks(locking.LEVEL_NODE_ALLOC)
2306 elif not self.op.nodes:
2307 self._LockInstancesNodes(primary_only=False)
2308 elif level == locking.LEVEL_NODE_RES:
2309 # Copy node locks
2310 self.needed_locks[locking.LEVEL_NODE_RES] = \
2311 _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
2312
2313 def BuildHooksEnv(self):
2314 """Build hooks env.
2315
2316 This runs on master, primary and secondary nodes of the instance.
2317
2318 """
2319 return _BuildInstanceHookEnvByObject(self, self.instance)
2320
2321 def BuildHooksNodes(self):
2322 """Build hooks nodes.
2323
2324 """
2325 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2326 return (nl, nl)
2327
2328 def CheckPrereq(self):
2329 """Check prerequisites.
2330
2331 This checks that the instance is in the cluster and is not running.
2332
2333 """
2334 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2335 assert instance is not None, \
2336 "Cannot retrieve locked instance %s" % self.op.instance_name
2337 if self.op.nodes:
2338 if len(self.op.nodes) != len(instance.all_nodes):
2339 raise errors.OpPrereqError("Instance %s currently has %d nodes, but"
2340 " %d replacement nodes were specified" %
2341 (instance.name, len(instance.all_nodes),
2342 len(self.op.nodes)),
2343 errors.ECODE_INVAL)
2344 assert instance.disk_template != constants.DT_DRBD8 or \
2345 len(self.op.nodes) == 2
2346 assert instance.disk_template != constants.DT_PLAIN or \
2347 len(self.op.nodes) == 1
2348 primary_node = self.op.nodes[0]
2349 else:
2350 primary_node = instance.primary_node
2351 if not self.op.iallocator:
2352 _CheckNodeOnline(self, primary_node)
2353
2354 if instance.disk_template == constants.DT_DISKLESS:
2355 raise errors.OpPrereqError("Instance '%s' has no disks" %
2356 self.op.instance_name, errors.ECODE_INVAL)
2357
2358 # Verify if node group locks are still correct
2359 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
2360 if owned_groups:
2361 # Node group locks are acquired only for the primary node (and only
2362 # when the allocator is used)
2363 _CheckInstanceNodeGroups(self.cfg, self.op.instance_name, owned_groups,
2364 primary_only=True)
2365
2366 # if we replace nodes *and* the old primary is offline, we don't
2367 # check the instance state
2368 old_pnode = self.cfg.GetNodeInfo(instance.primary_node)
2369 if not ((self.op.iallocator or self.op.nodes) and old_pnode.offline):
2370 _CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
2371 msg="cannot recreate disks")
2372
2373 if self.op.disks:
2374 self.disks = dict(self.op.disks)
2375 else:
2376 self.disks = dict((idx, {}) for idx in range(len(instance.disks)))
2377
2378 maxidx = max(self.disks.keys())
2379 if maxidx >= len(instance.disks):
2380 raise errors.OpPrereqError("Invalid disk index '%s'" % maxidx,
2381 errors.ECODE_INVAL)
2382
2383 if ((self.op.nodes or self.op.iallocator) and
2384 sorted(self.disks.keys()) != range(len(instance.disks))):
2385 raise errors.OpPrereqError("Can't recreate disks partially and"
2386 " change the nodes at the same time",
2387 errors.ECODE_INVAL)
2388
2389 self.instance = instance
2390
2391 if self.op.iallocator:
2392 self._RunAllocator()
2393 # Release unneeded node and node resource locks
2394 _ReleaseLocks(self, locking.LEVEL_NODE, keep=self.op.nodes)
2395 _ReleaseLocks(self, locking.LEVEL_NODE_RES, keep=self.op.nodes)
2396 _ReleaseLocks(self, locking.LEVEL_NODE_ALLOC)
2397
2398 assert not self.glm.is_owned(locking.LEVEL_NODE_ALLOC)
2399
2400 def Exec(self, feedback_fn):
2401 """Recreate the disks.
2402
2403 """
2404 instance = self.instance
2405
2406 assert (self.owned_locks(locking.LEVEL_NODE) ==
2407 self.owned_locks(locking.LEVEL_NODE_RES))
2408
2409 to_skip = []
2410 mods = [] # keeps track of needed changes
2411
2412 for idx, disk in enumerate(instance.disks):
2413 try:
2414 changes = self.disks[idx]
2415 except KeyError:
2416 # Disk should not be recreated
2417 to_skip.append(idx)
2418 continue
2419
2420 # update secondaries for disks, if needed
2421 if self.op.nodes and disk.dev_type == constants.LD_DRBD8:
2422 # need to update the nodes and minors
2423 assert len(self.op.nodes) == 2
2424 assert len(disk.logical_id) == 6 # otherwise disk internals
2425 # have changed
2426 (_, _, old_port, _, _, old_secret) = disk.logical_id
2427 new_minors = self.cfg.AllocateDRBDMinor(self.op.nodes, instance.name)
2428 new_id = (self.op.nodes[0], self.op.nodes[1], old_port,
2429 new_minors[0], new_minors[1], old_secret)
2430 assert len(disk.logical_id) == len(new_id)
2431 else:
2432 new_id = None
2433
2434 mods.append((idx, new_id, changes))
2435
2436 # now that we have passed all asserts above, we can apply the mods
2437 # in a single run (to avoid partial changes)
2438 for idx, new_id, changes in mods:
2439 disk = instance.disks[idx]
2440 if new_id is not None:
2441 assert disk.dev_type == constants.LD_DRBD8
2442 disk.logical_id = new_id
2443 if changes:
2444 disk.Update(size=changes.get(constants.IDISK_SIZE, None),
2445 mode=changes.get(constants.IDISK_MODE, None))
2446
2447 # change primary node, if needed
2448 if self.op.nodes:
2449 instance.primary_node = self.op.nodes[0]
2450 self.LogWarning("Changing the instance's nodes, you will have to"
2451 " remove any disks left on the older nodes manually")
2452
2453 if self.op.nodes:
2454 self.cfg.Update(instance, feedback_fn)
2455
2456 # All touched nodes must be locked
2457 mylocks = self.owned_locks(locking.LEVEL_NODE)
2458 assert mylocks.issuperset(frozenset(instance.all_nodes))
2459 _CreateDisks(self, instance, to_skip=to_skip)
2460
2461
2462 class LUInstanceRename(LogicalUnit):
2463 """Rename an instance.
2464
2465 """
2466 HPATH = "instance-rename"
2467 HTYPE = constants.HTYPE_INSTANCE
2468
2469 def CheckArguments(self):
2470 """Check arguments.
2471
2472 """
2473 if self.op.ip_check and not self.op.name_check:
2474 # TODO: make the ip check more flexible and not depend on the name check
2475 raise errors.OpPrereqError("IP address check requires a name check",
2476 errors.ECODE_INVAL)
2477
2478 def BuildHooksEnv(self):
2479 """Build hooks env.
2480
2481 This runs on master, primary and secondary nodes of the instance.
2482
2483 """
2484 env = _BuildInstanceHookEnvByObject(self, self.instance)
2485 env["INSTANCE_NEW_NAME"] = self.op.new_name
2486 return env
2487
2488 def BuildHooksNodes(self):
2489 """Build hooks nodes.
2490
2491 """
2492 nl = [self.cfg.GetMasterNode()] + list(self.instance.all_nodes)
2493 return (nl, nl)
2494
2495 def CheckPrereq(self):
2496 """Check prerequisites.
2497
2498 This checks that the instance is in the cluster and is not running.
2499
2500 """
2501 self.op.instance_name = _ExpandInstanceName(self.cfg,
2502 self.op.instance_name)
2503 instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2504 assert instance is not None
2505 _CheckNodeOnline(self, instance.primary_node)
2506 _CheckInstanceState(self, instance, INSTANCE_NOT_RUNNING,
2507 msg="cannot rename")
2508 self.instance = instance
2509
2510 new_name = self.op.new_name
2511 if self.op.name_check:
2512 hostname = _CheckHostnameSane(self, new_name)
2513 new_name = self.op.new_name = hostname.name
2514 if (self.op.ip_check and
2515 netutils.TcpPing(hostname.ip, constants.DEFAULT_NODED_PORT)):
2516 raise errors.OpPrereqError("IP %s of instance %s already in use" %
2517 (hostname.ip, new_name),
2518 errors.ECODE_NOTUNIQUE)
2519
2520 instance_list = self.cfg.GetInstanceList()
2521 if new_name in instance_list and new_name != instance.name:
2522 raise errors.OpPrereqError("Instance '%s' is already in the cluster" %
2523 new_name, errors.ECODE_EXISTS)
2524
2525 def Exec(self, feedback_fn):
2526 """Rename the instance.
2527
2528 """
2529 inst = self.instance
2530 old_name = inst.name
2531
2532 rename_file_storage = False
2533 if (inst.disk_template in constants.DTS_FILEBASED and
2534 self.op.new_name != inst.name):
2535 old_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2536 rename_file_storage = True
2537
2538 self.cfg.RenameInstance(inst.name, self.op.new_name)
2539 # Change the instance lock. This is definitely safe while we hold the BGL.
2540 # Otherwise the new lock would have to be added in acquired mode.
2541 assert self.REQ_BGL
2542 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER)
2543 self.glm.remove(locking.LEVEL_INSTANCE, old_name)
2544 self.glm.add(locking.LEVEL_INSTANCE, self.op.new_name)
2545
2546 # re-read the instance from the configuration after rename
2547 inst = self.cfg.GetInstanceInfo(self.op.new_name)
2548
2549 if rename_file_storage:
2550 new_file_storage_dir = os.path.dirname(inst.disks[0].logical_id[1])
2551 result = self.rpc.call_file_storage_dir_rename(inst.primary_node,
2552 old_file_storage_dir,
2553 new_file_storage_dir)
2554 result.Raise("Could not rename on node %s directory '%s' to '%s'"
2555 " (but the instance has been renamed in Ganeti)" %
2556 (inst.primary_node, old_file_storage_dir,
2557 new_file_storage_dir))
2558
2559 _StartInstanceDisks(self, inst, None)
2560 # update info on disks
2561 info = _GetInstanceInfoText(inst)
2562 for (idx, disk) in enumerate(inst.disks):
2563 for node in inst.all_nodes:
2564 self.cfg.SetDiskID(disk, node)
2565 result = self.rpc.call_blockdev_setinfo(node, disk, info)
2566 if result.fail_msg:
2567 self.LogWarning("Error setting info on node %s for disk %s: %s",
2568 node, idx, result.fail_msg)
2569 try:
2570 result = self.rpc.call_instance_run_rename(inst.primary_node, inst,
2571 old_name, self.op.debug_level)
2572 msg = result.fail_msg
2573 if msg:
2574 msg = ("Could not run OS rename script for instance %s on node %s"
2575 " (but the instance has been renamed in Ganeti): %s" %
2576 (inst.name, inst.primary_node, msg))
2577 self.LogWarning(msg)
2578 finally:
2579 _ShutdownInstanceDisks(self, inst)
2580
2581 return inst.name
2582
2583
2584 class LUInstanceRemove(LogicalUnit):
2585 """Remove an instance.
2586
2587 """
2588 HPATH = "instance-remove"
2589 HTYPE = constants.HTYPE_INSTANCE
2590 REQ_BGL = False
2591
2592 def ExpandNames(self):
2593 self._ExpandAndLockInstance()
2594 self.needed_locks[locking.LEVEL_NODE] = []
2595 self.needed_locks[locking.LEVEL_NODE_RES] = []
2596 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2597
2598 def DeclareLocks(self, level):
2599 if level == locking.LEVEL_NODE:
2600 self._LockInstancesNodes()
2601 elif level == locking.LEVEL_NODE_RES:
2602 # Copy node locks
2603 self.needed_locks[locking.LEVEL_NODE_RES] = \
2604 _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
2605
2606 def BuildHooksEnv(self):
2607 """Build hooks env.
2608
2609 This runs on master, primary and secondary nodes of the instance.
2610
2611 """
2612 env = _BuildInstanceHookEnvByObject(self, self.instance)
2613 env["SHUTDOWN_TIMEOUT"] = self.op.shutdown_timeout
2614 return env
2615
2616 def BuildHooksNodes(self):
2617 """Build hooks nodes.
2618
2619 """
2620 nl = [self.cfg.GetMasterNode()]
2621 nl_post = list(self.instance.all_nodes) + nl
2622 return (nl, nl_post)
2623
2624 def CheckPrereq(self):
2625 """Check prerequisites.
2626
2627 This checks that the instance is in the cluster.
2628
2629 """
2630 self.instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2631 assert self.instance is not None, \
2632 "Cannot retrieve locked instance %s" % self.op.instance_name
2633
2634 def Exec(self, feedback_fn):
2635 """Remove the instance.
2636
2637 """
2638 instance = self.instance
2639 logging.info("Shutting down instance %s on node %s",
2640 instance.name, instance.primary_node)
2641
2642 result = self.rpc.call_instance_shutdown(instance.primary_node, instance,
2643 self.op.shutdown_timeout,
2644 self.op.reason)
2645 msg = result.fail_msg
2646 if msg:
2647 if self.op.ignore_failures:
2648 feedback_fn("Warning: can't shutdown instance: %s" % msg)
2649 else:
2650 raise errors.OpExecError("Could not shutdown instance %s on"
2651 " node %s: %s" %
2652 (instance.name, instance.primary_node, msg))
2653
2654 assert (self.owned_locks(locking.LEVEL_NODE) ==
2655 self.owned_locks(locking.LEVEL_NODE_RES))
2656 assert not (set(instance.all_nodes) -
2657 self.owned_locks(locking.LEVEL_NODE)), \
2658 "Not owning correct locks"
2659
2660 _RemoveInstance(self, feedback_fn, instance, self.op.ignore_failures)
2661
2662
2663 def _RemoveInstance(lu, feedback_fn, instance, ignore_failures):
2664 """Utility function to remove an instance.
2665
2666 """
2667 logging.info("Removing block devices for instance %s", instance.name)
2668
2669 if not _RemoveDisks(lu, instance, ignore_failures=ignore_failures):
2670 if not ignore_failures:
2671 raise errors.OpExecError("Can't remove instance's disks")
2672 feedback_fn("Warning: can't remove instance's disks")
2673
2674 logging.info("Removing instance %s out of cluster config", instance.name)
2675
2676 lu.cfg.RemoveInstance(instance.name)
2677
2678 assert not lu.remove_locks.get(locking.LEVEL_INSTANCE), \
2679 "Instance lock removal conflict"
2680
2681 # Remove lock for the instance
2682 lu.remove_locks[locking.LEVEL_INSTANCE] = instance.name
2683
2684
2685 class LUInstanceQuery(NoHooksLU):
2686 """Logical unit for querying instances.
2687
2688 """
2689 # pylint: disable=W0142
2690 REQ_BGL = False
2691
2692 def CheckArguments(self):
2693 self.iq = _InstanceQuery(qlang.MakeSimpleFilter("name", self.op.names),
2694 self.op.output_fields, self.op.use_locking)
2695
2696 def ExpandNames(self):
2697 self.iq.ExpandNames(self)
2698
2699 def DeclareLocks(self, level):
2700 self.iq.DeclareLocks(self, level)
2701
2702 def Exec(self, feedback_fn):
2703 return self.iq.OldStyleQuery(self)
2704
2705
2706 def _ExpandNamesForMigration(lu):
2707 """Expands names for use with L{TLMigrateInstance}.
2708
2709 @type lu: L{LogicalUnit}
2710
2711 """
2712 if lu.op.target_node is not None:
2713 lu.op.target_node = _ExpandNodeName(lu.cfg, lu.op.target_node)
2714
2715 lu.needed_locks[locking.LEVEL_NODE] = []
2716 lu.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_REPLACE
2717
2718 lu.needed_locks[locking.LEVEL_NODE_RES] = []
2719 lu.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
2720
2721 # The node allocation lock is actually only needed for externally replicated
2722 # instances (e.g. sharedfile or RBD) and if an iallocator is used.
2723 lu.needed_locks[locking.LEVEL_NODE_ALLOC] = []
2724
2725
2726 def _DeclareLocksForMigration(lu, level):
2727 """Declares locks for L{TLMigrateInstance}.
2728
2729 @type lu: L{LogicalUnit}
2730 @param level: Lock level
2731
2732 """
2733 if level == locking.LEVEL_NODE_ALLOC:
2734 assert lu.op.instance_name in lu.owned_locks(locking.LEVEL_INSTANCE)
2735
2736 instance = lu.cfg.GetInstanceInfo(lu.op.instance_name)
2737
2738 # Node locks are already declared here rather than at LEVEL_NODE as we need
2739 # the instance object anyway to declare the node allocation lock.
2740 if instance.disk_template in constants.DTS_EXT_MIRROR:
2741 if lu.op.target_node is None:
2742 lu.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
2743 lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
2744 else:
2745 lu.needed_locks[locking.LEVEL_NODE] = [instance.primary_node,
2746 lu.op.target_node]
2747 del lu.recalculate_locks[locking.LEVEL_NODE]
2748 else:
2749 lu._LockInstancesNodes() # pylint: disable=W0212
2750
2751 elif level == locking.LEVEL_NODE:
2752 # Node locks are declared together with the node allocation lock
2753 assert (lu.needed_locks[locking.LEVEL_NODE] or
2754 lu.needed_locks[locking.LEVEL_NODE] is locking.ALL_SET)
2755
2756 elif level == locking.LEVEL_NODE_RES:
2757 # Copy node locks
2758 lu.needed_locks[locking.LEVEL_NODE_RES] = \
2759 _CopyLockList(lu.needed_locks[locking.LEVEL_NODE])
2760
2761
2762 class LUInstanceFailover(LogicalUnit):
2763 """Failover an instance.
2764
2765 """
2766 HPATH = "instance-failover"
2767 HTYPE = constants.HTYPE_INSTANCE
2768 REQ_BGL = False
2769
2770 def CheckArguments(self):
2771 """Check the arguments.
2772
2773 """
2774 self.iallocator = getattr(self.op, "iallocator", None)
2775 self.target_node = getattr(self.op, "target_node", None)
2776
2777 def ExpandNames(self):
2778 self._ExpandAndLockInstance()
2779 _ExpandNamesForMigration(self)
2780
2781 self._migrater = \
2782 TLMigrateInstance(self, self.op.instance_name, False, True, False,
2783 self.op.ignore_consistency, True,
2784 self.op.shutdown_timeout, self.op.ignore_ipolicy)
2785
2786 self.tasklets = [self._migrater]
2787
2788 def DeclareLocks(self, level):
2789 _DeclareLocksForMigration(self, level)
2790
2791 def BuildHooksEnv(self):
2792 """Build hooks env.
2793
2794 This runs on master, primary and secondary nodes of the instance.
2795
2796 """
2797 instance = self._migrater.instance
2798 source_node = instance.primary_node
2799 target_node = self.op.target_node
2800 env = {
2801 "IGNORE_CONSISTENCY": self.op.ignore_consistency,
2802 "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
2803 "OLD_PRIMARY": source_node,
2804 "NEW_PRIMARY": target_node,
2805 }
2806
2807 if instance.disk_template in constants.DTS_INT_MIRROR:
2808 env["OLD_SECONDARY"] = instance.secondary_nodes[0]
2809 env["NEW_SECONDARY"] = source_node
2810 else:
2811 env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = ""
2812
2813 env.update(_BuildInstanceHookEnvByObject(self, instance))
2814
2815 return env
2816
2817 def BuildHooksNodes(self):
2818 """Build hooks nodes.
2819
2820 """
2821 instance = self._migrater.instance
2822 nl = [self.cfg.GetMasterNode()] + list(instance.secondary_nodes)
2823 return (nl, nl + [instance.primary_node])
2824
2825
2826 class LUInstanceMigrate(LogicalUnit):
2827 """Migrate an instance.
2828
2829 This is migration without shutting down, compared to the failover,
2830 which is done with shutdown.
2831
2832 """
2833 HPATH = "instance-migrate"
2834 HTYPE = constants.HTYPE_INSTANCE
2835 REQ_BGL = False
2836
2837 def ExpandNames(self):
2838 self._ExpandAndLockInstance()
2839 _ExpandNamesForMigration(self)
2840
2841 self._migrater = \
2842 TLMigrateInstance(self, self.op.instance_name, self.op.cleanup,
2843 False, self.op.allow_failover, False,
2844 self.op.allow_runtime_changes,
2845 constants.DEFAULT_SHUTDOWN_TIMEOUT,
2846 self.op.ignore_ipolicy)
2847
2848 self.tasklets = [self._migrater]
2849
2850 def DeclareLocks(self, level):
2851 _DeclareLocksForMigration(self, level)
2852
2853 def BuildHooksEnv(self):
2854 """Build hooks env.
2855
2856 This runs on master, primary and secondary nodes of the instance.
2857
2858 """
2859 instance = self._migrater.instance
2860 source_node = instance.primary_node
2861 target_node = self.op.target_node
2862 env = _BuildInstanceHookEnvByObject(self, instance)
2863 env.update({
2864 "MIGRATE_LIVE": self._migrater.live,
2865 "MIGRATE_CLEANUP": self.op.cleanup,
2866 "OLD_PRIMARY": source_node,
2867 "NEW_PRIMARY": target_node,
2868 "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
2869 })
2870
2871 if instance.disk_template in constants.DTS_INT_MIRROR:
2872 env["OLD_SECONDARY"] = target_node
2873 env["NEW_SECONDARY"] = source_node
2874 else:
2875 env["OLD_SECONDARY"] = env["NEW_SECONDARY"] = None
2876
2877 return env
2878
2879 def BuildHooksNodes(self):
2880 """Build hooks nodes.
2881
2882 """
2883 instance = self._migrater.instance
2884 snodes = list(instance.secondary_nodes)
2885 nl = [self.cfg.GetMasterNode(), instance.primary_node] + snodes
2886 return (nl, nl)
2887
2888
2889 class LUInstanceMove(LogicalUnit):
2890 """Move an instance by data-copying.
2891
2892 """
2893 HPATH = "instance-move"
2894 HTYPE = constants.HTYPE_INSTANCE
2895 REQ_BGL = False
2896
2897 def ExpandNames(self):
2898 self._ExpandAndLockInstance()
2899 target_node = _ExpandNodeName(self.cfg, self.op.target_node)
2900 self.op.target_node = target_node
2901 self.needed_locks[locking.LEVEL_NODE] = [target_node]
2902 self.needed_locks[locking.LEVEL_NODE_RES] = []
2903 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
2904
2905 def DeclareLocks(self, level):
2906 if level == locking.LEVEL_NODE:
2907 self._LockInstancesNodes(primary_only=True)
2908 elif level == locking.LEVEL_NODE_RES:
2909 # Copy node locks
2910 self.needed_locks[locking.LEVEL_NODE_RES] = \
2911 _CopyLockList(self.needed_locks[locking.LEVEL_NODE])
2912
2913 def BuildHooksEnv(self):
2914 """Build hooks env.
2915
2916 This runs on master, primary and secondary nodes of the instance.
2917
2918 """
2919 env = {
2920 "TARGET_NODE": self.op.target_node,
2921 "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
2922 }
2923 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
2924 return env
2925
2926 def BuildHooksNodes(self):
2927 """Build hooks nodes.
2928
2929 """
2930 nl = [
2931 self.cfg.GetMasterNode(),
2932 self.instance.primary_node,
2933 self.op.target_node,
2934 ]
2935 return (nl, nl)
2936
2937 def CheckPrereq(self):
2938 """Check prerequisites.
2939
2940 This checks that the instance is in the cluster.
2941
2942 """
2943 self.instance = instance = self.cfg.GetInstanceInfo(self.op.instance_name)
2944 assert self.instance is not None, \
2945 "Cannot retrieve locked instance %s" % self.op.instance_name
2946
2947 if instance.disk_template not in constants.DTS_COPYABLE:
2948 raise errors.OpPrereqError("Disk template %s not suitable for copying" %
2949 instance.disk_template, errors.ECODE_STATE)
2950
2951 node = self.cfg.GetNodeInfo(self.op.target_node)
2952 assert node is not None, \
2953 "Cannot retrieve locked node %s" % self.op.target_node
2954
2955 self.target_node = target_node = node.name
2956
2957 if target_node == instance.primary_node:
2958 raise errors.OpPrereqError("Instance %s is already on the node %s" %
2959 (instance.name, target_node),
2960 errors.ECODE_STATE)
2961
2962 bep = self.cfg.GetClusterInfo().FillBE(instance)
2963
2964 for idx, dsk in enumerate(instance.disks):
2965 if dsk.dev_type not in (constants.LD_LV, constants.LD_FILE):
2966 raise errors.OpPrereqError("Instance disk %d has a complex layout,"
2967 " cannot copy" % idx, errors.ECODE_STATE)
2968
2969 _CheckNodeOnline(self, target_node)
2970 _CheckNodeNotDrained(self, target_node)
2971 _CheckNodeVmCapable(self, target_node)
2972 cluster = self.cfg.GetClusterInfo()
2973 group_info = self.cfg.GetNodeGroup(node.group)
2974 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster, group_info)
2975 _CheckTargetNodeIPolicy(self, ipolicy, instance, node, self.cfg,
2976 ignore=self.op.ignore_ipolicy)
2977
2978 if instance.admin_state == constants.ADMINST_UP:
2979 # check memory requirements on the secondary node
2980 _CheckNodeFreeMemory(self, target_node, "failing over instance %s" %
2981 instance.name, bep[constants.BE_MAXMEM],
2982 instance.hypervisor)
2983 else:
2984 self.LogInfo("Not checking memory on the secondary node as"
2985 " instance will not be started")
2986
2987 # check bridge existance
2988 _CheckInstanceBridgesExist(self, instance, node=target_node)
2989
2990 def Exec(self, feedback_fn):
2991 """Move an instance.
2992
2993 The move is done by shutting it down on its present node, copying
2994 the data over (slow) and starting it on the new node.
2995
2996 """
2997 instance = self.instance
2998
2999 source_node = instance.primary_node
3000 target_node = self.target_node
3001
3002 self.LogInfo("Shutting down instance %s on source node %s",
3003 instance.name, source_node)
3004
3005 assert (self.owned_locks(locking.LEVEL_NODE) ==
3006 self.owned_locks(locking.LEVEL_NODE_RES))
3007
3008 result = self.rpc.call_instance_shutdown(source_node, instance,
3009 self.op.shutdown_timeout,
3010 self.op.reason)
3011 msg = result.fail_msg
3012 if msg:
3013 if self.op.ignore_consistency:
3014 self.LogWarning("Could not shutdown instance %s on node %s."
3015 " Proceeding anyway. Please make sure node"
3016 " %s is down. Error details: %s",
3017 instance.name, source_node, source_node, msg)
3018 else:
3019 raise errors.OpExecError("Could not shutdown instance %s on"
3020 " node %s: %s" %
3021 (instance.name, source_node, msg))
3022
3023 # create the target disks
3024 try:
3025 _CreateDisks(self, instance, target_node=target_node)
3026 except errors.OpExecError:
3027 self.LogWarning("Device creation failed")
3028 self.cfg.ReleaseDRBDMinors(instance.name)
3029 raise
3030
3031 cluster_name = self.cfg.GetClusterInfo().cluster_name
3032
3033 errs = []
3034 # activate, get path, copy the data over
3035 for idx, disk in enumerate(instance.disks):
3036 self.LogInfo("Copying data for disk %d", idx)
3037 result = self.rpc.call_blockdev_assemble(target_node, (disk, instance),
3038 instance.name, True, idx)
3039 if result.fail_msg:
3040 self.LogWarning("Can't assemble newly created disk %d: %s",
3041 idx, result.fail_msg)
3042 errs.append(result.fail_msg)
3043 break
3044 dev_path = result.payload
3045 result = self.rpc.call_blockdev_export(source_node, (disk, instance),
3046 target_node, dev_path,
3047 cluster_name)
3048 if result.fail_msg:
3049 self.LogWarning("Can't copy data over for disk %d: %s",
3050 idx, result.fail_msg)
3051 errs.append(result.fail_msg)
3052 break
3053
3054 if errs:
3055 self.LogWarning("Some disks failed to copy, aborting")
3056 try:
3057 _RemoveDisks(self, instance, target_node=target_node)
3058 finally:
3059 self.cfg.ReleaseDRBDMinors(instance.name)
3060 raise errors.OpExecError("Errors during disk copy: %s" %
3061 (",".join(errs),))
3062
3063 instance.primary_node = target_node
3064 self.cfg.Update(instance, feedback_fn)
3065
3066 self.LogInfo("Removing the disks on the original node")
3067 _RemoveDisks(self, instance, target_node=source_node)
3068
3069 # Only start the instance if it's marked as up
3070 if instance.admin_state == constants.ADMINST_UP:
3071 self.LogInfo("Starting instance %s on node %s",
3072 instance.name, target_node)
3073
3074 disks_ok, _ = _AssembleInstanceDisks(self, instance,
3075 ignore_secondaries=True)
3076 if not disks_ok:
3077 _ShutdownInstanceDisks(self, instance)
3078 raise errors.OpExecError("Can't activate the instance's disks")
3079
3080 result = self.rpc.call_instance_start(target_node,
3081 (instance, None, None), False,
3082 self.op.reason)
3083 msg = result.fail_msg
3084 if msg:
3085 _ShutdownInstanceDisks(self, instance)
3086 raise errors.OpExecError("Could not start instance %s on node %s: %s" %
3087 (instance.name, target_node, msg))
3088
3089
3090 class TLMigrateInstance(Tasklet):
3091 """Tasklet class for instance migration.
3092
3093 @type live: boolean
3094 @ivar live: whether the migration will be done live or non-live;
3095 this variable is initalized only after CheckPrereq has run
3096 @type cleanup: boolean
3097 @ivar cleanup: Wheater we cleanup from a failed migration
3098 @type iallocator: string
3099 @ivar iallocator: The iallocator used to determine target_node
3100 @type target_node: string
3101 @ivar target_node: If given, the target_node to reallocate the instance to
3102 @type failover: boolean
3103 @ivar failover: Whether operation results in failover or migration
3104 @type fallback: boolean
3105 @ivar fallback: Whether fallback to failover is allowed if migration not
3106 possible
3107 @type ignore_consistency: boolean
3108 @ivar ignore_consistency: Wheter we should ignore consistency between source
3109 and target node
3110 @type shutdown_timeout: int
3111 @ivar shutdown_timeout: In case of failover timeout of the shutdown
3112 @type ignore_ipolicy: bool
3113 @ivar ignore_ipolicy: If true, we can ignore instance policy when migrating
3114
3115 """
3116
3117 # Constants
3118 _MIGRATION_POLL_INTERVAL = 1 # seconds
3119 _MIGRATION_FEEDBACK_INTERVAL = 10 # seconds
3120
3121 def __init__(self, lu, instance_name, cleanup, failover, fallback,
3122 ignore_consistency, allow_runtime_changes, shutdown_timeout,
3123 ignore_ipolicy):
3124 """Initializes this class.
3125
3126 """
3127 Tasklet.__init__(self, lu)
3128
3129 # Parameters
3130 self.instance_name = instance_name
3131 self.cleanup = cleanup
3132 self.live = False # will be overridden later
3133 self.failover = failover
3134 self.fallback = fallback
3135 self.ignore_consistency = ignore_consistency
3136 self.shutdown_timeout = shutdown_timeout
3137 self.ignore_ipolicy = ignore_ipolicy
3138 self.allow_runtime_changes = allow_runtime_changes
3139
3140 def CheckPrereq(self):
3141 """Check prerequisites.
3142
3143 This checks that the instance is in the cluster.
3144
3145 """
3146 instance_name = _ExpandInstanceName(self.lu.cfg, self.instance_name)
3147 instance = self.cfg.GetInstanceInfo(instance_name)
3148 assert instance is not None
3149 self.instance = instance
3150 cluster = self.cfg.GetClusterInfo()
3151
3152 if (not self.cleanup and
3153 not instance.admin_state == constants.ADMINST_UP and
3154 not self.failover and self.fallback):
3155 self.lu.LogInfo("Instance is marked down or offline, fallback allowed,"
3156 " switching to failover")
3157 self.failover = True
3158
3159 if instance.disk_template not in constants.DTS_MIRRORED:
3160 if self.failover:
3161 text = "failovers"
3162 else:
3163 text = "migrations"
3164 raise errors.OpPrereqError("Instance's disk layout '%s' does not allow"
3165 " %s" % (instance.disk_template, text),
3166 errors.ECODE_STATE)
3167
3168 if instance.disk_template in constants.DTS_EXT_MIRROR:
3169 _CheckIAllocatorOrNode(self.lu, "iallocator", "target_node")
3170
3171 if self.lu.op.iallocator:
3172 assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
3173 self._RunAllocator()
3174 else:
3175 # We set set self.target_node as it is required by
3176 # BuildHooksEnv
3177 self.target_node = self.lu.op.target_node
3178
3179 # Check that the target node is correct in terms of instance policy
3180 nodeinfo = self.cfg.GetNodeInfo(self.target_node)
3181 group_info = self.cfg.GetNodeGroup(nodeinfo.group)
3182 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
3183 group_info)
3184 _CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, self.cfg,
3185 ignore=self.ignore_ipolicy)
3186
3187 # self.target_node is already populated, either directly or by the
3188 # iallocator run
3189 target_node = self.target_node
3190 if self.target_node == instance.primary_node:
3191 raise errors.OpPrereqError("Cannot migrate instance %s"
3192 " to its primary (%s)" %
3193 (instance.name, instance.primary_node),
3194 errors.ECODE_STATE)
3195
3196 if len(self.lu.tasklets) == 1:
3197 # It is safe to release locks only when we're the only tasklet
3198 # in the LU
3199 _ReleaseLocks(self.lu, locking.LEVEL_NODE,
3200 keep=[instance.primary_node, self.target_node])
3201 _ReleaseLocks(self.lu, locking.LEVEL_NODE_ALLOC)
3202
3203 else:
3204 assert not self.lu.glm.is_owned(locking.LEVEL_NODE_ALLOC)
3205
3206 secondary_nodes = instance.secondary_nodes
3207 if not secondary_nodes:
3208 raise errors.ConfigurationError("No secondary node but using"
3209 " %s disk template" %
3210 instance.disk_template)
3211 target_node = secondary_nodes[0]
3212 if self.lu.op.iallocator or (self.lu.op.target_node and
3213 self.lu.op.target_node != target_node):
3214 if self.failover:
3215 text = "failed over"
3216 else:
3217 text = "migrated"
3218 raise errors.OpPrereqError("Instances with disk template %s cannot"
3219 " be %s to arbitrary nodes"
3220 " (neither an iallocator nor a target"
3221 " node can be passed)" %
3222 (instance.disk_template, text),
3223 errors.ECODE_INVAL)
3224 nodeinfo = self.cfg.GetNodeInfo(target_node)
3225 group_info = self.cfg.GetNodeGroup(nodeinfo.group)
3226 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
3227 group_info)
3228 _CheckTargetNodeIPolicy(self.lu, ipolicy, instance, nodeinfo, self.cfg,
3229 ignore=self.ignore_ipolicy)
3230
3231 i_be = cluster.FillBE(instance)
3232
3233 # check memory requirements on the secondary node
3234 if (not self.cleanup and
3235 (not self.failover or instance.admin_state == constants.ADMINST_UP)):
3236 self.tgt_free_mem = _CheckNodeFreeMemory(self.lu, target_node,
3237 "migrating instance %s" %
3238 instance.name,
3239 i_be[constants.BE_MINMEM],
3240 instance.hypervisor)
3241 else:
3242 self.lu.LogInfo("Not checking memory on the secondary node as"
3243 " instance will not be started")
3244
3245 # check if failover must be forced instead of migration
3246 if (not self.cleanup and not self.failover and
3247 i_be[constants.BE_ALWAYS_FAILOVER]):
3248 self.lu.LogInfo("Instance configured to always failover; fallback"
3249 " to failover")
3250 self.failover = True
3251
3252 # check bridge existance
3253 _CheckInstanceBridgesExist(self.lu, instance, node=target_node)
3254
3255 if not self.cleanup:
3256 _CheckNodeNotDrained(self.lu, target_node)
3257 if not self.failover:
3258 result = self.rpc.call_instance_migratable(instance.primary_node,
3259 instance)
3260 if result.fail_msg and self.fallback:
3261 self.lu.LogInfo("Can't migrate, instance offline, fallback to"
3262 " failover")
3263 self.failover = True
3264 else:
3265 result.Raise("Can't migrate, please use failover",
3266 prereq=True, ecode=errors.ECODE_STATE)
3267
3268 assert not (self.failover and self.cleanup)
3269
3270 if not self.failover:
3271 if self.lu.op.live is not None and self.lu.op.mode is not None:
3272 raise errors.OpPrereqError("Only one of the 'live' and 'mode'"
3273 " parameters are accepted",
3274 errors.ECODE_INVAL)
3275 if self.lu.op.live is not None:
3276 if self.lu.op.live:
3277 self.lu.op.mode = constants.HT_MIGRATION_LIVE
3278 else:
3279 self.lu.op.mode = constants.HT_MIGRATION_NONLIVE
3280 # reset the 'live' parameter to None so that repeated
3281 # invocations of CheckPrereq do not raise an exception
3282 self.lu.op.live = None
3283 elif self.lu.op.mode is None:
3284 # read the default value from the hypervisor
3285 i_hv = cluster.FillHV(self.instance, skip_globals=False)
3286 self.lu.op.mode = i_hv[constants.HV_MIGRATION_MODE]
3287
3288 self.live = self.lu.op.mode == constants.HT_MIGRATION_LIVE
3289 else:
3290 # Failover is never live
3291 self.live = False
3292
3293 if not (self.failover or self.cleanup):
3294 remote_info = self.rpc.call_instance_info(instance.primary_node,
3295 instance.name,
3296 instance.hypervisor)
3297 remote_info.Raise("Error checking instance on node %s" %
3298 instance.primary_node)
3299 instance_running = bool(remote_info.payload)
3300 if instance_running:
3301 self.current_mem = int(remote_info.payload["memory"])
3302
3303 def _RunAllocator(self):
3304 """Run the allocator based on input opcode.
3305
3306 """
3307 assert locking.NAL in self.lu.owned_locks(locking.LEVEL_NODE_ALLOC)
3308
3309 # FIXME: add a self.ignore_ipolicy option
3310 req = iallocator.IAReqRelocate(name=self.instance_name,
3311 relocate_from=[self.instance.primary_node])
3312 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
3313
3314 ial.Run(self.lu.op.iallocator)
3315
3316 if not ial.success:
3317 raise errors.OpPrereqError("Can't compute nodes using"
3318 " iallocator '%s': %s" %
3319 (self.lu.op.iallocator, ial.info),
3320 errors.ECODE_NORES)
3321 self.target_node = ial.result[0]
3322 self.lu.LogInfo("Selected nodes for instance %s via iallocator %s: %s",
3323 self.instance_name, self.lu.op.iallocator,
3324 utils.CommaJoin(ial.result))
3325
3326 def _WaitUntilSync(self):
3327 """Poll with custom rpc for disk sync.
3328
3329 This uses our own step-based rpc call.
3330
3331 """
3332 self.feedback_fn("* wait until resync is done")
3333 all_done = False
3334 while not all_done:
3335 all_done = True
3336 result = self.rpc.call_drbd_wait_sync(self.all_nodes,
3337 self.nodes_ip,
3338 (self.instance.disks,
3339 self.instance))
3340 min_percent = 100
3341 for node, nres in result.items():
3342 nres.Raise("Cannot resync disks on node %s" % node)
3343 node_done, node_percent = nres.payload
3344 all_done = all_done and node_done
3345 if node_percent is not None:
3346 min_percent = min(min_percent, node_percent)
3347 if not all_done:
3348 if min_percent < 100:
3349 self.feedback_fn(" - progress: %.1f%%" % min_percent)
3350 time.sleep(2)
3351
3352 def _EnsureSecondary(self, node):
3353 """Demote a node to secondary.
3354
3355 """
3356 self.feedback_fn("* switching node %s to secondary mode" % node)
3357
3358 for dev in self.instance.disks:
3359 self.cfg.SetDiskID(dev, node)
3360
3361 result = self.rpc.call_blockdev_close(node, self.instance.name,
3362 self.instance.disks)
3363 result.Raise("Cannot change disk to secondary on node %s" % node)
3364
3365 def _GoStandalone(self):
3366 """Disconnect from the network.
3367
3368 """
3369 self.feedback_fn("* changing into standalone mode")
3370 result = self.rpc.call_drbd_disconnect_net(self.all_nodes, self.nodes_ip,
3371 self.instance.disks)
3372 for node, nres in result.items():
3373 nres.Raise("Cannot disconnect disks node %s" % node)
3374
3375 def _GoReconnect(self, multimaster):
3376 """Reconnect to the network.
3377
3378 """
3379 if multimaster:
3380 msg = "dual-master"
3381 else:
3382 msg = "single-master"
3383 self.feedback_fn("* changing disks into %s mode" % msg)
3384 result = self.rpc.call_drbd_attach_net(self.all_nodes, self.nodes_ip,
3385 (self.instance.disks, self.instance),
3386 self.instance.name, multimaster)
3387 for node, nres in result.items():
3388 nres.Raise("Cannot change disks config on node %s" % node)
3389
3390 def _ExecCleanup(self):
3391 """Try to cleanup after a failed migration.
3392
3393 The cleanup is done by:
3394 - check that the instance is running only on one node
3395 (and update the config if needed)
3396 - change disks on its secondary node to secondary
3397 - wait until disks are fully synchronized
3398 - disconnect from the network
3399 - change disks into single-master mode
3400 - wait again until disks are fully synchronized
3401
3402 """
3403 instance = self.instance
3404 target_node = self.target_node
3405 source_node = self.source_node
3406
3407 # check running on only one node
3408 self.feedback_fn("* checking where the instance actually runs"
3409 " (if this hangs, the hypervisor might be in"
3410 " a bad state)")
3411 ins_l = self.rpc.call_instance_list(self.all_nodes, [instance.hypervisor])
3412 for node, result in ins_l.items():
3413 result.Raise("Can't contact node %s" % node)
3414
3415 runningon_source = instance.name in ins_l[source_node].payload
3416 runningon_target = instance.name in ins_l[target_node].payload
3417
3418 if runningon_source and runningon_target:
3419 raise errors.OpExecError("Instance seems to be running on two nodes,"
3420 " or the hypervisor is confused; you will have"
3421 " to ensure manually that it runs only on one"
3422 " and restart this operation")
3423
3424 if not (runningon_source or runningon_target):
3425 raise errors.OpExecError("Instance does not seem to be running at all;"
3426 " in this case it's safer to repair by"
3427 " running 'gnt-instance stop' to ensure disk"
3428 " shutdown, and then restarting it")
3429
3430 if runningon_target:
3431 # the migration has actually succeeded, we need to update the config
3432 self.feedback_fn("* instance running on secondary node (%s),"
3433 " updating config" % target_node)
3434 instance.primary_node = target_node
3435 self.cfg.Update(instance, self.feedback_fn)
3436 demoted_node = source_node
3437 else:
3438 self.feedback_fn("* instance confirmed to be running on its"
3439 " primary node (%s)" % source_node)
3440 demoted_node = target_node
3441
3442 if instance.disk_template in constants.DTS_INT_MIRROR:
3443 self._EnsureSecondary(demoted_node)
3444 try:
3445 self._WaitUntilSync()
3446 except errors.OpExecError:
3447 # we ignore here errors, since if the device is standalone, it
3448 # won't be able to sync
3449 pass
3450 self._GoStandalone()
3451 self._GoReconnect(False)
3452 self._WaitUntilSync()
3453
3454 self.feedback_fn("* done")
3455
3456 def _RevertDiskStatus(self):
3457 """Try to revert the disk status after a failed migration.
3458
3459 """
3460 target_node = self.target_node
3461 if self.instance.disk_template in constants.DTS_EXT_MIRROR:
3462 return
3463
3464 try:
3465 self._EnsureSecondary(target_node)
3466 self._GoStandalone()
3467 self._GoReconnect(False)
3468 self._WaitUntilSync()
3469 except errors.OpExecError, err:
3470 self.lu.LogWarning("Migration failed and I can't reconnect the drives,"
3471 " please try to recover the instance manually;"
3472 " error '%s'" % str(err))
3473
3474 def _AbortMigration(self):
3475 """Call the hypervisor code to abort a started migration.
3476
3477 """
3478 instance = self.instance
3479 target_node = self.target_node
3480 source_node = self.source_node
3481 migration_info = self.migration_info
3482
3483 abort_result = self.rpc.call_instance_finalize_migration_dst(target_node,
3484 instance,
3485 migration_info,
3486 False)
3487 abort_msg = abort_result.fail_msg
3488 if abort_msg:
3489 logging.error("Aborting migration failed on target node %s: %s",
3490 target_node, abort_msg)
3491 # Don't raise an exception here, as we stil have to try to revert the
3492 # disk status, even if this step failed.
3493
3494 abort_result = self.rpc.call_instance_finalize_migration_src(
3495 source_node, instance, False, self.live)
3496 abort_msg = abort_result.fail_msg
3497 if abort_msg:
3498 logging.error("Aborting migration failed on source node %s: %s",
3499 source_node, abort_msg)
3500
3501 def _ExecMigration(self):
3502 """Migrate an instance.
3503
3504 The migrate is done by:
3505 - change the disks into dual-master mode
3506 - wait until disks are fully synchronized again
3507 - migrate the instance
3508 - change disks on the new secondary node (the old primary) to secondary
3509 - wait until disks are fully synchronized
3510 - change disks into single-master mode
3511
3512 """
3513 instance = self.instance
3514 target_node = self.target_node
3515 source_node = self.source_node
3516
3517 # Check for hypervisor version mismatch and warn the user.
3518 nodeinfo = self.rpc.call_node_info([source_node, target_node],
3519 None, [self.instance.hypervisor], False)
3520 for ninfo in nodeinfo.values():
3521 ninfo.Raise("Unable to retrieve node information from node '%s'" %
3522 ninfo.node)
3523 (_, _, (src_info, )) = nodeinfo[source_node].payload
3524 (_, _, (dst_info, )) = nodeinfo[target_node].payload
3525
3526 if ((constants.HV_NODEINFO_KEY_VERSION in src_info) and
3527 (constants.HV_NODEINFO_KEY_VERSION in dst_info)):
3528 src_version = src_info[constants.HV_NODEINFO_KEY_VERSION]
3529 dst_version = dst_info[constants.HV_NODEINFO_KEY_VERSION]
3530 if src_version != dst_version:
3531 self.feedback_fn("* warning: hypervisor version mismatch between"
3532 " source (%s) and target (%s) node" %
3533 (src_version, dst_version))
3534
3535 self.feedback_fn("* checking disk consistency between source and target")
3536 for (idx, dev) in enumerate(instance.disks):
3537 if not _CheckDiskConsistency(self.lu, instance, dev, target_node, False):
3538 raise errors.OpExecError("Disk %s is degraded or not fully"
3539 " synchronized on target node,"
3540 " aborting migration" % idx)
3541
3542 if self.current_mem > self.tgt_free_mem:
3543 if not self.allow_runtime_changes:
3544 raise errors.OpExecError("Memory ballooning not allowed and not enough"
3545 " free memory to fit instance %s on target"
3546 " node %s (have %dMB, need %dMB)" %
3547 (instance.name, target_node,
3548 self.tgt_free_mem, self.current_mem))
3549 self.feedback_fn("* setting instance memory to %s" % self.tgt_free_mem)
3550 rpcres = self.rpc.call_instance_balloon_memory(instance.primary_node,
3551 instance,
3552 self.tgt_free_mem)
3553 rpcres.Raise("Cannot modify instance runtime memory")
3554
3555 # First get the migration information from the remote node
3556 result = self.rpc.call_migration_info(source_node, instance)
3557 msg = result.fail_msg
3558 if msg:
3559 log_err = ("Failed fetching source migration information from %s: %s" %
3560 (source_node, msg))
3561 logging.error(log_err)
3562 raise errors.OpExecError(log_err)
3563
3564 self.migration_info = migration_info = result.payload
3565
3566 if self.instance.disk_template not in constants.DTS_EXT_MIRROR:
3567 # Then switch the disks to master/master mode
3568 self._EnsureSecondary(target_node)
3569 self._GoStandalone()
3570 self._GoReconnect(True)
3571 self._WaitUntilSync()
3572
3573 self.feedback_fn("* preparing %s to accept the instance" % target_node)
3574 result = self.rpc.call_accept_instance(target_node,
3575 instance,
3576 migration_info,
3577 self.nodes_ip[target_node])
3578
3579 msg = result.fail_msg
3580 if msg:
3581 logging.error("Instance pre-migration failed, trying to revert"
3582 " disk status: %s", msg)
3583 self.feedback_fn("Pre-migration failed, aborting")
3584 self._AbortMigration()
3585 self._RevertDiskStatus()
3586 raise errors.OpExecError("Could not pre-migrate instance %s: %s" %
3587 (instance.name, msg))
3588
3589 self.feedback_fn("* migrating instance to %s" % target_node)
3590 result = self.rpc.call_instance_migrate(source_node, instance,
3591 self.nodes_ip[target_node],
3592 self.live)
3593 msg = result.fail_msg
3594 if msg:
3595 logging.error("Instance migration failed, trying to revert"
3596 " disk status: %s", msg)
3597 self.feedback_fn("Migration failed, aborting")
3598 self._AbortMigration()
3599 self._RevertDiskStatus()
3600 raise errors.OpExecError("Could not migrate instance %s: %s" %
3601 (instance.name, msg))
3602
3603 self.feedback_fn("* starting memory transfer")
3604 last_feedback = time.time()
3605 while True:
3606 result = self.rpc.call_instance_get_migration_status(source_node,
3607 instance)
3608 msg = result.fail_msg
3609 ms = result.payload # MigrationStatus instance
3610 if msg or (ms.status in constants.HV_MIGRATION_FAILED_STATUSES):
3611 logging.error("Instance migration failed, trying to revert"
3612 " disk status: %s", msg)
3613 self.feedback_fn("Migration failed, aborting")
3614 self._AbortMigration()
3615 self._RevertDiskStatus()
3616 if not msg:
3617 msg = "hypervisor returned failure"
3618 raise errors.OpExecError("Could not migrate instance %s: %s" %
3619 (instance.name, msg))
3620
3621 if result.payload.status != constants.HV_MIGRATION_ACTIVE:
3622 self.feedback_fn("* memory transfer complete")
3623 break
3624
3625 if (utils.TimeoutExpired(last_feedback,
3626 self._MIGRATION_FEEDBACK_INTERVAL) and
3627 ms.transferred_ram is not None):
3628 mem_progress = 100 * float(ms.transferred_ram) / float(ms.total_ram)
3629 self.feedback_fn("* memory transfer progress: %.2f %%" % mem_progress)