4514dfe77e7be12816295df14fdb446a0640f399
[ganeti-github.git] / lib / cmdlib / cluster.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Logical units dealing with the cluster."""
23
24 import OpenSSL
25
26 import copy
27 import itertools
28 import logging
29 import operator
30 import os
31 import re
32 import time
33
34 from ganeti import compat
35 from ganeti import constants
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import masterd
40 from ganeti import netutils
41 from ganeti import objects
42 from ganeti import opcodes
43 from ganeti import pathutils
44 from ganeti import query
45 from ganeti import rpc
46 from ganeti import runtime
47 from ganeti import ssh
48 from ganeti import uidpool
49 from ganeti import utils
50 from ganeti import vcluster
51
52 from ganeti.cmdlib.base import NoHooksLU, _QueryBase, LogicalUnit, \
53 ResultWithJobs
54 from ganeti.cmdlib.common import _ShareAll, _RunPostHook, \
55 _ComputeAncillaryFiles, _RedistributeAncillaryFiles, _UploadHelper, \
56 _GetWantedInstances, _MergeAndVerifyHvState, _MergeAndVerifyDiskState, \
57 _GetUpdatedIPolicy, _ComputeNewInstanceViolations, _GetUpdatedParams, \
58 _CheckOSParams, _CheckHVParams, _AdjustCandidatePool, _CheckNodePVs, \
59 _ComputeIPolicyInstanceViolation, _AnnotateDiskParams, \
60 _SupportsOob
61
62 import ganeti.masterd.instance
63
64
65 class LUClusterActivateMasterIp(NoHooksLU):
66 """Activate the master IP on the master node.
67
68 """
69 def Exec(self, feedback_fn):
70 """Activate the master IP.
71
72 """
73 master_params = self.cfg.GetMasterNetworkParameters()
74 ems = self.cfg.GetUseExternalMipScript()
75 result = self.rpc.call_node_activate_master_ip(master_params.name,
76 master_params, ems)
77 result.Raise("Could not activate the master IP")
78
79
80 class LUClusterDeactivateMasterIp(NoHooksLU):
81 """Deactivate the master IP on the master node.
82
83 """
84 def Exec(self, feedback_fn):
85 """Deactivate the master IP.
86
87 """
88 master_params = self.cfg.GetMasterNetworkParameters()
89 ems = self.cfg.GetUseExternalMipScript()
90 result = self.rpc.call_node_deactivate_master_ip(master_params.name,
91 master_params, ems)
92 result.Raise("Could not deactivate the master IP")
93
94
95 class LUClusterConfigQuery(NoHooksLU):
96 """Return configuration values.
97
98 """
99 REQ_BGL = False
100
101 def CheckArguments(self):
102 self.cq = _ClusterQuery(None, self.op.output_fields, False)
103
104 def ExpandNames(self):
105 self.cq.ExpandNames(self)
106
107 def DeclareLocks(self, level):
108 self.cq.DeclareLocks(self, level)
109
110 def Exec(self, feedback_fn):
111 result = self.cq.OldStyleQuery(self)
112
113 assert len(result) == 1
114
115 return result[0]
116
117
118 class LUClusterDestroy(LogicalUnit):
119 """Logical unit for destroying the cluster.
120
121 """
122 HPATH = "cluster-destroy"
123 HTYPE = constants.HTYPE_CLUSTER
124
125 def BuildHooksEnv(self):
126 """Build hooks env.
127
128 """
129 return {
130 "OP_TARGET": self.cfg.GetClusterName(),
131 }
132
133 def BuildHooksNodes(self):
134 """Build hooks nodes.
135
136 """
137 return ([], [])
138
139 def CheckPrereq(self):
140 """Check prerequisites.
141
142 This checks whether the cluster is empty.
143
144 Any errors are signaled by raising errors.OpPrereqError.
145
146 """
147 master = self.cfg.GetMasterNode()
148
149 nodelist = self.cfg.GetNodeList()
150 if len(nodelist) != 1 or nodelist[0] != master:
151 raise errors.OpPrereqError("There are still %d node(s) in"
152 " this cluster." % (len(nodelist) - 1),
153 errors.ECODE_INVAL)
154 instancelist = self.cfg.GetInstanceList()
155 if instancelist:
156 raise errors.OpPrereqError("There are still %d instance(s) in"
157 " this cluster." % len(instancelist),
158 errors.ECODE_INVAL)
159
160 def Exec(self, feedback_fn):
161 """Destroys the cluster.
162
163 """
164 master_params = self.cfg.GetMasterNetworkParameters()
165
166 # Run post hooks on master node before it's removed
167 _RunPostHook(self, master_params.name)
168
169 ems = self.cfg.GetUseExternalMipScript()
170 result = self.rpc.call_node_deactivate_master_ip(master_params.name,
171 master_params, ems)
172 if result.fail_msg:
173 self.LogWarning("Error disabling the master IP address: %s",
174 result.fail_msg)
175
176 return master_params.name
177
178
179 class LUClusterPostInit(LogicalUnit):
180 """Logical unit for running hooks after cluster initialization.
181
182 """
183 HPATH = "cluster-init"
184 HTYPE = constants.HTYPE_CLUSTER
185
186 def BuildHooksEnv(self):
187 """Build hooks env.
188
189 """
190 return {
191 "OP_TARGET": self.cfg.GetClusterName(),
192 }
193
194 def BuildHooksNodes(self):
195 """Build hooks nodes.
196
197 """
198 return ([], [self.cfg.GetMasterNode()])
199
200 def Exec(self, feedback_fn):
201 """Nothing to do.
202
203 """
204 return True
205
206
207 class _ClusterQuery(_QueryBase):
208 FIELDS = query.CLUSTER_FIELDS
209
210 #: Do not sort (there is only one item)
211 SORT_FIELD = None
212
213 def ExpandNames(self, lu):
214 lu.needed_locks = {}
215
216 # The following variables interact with _QueryBase._GetNames
217 self.wanted = locking.ALL_SET
218 self.do_locking = self.use_locking
219
220 if self.do_locking:
221 raise errors.OpPrereqError("Can not use locking for cluster queries",
222 errors.ECODE_INVAL)
223
224 def DeclareLocks(self, lu, level):
225 pass
226
227 def _GetQueryData(self, lu):
228 """Computes the list of nodes and their attributes.
229
230 """
231 # Locking is not used
232 assert not (compat.any(lu.glm.is_owned(level)
233 for level in locking.LEVELS
234 if level != locking.LEVEL_CLUSTER) or
235 self.do_locking or self.use_locking)
236
237 if query.CQ_CONFIG in self.requested_data:
238 cluster = lu.cfg.GetClusterInfo()
239 else:
240 cluster = NotImplemented
241
242 if query.CQ_QUEUE_DRAINED in self.requested_data:
243 drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
244 else:
245 drain_flag = NotImplemented
246
247 if query.CQ_WATCHER_PAUSE in self.requested_data:
248 master_name = lu.cfg.GetMasterNode()
249
250 result = lu.rpc.call_get_watcher_pause(master_name)
251 result.Raise("Can't retrieve watcher pause from master node '%s'" %
252 master_name)
253
254 watcher_pause = result.payload
255 else:
256 watcher_pause = NotImplemented
257
258 return query.ClusterQueryData(cluster, drain_flag, watcher_pause)
259
260
261 class LUClusterQuery(NoHooksLU):
262 """Query cluster configuration.
263
264 """
265 REQ_BGL = False
266
267 def ExpandNames(self):
268 self.needed_locks = {}
269
270 def Exec(self, feedback_fn):
271 """Return cluster config.
272
273 """
274 cluster = self.cfg.GetClusterInfo()
275 os_hvp = {}
276
277 # Filter just for enabled hypervisors
278 for os_name, hv_dict in cluster.os_hvp.items():
279 os_hvp[os_name] = {}
280 for hv_name, hv_params in hv_dict.items():
281 if hv_name in cluster.enabled_hypervisors:
282 os_hvp[os_name][hv_name] = hv_params
283
284 # Convert ip_family to ip_version
285 primary_ip_version = constants.IP4_VERSION
286 if cluster.primary_ip_family == netutils.IP6Address.family:
287 primary_ip_version = constants.IP6_VERSION
288
289 result = {
290 "software_version": constants.RELEASE_VERSION,
291 "protocol_version": constants.PROTOCOL_VERSION,
292 "config_version": constants.CONFIG_VERSION,
293 "os_api_version": max(constants.OS_API_VERSIONS),
294 "export_version": constants.EXPORT_VERSION,
295 "architecture": runtime.GetArchInfo(),
296 "name": cluster.cluster_name,
297 "master": cluster.master_node,
298 "default_hypervisor": cluster.primary_hypervisor,
299 "enabled_hypervisors": cluster.enabled_hypervisors,
300 "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
301 for hypervisor_name in cluster.enabled_hypervisors]),
302 "os_hvp": os_hvp,
303 "beparams": cluster.beparams,
304 "osparams": cluster.osparams,
305 "ipolicy": cluster.ipolicy,
306 "nicparams": cluster.nicparams,
307 "ndparams": cluster.ndparams,
308 "diskparams": cluster.diskparams,
309 "candidate_pool_size": cluster.candidate_pool_size,
310 "master_netdev": cluster.master_netdev,
311 "master_netmask": cluster.master_netmask,
312 "use_external_mip_script": cluster.use_external_mip_script,
313 "volume_group_name": cluster.volume_group_name,
314 "drbd_usermode_helper": cluster.drbd_usermode_helper,
315 "file_storage_dir": cluster.file_storage_dir,
316 "shared_file_storage_dir": cluster.shared_file_storage_dir,
317 "maintain_node_health": cluster.maintain_node_health,
318 "ctime": cluster.ctime,
319 "mtime": cluster.mtime,
320 "uuid": cluster.uuid,
321 "tags": list(cluster.GetTags()),
322 "uid_pool": cluster.uid_pool,
323 "default_iallocator": cluster.default_iallocator,
324 "reserved_lvs": cluster.reserved_lvs,
325 "primary_ip_version": primary_ip_version,
326 "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
327 "hidden_os": cluster.hidden_os,
328 "blacklisted_os": cluster.blacklisted_os,
329 }
330
331 return result
332
333
334 class LUClusterRedistConf(NoHooksLU):
335 """Force the redistribution of cluster configuration.
336
337 This is a very simple LU.
338
339 """
340 REQ_BGL = False
341
342 def ExpandNames(self):
343 self.needed_locks = {
344 locking.LEVEL_NODE: locking.ALL_SET,
345 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
346 }
347 self.share_locks = _ShareAll()
348
349 def Exec(self, feedback_fn):
350 """Redistribute the configuration.
351
352 """
353 self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
354 _RedistributeAncillaryFiles(self)
355
356
357 class LUClusterRename(LogicalUnit):
358 """Rename the cluster.
359
360 """
361 HPATH = "cluster-rename"
362 HTYPE = constants.HTYPE_CLUSTER
363
364 def BuildHooksEnv(self):
365 """Build hooks env.
366
367 """
368 return {
369 "OP_TARGET": self.cfg.GetClusterName(),
370 "NEW_NAME": self.op.name,
371 }
372
373 def BuildHooksNodes(self):
374 """Build hooks nodes.
375
376 """
377 return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
378
379 def CheckPrereq(self):
380 """Verify that the passed name is a valid one.
381
382 """
383 hostname = netutils.GetHostname(name=self.op.name,
384 family=self.cfg.GetPrimaryIPFamily())
385
386 new_name = hostname.name
387 self.ip = new_ip = hostname.ip
388 old_name = self.cfg.GetClusterName()
389 old_ip = self.cfg.GetMasterIP()
390 if new_name == old_name and new_ip == old_ip:
391 raise errors.OpPrereqError("Neither the name nor the IP address of the"
392 " cluster has changed",
393 errors.ECODE_INVAL)
394 if new_ip != old_ip:
395 if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
396 raise errors.OpPrereqError("The given cluster IP address (%s) is"
397 " reachable on the network" %
398 new_ip, errors.ECODE_NOTUNIQUE)
399
400 self.op.name = new_name
401
402 def Exec(self, feedback_fn):
403 """Rename the cluster.
404
405 """
406 clustername = self.op.name
407 new_ip = self.ip
408
409 # shutdown the master IP
410 master_params = self.cfg.GetMasterNetworkParameters()
411 ems = self.cfg.GetUseExternalMipScript()
412 result = self.rpc.call_node_deactivate_master_ip(master_params.name,
413 master_params, ems)
414 result.Raise("Could not disable the master role")
415
416 try:
417 cluster = self.cfg.GetClusterInfo()
418 cluster.cluster_name = clustername
419 cluster.master_ip = new_ip
420 self.cfg.Update(cluster, feedback_fn)
421
422 # update the known hosts file
423 ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
424 node_list = self.cfg.GetOnlineNodeList()
425 try:
426 node_list.remove(master_params.name)
427 except ValueError:
428 pass
429 _UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
430 finally:
431 master_params.ip = new_ip
432 result = self.rpc.call_node_activate_master_ip(master_params.name,
433 master_params, ems)
434 msg = result.fail_msg
435 if msg:
436 self.LogWarning("Could not re-enable the master role on"
437 " the master, please restart manually: %s", msg)
438
439 return clustername
440
441
442 class LUClusterRepairDiskSizes(NoHooksLU):
443 """Verifies the cluster disks sizes.
444
445 """
446 REQ_BGL = False
447
448 def ExpandNames(self):
449 if self.op.instances:
450 self.wanted_names = _GetWantedInstances(self, self.op.instances)
451 # Not getting the node allocation lock as only a specific set of
452 # instances (and their nodes) is going to be acquired
453 self.needed_locks = {
454 locking.LEVEL_NODE_RES: [],
455 locking.LEVEL_INSTANCE: self.wanted_names,
456 }
457 self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
458 else:
459 self.wanted_names = None
460 self.needed_locks = {
461 locking.LEVEL_NODE_RES: locking.ALL_SET,
462 locking.LEVEL_INSTANCE: locking.ALL_SET,
463
464 # This opcode is acquires the node locks for all instances
465 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
466 }
467
468 self.share_locks = {
469 locking.LEVEL_NODE_RES: 1,
470 locking.LEVEL_INSTANCE: 0,
471 locking.LEVEL_NODE_ALLOC: 1,
472 }
473
474 def DeclareLocks(self, level):
475 if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
476 self._LockInstancesNodes(primary_only=True, level=level)
477
478 def CheckPrereq(self):
479 """Check prerequisites.
480
481 This only checks the optional instance list against the existing names.
482
483 """
484 if self.wanted_names is None:
485 self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
486
487 self.wanted_instances = \
488 map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
489
490 def _EnsureChildSizes(self, disk):
491 """Ensure children of the disk have the needed disk size.
492
493 This is valid mainly for DRBD8 and fixes an issue where the
494 children have smaller disk size.
495
496 @param disk: an L{ganeti.objects.Disk} object
497
498 """
499 if disk.dev_type == constants.LD_DRBD8:
500 assert disk.children, "Empty children for DRBD8?"
501 fchild = disk.children[0]
502 mismatch = fchild.size < disk.size
503 if mismatch:
504 self.LogInfo("Child disk has size %d, parent %d, fixing",
505 fchild.size, disk.size)
506 fchild.size = disk.size
507
508 # and we recurse on this child only, not on the metadev
509 return self._EnsureChildSizes(fchild) or mismatch
510 else:
511 return False
512
513 def Exec(self, feedback_fn):
514 """Verify the size of cluster disks.
515
516 """
517 # TODO: check child disks too
518 # TODO: check differences in size between primary/secondary nodes
519 per_node_disks = {}
520 for instance in self.wanted_instances:
521 pnode = instance.primary_node
522 if pnode not in per_node_disks:
523 per_node_disks[pnode] = []
524 for idx, disk in enumerate(instance.disks):
525 per_node_disks[pnode].append((instance, idx, disk))
526
527 assert not (frozenset(per_node_disks.keys()) -
528 self.owned_locks(locking.LEVEL_NODE_RES)), \
529 "Not owning correct locks"
530 assert not self.owned_locks(locking.LEVEL_NODE)
531
532 changed = []
533 for node, dskl in per_node_disks.items():
534 newl = [v[2].Copy() for v in dskl]
535 for dsk in newl:
536 self.cfg.SetDiskID(dsk, node)
537 result = self.rpc.call_blockdev_getsize(node, newl)
538 if result.fail_msg:
539 self.LogWarning("Failure in blockdev_getsize call to node"
540 " %s, ignoring", node)
541 continue
542 if len(result.payload) != len(dskl):
543 logging.warning("Invalid result from node %s: len(dksl)=%d,"
544 " result.payload=%s", node, len(dskl), result.payload)
545 self.LogWarning("Invalid result from node %s, ignoring node results",
546 node)
547 continue
548 for ((instance, idx, disk), size) in zip(dskl, result.payload):
549 if size is None:
550 self.LogWarning("Disk %d of instance %s did not return size"
551 " information, ignoring", idx, instance.name)
552 continue
553 if not isinstance(size, (int, long)):
554 self.LogWarning("Disk %d of instance %s did not return valid"
555 " size information, ignoring", idx, instance.name)
556 continue
557 size = size >> 20
558 if size != disk.size:
559 self.LogInfo("Disk %d of instance %s has mismatched size,"
560 " correcting: recorded %d, actual %d", idx,
561 instance.name, disk.size, size)
562 disk.size = size
563 self.cfg.Update(instance, feedback_fn)
564 changed.append((instance.name, idx, size))
565 if self._EnsureChildSizes(disk):
566 self.cfg.Update(instance, feedback_fn)
567 changed.append((instance.name, idx, disk.size))
568 return changed
569
570
571 def _ValidateNetmask(cfg, netmask):
572 """Checks if a netmask is valid.
573
574 @type cfg: L{config.ConfigWriter}
575 @param cfg: The cluster configuration
576 @type netmask: int
577 @param netmask: the netmask to be verified
578 @raise errors.OpPrereqError: if the validation fails
579
580 """
581 ip_family = cfg.GetPrimaryIPFamily()
582 try:
583 ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
584 except errors.ProgrammerError:
585 raise errors.OpPrereqError("Invalid primary ip family: %s." %
586 ip_family, errors.ECODE_INVAL)
587 if not ipcls.ValidateNetmask(netmask):
588 raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
589 (netmask), errors.ECODE_INVAL)
590
591
592 class LUClusterSetParams(LogicalUnit):
593 """Change the parameters of the cluster.
594
595 """
596 HPATH = "cluster-modify"
597 HTYPE = constants.HTYPE_CLUSTER
598 REQ_BGL = False
599
600 def CheckArguments(self):
601 """Check parameters
602
603 """
604 if self.op.uid_pool:
605 uidpool.CheckUidPool(self.op.uid_pool)
606
607 if self.op.add_uids:
608 uidpool.CheckUidPool(self.op.add_uids)
609
610 if self.op.remove_uids:
611 uidpool.CheckUidPool(self.op.remove_uids)
612
613 if self.op.master_netmask is not None:
614 _ValidateNetmask(self.cfg, self.op.master_netmask)
615
616 if self.op.diskparams:
617 for dt_params in self.op.diskparams.values():
618 utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
619 try:
620 utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
621 except errors.OpPrereqError, err:
622 raise errors.OpPrereqError("While verify diskparams options: %s" % err,
623 errors.ECODE_INVAL)
624
625 def ExpandNames(self):
626 # FIXME: in the future maybe other cluster params won't require checking on
627 # all nodes to be modified.
628 # FIXME: This opcode changes cluster-wide settings. Is acquiring all
629 # resource locks the right thing, shouldn't it be the BGL instead?
630 self.needed_locks = {
631 locking.LEVEL_NODE: locking.ALL_SET,
632 locking.LEVEL_INSTANCE: locking.ALL_SET,
633 locking.LEVEL_NODEGROUP: locking.ALL_SET,
634 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
635 }
636 self.share_locks = _ShareAll()
637
638 def BuildHooksEnv(self):
639 """Build hooks env.
640
641 """
642 return {
643 "OP_TARGET": self.cfg.GetClusterName(),
644 "NEW_VG_NAME": self.op.vg_name,
645 }
646
647 def BuildHooksNodes(self):
648 """Build hooks nodes.
649
650 """
651 mn = self.cfg.GetMasterNode()
652 return ([mn], [mn])
653
654 def CheckPrereq(self):
655 """Check prerequisites.
656
657 This checks whether the given params don't conflict and
658 if the given volume group is valid.
659
660 """
661 if self.op.vg_name is not None and not self.op.vg_name:
662 if self.cfg.HasAnyDiskOfType(constants.LD_LV):
663 raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
664 " instances exist", errors.ECODE_INVAL)
665
666 if self.op.drbd_helper is not None and not self.op.drbd_helper:
667 if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
668 raise errors.OpPrereqError("Cannot disable drbd helper while"
669 " drbd-based instances exist",
670 errors.ECODE_INVAL)
671
672 node_list = self.owned_locks(locking.LEVEL_NODE)
673
674 vm_capable_nodes = [node.name
675 for node in self.cfg.GetAllNodesInfo().values()
676 if node.name in node_list and node.vm_capable]
677
678 # if vg_name not None, checks given volume group on all nodes
679 if self.op.vg_name:
680 vglist = self.rpc.call_vg_list(vm_capable_nodes)
681 for node in vm_capable_nodes:
682 msg = vglist[node].fail_msg
683 if msg:
684 # ignoring down node
685 self.LogWarning("Error while gathering data on node %s"
686 " (ignoring node): %s", node, msg)
687 continue
688 vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
689 self.op.vg_name,
690 constants.MIN_VG_SIZE)
691 if vgstatus:
692 raise errors.OpPrereqError("Error on node '%s': %s" %
693 (node, vgstatus), errors.ECODE_ENVIRON)
694
695 if self.op.drbd_helper:
696 # checks given drbd helper on all nodes
697 helpers = self.rpc.call_drbd_helper(node_list)
698 for (node, ninfo) in self.cfg.GetMultiNodeInfo(node_list):
699 if ninfo.offline:
700 self.LogInfo("Not checking drbd helper on offline node %s", node)
701 continue
702 msg = helpers[node].fail_msg
703 if msg:
704 raise errors.OpPrereqError("Error checking drbd helper on node"
705 " '%s': %s" % (node, msg),
706 errors.ECODE_ENVIRON)
707 node_helper = helpers[node].payload
708 if node_helper != self.op.drbd_helper:
709 raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
710 (node, node_helper), errors.ECODE_ENVIRON)
711
712 self.cluster = cluster = self.cfg.GetClusterInfo()
713 # validate params changes
714 if self.op.beparams:
715 objects.UpgradeBeParams(self.op.beparams)
716 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
717 self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
718
719 if self.op.ndparams:
720 utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
721 self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
722
723 # TODO: we need a more general way to handle resetting
724 # cluster-level parameters to default values
725 if self.new_ndparams["oob_program"] == "":
726 self.new_ndparams["oob_program"] = \
727 constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
728
729 if self.op.hv_state:
730 new_hv_state = _MergeAndVerifyHvState(self.op.hv_state,
731 self.cluster.hv_state_static)
732 self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
733 for hv, values in new_hv_state.items())
734
735 if self.op.disk_state:
736 new_disk_state = _MergeAndVerifyDiskState(self.op.disk_state,
737 self.cluster.disk_state_static)
738 self.new_disk_state = \
739 dict((storage, dict((name, cluster.SimpleFillDiskState(values))
740 for name, values in svalues.items()))
741 for storage, svalues in new_disk_state.items())
742
743 if self.op.ipolicy:
744 self.new_ipolicy = _GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
745 group_policy=False)
746
747 all_instances = self.cfg.GetAllInstancesInfo().values()
748 violations = set()
749 for group in self.cfg.GetAllNodeGroupsInfo().values():
750 instances = frozenset([inst for inst in all_instances
751 if compat.any(node in group.members
752 for node in inst.all_nodes)])
753 new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
754 ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
755 new = _ComputeNewInstanceViolations(ipol,
756 new_ipolicy, instances, self.cfg)
757 if new:
758 violations.update(new)
759
760 if violations:
761 self.LogWarning("After the ipolicy change the following instances"
762 " violate them: %s",
763 utils.CommaJoin(utils.NiceSort(violations)))
764
765 if self.op.nicparams:
766 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
767 self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
768 objects.NIC.CheckParameterSyntax(self.new_nicparams)
769 nic_errors = []
770
771 # check all instances for consistency
772 for instance in self.cfg.GetAllInstancesInfo().values():
773 for nic_idx, nic in enumerate(instance.nics):
774 params_copy = copy.deepcopy(nic.nicparams)
775 params_filled = objects.FillDict(self.new_nicparams, params_copy)
776
777 # check parameter syntax
778 try:
779 objects.NIC.CheckParameterSyntax(params_filled)
780 except errors.ConfigurationError, err:
781 nic_errors.append("Instance %s, nic/%d: %s" %
782 (instance.name, nic_idx, err))
783
784 # if we're moving instances to routed, check that they have an ip
785 target_mode = params_filled[constants.NIC_MODE]
786 if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
787 nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
788 " address" % (instance.name, nic_idx))
789 if nic_errors:
790 raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
791 "\n".join(nic_errors), errors.ECODE_INVAL)
792
793 # hypervisor list/parameters
794 self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
795 if self.op.hvparams:
796 for hv_name, hv_dict in self.op.hvparams.items():
797 if hv_name not in self.new_hvparams:
798 self.new_hvparams[hv_name] = hv_dict
799 else:
800 self.new_hvparams[hv_name].update(hv_dict)
801
802 # disk template parameters
803 self.new_diskparams = objects.FillDict(cluster.diskparams, {})
804 if self.op.diskparams:
805 for dt_name, dt_params in self.op.diskparams.items():
806 if dt_name not in self.op.diskparams:
807 self.new_diskparams[dt_name] = dt_params
808 else:
809 self.new_diskparams[dt_name].update(dt_params)
810
811 # os hypervisor parameters
812 self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
813 if self.op.os_hvp:
814 for os_name, hvs in self.op.os_hvp.items():
815 if os_name not in self.new_os_hvp:
816 self.new_os_hvp[os_name] = hvs
817 else:
818 for hv_name, hv_dict in hvs.items():
819 if hv_dict is None:
820 # Delete if it exists
821 self.new_os_hvp[os_name].pop(hv_name, None)
822 elif hv_name not in self.new_os_hvp[os_name]:
823 self.new_os_hvp[os_name][hv_name] = hv_dict
824 else:
825 self.new_os_hvp[os_name][hv_name].update(hv_dict)
826
827 # os parameters
828 self.new_osp = objects.FillDict(cluster.osparams, {})
829 if self.op.osparams:
830 for os_name, osp in self.op.osparams.items():
831 if os_name not in self.new_osp:
832 self.new_osp[os_name] = {}
833
834 self.new_osp[os_name] = _GetUpdatedParams(self.new_osp[os_name], osp,
835 use_none=True)
836
837 if not self.new_osp[os_name]:
838 # we removed all parameters
839 del self.new_osp[os_name]
840 else:
841 # check the parameter validity (remote check)
842 _CheckOSParams(self, False, [self.cfg.GetMasterNode()],
843 os_name, self.new_osp[os_name])
844
845 # changes to the hypervisor list
846 if self.op.enabled_hypervisors is not None:
847 self.hv_list = self.op.enabled_hypervisors
848 for hv in self.hv_list:
849 # if the hypervisor doesn't already exist in the cluster
850 # hvparams, we initialize it to empty, and then (in both
851 # cases) we make sure to fill the defaults, as we might not
852 # have a complete defaults list if the hypervisor wasn't
853 # enabled before
854 if hv not in new_hvp:
855 new_hvp[hv] = {}
856 new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
857 utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
858 else:
859 self.hv_list = cluster.enabled_hypervisors
860
861 if self.op.hvparams or self.op.enabled_hypervisors is not None:
862 # either the enabled list has changed, or the parameters have, validate
863 for hv_name, hv_params in self.new_hvparams.items():
864 if ((self.op.hvparams and hv_name in self.op.hvparams) or
865 (self.op.enabled_hypervisors and
866 hv_name in self.op.enabled_hypervisors)):
867 # either this is a new hypervisor, or its parameters have changed
868 hv_class = hypervisor.GetHypervisorClass(hv_name)
869 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
870 hv_class.CheckParameterSyntax(hv_params)
871 _CheckHVParams(self, node_list, hv_name, hv_params)
872
873 self._CheckDiskTemplateConsistency()
874
875 if self.op.os_hvp:
876 # no need to check any newly-enabled hypervisors, since the
877 # defaults have already been checked in the above code-block
878 for os_name, os_hvp in self.new_os_hvp.items():
879 for hv_name, hv_params in os_hvp.items():
880 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
881 # we need to fill in the new os_hvp on top of the actual hv_p
882 cluster_defaults = self.new_hvparams.get(hv_name, {})
883 new_osp = objects.FillDict(cluster_defaults, hv_params)
884 hv_class = hypervisor.GetHypervisorClass(hv_name)
885 hv_class.CheckParameterSyntax(new_osp)
886 _CheckHVParams(self, node_list, hv_name, new_osp)
887
888 if self.op.default_iallocator:
889 alloc_script = utils.FindFile(self.op.default_iallocator,
890 constants.IALLOCATOR_SEARCH_PATH,
891 os.path.isfile)
892 if alloc_script is None:
893 raise errors.OpPrereqError("Invalid default iallocator script '%s'"
894 " specified" % self.op.default_iallocator,
895 errors.ECODE_INVAL)
896
897 def _CheckDiskTemplateConsistency(self):
898 """Check whether the disk templates that are going to be disabled
899 are still in use by some instances.
900
901 """
902 if self.op.enabled_disk_templates:
903 cluster = self.cfg.GetClusterInfo()
904 instances = self.cfg.GetAllInstancesInfo()
905
906 disk_templates_to_remove = set(cluster.enabled_disk_templates) \
907 - set(self.op.enabled_disk_templates)
908 for instance in instances.itervalues():
909 if instance.disk_template in disk_templates_to_remove:
910 raise errors.OpPrereqError("Cannot disable disk template '%s',"
911 " because instance '%s' is using it." %
912 (instance.disk_template, instance.name))
913
914 def Exec(self, feedback_fn):
915 """Change the parameters of the cluster.
916
917 """
918 if self.op.vg_name is not None:
919 new_volume = self.op.vg_name
920 if not new_volume:
921 new_volume = None
922 if new_volume != self.cfg.GetVGName():
923 self.cfg.SetVGName(new_volume)
924 else:
925 feedback_fn("Cluster LVM configuration already in desired"
926 " state, not changing")
927 if self.op.drbd_helper is not None:
928 new_helper = self.op.drbd_helper
929 if not new_helper:
930 new_helper = None
931 if new_helper != self.cfg.GetDRBDHelper():
932 self.cfg.SetDRBDHelper(new_helper)
933 else:
934 feedback_fn("Cluster DRBD helper already in desired state,"
935 " not changing")
936 if self.op.hvparams:
937 self.cluster.hvparams = self.new_hvparams
938 if self.op.os_hvp:
939 self.cluster.os_hvp = self.new_os_hvp
940 if self.op.enabled_hypervisors is not None:
941 self.cluster.hvparams = self.new_hvparams
942 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
943 if self.op.enabled_disk_templates:
944 self.cluster.enabled_disk_templates = \
945 list(set(self.op.enabled_disk_templates))
946 if self.op.beparams:
947 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
948 if self.op.nicparams:
949 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
950 if self.op.ipolicy:
951 self.cluster.ipolicy = self.new_ipolicy
952 if self.op.osparams:
953 self.cluster.osparams = self.new_osp
954 if self.op.ndparams:
955 self.cluster.ndparams = self.new_ndparams
956 if self.op.diskparams:
957 self.cluster.diskparams = self.new_diskparams
958 if self.op.hv_state:
959 self.cluster.hv_state_static = self.new_hv_state
960 if self.op.disk_state:
961 self.cluster.disk_state_static = self.new_disk_state
962
963 if self.op.candidate_pool_size is not None:
964 self.cluster.candidate_pool_size = self.op.candidate_pool_size
965 # we need to update the pool size here, otherwise the save will fail
966 _AdjustCandidatePool(self, [])
967
968 if self.op.maintain_node_health is not None:
969 if self.op.maintain_node_health and not constants.ENABLE_CONFD:
970 feedback_fn("Note: CONFD was disabled at build time, node health"
971 " maintenance is not useful (still enabling it)")
972 self.cluster.maintain_node_health = self.op.maintain_node_health
973
974 if self.op.prealloc_wipe_disks is not None:
975 self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
976
977 if self.op.add_uids is not None:
978 uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
979
980 if self.op.remove_uids is not None:
981 uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
982
983 if self.op.uid_pool is not None:
984 self.cluster.uid_pool = self.op.uid_pool
985
986 if self.op.default_iallocator is not None:
987 self.cluster.default_iallocator = self.op.default_iallocator
988
989 if self.op.reserved_lvs is not None:
990 self.cluster.reserved_lvs = self.op.reserved_lvs
991
992 if self.op.use_external_mip_script is not None:
993 self.cluster.use_external_mip_script = self.op.use_external_mip_script
994
995 def helper_os(aname, mods, desc):
996 desc += " OS list"
997 lst = getattr(self.cluster, aname)
998 for key, val in mods:
999 if key == constants.DDM_ADD:
1000 if val in lst:
1001 feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1002 else:
1003 lst.append(val)
1004 elif key == constants.DDM_REMOVE:
1005 if val in lst:
1006 lst.remove(val)
1007 else:
1008 feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1009 else:
1010 raise errors.ProgrammerError("Invalid modification '%s'" % key)
1011
1012 if self.op.hidden_os:
1013 helper_os("hidden_os", self.op.hidden_os, "hidden")
1014
1015 if self.op.blacklisted_os:
1016 helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1017
1018 if self.op.master_netdev:
1019 master_params = self.cfg.GetMasterNetworkParameters()
1020 ems = self.cfg.GetUseExternalMipScript()
1021 feedback_fn("Shutting down master ip on the current netdev (%s)" %
1022 self.cluster.master_netdev)
1023 result = self.rpc.call_node_deactivate_master_ip(master_params.name,
1024 master_params, ems)
1025 result.Raise("Could not disable the master ip")
1026 feedback_fn("Changing master_netdev from %s to %s" %
1027 (master_params.netdev, self.op.master_netdev))
1028 self.cluster.master_netdev = self.op.master_netdev
1029
1030 if self.op.master_netmask:
1031 master_params = self.cfg.GetMasterNetworkParameters()
1032 feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1033 result = self.rpc.call_node_change_master_netmask(master_params.name,
1034 master_params.netmask,
1035 self.op.master_netmask,
1036 master_params.ip,
1037 master_params.netdev)
1038 if result.fail_msg:
1039 msg = "Could not change the master IP netmask: %s" % result.fail_msg
1040 feedback_fn(msg)
1041
1042 self.cluster.master_netmask = self.op.master_netmask
1043
1044 self.cfg.Update(self.cluster, feedback_fn)
1045
1046 if self.op.master_netdev:
1047 master_params = self.cfg.GetMasterNetworkParameters()
1048 feedback_fn("Starting the master ip on the new master netdev (%s)" %
1049 self.op.master_netdev)
1050 ems = self.cfg.GetUseExternalMipScript()
1051 result = self.rpc.call_node_activate_master_ip(master_params.name,
1052 master_params, ems)
1053 if result.fail_msg:
1054 self.LogWarning("Could not re-enable the master ip on"
1055 " the master, please restart manually: %s",
1056 result.fail_msg)
1057
1058
1059 class LUClusterVerify(NoHooksLU):
1060 """Submits all jobs necessary to verify the cluster.
1061
1062 """
1063 REQ_BGL = False
1064
1065 def ExpandNames(self):
1066 self.needed_locks = {}
1067
1068 def Exec(self, feedback_fn):
1069 jobs = []
1070
1071 if self.op.group_name:
1072 groups = [self.op.group_name]
1073 depends_fn = lambda: None
1074 else:
1075 groups = self.cfg.GetNodeGroupList()
1076
1077 # Verify global configuration
1078 jobs.append([
1079 opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1080 ])
1081
1082 # Always depend on global verification
1083 depends_fn = lambda: [(-len(jobs), [])]
1084
1085 jobs.extend(
1086 [opcodes.OpClusterVerifyGroup(group_name=group,
1087 ignore_errors=self.op.ignore_errors,
1088 depends=depends_fn())]
1089 for group in groups)
1090
1091 # Fix up all parameters
1092 for op in itertools.chain(*jobs): # pylint: disable=W0142
1093 op.debug_simulate_errors = self.op.debug_simulate_errors
1094 op.verbose = self.op.verbose
1095 op.error_codes = self.op.error_codes
1096 try:
1097 op.skip_checks = self.op.skip_checks
1098 except AttributeError:
1099 assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1100
1101 return ResultWithJobs(jobs)
1102
1103
1104 class _VerifyErrors(object):
1105 """Mix-in for cluster/group verify LUs.
1106
1107 It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1108 self.op and self._feedback_fn to be available.)
1109
1110 """
1111
1112 ETYPE_FIELD = "code"
1113 ETYPE_ERROR = "ERROR"
1114 ETYPE_WARNING = "WARNING"
1115
1116 def _Error(self, ecode, item, msg, *args, **kwargs):
1117 """Format an error message.
1118
1119 Based on the opcode's error_codes parameter, either format a
1120 parseable error code, or a simpler error string.
1121
1122 This must be called only from Exec and functions called from Exec.
1123
1124 """
1125 ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1126 itype, etxt, _ = ecode
1127 # If the error code is in the list of ignored errors, demote the error to a
1128 # warning
1129 if etxt in self.op.ignore_errors: # pylint: disable=E1101
1130 ltype = self.ETYPE_WARNING
1131 # first complete the msg
1132 if args:
1133 msg = msg % args
1134 # then format the whole message
1135 if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1136 msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1137 else:
1138 if item:
1139 item = " " + item
1140 else:
1141 item = ""
1142 msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1143 # and finally report it via the feedback_fn
1144 self._feedback_fn(" - %s" % msg) # Mix-in. pylint: disable=E1101
1145 # do not mark the operation as failed for WARN cases only
1146 if ltype == self.ETYPE_ERROR:
1147 self.bad = True
1148
1149 def _ErrorIf(self, cond, *args, **kwargs):
1150 """Log an error message if the passed condition is True.
1151
1152 """
1153 if (bool(cond)
1154 or self.op.debug_simulate_errors): # pylint: disable=E1101
1155 self._Error(*args, **kwargs)
1156
1157
1158 def _VerifyCertificate(filename):
1159 """Verifies a certificate for L{LUClusterVerifyConfig}.
1160
1161 @type filename: string
1162 @param filename: Path to PEM file
1163
1164 """
1165 try:
1166 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1167 utils.ReadFile(filename))
1168 except Exception, err: # pylint: disable=W0703
1169 return (LUClusterVerifyConfig.ETYPE_ERROR,
1170 "Failed to load X509 certificate %s: %s" % (filename, err))
1171
1172 (errcode, msg) = \
1173 utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1174 constants.SSL_CERT_EXPIRATION_ERROR)
1175
1176 if msg:
1177 fnamemsg = "While verifying %s: %s" % (filename, msg)
1178 else:
1179 fnamemsg = None
1180
1181 if errcode is None:
1182 return (None, fnamemsg)
1183 elif errcode == utils.CERT_WARNING:
1184 return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1185 elif errcode == utils.CERT_ERROR:
1186 return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1187
1188 raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1189
1190
1191 def _GetAllHypervisorParameters(cluster, instances):
1192 """Compute the set of all hypervisor parameters.
1193
1194 @type cluster: L{objects.Cluster}
1195 @param cluster: the cluster object
1196 @param instances: list of L{objects.Instance}
1197 @param instances: additional instances from which to obtain parameters
1198 @rtype: list of (origin, hypervisor, parameters)
1199 @return: a list with all parameters found, indicating the hypervisor they
1200 apply to, and the origin (can be "cluster", "os X", or "instance Y")
1201
1202 """
1203 hvp_data = []
1204
1205 for hv_name in cluster.enabled_hypervisors:
1206 hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1207
1208 for os_name, os_hvp in cluster.os_hvp.items():
1209 for hv_name, hv_params in os_hvp.items():
1210 if hv_params:
1211 full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1212 hvp_data.append(("os %s" % os_name, hv_name, full_params))
1213
1214 # TODO: collapse identical parameter values in a single one
1215 for instance in instances:
1216 if instance.hvparams:
1217 hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1218 cluster.FillHV(instance)))
1219
1220 return hvp_data
1221
1222
1223 class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1224 """Verifies the cluster config.
1225
1226 """
1227 REQ_BGL = False
1228
1229 def _VerifyHVP(self, hvp_data):
1230 """Verifies locally the syntax of the hypervisor parameters.
1231
1232 """
1233 for item, hv_name, hv_params in hvp_data:
1234 msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1235 (item, hv_name))
1236 try:
1237 hv_class = hypervisor.GetHypervisorClass(hv_name)
1238 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1239 hv_class.CheckParameterSyntax(hv_params)
1240 except errors.GenericError, err:
1241 self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1242
1243 def ExpandNames(self):
1244 self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1245 self.share_locks = _ShareAll()
1246
1247 def CheckPrereq(self):
1248 """Check prerequisites.
1249
1250 """
1251 # Retrieve all information
1252 self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1253 self.all_node_info = self.cfg.GetAllNodesInfo()
1254 self.all_inst_info = self.cfg.GetAllInstancesInfo()
1255
1256 def Exec(self, feedback_fn):
1257 """Verify integrity of cluster, performing various test on nodes.
1258
1259 """
1260 self.bad = False
1261 self._feedback_fn = feedback_fn
1262
1263 feedback_fn("* Verifying cluster config")
1264
1265 for msg in self.cfg.VerifyConfig():
1266 self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1267
1268 feedback_fn("* Verifying cluster certificate files")
1269
1270 for cert_filename in pathutils.ALL_CERT_FILES:
1271 (errcode, msg) = _VerifyCertificate(cert_filename)
1272 self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1273
1274 feedback_fn("* Verifying hypervisor parameters")
1275
1276 self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1277 self.all_inst_info.values()))
1278
1279 feedback_fn("* Verifying all nodes belong to an existing group")
1280
1281 # We do this verification here because, should this bogus circumstance
1282 # occur, it would never be caught by VerifyGroup, which only acts on
1283 # nodes/instances reachable from existing node groups.
1284
1285 dangling_nodes = set(node.name for node in self.all_node_info.values()
1286 if node.group not in self.all_group_info)
1287
1288 dangling_instances = {}
1289 no_node_instances = []
1290
1291 for inst in self.all_inst_info.values():
1292 if inst.primary_node in dangling_nodes:
1293 dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
1294 elif inst.primary_node not in self.all_node_info:
1295 no_node_instances.append(inst.name)
1296
1297 pretty_dangling = [
1298 "%s (%s)" %
1299 (node.name,
1300 utils.CommaJoin(dangling_instances.get(node.name,
1301 ["no instances"])))
1302 for node in dangling_nodes]
1303
1304 self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1305 None,
1306 "the following nodes (and their instances) belong to a non"
1307 " existing group: %s", utils.CommaJoin(pretty_dangling))
1308
1309 self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1310 None,
1311 "the following instances have a non-existing primary-node:"
1312 " %s", utils.CommaJoin(no_node_instances))
1313
1314 return not self.bad
1315
1316
1317 class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1318 """Verifies the status of a node group.
1319
1320 """
1321 HPATH = "cluster-verify"
1322 HTYPE = constants.HTYPE_CLUSTER
1323 REQ_BGL = False
1324
1325 _HOOKS_INDENT_RE = re.compile("^", re.M)
1326
1327 class NodeImage(object):
1328 """A class representing the logical and physical status of a node.
1329
1330 @type name: string
1331 @ivar name: the node name to which this object refers
1332 @ivar volumes: a structure as returned from
1333 L{ganeti.backend.GetVolumeList} (runtime)
1334 @ivar instances: a list of running instances (runtime)
1335 @ivar pinst: list of configured primary instances (config)
1336 @ivar sinst: list of configured secondary instances (config)
1337 @ivar sbp: dictionary of {primary-node: list of instances} for all
1338 instances for which this node is secondary (config)
1339 @ivar mfree: free memory, as reported by hypervisor (runtime)
1340 @ivar dfree: free disk, as reported by the node (runtime)
1341 @ivar offline: the offline status (config)
1342 @type rpc_fail: boolean
1343 @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1344 not whether the individual keys were correct) (runtime)
1345 @type lvm_fail: boolean
1346 @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1347 @type hyp_fail: boolean
1348 @ivar hyp_fail: whether the RPC call didn't return the instance list
1349 @type ghost: boolean
1350 @ivar ghost: whether this is a known node or not (config)
1351 @type os_fail: boolean
1352 @ivar os_fail: whether the RPC call didn't return valid OS data
1353 @type oslist: list
1354 @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1355 @type vm_capable: boolean
1356 @ivar vm_capable: whether the node can host instances
1357 @type pv_min: float
1358 @ivar pv_min: size in MiB of the smallest PVs
1359 @type pv_max: float
1360 @ivar pv_max: size in MiB of the biggest PVs
1361
1362 """
1363 def __init__(self, offline=False, name=None, vm_capable=True):
1364 self.name = name
1365 self.volumes = {}
1366 self.instances = []
1367 self.pinst = []
1368 self.sinst = []
1369 self.sbp = {}
1370 self.mfree = 0
1371 self.dfree = 0
1372 self.offline = offline
1373 self.vm_capable = vm_capable
1374 self.rpc_fail = False
1375 self.lvm_fail = False
1376 self.hyp_fail = False
1377 self.ghost = False
1378 self.os_fail = False
1379 self.oslist = {}
1380 self.pv_min = None
1381 self.pv_max = None
1382
1383 def ExpandNames(self):
1384 # This raises errors.OpPrereqError on its own:
1385 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1386
1387 # Get instances in node group; this is unsafe and needs verification later
1388 inst_names = \
1389 self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1390
1391 self.needed_locks = {
1392 locking.LEVEL_INSTANCE: inst_names,
1393 locking.LEVEL_NODEGROUP: [self.group_uuid],
1394 locking.LEVEL_NODE: [],
1395
1396 # This opcode is run by watcher every five minutes and acquires all nodes
1397 # for a group. It doesn't run for a long time, so it's better to acquire
1398 # the node allocation lock as well.
1399 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1400 }
1401
1402 self.share_locks = _ShareAll()
1403
1404 def DeclareLocks(self, level):
1405 if level == locking.LEVEL_NODE:
1406 # Get members of node group; this is unsafe and needs verification later
1407 nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1408
1409 all_inst_info = self.cfg.GetAllInstancesInfo()
1410
1411 # In Exec(), we warn about mirrored instances that have primary and
1412 # secondary living in separate node groups. To fully verify that
1413 # volumes for these instances are healthy, we will need to do an
1414 # extra call to their secondaries. We ensure here those nodes will
1415 # be locked.
1416 for inst in self.owned_locks(locking.LEVEL_INSTANCE):
1417 # Important: access only the instances whose lock is owned
1418 if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
1419 nodes.update(all_inst_info[inst].secondary_nodes)
1420
1421 self.needed_locks[locking.LEVEL_NODE] = nodes
1422
1423 def CheckPrereq(self):
1424 assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1425 self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1426
1427 group_nodes = set(self.group_info.members)
1428 group_instances = \
1429 self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1430
1431 unlocked_nodes = \
1432 group_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1433
1434 unlocked_instances = \
1435 group_instances.difference(self.owned_locks(locking.LEVEL_INSTANCE))
1436
1437 if unlocked_nodes:
1438 raise errors.OpPrereqError("Missing lock for nodes: %s" %
1439 utils.CommaJoin(unlocked_nodes),
1440 errors.ECODE_STATE)
1441
1442 if unlocked_instances:
1443 raise errors.OpPrereqError("Missing lock for instances: %s" %
1444 utils.CommaJoin(unlocked_instances),
1445 errors.ECODE_STATE)
1446
1447 self.all_node_info = self.cfg.GetAllNodesInfo()
1448 self.all_inst_info = self.cfg.GetAllInstancesInfo()
1449
1450 self.my_node_names = utils.NiceSort(group_nodes)
1451 self.my_inst_names = utils.NiceSort(group_instances)
1452
1453 self.my_node_info = dict((name, self.all_node_info[name])
1454 for name in self.my_node_names)
1455
1456 self.my_inst_info = dict((name, self.all_inst_info[name])
1457 for name in self.my_inst_names)
1458
1459 # We detect here the nodes that will need the extra RPC calls for verifying
1460 # split LV volumes; they should be locked.
1461 extra_lv_nodes = set()
1462
1463 for inst in self.my_inst_info.values():
1464 if inst.disk_template in constants.DTS_INT_MIRROR:
1465 for nname in inst.all_nodes:
1466 if self.all_node_info[nname].group != self.group_uuid:
1467 extra_lv_nodes.add(nname)
1468
1469 unlocked_lv_nodes = \
1470 extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1471
1472 if unlocked_lv_nodes:
1473 raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1474 utils.CommaJoin(unlocked_lv_nodes),
1475 errors.ECODE_STATE)
1476 self.extra_lv_nodes = list(extra_lv_nodes)
1477
1478 def _VerifyNode(self, ninfo, nresult):
1479 """Perform some basic validation on data returned from a node.
1480
1481 - check the result data structure is well formed and has all the
1482 mandatory fields
1483 - check ganeti version
1484
1485 @type ninfo: L{objects.Node}
1486 @param ninfo: the node to check
1487 @param nresult: the results from the node
1488 @rtype: boolean
1489 @return: whether overall this call was successful (and we can expect
1490 reasonable values in the respose)
1491
1492 """
1493 node = ninfo.name
1494 _ErrorIf = self._ErrorIf # pylint: disable=C0103
1495
1496 # main result, nresult should be a non-empty dict
1497 test = not nresult or not isinstance(nresult, dict)
1498 _ErrorIf(test, constants.CV_ENODERPC, node,
1499 "unable to verify node: no data returned")
1500 if test:
1501 return False
1502
1503 # compares ganeti version
1504 local_version = constants.PROTOCOL_VERSION
1505 remote_version = nresult.get("version", None)
1506 test = not (remote_version and
1507 isinstance(remote_version, (list, tuple)) and
1508 len(remote_version) == 2)
1509 _ErrorIf(test, constants.CV_ENODERPC, node,
1510 "connection to node returned invalid data")
1511 if test:
1512 return False
1513
1514 test = local_version != remote_version[0]
1515 _ErrorIf(test, constants.CV_ENODEVERSION, node,
1516 "incompatible protocol versions: master %s,"
1517 " node %s", local_version, remote_version[0])
1518 if test:
1519 return False
1520
1521 # node seems compatible, we can actually try to look into its results
1522
1523 # full package version
1524 self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1525 constants.CV_ENODEVERSION, node,
1526 "software version mismatch: master %s, node %s",
1527 constants.RELEASE_VERSION, remote_version[1],
1528 code=self.ETYPE_WARNING)
1529
1530 hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1531 if ninfo.vm_capable and isinstance(hyp_result, dict):
1532 for hv_name, hv_result in hyp_result.iteritems():
1533 test = hv_result is not None
1534 _ErrorIf(test, constants.CV_ENODEHV, node,
1535 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1536
1537 hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1538 if ninfo.vm_capable and isinstance(hvp_result, list):
1539 for item, hv_name, hv_result in hvp_result:
1540 _ErrorIf(True, constants.CV_ENODEHV, node,
1541 "hypervisor %s parameter verify failure (source %s): %s",
1542 hv_name, item, hv_result)
1543
1544 test = nresult.get(constants.NV_NODESETUP,
1545 ["Missing NODESETUP results"])
1546 _ErrorIf(test, constants.CV_ENODESETUP, node, "node setup error: %s",
1547 "; ".join(test))
1548
1549 return True
1550
1551 def _VerifyNodeTime(self, ninfo, nresult,
1552 nvinfo_starttime, nvinfo_endtime):
1553 """Check the node time.
1554
1555 @type ninfo: L{objects.Node}
1556 @param ninfo: the node to check
1557 @param nresult: the remote results for the node
1558 @param nvinfo_starttime: the start time of the RPC call
1559 @param nvinfo_endtime: the end time of the RPC call
1560
1561 """
1562 node = ninfo.name
1563 _ErrorIf = self._ErrorIf # pylint: disable=C0103
1564
1565 ntime = nresult.get(constants.NV_TIME, None)
1566 try:
1567 ntime_merged = utils.MergeTime(ntime)
1568 except (ValueError, TypeError):
1569 _ErrorIf(True, constants.CV_ENODETIME, node, "Node returned invalid time")
1570 return
1571
1572 if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1573 ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1574 elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1575 ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1576 else:
1577 ntime_diff = None
1578
1579 _ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, node,
1580 "Node time diverges by at least %s from master node time",
1581 ntime_diff)
1582
1583 def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1584 """Check the node LVM results and update info for cross-node checks.
1585
1586 @type ninfo: L{objects.Node}
1587 @param ninfo: the node to check
1588 @param nresult: the remote results for the node
1589 @param vg_name: the configured VG name
1590 @type nimg: L{NodeImage}
1591 @param nimg: node image
1592
1593 """
1594 if vg_name is None:
1595 return
1596
1597 node = ninfo.name
1598 _ErrorIf = self._ErrorIf # pylint: disable=C0103
1599
1600 # checks vg existence and size > 20G
1601 vglist = nresult.get(constants.NV_VGLIST, None)
1602 test = not vglist
1603 _ErrorIf(test, constants.CV_ENODELVM, node, "unable to check volume groups")
1604 if not test:
1605 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1606 constants.MIN_VG_SIZE)
1607 _ErrorIf(vgstatus, constants.CV_ENODELVM, node, vgstatus)
1608
1609 # Check PVs
1610 (errmsgs, pvminmax) = _CheckNodePVs(nresult, self._exclusive_storage)
1611 for em in errmsgs:
1612 self._Error(constants.CV_ENODELVM, node, em)
1613 if pvminmax is not None:
1614 (nimg.pv_min, nimg.pv_max) = pvminmax
1615
1616 def _VerifyGroupLVM(self, node_image, vg_name):
1617 """Check cross-node consistency in LVM.
1618
1619 @type node_image: dict
1620 @param node_image: info about nodes, mapping from node to names to
1621 L{NodeImage} objects
1622 @param vg_name: the configured VG name
1623
1624 """
1625 if vg_name is None:
1626 return
1627
1628 # Only exlcusive storage needs this kind of checks
1629 if not self._exclusive_storage:
1630 return
1631
1632 # exclusive_storage wants all PVs to have the same size (approximately),
1633 # if the smallest and the biggest ones are okay, everything is fine.
1634 # pv_min is None iff pv_max is None
1635 vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1636 if not vals:
1637 return
1638 (pvmin, minnode) = min((ni.pv_min, ni.name) for ni in vals)
1639 (pvmax, maxnode) = max((ni.pv_max, ni.name) for ni in vals)
1640 bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1641 self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1642 "PV sizes differ too much in the group; smallest (%s MB) is"
1643 " on %s, biggest (%s MB) is on %s",
1644 pvmin, minnode, pvmax, maxnode)
1645
1646 def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1647 """Check the node bridges.
1648
1649 @type ninfo: L{objects.Node}
1650 @param ninfo: the node to check
1651 @param nresult: the remote results for the node
1652 @param bridges: the expected list of bridges
1653
1654 """
1655 if not bridges:
1656 return
1657
1658 node = ninfo.name
1659 _ErrorIf = self._ErrorIf # pylint: disable=C0103
1660
1661 missing = nresult.get(constants.NV_BRIDGES, None)
1662 test = not isinstance(missing, list)
1663 _ErrorIf(test, constants.CV_ENODENET, node,
1664 "did not return valid bridge information")
1665 if not test:
1666 _ErrorIf(bool(missing), constants.CV_ENODENET, node,
1667 "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1668
1669 def _VerifyNodeUserScripts(self, ninfo, nresult):
1670 """Check the results of user scripts presence and executability on the node
1671
1672 @type ninfo: L{objects.Node}
1673 @param ninfo: the node to check
1674 @param nresult: the remote results for the node
1675
1676 """
1677 node = ninfo.name
1678
1679 test = not constants.NV_USERSCRIPTS in nresult
1680 self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, node,
1681 "did not return user scripts information")
1682
1683 broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1684 if not test:
1685 self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, node,
1686 "user scripts not present or not executable: %s" %
1687 utils.CommaJoin(sorted(broken_scripts)))
1688
1689 def _VerifyNodeNetwork(self, ninfo, nresult):
1690 """Check the node network connectivity results.
1691
1692 @type ninfo: L{objects.Node}
1693 @param ninfo: the node to check
1694 @param nresult: the remote results for the node
1695
1696 """
1697 node = ninfo.name
1698 _ErrorIf = self._ErrorIf # pylint: disable=C0103
1699
1700 test = constants.NV_NODELIST not in nresult
1701 _ErrorIf(test, constants.CV_ENODESSH, node,
1702 "node hasn't returned node ssh connectivity data")
1703 if not test:
1704 if nresult[constants.NV_NODELIST]:
1705 for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1706 _ErrorIf(True, constants.CV_ENODESSH, node,
1707 "ssh communication with node '%s': %s", a_node, a_msg)
1708
1709 test = constants.NV_NODENETTEST not in nresult
1710 _ErrorIf(test, constants.CV_ENODENET, node,
1711 "node hasn't returned node tcp connectivity data")
1712 if not test:
1713 if nresult[constants.NV_NODENETTEST]:
1714 nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1715 for anode in nlist:
1716 _ErrorIf(True, constants.CV_ENODENET, node,
1717 "tcp communication with node '%s': %s",
1718 anode, nresult[constants.NV_NODENETTEST][anode])
1719
1720 test = constants.NV_MASTERIP not in nresult
1721 _ErrorIf(test, constants.CV_ENODENET, node,
1722 "node hasn't returned node master IP reachability data")
1723 if not test:
1724 if not nresult[constants.NV_MASTERIP]:
1725 if node == self.master_node:
1726 msg = "the master node cannot reach the master IP (not configured?)"
1727 else:
1728 msg = "cannot reach the master IP"
1729 _ErrorIf(True, constants.CV_ENODENET, node, msg)
1730
1731 def _VerifyInstance(self, instance, inst_config, node_image,
1732 diskstatus):
1733 """Verify an instance.
1734
1735 This function checks to see if the required block devices are
1736 available on the instance's node, and that the nodes are in the correct
1737 state.
1738
1739 """
1740 _ErrorIf = self._ErrorIf # pylint: disable=C0103
1741 pnode = inst_config.primary_node
1742 pnode_img = node_image[pnode]
1743 groupinfo = self.cfg.GetAllNodeGroupsInfo()
1744
1745 node_vol_should = {}
1746 inst_config.MapLVsByNode(node_vol_should)
1747
1748 cluster = self.cfg.GetClusterInfo()
1749 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1750 self.group_info)
1751 err = _ComputeIPolicyInstanceViolation(ipolicy, inst_config, self.cfg)
1752 _ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance, utils.CommaJoin(err),
1753 code=self.ETYPE_WARNING)
1754
1755 for node in node_vol_should:
1756 n_img = node_image[node]
1757 if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1758 # ignore missing volumes on offline or broken nodes
1759 continue
1760 for volume in node_vol_should[node]:
1761 test = volume not in n_img.volumes
1762 _ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance,
1763 "volume %s missing on node %s", volume, node)
1764
1765 if inst_config.admin_state == constants.ADMINST_UP:
1766 test = instance not in pnode_img.instances and not pnode_img.offline
1767 _ErrorIf(test, constants.CV_EINSTANCEDOWN, instance,
1768 "instance not running on its primary node %s",
1769 pnode)
1770 _ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE, instance,
1771 "instance is marked as running and lives on offline node %s",
1772 pnode)
1773
1774 diskdata = [(nname, success, status, idx)
1775 for (nname, disks) in diskstatus.items()
1776 for idx, (success, status) in enumerate(disks)]
1777
1778 for nname, success, bdev_status, idx in diskdata:
1779 # the 'ghost node' construction in Exec() ensures that we have a
1780 # node here
1781 snode = node_image[nname]
1782 bad_snode = snode.ghost or snode.offline
1783 _ErrorIf(inst_config.admin_state == constants.ADMINST_UP and
1784 not success and not bad_snode,
1785 constants.CV_EINSTANCEFAULTYDISK, instance,
1786 "couldn't retrieve status for disk/%s on %s: %s",
1787 idx, nname, bdev_status)
1788 _ErrorIf((inst_config.admin_state == constants.ADMINST_UP and
1789 success and bdev_status.ldisk_status == constants.LDS_FAULTY),
1790 constants.CV_EINSTANCEFAULTYDISK, instance,
1791 "disk/%s on %s is faulty", idx, nname)
1792
1793 _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
1794 constants.CV_ENODERPC, pnode, "instance %s, connection to"
1795 " primary node failed", instance)
1796
1797 _ErrorIf(len(inst_config.secondary_nodes) > 1,
1798 constants.CV_EINSTANCELAYOUT,
1799 instance, "instance has multiple secondary nodes: %s",
1800 utils.CommaJoin(inst_config.secondary_nodes),
1801 code=self.ETYPE_WARNING)
1802
1803 if inst_config.disk_template not in constants.DTS_EXCL_STORAGE:
1804 # Disk template not compatible with exclusive_storage: no instance
1805 # node should have the flag set
1806 es_flags = rpc.GetExclusiveStorageForNodeNames(self.cfg,
1807 inst_config.all_nodes)
1808 es_nodes = [n for (n, es) in es_flags.items()
1809 if es]
1810 _ErrorIf(es_nodes, constants.CV_EINSTANCEUNSUITABLENODE, instance,
1811 "instance has template %s, which is not supported on nodes"
1812 " that have exclusive storage set: %s",
1813 inst_config.disk_template, utils.CommaJoin(es_nodes))
1814
1815 if inst_config.disk_template in constants.DTS_INT_MIRROR:
1816 instance_nodes = utils.NiceSort(inst_config.all_nodes)
1817 instance_groups = {}
1818
1819 for node in instance_nodes:
1820 instance_groups.setdefault(self.all_node_info[node].group,
1821 []).append(node)
1822
1823 pretty_list = [
1824 "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name)
1825 # Sort so that we always list the primary node first.
1826 for group, nodes in sorted(instance_groups.items(),
1827 key=lambda (_, nodes): pnode in nodes,
1828 reverse=True)]
1829
1830 self._ErrorIf(len(instance_groups) > 1,
1831 constants.CV_EINSTANCESPLITGROUPS,
1832 instance, "instance has primary and secondary nodes in"
1833 " different groups: %s", utils.CommaJoin(pretty_list),
1834 code=self.ETYPE_WARNING)
1835
1836 inst_nodes_offline = []
1837 for snode in inst_config.secondary_nodes:
1838 s_img = node_image[snode]
1839 _ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
1840 snode, "instance %s, connection to secondary node failed",
1841 instance)
1842
1843 if s_img.offline:
1844 inst_nodes_offline.append(snode)
1845
1846 # warn that the instance lives on offline nodes
1847 _ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE, instance,
1848 "instance has offline secondary node(s) %s",
1849 utils.CommaJoin(inst_nodes_offline))
1850 # ... or ghost/non-vm_capable nodes
1851 for node in inst_config.all_nodes:
1852 _ErrorIf(node_image[node].ghost, constants.CV_EINSTANCEBADNODE,
1853 instance, "instance lives on ghost node %s", node)
1854 _ErrorIf(not node_image[node].vm_capable, constants.CV_EINSTANCEBADNODE,
1855 instance, "instance lives on non-vm_capable node %s", node)
1856
1857 def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
1858 """Verify if there are any unknown volumes in the cluster.
1859
1860 The .os, .swap and backup volumes are ignored. All other volumes are
1861 reported as unknown.
1862
1863 @type reserved: L{ganeti.utils.FieldSet}
1864 @param reserved: a FieldSet of reserved volume names
1865
1866 """
1867 for node, n_img in node_image.items():
1868 if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
1869 self.all_node_info[node].group != self.group_uuid):
1870 # skip non-healthy nodes
1871 continue
1872 for volume in n_img.volumes:
1873 test = ((node not in node_vol_should or
1874 volume not in node_vol_should[node]) and
1875 not reserved.Matches(volume))
1876 self._ErrorIf(test, constants.CV_ENODEORPHANLV, node,
1877 "volume %s is unknown", volume)
1878
1879 def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1880 """Verify N+1 Memory Resilience.
1881
1882 Check that if one single node dies we can still start all the
1883 instances it was primary for.
1884
1885 """
1886 cluster_info = self.cfg.GetClusterInfo()
1887 for node, n_img in node_image.items():
1888 # This code checks that every node which is now listed as
1889 # secondary has enough memory to host all instances it is
1890 # supposed to should a single other node in the cluster fail.
1891 # FIXME: not ready for failover to an arbitrary node
1892 # FIXME: does not support file-backed instances
1893 # WARNING: we currently take into account down instances as well
1894 # as up ones, considering that even if they're down someone
1895 # might want to start them even in the event of a node failure.
1896 if n_img.offline or self.all_node_info[node].group != self.group_uuid:
1897 # we're skipping nodes marked offline and nodes in other groups from
1898 # the N+1 warning, since most likely we don't have good memory
1899 # infromation from them; we already list instances living on such
1900 # nodes, and that's enough warning
1901 continue
1902 #TODO(dynmem): also consider ballooning out other instances
1903 for prinode, instances in n_img.sbp.items():
1904 needed_mem = 0
1905 for instance in instances:
1906 bep = cluster_info.FillBE(instance_cfg[instance])
1907 if bep[constants.BE_AUTO_BALANCE]:
1908 needed_mem += bep[constants.BE_MINMEM]
1909 test = n_img.mfree < needed_mem
1910 self._ErrorIf(test, constants.CV_ENODEN1, node,
1911 "not enough memory to accomodate instance failovers"
1912 " should node %s fail (%dMiB needed, %dMiB available)",
1913 prinode, needed_mem, n_img.mfree)
1914
1915 @classmethod
1916 def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
1917 (files_all, files_opt, files_mc, files_vm)):
1918 """Verifies file checksums collected from all nodes.
1919
1920 @param errorif: Callback for reporting errors
1921 @param nodeinfo: List of L{objects.Node} objects
1922 @param master_node: Name of master node
1923 @param all_nvinfo: RPC results
1924
1925 """
1926 # Define functions determining which nodes to consider for a file
1927 files2nodefn = [
1928 (files_all, None),
1929 (files_mc, lambda node: (node.master_candidate or
1930 node.name == master_node)),
1931 (files_vm, lambda node: node.vm_capable),
1932 ]
1933
1934 # Build mapping from filename to list of nodes which should have the file
1935 nodefiles = {}
1936 for (files, fn) in files2nodefn:
1937 if fn is None:
1938 filenodes = nodeinfo
1939 else:
1940 filenodes = filter(fn, nodeinfo)
1941 nodefiles.update((filename,
1942 frozenset(map(operator.attrgetter("name"), filenodes)))
1943 for filename in files)
1944
1945 assert set(nodefiles) == (files_all | files_mc | files_vm)
1946
1947 fileinfo = dict((filename, {}) for filename in nodefiles)
1948 ignore_nodes = set()
1949
1950 for node in nodeinfo:
1951 if node.offline:
1952 ignore_nodes.add(node.name)
1953 continue
1954
1955 nresult = all_nvinfo[node.name]
1956
1957 if nresult.fail_msg or not nresult.payload:
1958 node_files = None
1959 else:
1960 fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
1961 node_files = dict((vcluster.LocalizeVirtualPath(key), value)
1962 for (key, value) in fingerprints.items())
1963 del fingerprints
1964
1965 test = not (node_files and isinstance(node_files, dict))
1966 errorif(test, constants.CV_ENODEFILECHECK, node.name,
1967 "Node did not return file checksum data")
1968 if test:
1969 ignore_nodes.add(node.name)
1970 continue
1971
1972 # Build per-checksum mapping from filename to nodes having it
1973 for (filename, checksum) in node_files.items():
1974 assert filename in nodefiles
1975 fileinfo[filename].setdefault(checksum, set()).add(node.name)
1976
1977 for (filename, checksums) in fileinfo.items():
1978 assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
1979
1980 # Nodes having the file
1981 with_file = frozenset(node_name
1982 for nodes in fileinfo[filename].values()
1983 for node_name in nodes) - ignore_nodes
1984
1985 expected_nodes = nodefiles[filename] - ignore_nodes
1986
1987 # Nodes missing file
1988 missing_file = expected_nodes - with_file
1989
1990 if filename in files_opt:
1991 # All or no nodes
1992 errorif(missing_file and missing_file != expected_nodes,
1993 constants.CV_ECLUSTERFILECHECK, None,
1994 "File %s is optional, but it must exist on all or no"
1995 " nodes (not found on %s)",
1996 filename, utils.CommaJoin(utils.NiceSort(missing_file)))
1997 else:
1998 errorif(missing_file, constants.CV_ECLUSTERFILECHECK, None,
1999 "File %s is missing from node(s) %s", filename,
2000 utils.CommaJoin(utils.NiceSort(missing_file)))
2001
2002 # Warn if a node has a file it shouldn't
2003 unexpected = with_file - expected_nodes
2004 errorif(unexpected,
2005 constants.CV_ECLUSTERFILECHECK, None,
2006 "File %s should not exist on node(s) %s",
2007 filename, utils.CommaJoin(utils.NiceSort(unexpected)))
2008
2009 # See if there are multiple versions of the file
2010 test = len(checksums) > 1
2011 if test:
2012 variants = ["variant %s on %s" %
2013 (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
2014 for (idx, (checksum, nodes)) in
2015 enumerate(sorted(checksums.items()))]
2016 else:
2017 variants = []
2018
2019 errorif(test, constants.CV_ECLUSTERFILECHECK, None,
2020 "File %s found with %s different checksums (%s)",
2021 filename, len(checksums), "; ".join(variants))
2022
2023 def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2024 drbd_map):
2025 """Verifies and the node DRBD status.
2026
2027 @type ninfo: L{objects.Node}
2028 @param ninfo: the node to check
2029 @param nresult: the remote results for the node
2030 @param instanceinfo: the dict of instances
2031 @param drbd_helper: the configured DRBD usermode helper
2032 @param drbd_map: the DRBD map as returned by
2033 L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2034
2035 """
2036 node = ninfo.name
2037 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2038
2039 if drbd_helper:
2040 helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2041 test = (helper_result is None)
2042 _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2043 "no drbd usermode helper returned")
2044 if helper_result:
2045 status, payload = helper_result
2046 test = not status
2047 _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2048 "drbd usermode helper check unsuccessful: %s", payload)
2049 test = status and (payload != drbd_helper)
2050 _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2051 "wrong drbd usermode helper: %s", payload)
2052
2053 # compute the DRBD minors
2054 node_drbd = {}
2055 for minor, instance in drbd_map[node].items():
2056 test = instance not in instanceinfo
2057 _ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2058 "ghost instance '%s' in temporary DRBD map", instance)
2059 # ghost instance should not be running, but otherwise we
2060 # don't give double warnings (both ghost instance and
2061 # unallocated minor in use)
2062 if test:
2063 node_drbd[minor] = (instance, False)
2064 else:
2065 instance = instanceinfo[instance]
2066 node_drbd[minor] = (instance.name,
2067 instance.admin_state == constants.ADMINST_UP)
2068
2069 # and now check them
2070 used_minors = nresult.get(constants.NV_DRBDLIST, [])
2071 test = not isinstance(used_minors, (tuple, list))
2072 _ErrorIf(test, constants.CV_ENODEDRBD, node,
2073 "cannot parse drbd status file: %s", str(used_minors))
2074 if test:
2075 # we cannot check drbd status
2076 return
2077
2078 for minor, (iname, must_exist) in node_drbd.items():
2079 test = minor not in used_minors and must_exist
2080 _ErrorIf(test, constants.CV_ENODEDRBD, node,
2081 "drbd minor %d of instance %s is not active", minor, iname)
2082 for minor in used_minors:
2083 test = minor not in node_drbd
2084 _ErrorIf(test, constants.CV_ENODEDRBD, node,
2085 "unallocated drbd minor %d is in use", minor)
2086
2087 def _UpdateNodeOS(self, ninfo, nresult, nimg):
2088 """Builds the node OS structures.
2089
2090 @type ninfo: L{objects.Node}
2091 @param ninfo: the node to check
2092 @param nresult: the remote results for the node
2093 @param nimg: the node image object
2094
2095 """
2096 node = ninfo.name
2097 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2098
2099 remote_os = nresult.get(constants.NV_OSLIST, None)
2100 test = (not isinstance(remote_os, list) or
2101 not compat.all(isinstance(v, list) and len(v) == 7
2102 for v in remote_os))
2103
2104 _ErrorIf(test, constants.CV_ENODEOS, node,
2105 "node hasn't returned valid OS data")
2106
2107 nimg.os_fail = test
2108
2109 if test:
2110 return
2111
2112 os_dict = {}
2113
2114 for (name, os_path, status, diagnose,
2115 variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2116
2117 if name not in os_dict:
2118 os_dict[name] = []
2119
2120 # parameters is a list of lists instead of list of tuples due to
2121 # JSON lacking a real tuple type, fix it:
2122 parameters = [tuple(v) for v in parameters]
2123 os_dict[name].append((os_path, status, diagnose,
2124 set(variants), set(parameters), set(api_ver)))
2125
2126 nimg.oslist = os_dict
2127
2128 def _VerifyNodeOS(self, ninfo, nimg, base):
2129 """Verifies the node OS list.
2130
2131 @type ninfo: L{objects.Node}
2132 @param ninfo: the node to check
2133 @param nimg: the node image object
2134 @param base: the 'template' node we match against (e.g. from the master)
2135
2136 """
2137 node = ninfo.name
2138 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2139
2140 assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2141
2142 beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2143 for os_name, os_data in nimg.oslist.items():
2144 assert os_data, "Empty OS status for OS %s?!" % os_name
2145 f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2146 _ErrorIf(not f_status, constants.CV_ENODEOS, node,
2147 "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
2148 _ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, node,
2149 "OS '%s' has multiple entries (first one shadows the rest): %s",
2150 os_name, utils.CommaJoin([v[0] for v in os_data]))
2151 # comparisons with the 'base' image
2152 test = os_name not in base.oslist
2153 _ErrorIf(test, constants.CV_ENODEOS, node,
2154 "Extra OS %s not present on reference node (%s)",
2155 os_name, base.name)
2156 if test:
2157 continue
2158 assert base.oslist[os_name], "Base node has empty OS status?"
2159 _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2160 if not b_status:
2161 # base OS is invalid, skipping
2162 continue
2163 for kind, a, b in [("API version", f_api, b_api),
2164 ("variants list", f_var, b_var),
2165 ("parameters", beautify_params(f_param),
2166 beautify_params(b_param))]:
2167 _ErrorIf(a != b, constants.CV_ENODEOS, node,
2168 "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
2169 kind, os_name, base.name,
2170 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2171
2172 # check any missing OSes
2173 missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2174 _ErrorIf(missing, constants.CV_ENODEOS, node,
2175 "OSes present on reference node %s but missing on this node: %s",
2176 base.name, utils.CommaJoin(missing))
2177
2178 def _VerifyFileStoragePaths(self, ninfo, nresult, is_master):
2179 """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2180
2181 @type ninfo: L{objects.Node}
2182 @param ninfo: the node to check
2183 @param nresult: the remote results for the node
2184 @type is_master: bool
2185 @param is_master: Whether node is the master node
2186
2187 """
2188 node = ninfo.name
2189
2190 if (is_master and
2191 (constants.ENABLE_FILE_STORAGE or
2192 constants.ENABLE_SHARED_FILE_STORAGE)):
2193 try:
2194 fspaths = nresult[constants.NV_FILE_STORAGE_PATHS]
2195 except KeyError:
2196 # This should never happen
2197 self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, node,
2198 "Node did not return forbidden file storage paths")
2199 else:
2200 self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, node,
2201 "Found forbidden file storage paths: %s",
2202 utils.CommaJoin(fspaths))
2203 else:
2204 self._ErrorIf(constants.NV_FILE_STORAGE_PATHS in nresult,
2205 constants.CV_ENODEFILESTORAGEPATHS, node,
2206 "Node should not have returned forbidden file storage"
2207 " paths")
2208
2209 def _VerifyOob(self, ninfo, nresult):
2210 """Verifies out of band functionality of a node.
2211
2212 @type ninfo: L{objects.Node}
2213 @param ninfo: the node to check
2214 @param nresult: the remote results for the node
2215
2216 """
2217 node = ninfo.name
2218 # We just have to verify the paths on master and/or master candidates
2219 # as the oob helper is invoked on the master
2220 if ((ninfo.master_candidate or ninfo.master_capable) and
2221 constants.NV_OOB_PATHS in nresult):
2222 for path_result in nresult[constants.NV_OOB_PATHS]:
2223 self._ErrorIf(path_result, constants.CV_ENODEOOBPATH, node, path_result)
2224
2225 def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2226 """Verifies and updates the node volume data.
2227
2228 This function will update a L{NodeImage}'s internal structures
2229 with data from the remote call.
2230
2231 @type ninfo: L{objects.Node}
2232 @param ninfo: the node to check
2233 @param nresult: the remote results for the node
2234 @param nimg: the node image object
2235 @param vg_name: the configured VG name
2236
2237 """
2238 node = ninfo.name
2239 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2240
2241 nimg.lvm_fail = True
2242 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2243 if vg_name is None:
2244 pass
2245 elif isinstance(lvdata, basestring):
2246 _ErrorIf(True, constants.CV_ENODELVM, node, "LVM problem on node: %s",
2247 utils.SafeEncode(lvdata))
2248 elif not isinstance(lvdata, dict):
2249 _ErrorIf(True, constants.CV_ENODELVM, node,
2250 "rpc call to node failed (lvlist)")
2251 else:
2252 nimg.volumes = lvdata
2253 nimg.lvm_fail = False
2254
2255 def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2256 """Verifies and updates the node instance list.
2257
2258 If the listing was successful, then updates this node's instance
2259 list. Otherwise, it marks the RPC call as failed for the instance
2260 list key.
2261
2262 @type ninfo: L{objects.Node}
2263 @param ninfo: the node to check
2264 @param nresult: the remote results for the node
2265 @param nimg: the node image object
2266
2267 """
2268 idata = nresult.get(constants.NV_INSTANCELIST, None)
2269 test = not isinstance(idata, list)
2270 self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2271 "rpc call to node failed (instancelist): %s",
2272 utils.SafeEncode(str(idata)))
2273 if test:
2274 nimg.hyp_fail = True
2275 else:
2276 nimg.instances = idata
2277
2278 def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2279 """Verifies and computes a node information map
2280
2281 @type ninfo: L{objects.Node}
2282 @param ninfo: the node to check
2283 @param nresult: the remote results for the node
2284 @param nimg: the node image object
2285 @param vg_name: the configured VG name
2286
2287 """
2288 node = ninfo.name
2289 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2290
2291 # try to read free memory (from the hypervisor)
2292 hv_info = nresult.get(constants.NV_HVINFO, None)
2293 test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2294 _ErrorIf(test, constants.CV_ENODEHV, node,
2295 "rpc call to node failed (hvinfo)")
2296 if not test:
2297 try:
2298 nimg.mfree = int(hv_info["memory_free"])
2299 except (ValueError, TypeError):
2300 _ErrorIf(True, constants.CV_ENODERPC, node,
2301 "node returned invalid nodeinfo, check hypervisor")
2302
2303 # FIXME: devise a free space model for file based instances as well
2304 if vg_name is not None:
2305 test = (constants.NV_VGLIST not in nresult or
2306 vg_name not in nresult[constants.NV_VGLIST])
2307 _ErrorIf(test, constants.CV_ENODELVM, node,
2308 "node didn't return data for the volume group '%s'"
2309 " - it is either missing or broken", vg_name)
2310 if not test:
2311 try:
2312 nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2313 except (ValueError, TypeError):
2314 _ErrorIf(True, constants.CV_ENODERPC, node,
2315 "node returned invalid LVM info, check LVM status")
2316
2317 def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
2318 """Gets per-disk status information for all instances.
2319
2320 @type nodelist: list of strings
2321 @param nodelist: Node names
2322 @type node_image: dict of (name, L{objects.Node})
2323 @param node_image: Node objects
2324 @type instanceinfo: dict of (name, L{objects.Instance})
2325 @param instanceinfo: Instance objects
2326 @rtype: {instance: {node: [(succes, payload)]}}
2327 @return: a dictionary of per-instance dictionaries with nodes as
2328 keys and disk information as values; the disk information is a
2329 list of tuples (success, payload)
2330
2331 """
2332 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2333
2334 node_disks = {}
2335 node_disks_devonly = {}
2336 diskless_instances = set()
2337 diskless = constants.DT_DISKLESS
2338
2339 for nname in nodelist:
2340 node_instances = list(itertools.chain(node_image[nname].pinst,
2341 node_image[nname].sinst))
2342 diskless_instances.update(inst for inst in node_instances
2343 if instanceinfo[inst].disk_template == diskless)
2344 disks = [(inst, disk)
2345 for inst in node_instances
2346 for disk in instanceinfo[inst].disks]
2347
2348 if not disks:
2349 # No need to collect data
2350 continue
2351
2352 node_disks[nname] = disks
2353
2354 # _AnnotateDiskParams makes already copies of the disks
2355 devonly = []
2356 for (inst, dev) in disks:
2357 (anno_disk,) = _AnnotateDiskParams(instanceinfo[inst], [dev], self.cfg)
2358 self.cfg.SetDiskID(anno_disk, nname)
2359 devonly.append(anno_disk)
2360
2361 node_disks_devonly[nname] = devonly
2362
2363 assert len(node_disks) == len(node_disks_devonly)
2364
2365 # Collect data from all nodes with disks
2366 result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2367 node_disks_devonly)
2368
2369 assert len(result) == len(node_disks)
2370
2371 instdisk = {}
2372
2373 for (nname, nres) in result.items():
2374 disks = node_disks[nname]
2375
2376 if nres.offline:
2377 # No data from this node
2378 data = len(disks) * [(False, "node offline")]
2379 else:
2380 msg = nres.fail_msg
2381 _ErrorIf(msg, constants.CV_ENODERPC, nname,
2382 "while getting disk information: %s", msg)
2383 if msg:
2384 # No data from this node
2385 data = len(disks) * [(False, msg)]
2386 else:
2387 data = []
2388 for idx, i in enumerate(nres.payload):
2389 if isinstance(i, (tuple, list)) and len(i) == 2:
2390 data.append(i)
2391 else:
2392 logging.warning("Invalid result from node %s, entry %d: %s",
2393 nname, idx, i)
2394 data.append((False, "Invalid result from the remote node"))
2395
2396 for ((inst, _), status) in zip(disks, data):
2397 instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
2398
2399 # Add empty entries for diskless instances.
2400 for inst in diskless_instances:
2401 assert inst not in instdisk
2402 instdisk[inst] = {}
2403
2404 assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2405 len(nnames) <= len(instanceinfo[inst].all_nodes) and
2406 compat.all(isinstance(s, (tuple, list)) and
2407 len(s) == 2 for s in statuses)
2408 for inst, nnames in instdisk.items()
2409 for nname, statuses in nnames.items())
2410 if __debug__:
2411 instdisk_keys = set(instdisk)
2412 instanceinfo_keys = set(instanceinfo)
2413 assert instdisk_keys == instanceinfo_keys, \
2414 ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2415 (instdisk_keys, instanceinfo_keys))
2416
2417 return instdisk
2418
2419 @staticmethod
2420 def _SshNodeSelector(group_uuid, all_nodes):
2421 """Create endless iterators for all potential SSH check hosts.
2422
2423 """
2424 nodes = [node for node in all_nodes
2425 if (node.group != group_uuid and
2426 not node.offline)]
2427 keyfunc = operator.attrgetter("group")
2428
2429 return map(itertools.cycle,
2430 [sorted(map(operator.attrgetter("name"), names))
2431 for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2432 keyfunc)])
2433
2434 @classmethod
2435 def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2436 """Choose which nodes should talk to which other nodes.
2437
2438 We will make nodes contact all nodes in their group, and one node from
2439 every other group.
2440
2441 @warning: This algorithm has a known issue if one node group is much
2442 smaller than others (e.g. just one node). In such a case all other
2443 nodes will talk to the single node.
2444
2445 """
2446 online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2447 sel = cls._SshNodeSelector(group_uuid, all_nodes)
2448
2449 return (online_nodes,
2450 dict((name, sorted([i.next() for i in sel]))
2451 for name in online_nodes))
2452
2453 def BuildHooksEnv(self):
2454 """Build hooks env.
2455
2456 Cluster-Verify hooks just ran in the post phase and their failure makes
2457 the output be logged in the verify output and the verification to fail.
2458
2459 """
2460 env = {
2461 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2462 }
2463
2464 env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2465 for node in self.my_node_info.values())
2466
2467 return env
2468
2469 def BuildHooksNodes(self):
2470 """Build hooks nodes.
2471
2472 """
2473 return ([], self.my_node_names)
2474
2475 def Exec(self, feedback_fn):
2476 """Verify integrity of the node group, performing various test on nodes.
2477
2478 """
2479 # This method has too many local variables. pylint: disable=R0914
2480 feedback_fn("* Verifying group '%s'" % self.group_info.name)
2481
2482 if not self.my_node_names:
2483 # empty node group
2484 feedback_fn("* Empty node group, skipping verification")
2485 return True
2486
2487 self.bad = False
2488 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2489 verbose = self.op.verbose
2490 self._feedback_fn = feedback_fn
2491
2492 vg_name = self.cfg.GetVGName()
2493 drbd_helper = self.cfg.GetDRBDHelper()
2494 cluster = self.cfg.GetClusterInfo()
2495 hypervisors = cluster.enabled_hypervisors
2496 node_data_list = [self.my_node_info[name] for name in self.my_node_names]
2497
2498 i_non_redundant = [] # Non redundant instances
2499 i_non_a_balanced = [] # Non auto-balanced instances
2500 i_offline = 0 # Count of offline instances
2501 n_offline = 0 # Count of offline nodes
2502 n_drained = 0 # Count of nodes being drained
2503 node_vol_should = {}
2504
2505 # FIXME: verify OS list
2506
2507 # File verification
2508 filemap = _ComputeAncillaryFiles(cluster, False)
2509
2510 # do local checksums
2511 master_node = self.master_node = self.cfg.GetMasterNode()
2512 master_ip = self.cfg.GetMasterIP()
2513
2514 feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
2515
2516 user_scripts = []
2517 if self.cfg.GetUseExternalMipScript():
2518 user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2519
2520 node_verify_param = {
2521 constants.NV_FILELIST:
2522 map(vcluster.MakeVirtualPath,
2523 utils.UniqueSequence(filename
2524 for files in filemap
2525 for filename in files)),
2526 constants.NV_NODELIST:
2527 self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2528 self.all_node_info.values()),
2529 constants.NV_HYPERVISOR: hypervisors,
2530 constants.NV_HVPARAMS:
2531 _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2532 constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2533 for node in node_data_list
2534 if not node.offline],
2535 constants.NV_INSTANCELIST: hypervisors,
2536 constants.NV_VERSION: None,
2537 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2538 constants.NV_NODESETUP: None,
2539 constants.NV_TIME: None,
2540 constants.NV_MASTERIP: (master_node, master_ip),
2541 constants.NV_OSLIST: None,
2542 constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2543 constants.NV_USERSCRIPTS: user_scripts,
2544 }
2545
2546 if vg_name is not None:
2547 node_verify_param[constants.NV_VGLIST] = None
2548 node_verify_param[constants.NV_LVLIST] = vg_name
2549 node_verify_param[constants.NV_PVLIST] = [vg_name]
2550
2551 if drbd_helper:
2552 node_verify_param[constants.NV_DRBDLIST] = None
2553 node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2554
2555 if constants.ENABLE_FILE_STORAGE or constants.ENABLE_SHARED_FILE_STORAGE:
2556 # Load file storage paths only from master node
2557 node_verify_param[constants.NV_FILE_STORAGE_PATHS] = master_node
2558
2559 # bridge checks
2560 # FIXME: this needs to be changed per node-group, not cluster-wide
2561 bridges = set()
2562 default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2563 if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2564 bridges.add(default_nicpp[constants.NIC_LINK])
2565 for instance in self.my_inst_info.values():
2566 for nic in instance.nics:
2567 full_nic = cluster.SimpleFillNIC(nic.nicparams)
2568 if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2569 bridges.add(full_nic[constants.NIC_LINK])
2570
2571 if bridges:
2572 node_verify_param[constants.NV_BRIDGES] = list(bridges)
2573
2574 # Build our expected cluster state
2575 node_image = dict((node.name, self.NodeImage(offline=node.offline,
2576 name=node.name,
2577 vm_capable=node.vm_capable))
2578 for node in node_data_list)
2579
2580 # Gather OOB paths
2581 oob_paths = []
2582 for node in self.all_node_info.values():
2583 path = _SupportsOob(self.cfg, node)
2584 if path and path not in oob_paths:
2585 oob_paths.append(path)
2586
2587 if oob_paths:
2588 node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2589
2590 for instance in self.my_inst_names:
2591 inst_config = self.my_inst_info[instance]
2592 if inst_config.admin_state == constants.ADMINST_OFFLINE:
2593 i_offline += 1
2594
2595 for nname in inst_config.all_nodes:
2596 if nname not in node_image:
2597 gnode = self.NodeImage(name=nname)
2598 gnode.ghost = (nname not in self.all_node_info)
2599 node_image[nname] = gnode
2600
2601 inst_config.MapLVsByNode(node_vol_should)
2602
2603 pnode = inst_config.primary_node
2604 node_image[pnode].pinst.append(instance)
2605
2606 for snode in inst_config.secondary_nodes:
2607 nimg = node_image[snode]
2608 nimg.sinst.append(instance)
2609 if pnode not in nimg.sbp:
2610 nimg.sbp[pnode] = []
2611 nimg.sbp[pnode].append(instance)
2612
2613 es_flags = rpc.GetExclusiveStorageForNodeNames(self.cfg, self.my_node_names)
2614 # The value of exclusive_storage should be the same across the group, so if
2615 # it's True for at least a node, we act as if it were set for all the nodes
2616 self._exclusive_storage = compat.any(es_flags.values())
2617 if self._exclusive_storage:
2618 node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2619
2620 # At this point, we have the in-memory data structures complete,
2621 # except for the runtime information, which we'll gather next
2622
2623 # Due to the way our RPC system works, exact response times cannot be
2624 # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2625 # time before and after executing the request, we can at least have a time
2626 # window.
2627 nvinfo_starttime = time.time()
2628 all_nvinfo = self.rpc.call_node_verify(self.my_node_names,
2629 node_verify_param,
2630 self.cfg.GetClusterName())
2631 nvinfo_endtime = time.time()
2632
2633 if self.extra_lv_nodes and vg_name is not None:
2634 extra_lv_nvinfo = \
2635 self.rpc.call_node_verify(self.extra_lv_nodes,
2636 {constants.NV_LVLIST: vg_name},
2637 self.cfg.GetClusterName())
2638 else:
2639 extra_lv_nvinfo = {}
2640
2641 all_drbd_map = self.cfg.ComputeDRBDMap()
2642
2643 feedback_fn("* Gathering disk information (%s nodes)" %
2644 len(self.my_node_names))
2645 instdisk = self._CollectDiskInfo(self.my_node_names, node_image,
2646 self.my_inst_info)
2647
2648 feedback_fn("* Verifying configuration file consistency")
2649
2650 # If not all nodes are being checked, we need to make sure the master node
2651 # and a non-checked vm_capable node are in the list.
2652 absent_nodes = set(self.all_node_info).difference(self.my_node_info)
2653 if absent_nodes:
2654 vf_nvinfo = all_nvinfo.copy()
2655 vf_node_info = list(self.my_node_info.values())
2656 additional_nodes = []
2657 if master_node not in self.my_node_info:
2658 additional_nodes.append(master_node)
2659 vf_node_info.append(self.all_node_info[master_node])
2660 # Add the first vm_capable node we find which is not included,
2661 # excluding the master node (which we already have)
2662 for node in absent_nodes:
2663 nodeinfo = self.all_node_info[node]
2664 if (nodeinfo.vm_capable and not nodeinfo.offline and
2665 node != master_node):
2666 additional_nodes.append(node)
2667 vf_node_info.append(self.all_node_info[node])
2668 break
2669 key = constants.NV_FILELIST
2670 vf_nvinfo.update(self.rpc.call_node_verify(additional_nodes,
2671 {key: node_verify_param[key]},
2672 self.cfg.GetClusterName()))
2673 else:
2674 vf_nvinfo = all_nvinfo
2675 vf_node_info = self.my_node_info.values()
2676
2677 self._VerifyFiles(_ErrorIf, vf_node_info, master_node, vf_nvinfo, filemap)
2678
2679 feedback_fn("* Verifying node status")
2680
2681 refos_img = None
2682
2683 for node_i in node_data_list:
2684 node = node_i.name
2685 nimg = node_image[node]
2686
2687 if node_i.offline:
2688 if verbose:
2689 feedback_fn("* Skipping offline node %s" % (node,))
2690 n_offline += 1
2691 continue
2692
2693 if node == master_node:
2694 ntype = "master"
2695 elif node_i.master_candidate:
2696 ntype = "master candidate"
2697 elif node_i.drained:
2698 ntype = "drained"
2699 n_drained += 1
2700 else:
2701 ntype = "regular"
2702 if verbose:
2703 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2704
2705 msg = all_nvinfo[node].fail_msg
2706 _ErrorIf(msg, constants.CV_ENODERPC, node, "while contacting node: %s",
2707 msg)
2708 if msg:
2709 nimg.rpc_fail = True
2710 continue
2711
2712 nresult = all_nvinfo[node].payload
2713
2714 nimg.call_ok = self._VerifyNode(node_i, nresult)
2715 self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2716 self._VerifyNodeNetwork(node_i, nresult)
2717 self._VerifyNodeUserScripts(node_i, nresult)
2718 self._VerifyOob(node_i, nresult)
2719 self._VerifyFileStoragePaths(node_i, nresult,
2720 node == master_node)
2721
2722 if nimg.vm_capable:
2723 self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
2724 self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2725 all_drbd_map)
2726
2727 self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2728 self._UpdateNodeInstances(node_i, nresult, nimg)
2729 self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2730 self._UpdateNodeOS(node_i, nresult, nimg)
2731
2732 if not nimg.os_fail:
2733 if refos_img is None:
2734 refos_img = nimg
2735 self._VerifyNodeOS(node_i, nimg, refos_img)
2736 self._VerifyNodeBridges(node_i, nresult, bridges)
2737
2738 # Check whether all running instancies are primary for the node. (This
2739 # can no longer be done from _VerifyInstance below, since some of the
2740 # wrong instances could be from other node groups.)
2741 non_primary_inst = set(nimg.instances).difference(nimg.pinst)
2742
2743 for inst in non_primary_inst:
2744 test = inst in self.all_inst_info
2745 _ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, inst,
2746 "instance should not run on node %s", node_i.name)
2747 _ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
2748 "node is running unknown instance %s", inst)
2749
2750 self._VerifyGroupLVM(node_image, vg_name)
2751
2752 for node, result in extra_lv_nvinfo.items():
2753 self._UpdateNodeVolumes(self.all_node_info[node], result.payload,
2754 node_image[node], vg_name)
2755
2756 feedback_fn("* Verifying instance status")
2757 for instance in self.my_inst_names:
2758 if verbose:
2759 feedback_fn("* Verifying instance %s" % instance)
2760 inst_config = self.my_inst_info[instance]
2761 self._VerifyInstance(instance, inst_config, node_image,
2762 instdisk[instance])
2763
2764 # If the instance is non-redundant we cannot survive losing its primary
2765 # node, so we are not N+1 compliant.
2766 if inst_config.disk_template not in constants.DTS_MIRRORED:
2767 i_non_redundant.append(instance)
2768
2769 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2770 i_non_a_balanced.append(instance)
2771
2772 feedback_fn("* Verifying orphan volumes")
2773 reserved = utils.FieldSet(*cluster.reserved_lvs)
2774
2775 # We will get spurious "unknown volume" warnings if any node of this group
2776 # is secondary for an instance whose primary is in another group. To avoid
2777 # them, we find these instances and add their volumes to node_vol_should.
2778 for inst in self.all_inst_info.values():
2779 for secondary in inst.secondary_nodes:
2780 if (secondary in self.my_node_info
2781 and inst.name not in self.my_inst_info):
2782 inst.MapLVsByNode(node_vol_should)
2783 break
2784
2785 self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2786
2787 if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2788 feedback_fn("* Verifying N+1 Memory redundancy")
2789 self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
2790
2791 feedback_fn("* Other Notes")
2792 if i_non_redundant:
2793 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
2794 % len(i_non_redundant))
2795
2796 if i_non_a_balanced:
2797 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
2798 % len(i_non_a_balanced))
2799
2800 if i_offline:
2801 feedback_fn(" - NOTICE: %d offline instance(s) found." % i_offline)
2802
2803 if n_offline:
2804 feedback_fn(" - NOTICE: %d offline node(s) found." % n_offline)
2805
2806 if n_drained:
2807 feedback_fn(" - NOTICE: %d drained node(s) found." % n_drained)
2808
2809 return not self.bad
2810
2811 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2812 """Analyze the post-hooks' result
2813
2814 This method analyses the hook result, handles it, and sends some
2815 nicely-formatted feedback back to the user.
2816
2817 @param phase: one of L{constants.HOOKS_PHASE_POST} or
2818 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2819 @param hooks_results: the results of the multi-node hooks rpc call
2820 @param feedback_fn: function used send feedback back to the caller
2821 @param lu_result: previous Exec result
2822 @return: the new Exec result, based on the previous result
2823 and hook results
2824
2825 """
2826 # We only really run POST phase hooks, only for non-empty groups,
2827 # and are only interested in their results
2828 if not self.my_node_names:
2829 # empty node group
2830 pass
2831 elif phase == constants.HOOKS_PHASE_POST:
2832 # Used to change hooks' output to proper indentation
2833 feedback_fn("* Hooks Results")
2834 assert hooks_results, "invalid result from hooks"
2835
2836 for node_name in hooks_results:
2837 res = hooks_results[node_name]
2838 msg = res.fail_msg
2839 test = msg and not res.offline
2840 self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2841 "Communication failure in hooks execution: %s", msg)
2842 if res.offline or msg:
2843 # No need to investigate payload if node is offline or gave
2844 # an error.
2845 continue
2846 for script, hkr, output in res.payload:
2847 test = hkr == constants.HKR_FAIL
2848 self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2849 "Script %s failed, output:", script)
2850 if test:
2851 output = self._HOOKS_INDENT_RE.sub(" ", output)
2852 feedback_fn("%s" % output)
2853 lu_result = False
2854
2855 return lu_result
2856
2857
2858 class LUClusterVerifyDisks(NoHooksLU):
2859 """Verifies the cluster disks status.
2860
2861 """
2862 REQ_BGL = False
2863
2864 def ExpandNames(self):
2865 self.share_locks = _ShareAll()
2866 self.needed_locks = {
2867 locking.LEVEL_NODEGROUP: locking.ALL_SET,
2868 }
2869
2870 def Exec(self, feedback_fn):
2871 group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
2872
2873 # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
2874 return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
2875 for group in group_names])