cmdlib: Cleanup public/private functions
[ganeti-github.git] / lib / cmdlib / cluster.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Logical units dealing with the cluster."""
23
24 import OpenSSL
25
26 import copy
27 import itertools
28 import logging
29 import operator
30 import os
31 import re
32 import time
33
34 from ganeti import compat
35 from ganeti import constants
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import masterd
40 from ganeti import netutils
41 from ganeti import objects
42 from ganeti import opcodes
43 from ganeti import pathutils
44 from ganeti import query
45 from ganeti import rpc
46 from ganeti import runtime
47 from ganeti import ssh
48 from ganeti import uidpool
49 from ganeti import utils
50 from ganeti import vcluster
51
52 from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53 ResultWithJobs
54 from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55 ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56 GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57 GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58 CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59 ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob
60
61 import ganeti.masterd.instance
62
63
64 class LUClusterActivateMasterIp(NoHooksLU):
65 """Activate the master IP on the master node.
66
67 """
68 def Exec(self, feedback_fn):
69 """Activate the master IP.
70
71 """
72 master_params = self.cfg.GetMasterNetworkParameters()
73 ems = self.cfg.GetUseExternalMipScript()
74 result = self.rpc.call_node_activate_master_ip(master_params.name,
75 master_params, ems)
76 result.Raise("Could not activate the master IP")
77
78
79 class LUClusterDeactivateMasterIp(NoHooksLU):
80 """Deactivate the master IP on the master node.
81
82 """
83 def Exec(self, feedback_fn):
84 """Deactivate the master IP.
85
86 """
87 master_params = self.cfg.GetMasterNetworkParameters()
88 ems = self.cfg.GetUseExternalMipScript()
89 result = self.rpc.call_node_deactivate_master_ip(master_params.name,
90 master_params, ems)
91 result.Raise("Could not deactivate the master IP")
92
93
94 class LUClusterConfigQuery(NoHooksLU):
95 """Return configuration values.
96
97 """
98 REQ_BGL = False
99
100 def CheckArguments(self):
101 self.cq = ClusterQuery(None, self.op.output_fields, False)
102
103 def ExpandNames(self):
104 self.cq.ExpandNames(self)
105
106 def DeclareLocks(self, level):
107 self.cq.DeclareLocks(self, level)
108
109 def Exec(self, feedback_fn):
110 result = self.cq.OldStyleQuery(self)
111
112 assert len(result) == 1
113
114 return result[0]
115
116
117 class LUClusterDestroy(LogicalUnit):
118 """Logical unit for destroying the cluster.
119
120 """
121 HPATH = "cluster-destroy"
122 HTYPE = constants.HTYPE_CLUSTER
123
124 def BuildHooksEnv(self):
125 """Build hooks env.
126
127 """
128 return {
129 "OP_TARGET": self.cfg.GetClusterName(),
130 }
131
132 def BuildHooksNodes(self):
133 """Build hooks nodes.
134
135 """
136 return ([], [])
137
138 def CheckPrereq(self):
139 """Check prerequisites.
140
141 This checks whether the cluster is empty.
142
143 Any errors are signaled by raising errors.OpPrereqError.
144
145 """
146 master = self.cfg.GetMasterNode()
147
148 nodelist = self.cfg.GetNodeList()
149 if len(nodelist) != 1 or nodelist[0] != master:
150 raise errors.OpPrereqError("There are still %d node(s) in"
151 " this cluster." % (len(nodelist) - 1),
152 errors.ECODE_INVAL)
153 instancelist = self.cfg.GetInstanceList()
154 if instancelist:
155 raise errors.OpPrereqError("There are still %d instance(s) in"
156 " this cluster." % len(instancelist),
157 errors.ECODE_INVAL)
158
159 def Exec(self, feedback_fn):
160 """Destroys the cluster.
161
162 """
163 master_params = self.cfg.GetMasterNetworkParameters()
164
165 # Run post hooks on master node before it's removed
166 RunPostHook(self, master_params.name)
167
168 ems = self.cfg.GetUseExternalMipScript()
169 result = self.rpc.call_node_deactivate_master_ip(master_params.name,
170 master_params, ems)
171 if result.fail_msg:
172 self.LogWarning("Error disabling the master IP address: %s",
173 result.fail_msg)
174
175 return master_params.name
176
177
178 class LUClusterPostInit(LogicalUnit):
179 """Logical unit for running hooks after cluster initialization.
180
181 """
182 HPATH = "cluster-init"
183 HTYPE = constants.HTYPE_CLUSTER
184
185 def BuildHooksEnv(self):
186 """Build hooks env.
187
188 """
189 return {
190 "OP_TARGET": self.cfg.GetClusterName(),
191 }
192
193 def BuildHooksNodes(self):
194 """Build hooks nodes.
195
196 """
197 return ([], [self.cfg.GetMasterNode()])
198
199 def Exec(self, feedback_fn):
200 """Nothing to do.
201
202 """
203 return True
204
205
206 class ClusterQuery(QueryBase):
207 FIELDS = query.CLUSTER_FIELDS
208
209 #: Do not sort (there is only one item)
210 SORT_FIELD = None
211
212 def ExpandNames(self, lu):
213 lu.needed_locks = {}
214
215 # The following variables interact with _QueryBase._GetNames
216 self.wanted = locking.ALL_SET
217 self.do_locking = self.use_locking
218
219 if self.do_locking:
220 raise errors.OpPrereqError("Can not use locking for cluster queries",
221 errors.ECODE_INVAL)
222
223 def DeclareLocks(self, lu, level):
224 pass
225
226 def _GetQueryData(self, lu):
227 """Computes the list of nodes and their attributes.
228
229 """
230 # Locking is not used
231 assert not (compat.any(lu.glm.is_owned(level)
232 for level in locking.LEVELS
233 if level != locking.LEVEL_CLUSTER) or
234 self.do_locking or self.use_locking)
235
236 if query.CQ_CONFIG in self.requested_data:
237 cluster = lu.cfg.GetClusterInfo()
238 else:
239 cluster = NotImplemented
240
241 if query.CQ_QUEUE_DRAINED in self.requested_data:
242 drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
243 else:
244 drain_flag = NotImplemented
245
246 if query.CQ_WATCHER_PAUSE in self.requested_data:
247 master_name = lu.cfg.GetMasterNode()
248
249 result = lu.rpc.call_get_watcher_pause(master_name)
250 result.Raise("Can't retrieve watcher pause from master node '%s'" %
251 master_name)
252
253 watcher_pause = result.payload
254 else:
255 watcher_pause = NotImplemented
256
257 return query.ClusterQueryData(cluster, drain_flag, watcher_pause)
258
259
260 class LUClusterQuery(NoHooksLU):
261 """Query cluster configuration.
262
263 """
264 REQ_BGL = False
265
266 def ExpandNames(self):
267 self.needed_locks = {}
268
269 def Exec(self, feedback_fn):
270 """Return cluster config.
271
272 """
273 cluster = self.cfg.GetClusterInfo()
274 os_hvp = {}
275
276 # Filter just for enabled hypervisors
277 for os_name, hv_dict in cluster.os_hvp.items():
278 os_hvp[os_name] = {}
279 for hv_name, hv_params in hv_dict.items():
280 if hv_name in cluster.enabled_hypervisors:
281 os_hvp[os_name][hv_name] = hv_params
282
283 # Convert ip_family to ip_version
284 primary_ip_version = constants.IP4_VERSION
285 if cluster.primary_ip_family == netutils.IP6Address.family:
286 primary_ip_version = constants.IP6_VERSION
287
288 result = {
289 "software_version": constants.RELEASE_VERSION,
290 "protocol_version": constants.PROTOCOL_VERSION,
291 "config_version": constants.CONFIG_VERSION,
292 "os_api_version": max(constants.OS_API_VERSIONS),
293 "export_version": constants.EXPORT_VERSION,
294 "architecture": runtime.GetArchInfo(),
295 "name": cluster.cluster_name,
296 "master": cluster.master_node,
297 "default_hypervisor": cluster.primary_hypervisor,
298 "enabled_hypervisors": cluster.enabled_hypervisors,
299 "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
300 for hypervisor_name in cluster.enabled_hypervisors]),
301 "os_hvp": os_hvp,
302 "beparams": cluster.beparams,
303 "osparams": cluster.osparams,
304 "ipolicy": cluster.ipolicy,
305 "nicparams": cluster.nicparams,
306 "ndparams": cluster.ndparams,
307 "diskparams": cluster.diskparams,
308 "candidate_pool_size": cluster.candidate_pool_size,
309 "master_netdev": cluster.master_netdev,
310 "master_netmask": cluster.master_netmask,
311 "use_external_mip_script": cluster.use_external_mip_script,
312 "volume_group_name": cluster.volume_group_name,
313 "drbd_usermode_helper": cluster.drbd_usermode_helper,
314 "file_storage_dir": cluster.file_storage_dir,
315 "shared_file_storage_dir": cluster.shared_file_storage_dir,
316 "maintain_node_health": cluster.maintain_node_health,
317 "ctime": cluster.ctime,
318 "mtime": cluster.mtime,
319 "uuid": cluster.uuid,
320 "tags": list(cluster.GetTags()),
321 "uid_pool": cluster.uid_pool,
322 "default_iallocator": cluster.default_iallocator,
323 "reserved_lvs": cluster.reserved_lvs,
324 "primary_ip_version": primary_ip_version,
325 "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
326 "hidden_os": cluster.hidden_os,
327 "blacklisted_os": cluster.blacklisted_os,
328 }
329
330 return result
331
332
333 class LUClusterRedistConf(NoHooksLU):
334 """Force the redistribution of cluster configuration.
335
336 This is a very simple LU.
337
338 """
339 REQ_BGL = False
340
341 def ExpandNames(self):
342 self.needed_locks = {
343 locking.LEVEL_NODE: locking.ALL_SET,
344 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
345 }
346 self.share_locks = ShareAll()
347
348 def Exec(self, feedback_fn):
349 """Redistribute the configuration.
350
351 """
352 self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
353 RedistributeAncillaryFiles(self)
354
355
356 class LUClusterRename(LogicalUnit):
357 """Rename the cluster.
358
359 """
360 HPATH = "cluster-rename"
361 HTYPE = constants.HTYPE_CLUSTER
362
363 def BuildHooksEnv(self):
364 """Build hooks env.
365
366 """
367 return {
368 "OP_TARGET": self.cfg.GetClusterName(),
369 "NEW_NAME": self.op.name,
370 }
371
372 def BuildHooksNodes(self):
373 """Build hooks nodes.
374
375 """
376 return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
377
378 def CheckPrereq(self):
379 """Verify that the passed name is a valid one.
380
381 """
382 hostname = netutils.GetHostname(name=self.op.name,
383 family=self.cfg.GetPrimaryIPFamily())
384
385 new_name = hostname.name
386 self.ip = new_ip = hostname.ip
387 old_name = self.cfg.GetClusterName()
388 old_ip = self.cfg.GetMasterIP()
389 if new_name == old_name and new_ip == old_ip:
390 raise errors.OpPrereqError("Neither the name nor the IP address of the"
391 " cluster has changed",
392 errors.ECODE_INVAL)
393 if new_ip != old_ip:
394 if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
395 raise errors.OpPrereqError("The given cluster IP address (%s) is"
396 " reachable on the network" %
397 new_ip, errors.ECODE_NOTUNIQUE)
398
399 self.op.name = new_name
400
401 def Exec(self, feedback_fn):
402 """Rename the cluster.
403
404 """
405 clustername = self.op.name
406 new_ip = self.ip
407
408 # shutdown the master IP
409 master_params = self.cfg.GetMasterNetworkParameters()
410 ems = self.cfg.GetUseExternalMipScript()
411 result = self.rpc.call_node_deactivate_master_ip(master_params.name,
412 master_params, ems)
413 result.Raise("Could not disable the master role")
414
415 try:
416 cluster = self.cfg.GetClusterInfo()
417 cluster.cluster_name = clustername
418 cluster.master_ip = new_ip
419 self.cfg.Update(cluster, feedback_fn)
420
421 # update the known hosts file
422 ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
423 node_list = self.cfg.GetOnlineNodeList()
424 try:
425 node_list.remove(master_params.name)
426 except ValueError:
427 pass
428 UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
429 finally:
430 master_params.ip = new_ip
431 result = self.rpc.call_node_activate_master_ip(master_params.name,
432 master_params, ems)
433 msg = result.fail_msg
434 if msg:
435 self.LogWarning("Could not re-enable the master role on"
436 " the master, please restart manually: %s", msg)
437
438 return clustername
439
440
441 class LUClusterRepairDiskSizes(NoHooksLU):
442 """Verifies the cluster disks sizes.
443
444 """
445 REQ_BGL = False
446
447 def ExpandNames(self):
448 if self.op.instances:
449 self.wanted_names = GetWantedInstances(self, self.op.instances)
450 # Not getting the node allocation lock as only a specific set of
451 # instances (and their nodes) is going to be acquired
452 self.needed_locks = {
453 locking.LEVEL_NODE_RES: [],
454 locking.LEVEL_INSTANCE: self.wanted_names,
455 }
456 self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
457 else:
458 self.wanted_names = None
459 self.needed_locks = {
460 locking.LEVEL_NODE_RES: locking.ALL_SET,
461 locking.LEVEL_INSTANCE: locking.ALL_SET,
462
463 # This opcode is acquires the node locks for all instances
464 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
465 }
466
467 self.share_locks = {
468 locking.LEVEL_NODE_RES: 1,
469 locking.LEVEL_INSTANCE: 0,
470 locking.LEVEL_NODE_ALLOC: 1,
471 }
472
473 def DeclareLocks(self, level):
474 if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
475 self._LockInstancesNodes(primary_only=True, level=level)
476
477 def CheckPrereq(self):
478 """Check prerequisites.
479
480 This only checks the optional instance list against the existing names.
481
482 """
483 if self.wanted_names is None:
484 self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
485
486 self.wanted_instances = \
487 map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
488
489 def _EnsureChildSizes(self, disk):
490 """Ensure children of the disk have the needed disk size.
491
492 This is valid mainly for DRBD8 and fixes an issue where the
493 children have smaller disk size.
494
495 @param disk: an L{ganeti.objects.Disk} object
496
497 """
498 if disk.dev_type == constants.LD_DRBD8:
499 assert disk.children, "Empty children for DRBD8?"
500 fchild = disk.children[0]
501 mismatch = fchild.size < disk.size
502 if mismatch:
503 self.LogInfo("Child disk has size %d, parent %d, fixing",
504 fchild.size, disk.size)
505 fchild.size = disk.size
506
507 # and we recurse on this child only, not on the metadev
508 return self._EnsureChildSizes(fchild) or mismatch
509 else:
510 return False
511
512 def Exec(self, feedback_fn):
513 """Verify the size of cluster disks.
514
515 """
516 # TODO: check child disks too
517 # TODO: check differences in size between primary/secondary nodes
518 per_node_disks = {}
519 for instance in self.wanted_instances:
520 pnode = instance.primary_node
521 if pnode not in per_node_disks:
522 per_node_disks[pnode] = []
523 for idx, disk in enumerate(instance.disks):
524 per_node_disks[pnode].append((instance, idx, disk))
525
526 assert not (frozenset(per_node_disks.keys()) -
527 self.owned_locks(locking.LEVEL_NODE_RES)), \
528 "Not owning correct locks"
529 assert not self.owned_locks(locking.LEVEL_NODE)
530
531 changed = []
532 for node, dskl in per_node_disks.items():
533 newl = [v[2].Copy() for v in dskl]
534 for dsk in newl:
535 self.cfg.SetDiskID(dsk, node)
536 result = self.rpc.call_blockdev_getsize(node, newl)
537 if result.fail_msg:
538 self.LogWarning("Failure in blockdev_getsize call to node"
539 " %s, ignoring", node)
540 continue
541 if len(result.payload) != len(dskl):
542 logging.warning("Invalid result from node %s: len(dksl)=%d,"
543 " result.payload=%s", node, len(dskl), result.payload)
544 self.LogWarning("Invalid result from node %s, ignoring node results",
545 node)
546 continue
547 for ((instance, idx, disk), size) in zip(dskl, result.payload):
548 if size is None:
549 self.LogWarning("Disk %d of instance %s did not return size"
550 " information, ignoring", idx, instance.name)
551 continue
552 if not isinstance(size, (int, long)):
553 self.LogWarning("Disk %d of instance %s did not return valid"
554 " size information, ignoring", idx, instance.name)
555 continue
556 size = size >> 20
557 if size != disk.size:
558 self.LogInfo("Disk %d of instance %s has mismatched size,"
559 " correcting: recorded %d, actual %d", idx,
560 instance.name, disk.size, size)
561 disk.size = size
562 self.cfg.Update(instance, feedback_fn)
563 changed.append((instance.name, idx, size))
564 if self._EnsureChildSizes(disk):
565 self.cfg.Update(instance, feedback_fn)
566 changed.append((instance.name, idx, disk.size))
567 return changed
568
569
570 def _ValidateNetmask(cfg, netmask):
571 """Checks if a netmask is valid.
572
573 @type cfg: L{config.ConfigWriter}
574 @param cfg: The cluster configuration
575 @type netmask: int
576 @param netmask: the netmask to be verified
577 @raise errors.OpPrereqError: if the validation fails
578
579 """
580 ip_family = cfg.GetPrimaryIPFamily()
581 try:
582 ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
583 except errors.ProgrammerError:
584 raise errors.OpPrereqError("Invalid primary ip family: %s." %
585 ip_family, errors.ECODE_INVAL)
586 if not ipcls.ValidateNetmask(netmask):
587 raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
588 (netmask), errors.ECODE_INVAL)
589
590
591 class LUClusterSetParams(LogicalUnit):
592 """Change the parameters of the cluster.
593
594 """
595 HPATH = "cluster-modify"
596 HTYPE = constants.HTYPE_CLUSTER
597 REQ_BGL = False
598
599 def CheckArguments(self):
600 """Check parameters
601
602 """
603 if self.op.uid_pool:
604 uidpool.CheckUidPool(self.op.uid_pool)
605
606 if self.op.add_uids:
607 uidpool.CheckUidPool(self.op.add_uids)
608
609 if self.op.remove_uids:
610 uidpool.CheckUidPool(self.op.remove_uids)
611
612 if self.op.master_netmask is not None:
613 _ValidateNetmask(self.cfg, self.op.master_netmask)
614
615 if self.op.diskparams:
616 for dt_params in self.op.diskparams.values():
617 utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
618 try:
619 utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
620 except errors.OpPrereqError, err:
621 raise errors.OpPrereqError("While verify diskparams options: %s" % err,
622 errors.ECODE_INVAL)
623
624 def ExpandNames(self):
625 # FIXME: in the future maybe other cluster params won't require checking on
626 # all nodes to be modified.
627 # FIXME: This opcode changes cluster-wide settings. Is acquiring all
628 # resource locks the right thing, shouldn't it be the BGL instead?
629 self.needed_locks = {
630 locking.LEVEL_NODE: locking.ALL_SET,
631 locking.LEVEL_INSTANCE: locking.ALL_SET,
632 locking.LEVEL_NODEGROUP: locking.ALL_SET,
633 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
634 }
635 self.share_locks = ShareAll()
636
637 def BuildHooksEnv(self):
638 """Build hooks env.
639
640 """
641 return {
642 "OP_TARGET": self.cfg.GetClusterName(),
643 "NEW_VG_NAME": self.op.vg_name,
644 }
645
646 def BuildHooksNodes(self):
647 """Build hooks nodes.
648
649 """
650 mn = self.cfg.GetMasterNode()
651 return ([mn], [mn])
652
653 def CheckPrereq(self):
654 """Check prerequisites.
655
656 This checks whether the given params don't conflict and
657 if the given volume group is valid.
658
659 """
660 if self.op.vg_name is not None and not self.op.vg_name:
661 if self.cfg.HasAnyDiskOfType(constants.LD_LV):
662 raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
663 " instances exist", errors.ECODE_INVAL)
664
665 if self.op.drbd_helper is not None and not self.op.drbd_helper:
666 if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
667 raise errors.OpPrereqError("Cannot disable drbd helper while"
668 " drbd-based instances exist",
669 errors.ECODE_INVAL)
670
671 node_list = self.owned_locks(locking.LEVEL_NODE)
672
673 vm_capable_nodes = [node.name
674 for node in self.cfg.GetAllNodesInfo().values()
675 if node.name in node_list and node.vm_capable]
676
677 # if vg_name not None, checks given volume group on all nodes
678 if self.op.vg_name:
679 vglist = self.rpc.call_vg_list(vm_capable_nodes)
680 for node in vm_capable_nodes:
681 msg = vglist[node].fail_msg
682 if msg:
683 # ignoring down node
684 self.LogWarning("Error while gathering data on node %s"
685 " (ignoring node): %s", node, msg)
686 continue
687 vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
688 self.op.vg_name,
689 constants.MIN_VG_SIZE)
690 if vgstatus:
691 raise errors.OpPrereqError("Error on node '%s': %s" %
692 (node, vgstatus), errors.ECODE_ENVIRON)
693
694 if self.op.drbd_helper:
695 # checks given drbd helper on all nodes
696 helpers = self.rpc.call_drbd_helper(node_list)
697 for (node, ninfo) in self.cfg.GetMultiNodeInfo(node_list):
698 if ninfo.offline:
699 self.LogInfo("Not checking drbd helper on offline node %s", node)
700 continue
701 msg = helpers[node].fail_msg
702 if msg:
703 raise errors.OpPrereqError("Error checking drbd helper on node"
704 " '%s': %s" % (node, msg),
705 errors.ECODE_ENVIRON)
706 node_helper = helpers[node].payload
707 if node_helper != self.op.drbd_helper:
708 raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
709 (node, node_helper), errors.ECODE_ENVIRON)
710
711 self.cluster = cluster = self.cfg.GetClusterInfo()
712 # validate params changes
713 if self.op.beparams:
714 objects.UpgradeBeParams(self.op.beparams)
715 utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
716 self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
717
718 if self.op.ndparams:
719 utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
720 self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
721
722 # TODO: we need a more general way to handle resetting
723 # cluster-level parameters to default values
724 if self.new_ndparams["oob_program"] == "":
725 self.new_ndparams["oob_program"] = \
726 constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
727
728 if self.op.hv_state:
729 new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
730 self.cluster.hv_state_static)
731 self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
732 for hv, values in new_hv_state.items())
733
734 if self.op.disk_state:
735 new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
736 self.cluster.disk_state_static)
737 self.new_disk_state = \
738 dict((storage, dict((name, cluster.SimpleFillDiskState(values))
739 for name, values in svalues.items()))
740 for storage, svalues in new_disk_state.items())
741
742 if self.op.ipolicy:
743 self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
744 group_policy=False)
745
746 all_instances = self.cfg.GetAllInstancesInfo().values()
747 violations = set()
748 for group in self.cfg.GetAllNodeGroupsInfo().values():
749 instances = frozenset([inst for inst in all_instances
750 if compat.any(node in group.members
751 for node in inst.all_nodes)])
752 new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
753 ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
754 new = ComputeNewInstanceViolations(ipol,
755 new_ipolicy, instances, self.cfg)
756 if new:
757 violations.update(new)
758
759 if violations:
760 self.LogWarning("After the ipolicy change the following instances"
761 " violate them: %s",
762 utils.CommaJoin(utils.NiceSort(violations)))
763
764 if self.op.nicparams:
765 utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
766 self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
767 objects.NIC.CheckParameterSyntax(self.new_nicparams)
768 nic_errors = []
769
770 # check all instances for consistency
771 for instance in self.cfg.GetAllInstancesInfo().values():
772 for nic_idx, nic in enumerate(instance.nics):
773 params_copy = copy.deepcopy(nic.nicparams)
774 params_filled = objects.FillDict(self.new_nicparams, params_copy)
775
776 # check parameter syntax
777 try:
778 objects.NIC.CheckParameterSyntax(params_filled)
779 except errors.ConfigurationError, err:
780 nic_errors.append("Instance %s, nic/%d: %s" %
781 (instance.name, nic_idx, err))
782
783 # if we're moving instances to routed, check that they have an ip
784 target_mode = params_filled[constants.NIC_MODE]
785 if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
786 nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
787 " address" % (instance.name, nic_idx))
788 if nic_errors:
789 raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
790 "\n".join(nic_errors), errors.ECODE_INVAL)
791
792 # hypervisor list/parameters
793 self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
794 if self.op.hvparams:
795 for hv_name, hv_dict in self.op.hvparams.items():
796 if hv_name not in self.new_hvparams:
797 self.new_hvparams[hv_name] = hv_dict
798 else:
799 self.new_hvparams[hv_name].update(hv_dict)
800
801 # disk template parameters
802 self.new_diskparams = objects.FillDict(cluster.diskparams, {})
803 if self.op.diskparams:
804 for dt_name, dt_params in self.op.diskparams.items():
805 if dt_name not in self.op.diskparams:
806 self.new_diskparams[dt_name] = dt_params
807 else:
808 self.new_diskparams[dt_name].update(dt_params)
809
810 # os hypervisor parameters
811 self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
812 if self.op.os_hvp:
813 for os_name, hvs in self.op.os_hvp.items():
814 if os_name not in self.new_os_hvp:
815 self.new_os_hvp[os_name] = hvs
816 else:
817 for hv_name, hv_dict in hvs.items():
818 if hv_dict is None:
819 # Delete if it exists
820 self.new_os_hvp[os_name].pop(hv_name, None)
821 elif hv_name not in self.new_os_hvp[os_name]:
822 self.new_os_hvp[os_name][hv_name] = hv_dict
823 else:
824 self.new_os_hvp[os_name][hv_name].update(hv_dict)
825
826 # os parameters
827 self.new_osp = objects.FillDict(cluster.osparams, {})
828 if self.op.osparams:
829 for os_name, osp in self.op.osparams.items():
830 if os_name not in self.new_osp:
831 self.new_osp[os_name] = {}
832
833 self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
834 use_none=True)
835
836 if not self.new_osp[os_name]:
837 # we removed all parameters
838 del self.new_osp[os_name]
839 else:
840 # check the parameter validity (remote check)
841 CheckOSParams(self, False, [self.cfg.GetMasterNode()],
842 os_name, self.new_osp[os_name])
843
844 # changes to the hypervisor list
845 if self.op.enabled_hypervisors is not None:
846 self.hv_list = self.op.enabled_hypervisors
847 for hv in self.hv_list:
848 # if the hypervisor doesn't already exist in the cluster
849 # hvparams, we initialize it to empty, and then (in both
850 # cases) we make sure to fill the defaults, as we might not
851 # have a complete defaults list if the hypervisor wasn't
852 # enabled before
853 if hv not in new_hvp:
854 new_hvp[hv] = {}
855 new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
856 utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
857 else:
858 self.hv_list = cluster.enabled_hypervisors
859
860 if self.op.hvparams or self.op.enabled_hypervisors is not None:
861 # either the enabled list has changed, or the parameters have, validate
862 for hv_name, hv_params in self.new_hvparams.items():
863 if ((self.op.hvparams and hv_name in self.op.hvparams) or
864 (self.op.enabled_hypervisors and
865 hv_name in self.op.enabled_hypervisors)):
866 # either this is a new hypervisor, or its parameters have changed
867 hv_class = hypervisor.GetHypervisorClass(hv_name)
868 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
869 hv_class.CheckParameterSyntax(hv_params)
870 CheckHVParams(self, node_list, hv_name, hv_params)
871
872 self._CheckDiskTemplateConsistency()
873
874 if self.op.os_hvp:
875 # no need to check any newly-enabled hypervisors, since the
876 # defaults have already been checked in the above code-block
877 for os_name, os_hvp in self.new_os_hvp.items():
878 for hv_name, hv_params in os_hvp.items():
879 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
880 # we need to fill in the new os_hvp on top of the actual hv_p
881 cluster_defaults = self.new_hvparams.get(hv_name, {})
882 new_osp = objects.FillDict(cluster_defaults, hv_params)
883 hv_class = hypervisor.GetHypervisorClass(hv_name)
884 hv_class.CheckParameterSyntax(new_osp)
885 CheckHVParams(self, node_list, hv_name, new_osp)
886
887 if self.op.default_iallocator:
888 alloc_script = utils.FindFile(self.op.default_iallocator,
889 constants.IALLOCATOR_SEARCH_PATH,
890 os.path.isfile)
891 if alloc_script is None:
892 raise errors.OpPrereqError("Invalid default iallocator script '%s'"
893 " specified" % self.op.default_iallocator,
894 errors.ECODE_INVAL)
895
896 def _CheckDiskTemplateConsistency(self):
897 """Check whether the disk templates that are going to be disabled
898 are still in use by some instances.
899
900 """
901 if self.op.enabled_disk_templates:
902 cluster = self.cfg.GetClusterInfo()
903 instances = self.cfg.GetAllInstancesInfo()
904
905 disk_templates_to_remove = set(cluster.enabled_disk_templates) \
906 - set(self.op.enabled_disk_templates)
907 for instance in instances.itervalues():
908 if instance.disk_template in disk_templates_to_remove:
909 raise errors.OpPrereqError("Cannot disable disk template '%s',"
910 " because instance '%s' is using it." %
911 (instance.disk_template, instance.name))
912
913 def Exec(self, feedback_fn):
914 """Change the parameters of the cluster.
915
916 """
917 if self.op.vg_name is not None:
918 new_volume = self.op.vg_name
919 if not new_volume:
920 new_volume = None
921 if new_volume != self.cfg.GetVGName():
922 self.cfg.SetVGName(new_volume)
923 else:
924 feedback_fn("Cluster LVM configuration already in desired"
925 " state, not changing")
926 if self.op.drbd_helper is not None:
927 new_helper = self.op.drbd_helper
928 if not new_helper:
929 new_helper = None
930 if new_helper != self.cfg.GetDRBDHelper():
931 self.cfg.SetDRBDHelper(new_helper)
932 else:
933 feedback_fn("Cluster DRBD helper already in desired state,"
934 " not changing")
935 if self.op.hvparams:
936 self.cluster.hvparams = self.new_hvparams
937 if self.op.os_hvp:
938 self.cluster.os_hvp = self.new_os_hvp
939 if self.op.enabled_hypervisors is not None:
940 self.cluster.hvparams = self.new_hvparams
941 self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
942 if self.op.enabled_disk_templates:
943 self.cluster.enabled_disk_templates = \
944 list(set(self.op.enabled_disk_templates))
945 if self.op.beparams:
946 self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
947 if self.op.nicparams:
948 self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
949 if self.op.ipolicy:
950 self.cluster.ipolicy = self.new_ipolicy
951 if self.op.osparams:
952 self.cluster.osparams = self.new_osp
953 if self.op.ndparams:
954 self.cluster.ndparams = self.new_ndparams
955 if self.op.diskparams:
956 self.cluster.diskparams = self.new_diskparams
957 if self.op.hv_state:
958 self.cluster.hv_state_static = self.new_hv_state
959 if self.op.disk_state:
960 self.cluster.disk_state_static = self.new_disk_state
961
962 if self.op.candidate_pool_size is not None:
963 self.cluster.candidate_pool_size = self.op.candidate_pool_size
964 # we need to update the pool size here, otherwise the save will fail
965 AdjustCandidatePool(self, [])
966
967 if self.op.maintain_node_health is not None:
968 if self.op.maintain_node_health and not constants.ENABLE_CONFD:
969 feedback_fn("Note: CONFD was disabled at build time, node health"
970 " maintenance is not useful (still enabling it)")
971 self.cluster.maintain_node_health = self.op.maintain_node_health
972
973 if self.op.prealloc_wipe_disks is not None:
974 self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
975
976 if self.op.add_uids is not None:
977 uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
978
979 if self.op.remove_uids is not None:
980 uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
981
982 if self.op.uid_pool is not None:
983 self.cluster.uid_pool = self.op.uid_pool
984
985 if self.op.default_iallocator is not None:
986 self.cluster.default_iallocator = self.op.default_iallocator
987
988 if self.op.reserved_lvs is not None:
989 self.cluster.reserved_lvs = self.op.reserved_lvs
990
991 if self.op.use_external_mip_script is not None:
992 self.cluster.use_external_mip_script = self.op.use_external_mip_script
993
994 def helper_os(aname, mods, desc):
995 desc += " OS list"
996 lst = getattr(self.cluster, aname)
997 for key, val in mods:
998 if key == constants.DDM_ADD:
999 if val in lst:
1000 feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1001 else:
1002 lst.append(val)
1003 elif key == constants.DDM_REMOVE:
1004 if val in lst:
1005 lst.remove(val)
1006 else:
1007 feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1008 else:
1009 raise errors.ProgrammerError("Invalid modification '%s'" % key)
1010
1011 if self.op.hidden_os:
1012 helper_os("hidden_os", self.op.hidden_os, "hidden")
1013
1014 if self.op.blacklisted_os:
1015 helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1016
1017 if self.op.master_netdev:
1018 master_params = self.cfg.GetMasterNetworkParameters()
1019 ems = self.cfg.GetUseExternalMipScript()
1020 feedback_fn("Shutting down master ip on the current netdev (%s)" %
1021 self.cluster.master_netdev)
1022 result = self.rpc.call_node_deactivate_master_ip(master_params.name,
1023 master_params, ems)
1024 result.Raise("Could not disable the master ip")
1025 feedback_fn("Changing master_netdev from %s to %s" %
1026 (master_params.netdev, self.op.master_netdev))
1027 self.cluster.master_netdev = self.op.master_netdev
1028
1029 if self.op.master_netmask:
1030 master_params = self.cfg.GetMasterNetworkParameters()
1031 feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1032 result = self.rpc.call_node_change_master_netmask(master_params.name,
1033 master_params.netmask,
1034 self.op.master_netmask,
1035 master_params.ip,
1036 master_params.netdev)
1037 if result.fail_msg:
1038 msg = "Could not change the master IP netmask: %s" % result.fail_msg
1039 feedback_fn(msg)
1040
1041 self.cluster.master_netmask = self.op.master_netmask
1042
1043 self.cfg.Update(self.cluster, feedback_fn)
1044
1045 if self.op.master_netdev:
1046 master_params = self.cfg.GetMasterNetworkParameters()
1047 feedback_fn("Starting the master ip on the new master netdev (%s)" %
1048 self.op.master_netdev)
1049 ems = self.cfg.GetUseExternalMipScript()
1050 result = self.rpc.call_node_activate_master_ip(master_params.name,
1051 master_params, ems)
1052 if result.fail_msg:
1053 self.LogWarning("Could not re-enable the master ip on"
1054 " the master, please restart manually: %s",
1055 result.fail_msg)
1056
1057
1058 class LUClusterVerify(NoHooksLU):
1059 """Submits all jobs necessary to verify the cluster.
1060
1061 """
1062 REQ_BGL = False
1063
1064 def ExpandNames(self):
1065 self.needed_locks = {}
1066
1067 def Exec(self, feedback_fn):
1068 jobs = []
1069
1070 if self.op.group_name:
1071 groups = [self.op.group_name]
1072 depends_fn = lambda: None
1073 else:
1074 groups = self.cfg.GetNodeGroupList()
1075
1076 # Verify global configuration
1077 jobs.append([
1078 opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1079 ])
1080
1081 # Always depend on global verification
1082 depends_fn = lambda: [(-len(jobs), [])]
1083
1084 jobs.extend(
1085 [opcodes.OpClusterVerifyGroup(group_name=group,
1086 ignore_errors=self.op.ignore_errors,
1087 depends=depends_fn())]
1088 for group in groups)
1089
1090 # Fix up all parameters
1091 for op in itertools.chain(*jobs): # pylint: disable=W0142
1092 op.debug_simulate_errors = self.op.debug_simulate_errors
1093 op.verbose = self.op.verbose
1094 op.error_codes = self.op.error_codes
1095 try:
1096 op.skip_checks = self.op.skip_checks
1097 except AttributeError:
1098 assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1099
1100 return ResultWithJobs(jobs)
1101
1102
1103 class _VerifyErrors(object):
1104 """Mix-in for cluster/group verify LUs.
1105
1106 It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1107 self.op and self._feedback_fn to be available.)
1108
1109 """
1110
1111 ETYPE_FIELD = "code"
1112 ETYPE_ERROR = "ERROR"
1113 ETYPE_WARNING = "WARNING"
1114
1115 def _Error(self, ecode, item, msg, *args, **kwargs):
1116 """Format an error message.
1117
1118 Based on the opcode's error_codes parameter, either format a
1119 parseable error code, or a simpler error string.
1120
1121 This must be called only from Exec and functions called from Exec.
1122
1123 """
1124 ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1125 itype, etxt, _ = ecode
1126 # If the error code is in the list of ignored errors, demote the error to a
1127 # warning
1128 if etxt in self.op.ignore_errors: # pylint: disable=E1101
1129 ltype = self.ETYPE_WARNING
1130 # first complete the msg
1131 if args:
1132 msg = msg % args
1133 # then format the whole message
1134 if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1135 msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1136 else:
1137 if item:
1138 item = " " + item
1139 else:
1140 item = ""
1141 msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1142 # and finally report it via the feedback_fn
1143 self._feedback_fn(" - %s" % msg) # Mix-in. pylint: disable=E1101
1144 # do not mark the operation as failed for WARN cases only
1145 if ltype == self.ETYPE_ERROR:
1146 self.bad = True
1147
1148 def _ErrorIf(self, cond, *args, **kwargs):
1149 """Log an error message if the passed condition is True.
1150
1151 """
1152 if (bool(cond)
1153 or self.op.debug_simulate_errors): # pylint: disable=E1101
1154 self._Error(*args, **kwargs)
1155
1156
1157 def _VerifyCertificate(filename):
1158 """Verifies a certificate for L{LUClusterVerifyConfig}.
1159
1160 @type filename: string
1161 @param filename: Path to PEM file
1162
1163 """
1164 try:
1165 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1166 utils.ReadFile(filename))
1167 except Exception, err: # pylint: disable=W0703
1168 return (LUClusterVerifyConfig.ETYPE_ERROR,
1169 "Failed to load X509 certificate %s: %s" % (filename, err))
1170
1171 (errcode, msg) = \
1172 utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1173 constants.SSL_CERT_EXPIRATION_ERROR)
1174
1175 if msg:
1176 fnamemsg = "While verifying %s: %s" % (filename, msg)
1177 else:
1178 fnamemsg = None
1179
1180 if errcode is None:
1181 return (None, fnamemsg)
1182 elif errcode == utils.CERT_WARNING:
1183 return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1184 elif errcode == utils.CERT_ERROR:
1185 return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1186
1187 raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1188
1189
1190 def _GetAllHypervisorParameters(cluster, instances):
1191 """Compute the set of all hypervisor parameters.
1192
1193 @type cluster: L{objects.Cluster}
1194 @param cluster: the cluster object
1195 @param instances: list of L{objects.Instance}
1196 @param instances: additional instances from which to obtain parameters
1197 @rtype: list of (origin, hypervisor, parameters)
1198 @return: a list with all parameters found, indicating the hypervisor they
1199 apply to, and the origin (can be "cluster", "os X", or "instance Y")
1200
1201 """
1202 hvp_data = []
1203
1204 for hv_name in cluster.enabled_hypervisors:
1205 hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1206
1207 for os_name, os_hvp in cluster.os_hvp.items():
1208 for hv_name, hv_params in os_hvp.items():
1209 if hv_params:
1210 full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1211 hvp_data.append(("os %s" % os_name, hv_name, full_params))
1212
1213 # TODO: collapse identical parameter values in a single one
1214 for instance in instances:
1215 if instance.hvparams:
1216 hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1217 cluster.FillHV(instance)))
1218
1219 return hvp_data
1220
1221
1222 class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1223 """Verifies the cluster config.
1224
1225 """
1226 REQ_BGL = False
1227
1228 def _VerifyHVP(self, hvp_data):
1229 """Verifies locally the syntax of the hypervisor parameters.
1230
1231 """
1232 for item, hv_name, hv_params in hvp_data:
1233 msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1234 (item, hv_name))
1235 try:
1236 hv_class = hypervisor.GetHypervisorClass(hv_name)
1237 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1238 hv_class.CheckParameterSyntax(hv_params)
1239 except errors.GenericError, err:
1240 self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1241
1242 def ExpandNames(self):
1243 self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1244 self.share_locks = ShareAll()
1245
1246 def CheckPrereq(self):
1247 """Check prerequisites.
1248
1249 """
1250 # Retrieve all information
1251 self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1252 self.all_node_info = self.cfg.GetAllNodesInfo()
1253 self.all_inst_info = self.cfg.GetAllInstancesInfo()
1254
1255 def Exec(self, feedback_fn):
1256 """Verify integrity of cluster, performing various test on nodes.
1257
1258 """
1259 self.bad = False
1260 self._feedback_fn = feedback_fn
1261
1262 feedback_fn("* Verifying cluster config")
1263
1264 for msg in self.cfg.VerifyConfig():
1265 self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1266
1267 feedback_fn("* Verifying cluster certificate files")
1268
1269 for cert_filename in pathutils.ALL_CERT_FILES:
1270 (errcode, msg) = _VerifyCertificate(cert_filename)
1271 self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1272
1273 feedback_fn("* Verifying hypervisor parameters")
1274
1275 self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1276 self.all_inst_info.values()))
1277
1278 feedback_fn("* Verifying all nodes belong to an existing group")
1279
1280 # We do this verification here because, should this bogus circumstance
1281 # occur, it would never be caught by VerifyGroup, which only acts on
1282 # nodes/instances reachable from existing node groups.
1283
1284 dangling_nodes = set(node.name for node in self.all_node_info.values()
1285 if node.group not in self.all_group_info)
1286
1287 dangling_instances = {}
1288 no_node_instances = []
1289
1290 for inst in self.all_inst_info.values():
1291 if inst.primary_node in dangling_nodes:
1292 dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
1293 elif inst.primary_node not in self.all_node_info:
1294 no_node_instances.append(inst.name)
1295
1296 pretty_dangling = [
1297 "%s (%s)" %
1298 (node.name,
1299 utils.CommaJoin(dangling_instances.get(node.name,
1300 ["no instances"])))
1301 for node in dangling_nodes]
1302
1303 self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1304 None,
1305 "the following nodes (and their instances) belong to a non"
1306 " existing group: %s", utils.CommaJoin(pretty_dangling))
1307
1308 self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1309 None,
1310 "the following instances have a non-existing primary-node:"
1311 " %s", utils.CommaJoin(no_node_instances))
1312
1313 return not self.bad
1314
1315
1316 class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1317 """Verifies the status of a node group.
1318
1319 """
1320 HPATH = "cluster-verify"
1321 HTYPE = constants.HTYPE_CLUSTER
1322 REQ_BGL = False
1323
1324 _HOOKS_INDENT_RE = re.compile("^", re.M)
1325
1326 class NodeImage(object):
1327 """A class representing the logical and physical status of a node.
1328
1329 @type name: string
1330 @ivar name: the node name to which this object refers
1331 @ivar volumes: a structure as returned from
1332 L{ganeti.backend.GetVolumeList} (runtime)
1333 @ivar instances: a list of running instances (runtime)
1334 @ivar pinst: list of configured primary instances (config)
1335 @ivar sinst: list of configured secondary instances (config)
1336 @ivar sbp: dictionary of {primary-node: list of instances} for all
1337 instances for which this node is secondary (config)
1338 @ivar mfree: free memory, as reported by hypervisor (runtime)
1339 @ivar dfree: free disk, as reported by the node (runtime)
1340 @ivar offline: the offline status (config)
1341 @type rpc_fail: boolean
1342 @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1343 not whether the individual keys were correct) (runtime)
1344 @type lvm_fail: boolean
1345 @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1346 @type hyp_fail: boolean
1347 @ivar hyp_fail: whether the RPC call didn't return the instance list
1348 @type ghost: boolean
1349 @ivar ghost: whether this is a known node or not (config)
1350 @type os_fail: boolean
1351 @ivar os_fail: whether the RPC call didn't return valid OS data
1352 @type oslist: list
1353 @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1354 @type vm_capable: boolean
1355 @ivar vm_capable: whether the node can host instances
1356 @type pv_min: float
1357 @ivar pv_min: size in MiB of the smallest PVs
1358 @type pv_max: float
1359 @ivar pv_max: size in MiB of the biggest PVs
1360
1361 """
1362 def __init__(self, offline=False, name=None, vm_capable=True):
1363 self.name = name
1364 self.volumes = {}
1365 self.instances = []
1366 self.pinst = []
1367 self.sinst = []
1368 self.sbp = {}
1369 self.mfree = 0
1370 self.dfree = 0
1371 self.offline = offline
1372 self.vm_capable = vm_capable
1373 self.rpc_fail = False
1374 self.lvm_fail = False
1375 self.hyp_fail = False
1376 self.ghost = False
1377 self.os_fail = False
1378 self.oslist = {}
1379 self.pv_min = None
1380 self.pv_max = None
1381
1382 def ExpandNames(self):
1383 # This raises errors.OpPrereqError on its own:
1384 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1385
1386 # Get instances in node group; this is unsafe and needs verification later
1387 inst_names = \
1388 self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1389
1390 self.needed_locks = {
1391 locking.LEVEL_INSTANCE: inst_names,
1392 locking.LEVEL_NODEGROUP: [self.group_uuid],
1393 locking.LEVEL_NODE: [],
1394
1395 # This opcode is run by watcher every five minutes and acquires all nodes
1396 # for a group. It doesn't run for a long time, so it's better to acquire
1397 # the node allocation lock as well.
1398 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1399 }
1400
1401 self.share_locks = ShareAll()
1402
1403 def DeclareLocks(self, level):
1404 if level == locking.LEVEL_NODE:
1405 # Get members of node group; this is unsafe and needs verification later
1406 nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1407
1408 all_inst_info = self.cfg.GetAllInstancesInfo()
1409
1410 # In Exec(), we warn about mirrored instances that have primary and
1411 # secondary living in separate node groups. To fully verify that
1412 # volumes for these instances are healthy, we will need to do an
1413 # extra call to their secondaries. We ensure here those nodes will
1414 # be locked.
1415 for inst in self.owned_locks(locking.LEVEL_INSTANCE):
1416 # Important: access only the instances whose lock is owned
1417 if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
1418 nodes.update(all_inst_info[inst].secondary_nodes)
1419
1420 self.needed_locks[locking.LEVEL_NODE] = nodes
1421
1422 def CheckPrereq(self):
1423 assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1424 self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1425
1426 group_nodes = set(self.group_info.members)
1427 group_instances = \
1428 self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1429
1430 unlocked_nodes = \
1431 group_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1432
1433 unlocked_instances = \
1434 group_instances.difference(self.owned_locks(locking.LEVEL_INSTANCE))
1435
1436 if unlocked_nodes:
1437 raise errors.OpPrereqError("Missing lock for nodes: %s" %
1438 utils.CommaJoin(unlocked_nodes),
1439 errors.ECODE_STATE)
1440
1441 if unlocked_instances:
1442 raise errors.OpPrereqError("Missing lock for instances: %s" %
1443 utils.CommaJoin(unlocked_instances),
1444 errors.ECODE_STATE)
1445
1446 self.all_node_info = self.cfg.GetAllNodesInfo()
1447 self.all_inst_info = self.cfg.GetAllInstancesInfo()
1448
1449 self.my_node_names = utils.NiceSort(group_nodes)
1450 self.my_inst_names = utils.NiceSort(group_instances)
1451
1452 self.my_node_info = dict((name, self.all_node_info[name])
1453 for name in self.my_node_names)
1454
1455 self.my_inst_info = dict((name, self.all_inst_info[name])
1456 for name in self.my_inst_names)
1457
1458 # We detect here the nodes that will need the extra RPC calls for verifying
1459 # split LV volumes; they should be locked.
1460 extra_lv_nodes = set()
1461
1462 for inst in self.my_inst_info.values():
1463 if inst.disk_template in constants.DTS_INT_MIRROR:
1464 for nname in inst.all_nodes:
1465 if self.all_node_info[nname].group != self.group_uuid:
1466 extra_lv_nodes.add(nname)
1467
1468 unlocked_lv_nodes = \
1469 extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1470
1471 if unlocked_lv_nodes:
1472 raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1473 utils.CommaJoin(unlocked_lv_nodes),
1474 errors.ECODE_STATE)
1475 self.extra_lv_nodes = list(extra_lv_nodes)
1476
1477 def _VerifyNode(self, ninfo, nresult):
1478 """Perform some basic validation on data returned from a node.
1479
1480 - check the result data structure is well formed and has all the
1481 mandatory fields
1482 - check ganeti version
1483
1484 @type ninfo: L{objects.Node}
1485 @param ninfo: the node to check
1486 @param nresult: the results from the node
1487 @rtype: boolean
1488 @return: whether overall this call was successful (and we can expect
1489 reasonable values in the respose)
1490
1491 """
1492 node = ninfo.name
1493 _ErrorIf = self._ErrorIf # pylint: disable=C0103
1494
1495 # main result, nresult should be a non-empty dict
1496 test = not nresult or not isinstance(nresult, dict)
1497 _ErrorIf(test, constants.CV_ENODERPC, node,
1498 "unable to verify node: no data returned")
1499 if test:
1500 return False
1501
1502 # compares ganeti version
1503 local_version = constants.PROTOCOL_VERSION
1504 remote_version = nresult.get("version", None)
1505 test = not (remote_version and
1506 isinstance(remote_version, (list, tuple)) and
1507 len(remote_version) == 2)
1508 _ErrorIf(test, constants.CV_ENODERPC, node,
1509 "connection to node returned invalid data")
1510 if test:
1511 return False
1512
1513 test = local_version != remote_version[0]
1514 _ErrorIf(test, constants.CV_ENODEVERSION, node,
1515 "incompatible protocol versions: master %s,"
1516 " node %s", local_version, remote_version[0])
1517 if test:
1518 return False
1519
1520 # node seems compatible, we can actually try to look into its results
1521
1522 # full package version
1523 self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1524 constants.CV_ENODEVERSION, node,
1525 "software version mismatch: master %s, node %s",
1526 constants.RELEASE_VERSION, remote_version[1],
1527 code=self.ETYPE_WARNING)
1528
1529 hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1530 if ninfo.vm_capable and isinstance(hyp_result, dict):
1531 for hv_name, hv_result in hyp_result.iteritems():
1532 test = hv_result is not None
1533 _ErrorIf(test, constants.CV_ENODEHV, node,
1534 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1535
1536 hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1537 if ninfo.vm_capable and isinstance(hvp_result, list):
1538 for item, hv_name, hv_result in hvp_result:
1539 _ErrorIf(True, constants.CV_ENODEHV, node,
1540 "hypervisor %s parameter verify failure (source %s): %s",
1541 hv_name, item, hv_result)
1542
1543 test = nresult.get(constants.NV_NODESETUP,
1544 ["Missing NODESETUP results"])
1545 _ErrorIf(test, constants.CV_ENODESETUP, node, "node setup error: %s",
1546 "; ".join(test))
1547
1548 return True
1549
1550 def _VerifyNodeTime(self, ninfo, nresult,
1551 nvinfo_starttime, nvinfo_endtime):
1552 """Check the node time.
1553
1554 @type ninfo: L{objects.Node}
1555 @param ninfo: the node to check
1556 @param nresult: the remote results for the node
1557 @param nvinfo_starttime: the start time of the RPC call
1558 @param nvinfo_endtime: the end time of the RPC call
1559
1560 """
1561 node = ninfo.name
1562 _ErrorIf = self._ErrorIf # pylint: disable=C0103
1563
1564 ntime = nresult.get(constants.NV_TIME, None)
1565 try:
1566 ntime_merged = utils.MergeTime(ntime)
1567 except (ValueError, TypeError):
1568 _ErrorIf(True, constants.CV_ENODETIME, node, "Node returned invalid time")
1569 return
1570
1571 if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1572 ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1573 elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1574 ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1575 else:
1576 ntime_diff = None
1577
1578 _ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, node,
1579 "Node time diverges by at least %s from master node time",
1580 ntime_diff)
1581
1582 def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1583 """Check the node LVM results and update info for cross-node checks.
1584
1585 @type ninfo: L{objects.Node}
1586 @param ninfo: the node to check
1587 @param nresult: the remote results for the node
1588 @param vg_name: the configured VG name
1589 @type nimg: L{NodeImage}
1590 @param nimg: node image
1591
1592 """
1593 if vg_name is None:
1594 return
1595
1596 node = ninfo.name
1597 _ErrorIf = self._ErrorIf # pylint: disable=C0103
1598
1599 # checks vg existence and size > 20G
1600 vglist = nresult.get(constants.NV_VGLIST, None)
1601 test = not vglist
1602 _ErrorIf(test, constants.CV_ENODELVM, node, "unable to check volume groups")
1603 if not test:
1604 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1605 constants.MIN_VG_SIZE)
1606 _ErrorIf(vgstatus, constants.CV_ENODELVM, node, vgstatus)
1607
1608 # Check PVs
1609 (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1610 for em in errmsgs:
1611 self._Error(constants.CV_ENODELVM, node, em)
1612 if pvminmax is not None:
1613 (nimg.pv_min, nimg.pv_max) = pvminmax
1614
1615 def _VerifyGroupLVM(self, node_image, vg_name):
1616 """Check cross-node consistency in LVM.
1617
1618 @type node_image: dict
1619 @param node_image: info about nodes, mapping from node to names to
1620 L{NodeImage} objects
1621 @param vg_name: the configured VG name
1622
1623 """
1624 if vg_name is None:
1625 return
1626
1627 # Only exlcusive storage needs this kind of checks
1628 if not self._exclusive_storage:
1629 return
1630
1631 # exclusive_storage wants all PVs to have the same size (approximately),
1632 # if the smallest and the biggest ones are okay, everything is fine.
1633 # pv_min is None iff pv_max is None
1634 vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1635 if not vals:
1636 return
1637 (pvmin, minnode) = min((ni.pv_min, ni.name) for ni in vals)
1638 (pvmax, maxnode) = max((ni.pv_max, ni.name) for ni in vals)
1639 bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1640 self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1641 "PV sizes differ too much in the group; smallest (%s MB) is"
1642 " on %s, biggest (%s MB) is on %s",
1643 pvmin, minnode, pvmax, maxnode)
1644
1645 def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1646 """Check the node bridges.
1647
1648 @type ninfo: L{objects.Node}
1649 @param ninfo: the node to check
1650 @param nresult: the remote results for the node
1651 @param bridges: the expected list of bridges
1652
1653 """
1654 if not bridges:
1655 return
1656
1657 node = ninfo.name
1658 _ErrorIf = self._ErrorIf # pylint: disable=C0103
1659
1660 missing = nresult.get(constants.NV_BRIDGES, None)
1661 test = not isinstance(missing, list)
1662 _ErrorIf(test, constants.CV_ENODENET, node,
1663 "did not return valid bridge information")
1664 if not test:
1665 _ErrorIf(bool(missing), constants.CV_ENODENET, node,
1666 "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1667
1668 def _VerifyNodeUserScripts(self, ninfo, nresult):
1669 """Check the results of user scripts presence and executability on the node
1670
1671 @type ninfo: L{objects.Node}
1672 @param ninfo: the node to check
1673 @param nresult: the remote results for the node
1674
1675 """
1676 node = ninfo.name
1677
1678 test = not constants.NV_USERSCRIPTS in nresult
1679 self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, node,
1680 "did not return user scripts information")
1681
1682 broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1683 if not test:
1684 self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, node,
1685 "user scripts not present or not executable: %s" %
1686 utils.CommaJoin(sorted(broken_scripts)))
1687
1688 def _VerifyNodeNetwork(self, ninfo, nresult):
1689 """Check the node network connectivity results.
1690
1691 @type ninfo: L{objects.Node}
1692 @param ninfo: the node to check
1693 @param nresult: the remote results for the node
1694
1695 """
1696 node = ninfo.name
1697 _ErrorIf = self._ErrorIf # pylint: disable=C0103
1698
1699 test = constants.NV_NODELIST not in nresult
1700 _ErrorIf(test, constants.CV_ENODESSH, node,
1701 "node hasn't returned node ssh connectivity data")
1702 if not test:
1703 if nresult[constants.NV_NODELIST]:
1704 for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1705 _ErrorIf(True, constants.CV_ENODESSH, node,
1706 "ssh communication with node '%s': %s", a_node, a_msg)
1707
1708 test = constants.NV_NODENETTEST not in nresult
1709 _ErrorIf(test, constants.CV_ENODENET, node,
1710 "node hasn't returned node tcp connectivity data")
1711 if not test:
1712 if nresult[constants.NV_NODENETTEST]:
1713 nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1714 for anode in nlist:
1715 _ErrorIf(True, constants.CV_ENODENET, node,
1716 "tcp communication with node '%s': %s",
1717 anode, nresult[constants.NV_NODENETTEST][anode])
1718
1719 test = constants.NV_MASTERIP not in nresult
1720 _ErrorIf(test, constants.CV_ENODENET, node,
1721 "node hasn't returned node master IP reachability data")
1722 if not test:
1723 if not nresult[constants.NV_MASTERIP]:
1724 if node == self.master_node:
1725 msg = "the master node cannot reach the master IP (not configured?)"
1726 else:
1727 msg = "cannot reach the master IP"
1728 _ErrorIf(True, constants.CV_ENODENET, node, msg)
1729
1730 def _VerifyInstance(self, instance, inst_config, node_image,
1731 diskstatus):
1732 """Verify an instance.
1733
1734 This function checks to see if the required block devices are
1735 available on the instance's node, and that the nodes are in the correct
1736 state.
1737
1738 """
1739 _ErrorIf = self._ErrorIf # pylint: disable=C0103
1740 pnode = inst_config.primary_node
1741 pnode_img = node_image[pnode]
1742 groupinfo = self.cfg.GetAllNodeGroupsInfo()
1743
1744 node_vol_should = {}
1745 inst_config.MapLVsByNode(node_vol_should)
1746
1747 cluster = self.cfg.GetClusterInfo()
1748 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1749 self.group_info)
1750 err = ComputeIPolicyInstanceViolation(ipolicy, inst_config, self.cfg)
1751 _ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance, utils.CommaJoin(err),
1752 code=self.ETYPE_WARNING)
1753
1754 for node in node_vol_should:
1755 n_img = node_image[node]
1756 if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1757 # ignore missing volumes on offline or broken nodes
1758 continue
1759 for volume in node_vol_should[node]:
1760 test = volume not in n_img.volumes
1761 _ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance,
1762 "volume %s missing on node %s", volume, node)
1763
1764 if inst_config.admin_state == constants.ADMINST_UP:
1765 test = instance not in pnode_img.instances and not pnode_img.offline
1766 _ErrorIf(test, constants.CV_EINSTANCEDOWN, instance,
1767 "instance not running on its primary node %s",
1768 pnode)
1769 _ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE, instance,
1770 "instance is marked as running and lives on offline node %s",
1771 pnode)
1772
1773 diskdata = [(nname, success, status, idx)
1774 for (nname, disks) in diskstatus.items()
1775 for idx, (success, status) in enumerate(disks)]
1776
1777 for nname, success, bdev_status, idx in diskdata:
1778 # the 'ghost node' construction in Exec() ensures that we have a
1779 # node here
1780 snode = node_image[nname]
1781 bad_snode = snode.ghost or snode.offline
1782 _ErrorIf(inst_config.admin_state == constants.ADMINST_UP and
1783 not success and not bad_snode,
1784 constants.CV_EINSTANCEFAULTYDISK, instance,
1785 "couldn't retrieve status for disk/%s on %s: %s",
1786 idx, nname, bdev_status)
1787 _ErrorIf((inst_config.admin_state == constants.ADMINST_UP and
1788 success and bdev_status.ldisk_status == constants.LDS_FAULTY),
1789 constants.CV_EINSTANCEFAULTYDISK, instance,
1790 "disk/%s on %s is faulty", idx, nname)
1791
1792 _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
1793 constants.CV_ENODERPC, pnode, "instance %s, connection to"
1794 " primary node failed", instance)
1795
1796 _ErrorIf(len(inst_config.secondary_nodes) > 1,
1797 constants.CV_EINSTANCELAYOUT,
1798 instance, "instance has multiple secondary nodes: %s",
1799 utils.CommaJoin(inst_config.secondary_nodes),
1800 code=self.ETYPE_WARNING)
1801
1802 if inst_config.disk_template not in constants.DTS_EXCL_STORAGE:
1803 # Disk template not compatible with exclusive_storage: no instance
1804 # node should have the flag set
1805 es_flags = rpc.GetExclusiveStorageForNodeNames(self.cfg,
1806 inst_config.all_nodes)
1807 es_nodes = [n for (n, es) in es_flags.items()
1808 if es]
1809 _ErrorIf(es_nodes, constants.CV_EINSTANCEUNSUITABLENODE, instance,
1810 "instance has template %s, which is not supported on nodes"
1811 " that have exclusive storage set: %s",
1812 inst_config.disk_template, utils.CommaJoin(es_nodes))
1813
1814 if inst_config.disk_template in constants.DTS_INT_MIRROR:
1815 instance_nodes = utils.NiceSort(inst_config.all_nodes)
1816 instance_groups = {}
1817
1818 for node in instance_nodes:
1819 instance_groups.setdefault(self.all_node_info[node].group,
1820 []).append(node)
1821
1822 pretty_list = [
1823 "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name)
1824 # Sort so that we always list the primary node first.
1825 for group, nodes in sorted(instance_groups.items(),
1826 key=lambda (_, nodes): pnode in nodes,
1827 reverse=True)]
1828
1829 self._ErrorIf(len(instance_groups) > 1,
1830 constants.CV_EINSTANCESPLITGROUPS,
1831 instance, "instance has primary and secondary nodes in"
1832 " different groups: %s", utils.CommaJoin(pretty_list),
1833 code=self.ETYPE_WARNING)
1834
1835 inst_nodes_offline = []
1836 for snode in inst_config.secondary_nodes:
1837 s_img = node_image[snode]
1838 _ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
1839 snode, "instance %s, connection to secondary node failed",
1840 instance)
1841
1842 if s_img.offline:
1843 inst_nodes_offline.append(snode)
1844
1845 # warn that the instance lives on offline nodes
1846 _ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE, instance,
1847 "instance has offline secondary node(s) %s",
1848 utils.CommaJoin(inst_nodes_offline))
1849 # ... or ghost/non-vm_capable nodes
1850 for node in inst_config.all_nodes:
1851 _ErrorIf(node_image[node].ghost, constants.CV_EINSTANCEBADNODE,
1852 instance, "instance lives on ghost node %s", node)
1853 _ErrorIf(not node_image[node].vm_capable, constants.CV_EINSTANCEBADNODE,
1854 instance, "instance lives on non-vm_capable node %s", node)
1855
1856 def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
1857 """Verify if there are any unknown volumes in the cluster.
1858
1859 The .os, .swap and backup volumes are ignored. All other volumes are
1860 reported as unknown.
1861
1862 @type reserved: L{ganeti.utils.FieldSet}
1863 @param reserved: a FieldSet of reserved volume names
1864
1865 """
1866 for node, n_img in node_image.items():
1867 if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
1868 self.all_node_info[node].group != self.group_uuid):
1869 # skip non-healthy nodes
1870 continue
1871 for volume in n_img.volumes:
1872 test = ((node not in node_vol_should or
1873 volume not in node_vol_should[node]) and
1874 not reserved.Matches(volume))
1875 self._ErrorIf(test, constants.CV_ENODEORPHANLV, node,
1876 "volume %s is unknown", volume)
1877
1878 def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1879 """Verify N+1 Memory Resilience.
1880
1881 Check that if one single node dies we can still start all the
1882 instances it was primary for.
1883
1884 """
1885 cluster_info = self.cfg.GetClusterInfo()
1886 for node, n_img in node_image.items():
1887 # This code checks that every node which is now listed as
1888 # secondary has enough memory to host all instances it is
1889 # supposed to should a single other node in the cluster fail.
1890 # FIXME: not ready for failover to an arbitrary node
1891 # FIXME: does not support file-backed instances
1892 # WARNING: we currently take into account down instances as well
1893 # as up ones, considering that even if they're down someone
1894 # might want to start them even in the event of a node failure.
1895 if n_img.offline or self.all_node_info[node].group != self.group_uuid:
1896 # we're skipping nodes marked offline and nodes in other groups from
1897 # the N+1 warning, since most likely we don't have good memory
1898 # infromation from them; we already list instances living on such
1899 # nodes, and that's enough warning
1900 continue
1901 #TODO(dynmem): also consider ballooning out other instances
1902 for prinode, instances in n_img.sbp.items():
1903 needed_mem = 0
1904 for instance in instances:
1905 bep = cluster_info.FillBE(instance_cfg[instance])
1906 if bep[constants.BE_AUTO_BALANCE]:
1907 needed_mem += bep[constants.BE_MINMEM]
1908 test = n_img.mfree < needed_mem
1909 self._ErrorIf(test, constants.CV_ENODEN1, node,
1910 "not enough memory to accomodate instance failovers"
1911 " should node %s fail (%dMiB needed, %dMiB available)",
1912 prinode, needed_mem, n_img.mfree)
1913
1914 @classmethod
1915 def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
1916 (files_all, files_opt, files_mc, files_vm)):
1917 """Verifies file checksums collected from all nodes.
1918
1919 @param errorif: Callback for reporting errors
1920 @param nodeinfo: List of L{objects.Node} objects
1921 @param master_node: Name of master node
1922 @param all_nvinfo: RPC results
1923
1924 """
1925 # Define functions determining which nodes to consider for a file
1926 files2nodefn = [
1927 (files_all, None),
1928 (files_mc, lambda node: (node.master_candidate or
1929 node.name == master_node)),
1930 (files_vm, lambda node: node.vm_capable),
1931 ]
1932
1933 # Build mapping from filename to list of nodes which should have the file
1934 nodefiles = {}
1935 for (files, fn) in files2nodefn:
1936 if fn is None:
1937 filenodes = nodeinfo
1938 else:
1939 filenodes = filter(fn, nodeinfo)
1940 nodefiles.update((filename,
1941 frozenset(map(operator.attrgetter("name"), filenodes)))
1942 for filename in files)
1943
1944 assert set(nodefiles) == (files_all | files_mc | files_vm)
1945
1946 fileinfo = dict((filename, {}) for filename in nodefiles)
1947 ignore_nodes = set()
1948
1949 for node in nodeinfo:
1950 if node.offline:
1951 ignore_nodes.add(node.name)
1952 continue
1953
1954 nresult = all_nvinfo[node.name]
1955
1956 if nresult.fail_msg or not nresult.payload:
1957 node_files = None
1958 else:
1959 fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
1960 node_files = dict((vcluster.LocalizeVirtualPath(key), value)
1961 for (key, value) in fingerprints.items())
1962 del fingerprints
1963
1964 test = not (node_files and isinstance(node_files, dict))
1965 errorif(test, constants.CV_ENODEFILECHECK, node.name,
1966 "Node did not return file checksum data")
1967 if test:
1968 ignore_nodes.add(node.name)
1969 continue
1970
1971 # Build per-checksum mapping from filename to nodes having it
1972 for (filename, checksum) in node_files.items():
1973 assert filename in nodefiles
1974 fileinfo[filename].setdefault(checksum, set()).add(node.name)
1975
1976 for (filename, checksums) in fileinfo.items():
1977 assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
1978
1979 # Nodes having the file
1980 with_file = frozenset(node_name
1981 for nodes in fileinfo[filename].values()
1982 for node_name in nodes) - ignore_nodes
1983
1984 expected_nodes = nodefiles[filename] - ignore_nodes
1985
1986 # Nodes missing file
1987 missing_file = expected_nodes - with_file
1988
1989 if filename in files_opt:
1990 # All or no nodes
1991 errorif(missing_file and missing_file != expected_nodes,
1992 constants.CV_ECLUSTERFILECHECK, None,
1993 "File %s is optional, but it must exist on all or no"
1994 " nodes (not found on %s)",
1995 filename, utils.CommaJoin(utils.NiceSort(missing_file)))
1996 else:
1997 errorif(missing_file, constants.CV_ECLUSTERFILECHECK, None,
1998 "File %s is missing from node(s) %s", filename,
1999 utils.CommaJoin(utils.NiceSort(missing_file)))
2000
2001 # Warn if a node has a file it shouldn't
2002 unexpected = with_file - expected_nodes
2003 errorif(unexpected,
2004 constants.CV_ECLUSTERFILECHECK, None,
2005 "File %s should not exist on node(s) %s",
2006 filename, utils.CommaJoin(utils.NiceSort(unexpected)))
2007
2008 # See if there are multiple versions of the file
2009 test = len(checksums) > 1
2010 if test:
2011 variants = ["variant %s on %s" %
2012 (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
2013 for (idx, (checksum, nodes)) in
2014 enumerate(sorted(checksums.items()))]
2015 else:
2016 variants = []
2017
2018 errorif(test, constants.CV_ECLUSTERFILECHECK, None,
2019 "File %s found with %s different checksums (%s)",
2020 filename, len(checksums), "; ".join(variants))
2021
2022 def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2023 drbd_map):
2024 """Verifies and the node DRBD status.
2025
2026 @type ninfo: L{objects.Node}
2027 @param ninfo: the node to check
2028 @param nresult: the remote results for the node
2029 @param instanceinfo: the dict of instances
2030 @param drbd_helper: the configured DRBD usermode helper
2031 @param drbd_map: the DRBD map as returned by
2032 L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2033
2034 """
2035 node = ninfo.name
2036 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2037
2038 if drbd_helper:
2039 helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2040 test = (helper_result is None)
2041 _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2042 "no drbd usermode helper returned")
2043 if helper_result:
2044 status, payload = helper_result
2045 test = not status
2046 _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2047 "drbd usermode helper check unsuccessful: %s", payload)
2048 test = status and (payload != drbd_helper)
2049 _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2050 "wrong drbd usermode helper: %s", payload)
2051
2052 # compute the DRBD minors
2053 node_drbd = {}
2054 for minor, instance in drbd_map[node].items():
2055 test = instance not in instanceinfo
2056 _ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2057 "ghost instance '%s' in temporary DRBD map", instance)
2058 # ghost instance should not be running, but otherwise we
2059 # don't give double warnings (both ghost instance and
2060 # unallocated minor in use)
2061 if test:
2062 node_drbd[minor] = (instance, False)
2063 else:
2064 instance = instanceinfo[instance]
2065 node_drbd[minor] = (instance.name,
2066 instance.admin_state == constants.ADMINST_UP)
2067
2068 # and now check them
2069 used_minors = nresult.get(constants.NV_DRBDLIST, [])
2070 test = not isinstance(used_minors, (tuple, list))
2071 _ErrorIf(test, constants.CV_ENODEDRBD, node,
2072 "cannot parse drbd status file: %s", str(used_minors))
2073 if test:
2074 # we cannot check drbd status
2075 return
2076
2077 for minor, (iname, must_exist) in node_drbd.items():
2078 test = minor not in used_minors and must_exist
2079 _ErrorIf(test, constants.CV_ENODEDRBD, node,
2080 "drbd minor %d of instance %s is not active", minor, iname)
2081 for minor in used_minors:
2082 test = minor not in node_drbd
2083 _ErrorIf(test, constants.CV_ENODEDRBD, node,
2084 "unallocated drbd minor %d is in use", minor)
2085
2086 def _UpdateNodeOS(self, ninfo, nresult, nimg):
2087 """Builds the node OS structures.
2088
2089 @type ninfo: L{objects.Node}
2090 @param ninfo: the node to check
2091 @param nresult: the remote results for the node
2092 @param nimg: the node image object
2093
2094 """
2095 node = ninfo.name
2096 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2097
2098 remote_os = nresult.get(constants.NV_OSLIST, None)
2099 test = (not isinstance(remote_os, list) or
2100 not compat.all(isinstance(v, list) and len(v) == 7
2101 for v in remote_os))
2102
2103 _ErrorIf(test, constants.CV_ENODEOS, node,
2104 "node hasn't returned valid OS data")
2105
2106 nimg.os_fail = test
2107
2108 if test:
2109 return
2110
2111 os_dict = {}
2112
2113 for (name, os_path, status, diagnose,
2114 variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2115
2116 if name not in os_dict:
2117 os_dict[name] = []
2118
2119 # parameters is a list of lists instead of list of tuples due to
2120 # JSON lacking a real tuple type, fix it:
2121 parameters = [tuple(v) for v in parameters]
2122 os_dict[name].append((os_path, status, diagnose,
2123 set(variants), set(parameters), set(api_ver)))
2124
2125 nimg.oslist = os_dict
2126
2127 def _VerifyNodeOS(self, ninfo, nimg, base):
2128 """Verifies the node OS list.
2129
2130 @type ninfo: L{objects.Node}
2131 @param ninfo: the node to check
2132 @param nimg: the node image object
2133 @param base: the 'template' node we match against (e.g. from the master)
2134
2135 """
2136 node = ninfo.name
2137 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2138
2139 assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2140
2141 beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2142 for os_name, os_data in nimg.oslist.items():
2143 assert os_data, "Empty OS status for OS %s?!" % os_name
2144 f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2145 _ErrorIf(not f_status, constants.CV_ENODEOS, node,
2146 "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
2147 _ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, node,
2148 "OS '%s' has multiple entries (first one shadows the rest): %s",
2149 os_name, utils.CommaJoin([v[0] for v in os_data]))
2150 # comparisons with the 'base' image
2151 test = os_name not in base.oslist
2152 _ErrorIf(test, constants.CV_ENODEOS, node,
2153 "Extra OS %s not present on reference node (%s)",
2154 os_name, base.name)
2155 if test:
2156 continue
2157 assert base.oslist[os_name], "Base node has empty OS status?"
2158 _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2159 if not b_status:
2160 # base OS is invalid, skipping
2161 continue
2162 for kind, a, b in [("API version", f_api, b_api),
2163 ("variants list", f_var, b_var),
2164 ("parameters", beautify_params(f_param),
2165 beautify_params(b_param))]:
2166 _ErrorIf(a != b, constants.CV_ENODEOS, node,
2167 "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
2168 kind, os_name, base.name,
2169 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2170
2171 # check any missing OSes
2172 missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2173 _ErrorIf(missing, constants.CV_ENODEOS, node,
2174 "OSes present on reference node %s but missing on this node: %s",
2175 base.name, utils.CommaJoin(missing))
2176
2177 def _VerifyFileStoragePaths(self, ninfo, nresult, is_master):
2178 """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2179
2180 @type ninfo: L{objects.Node}
2181 @param ninfo: the node to check
2182 @param nresult: the remote results for the node
2183 @type is_master: bool
2184 @param is_master: Whether node is the master node
2185
2186 """
2187 node = ninfo.name
2188
2189 if (is_master and
2190 (constants.ENABLE_FILE_STORAGE or
2191 constants.ENABLE_SHARED_FILE_STORAGE)):
2192 try:
2193 fspaths = nresult[constants.NV_FILE_STORAGE_PATHS]
2194 except KeyError:
2195 # This should never happen
2196 self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, node,
2197 "Node did not return forbidden file storage paths")
2198 else:
2199 self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, node,
2200 "Found forbidden file storage paths: %s",
2201 utils.CommaJoin(fspaths))
2202 else:
2203 self._ErrorIf(constants.NV_FILE_STORAGE_PATHS in nresult,
2204 constants.CV_ENODEFILESTORAGEPATHS, node,
2205 "Node should not have returned forbidden file storage"
2206 " paths")
2207
2208 def _VerifyOob(self, ninfo, nresult):
2209 """Verifies out of band functionality of a node.
2210
2211 @type ninfo: L{objects.Node}
2212 @param ninfo: the node to check
2213 @param nresult: the remote results for the node
2214
2215 """
2216 node = ninfo.name
2217 # We just have to verify the paths on master and/or master candidates
2218 # as the oob helper is invoked on the master
2219 if ((ninfo.master_candidate or ninfo.master_capable) and
2220 constants.NV_OOB_PATHS in nresult):
2221 for path_result in nresult[constants.NV_OOB_PATHS]:
2222 self._ErrorIf(path_result, constants.CV_ENODEOOBPATH, node, path_result)
2223
2224 def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2225 """Verifies and updates the node volume data.
2226
2227 This function will update a L{NodeImage}'s internal structures
2228 with data from the remote call.
2229
2230 @type ninfo: L{objects.Node}
2231 @param ninfo: the node to check
2232 @param nresult: the remote results for the node
2233 @param nimg: the node image object
2234 @param vg_name: the configured VG name
2235
2236 """
2237 node = ninfo.name
2238 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2239
2240 nimg.lvm_fail = True
2241 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2242 if vg_name is None:
2243 pass
2244 elif isinstance(lvdata, basestring):
2245 _ErrorIf(True, constants.CV_ENODELVM, node, "LVM problem on node: %s",
2246 utils.SafeEncode(lvdata))
2247 elif not isinstance(lvdata, dict):
2248 _ErrorIf(True, constants.CV_ENODELVM, node,
2249 "rpc call to node failed (lvlist)")
2250 else:
2251 nimg.volumes = lvdata
2252 nimg.lvm_fail = False
2253
2254 def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2255 """Verifies and updates the node instance list.
2256
2257 If the listing was successful, then updates this node's instance
2258 list. Otherwise, it marks the RPC call as failed for the instance
2259 list key.
2260
2261 @type ninfo: L{objects.Node}
2262 @param ninfo: the node to check
2263 @param nresult: the remote results for the node
2264 @param nimg: the node image object
2265
2266 """
2267 idata = nresult.get(constants.NV_INSTANCELIST, None)
2268 test = not isinstance(idata, list)
2269 self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2270 "rpc call to node failed (instancelist): %s",
2271 utils.SafeEncode(str(idata)))
2272 if test:
2273 nimg.hyp_fail = True
2274 else:
2275 nimg.instances = idata
2276
2277 def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2278 """Verifies and computes a node information map
2279
2280 @type ninfo: L{objects.Node}
2281 @param ninfo: the node to check
2282 @param nresult: the remote results for the node
2283 @param nimg: the node image object
2284 @param vg_name: the configured VG name
2285
2286 """
2287 node = ninfo.name
2288 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2289
2290 # try to read free memory (from the hypervisor)
2291 hv_info = nresult.get(constants.NV_HVINFO, None)
2292 test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2293 _ErrorIf(test, constants.CV_ENODEHV, node,
2294 "rpc call to node failed (hvinfo)")
2295 if not test:
2296 try:
2297 nimg.mfree = int(hv_info["memory_free"])
2298 except (ValueError, TypeError):
2299 _ErrorIf(True, constants.CV_ENODERPC, node,
2300 "node returned invalid nodeinfo, check hypervisor")
2301
2302 # FIXME: devise a free space model for file based instances as well
2303 if vg_name is not None:
2304 test = (constants.NV_VGLIST not in nresult or
2305 vg_name not in nresult[constants.NV_VGLIST])
2306 _ErrorIf(test, constants.CV_ENODELVM, node,
2307 "node didn't return data for the volume group '%s'"
2308 " - it is either missing or broken", vg_name)
2309 if not test:
2310 try:
2311 nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2312 except (ValueError, TypeError):
2313 _ErrorIf(True, constants.CV_ENODERPC, node,
2314 "node returned invalid LVM info, check LVM status")
2315
2316 def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
2317 """Gets per-disk status information for all instances.
2318
2319 @type nodelist: list of strings
2320 @param nodelist: Node names
2321 @type node_image: dict of (name, L{objects.Node})
2322 @param node_image: Node objects
2323 @type instanceinfo: dict of (name, L{objects.Instance})
2324 @param instanceinfo: Instance objects
2325 @rtype: {instance: {node: [(succes, payload)]}}
2326 @return: a dictionary of per-instance dictionaries with nodes as
2327 keys and disk information as values; the disk information is a
2328 list of tuples (success, payload)
2329
2330 """
2331 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2332
2333 node_disks = {}
2334 node_disks_devonly = {}
2335 diskless_instances = set()
2336 diskless = constants.DT_DISKLESS
2337
2338 for nname in nodelist:
2339 node_instances = list(itertools.chain(node_image[nname].pinst,
2340 node_image[nname].sinst))
2341 diskless_instances.update(inst for inst in node_instances
2342 if instanceinfo[inst].disk_template == diskless)
2343 disks = [(inst, disk)
2344 for inst in node_instances
2345 for disk in instanceinfo[inst].disks]
2346
2347 if not disks:
2348 # No need to collect data
2349 continue
2350
2351 node_disks[nname] = disks
2352
2353 # _AnnotateDiskParams makes already copies of the disks
2354 devonly = []
2355 for (inst, dev) in disks:
2356 (anno_disk,) = AnnotateDiskParams(instanceinfo[inst], [dev], self.cfg)
2357 self.cfg.SetDiskID(anno_disk, nname)
2358 devonly.append(anno_disk)
2359
2360 node_disks_devonly[nname] = devonly
2361
2362 assert len(node_disks) == len(node_disks_devonly)
2363
2364 # Collect data from all nodes with disks
2365 result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2366 node_disks_devonly)
2367
2368 assert len(result) == len(node_disks)
2369
2370 instdisk = {}
2371
2372 for (nname, nres) in result.items():
2373 disks = node_disks[nname]
2374
2375 if nres.offline:
2376 # No data from this node
2377 data = len(disks) * [(False, "node offline")]
2378 else:
2379 msg = nres.fail_msg
2380 _ErrorIf(msg, constants.CV_ENODERPC, nname,
2381 "while getting disk information: %s", msg)
2382 if msg:
2383 # No data from this node
2384 data = len(disks) * [(False, msg)]
2385 else:
2386 data = []
2387 for idx, i in enumerate(nres.payload):
2388 if isinstance(i, (tuple, list)) and len(i) == 2:
2389 data.append(i)
2390 else:
2391 logging.warning("Invalid result from node %s, entry %d: %s",
2392 nname, idx, i)
2393 data.append((False, "Invalid result from the remote node"))
2394
2395 for ((inst, _), status) in zip(disks, data):
2396 instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
2397
2398 # Add empty entries for diskless instances.
2399 for inst in diskless_instances:
2400 assert inst not in instdisk
2401 instdisk[inst] = {}
2402
2403 assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2404 len(nnames) <= len(instanceinfo[inst].all_nodes) and
2405 compat.all(isinstance(s, (tuple, list)) and
2406 len(s) == 2 for s in statuses)
2407 for inst, nnames in instdisk.items()
2408 for nname, statuses in nnames.items())
2409 if __debug__:
2410 instdisk_keys = set(instdisk)
2411 instanceinfo_keys = set(instanceinfo)
2412 assert instdisk_keys == instanceinfo_keys, \
2413 ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2414 (instdisk_keys, instanceinfo_keys))
2415
2416 return instdisk
2417
2418 @staticmethod
2419 def _SshNodeSelector(group_uuid, all_nodes):
2420 """Create endless iterators for all potential SSH check hosts.
2421
2422 """
2423 nodes = [node for node in all_nodes
2424 if (node.group != group_uuid and
2425 not node.offline)]
2426 keyfunc = operator.attrgetter("group")
2427
2428 return map(itertools.cycle,
2429 [sorted(map(operator.attrgetter("name"), names))
2430 for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2431 keyfunc)])
2432
2433 @classmethod
2434 def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2435 """Choose which nodes should talk to which other nodes.
2436
2437 We will make nodes contact all nodes in their group, and one node from
2438 every other group.
2439
2440 @warning: This algorithm has a known issue if one node group is much
2441 smaller than others (e.g. just one node). In such a case all other
2442 nodes will talk to the single node.
2443
2444 """
2445 online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2446 sel = cls._SshNodeSelector(group_uuid, all_nodes)
2447
2448 return (online_nodes,
2449 dict((name, sorted([i.next() for i in sel]))
2450 for name in online_nodes))
2451
2452 def BuildHooksEnv(self):
2453 """Build hooks env.
2454
2455 Cluster-Verify hooks just ran in the post phase and their failure makes
2456 the output be logged in the verify output and the verification to fail.
2457
2458 """
2459 env = {
2460 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2461 }
2462
2463 env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2464 for node in self.my_node_info.values())
2465
2466 return env
2467
2468 def BuildHooksNodes(self):
2469 """Build hooks nodes.
2470
2471 """
2472 return ([], self.my_node_names)
2473
2474 def Exec(self, feedback_fn):
2475 """Verify integrity of the node group, performing various test on nodes.
2476
2477 """
2478 # This method has too many local variables. pylint: disable=R0914
2479 feedback_fn("* Verifying group '%s'" % self.group_info.name)
2480
2481 if not self.my_node_names:
2482 # empty node group
2483 feedback_fn("* Empty node group, skipping verification")
2484 return True
2485
2486 self.bad = False
2487 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2488 verbose = self.op.verbose
2489 self._feedback_fn = feedback_fn
2490
2491 vg_name = self.cfg.GetVGName()
2492 drbd_helper = self.cfg.GetDRBDHelper()
2493 cluster = self.cfg.GetClusterInfo()
2494 hypervisors = cluster.enabled_hypervisors
2495 node_data_list = [self.my_node_info[name] for name in self.my_node_names]
2496
2497 i_non_redundant = [] # Non redundant instances
2498 i_non_a_balanced = [] # Non auto-balanced instances
2499 i_offline = 0 # Count of offline instances
2500 n_offline = 0 # Count of offline nodes
2501 n_drained = 0 # Count of nodes being drained
2502 node_vol_should = {}
2503
2504 # FIXME: verify OS list
2505
2506 # File verification
2507 filemap = ComputeAncillaryFiles(cluster, False)
2508
2509 # do local checksums
2510 master_node = self.master_node = self.cfg.GetMasterNode()
2511 master_ip = self.cfg.GetMasterIP()
2512
2513 feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
2514
2515 user_scripts = []
2516 if self.cfg.GetUseExternalMipScript():
2517 user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2518
2519 node_verify_param = {
2520 constants.NV_FILELIST:
2521 map(vcluster.MakeVirtualPath,
2522 utils.UniqueSequence(filename
2523 for files in filemap
2524 for filename in files)),
2525 constants.NV_NODELIST:
2526 self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2527 self.all_node_info.values()),
2528 constants.NV_HYPERVISOR: hypervisors,
2529 constants.NV_HVPARAMS:
2530 _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2531 constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2532 for node in node_data_list
2533 if not node.offline],
2534 constants.NV_INSTANCELIST: hypervisors,
2535 constants.NV_VERSION: None,
2536 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2537 constants.NV_NODESETUP: None,
2538 constants.NV_TIME: None,
2539 constants.NV_MASTERIP: (master_node, master_ip),
2540 constants.NV_OSLIST: None,
2541 constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2542 constants.NV_USERSCRIPTS: user_scripts,
2543 }
2544
2545 if vg_name is not None:
2546 node_verify_param[constants.NV_VGLIST] = None
2547 node_verify_param[constants.NV_LVLIST] = vg_name
2548 node_verify_param[constants.NV_PVLIST] = [vg_name]
2549
2550 if drbd_helper:
2551 node_verify_param[constants.NV_DRBDLIST] = None
2552 node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2553
2554 if constants.ENABLE_FILE_STORAGE or constants.ENABLE_SHARED_FILE_STORAGE:
2555 # Load file storage paths only from master node
2556 node_verify_param[constants.NV_FILE_STORAGE_PATHS] = master_node
2557
2558 # bridge checks
2559 # FIXME: this needs to be changed per node-group, not cluster-wide
2560 bridges = set()
2561 default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2562 if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2563 bridges.add(default_nicpp[constants.NIC_LINK])
2564 for instance in self.my_inst_info.values():
2565 for nic in instance.nics:
2566 full_nic = cluster.SimpleFillNIC(nic.nicparams)
2567 if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2568 bridges.add(full_nic[constants.NIC_LINK])
2569
2570 if bridges:
2571 node_verify_param[constants.NV_BRIDGES] = list(bridges)
2572
2573 # Build our expected cluster state
2574 node_image = dict((node.name, self.NodeImage(offline=node.offline,
2575 name=node.name,
2576 vm_capable=node.vm_capable))
2577 for node in node_data_list)
2578
2579 # Gather OOB paths
2580 oob_paths = []
2581 for node in self.all_node_info.values():
2582 path = SupportsOob(self.cfg, node)
2583 if path and path not in oob_paths:
2584 oob_paths.append(path)
2585
2586 if oob_paths:
2587 node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2588
2589 for instance in self.my_inst_names:
2590 inst_config = self.my_inst_info[instance]
2591 if inst_config.admin_state == constants.ADMINST_OFFLINE:
2592 i_offline += 1
2593
2594 for nname in inst_config.all_nodes:
2595 if nname not in node_image:
2596 gnode = self.NodeImage(name=nname)
2597 gnode.ghost = (nname not in self.all_node_info)
2598 node_image[nname] = gnode
2599
2600 inst_config.MapLVsByNode(node_vol_should)
2601
2602 pnode = inst_config.primary_node
2603 node_image[pnode].pinst.append(instance)
2604
2605 for snode in inst_config.secondary_nodes:
2606 nimg = node_image[snode]
2607 nimg.sinst.append(instance)
2608 if pnode not in nimg.sbp:
2609 nimg.sbp[pnode] = []
2610 nimg.sbp[pnode].append(instance)
2611
2612 es_flags = rpc.GetExclusiveStorageForNodeNames(self.cfg, self.my_node_names)
2613 # The value of exclusive_storage should be the same across the group, so if
2614 # it's True for at least a node, we act as if it were set for all the nodes
2615 self._exclusive_storage = compat.any(es_flags.values())
2616 if self._exclusive_storage:
2617 node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2618
2619 # At this point, we have the in-memory data structures complete,
2620 # except for the runtime information, which we'll gather next
2621
2622 # Due to the way our RPC system works, exact response times cannot be
2623 # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2624 # time before and after executing the request, we can at least have a time
2625 # window.
2626 nvinfo_starttime = time.time()
2627 all_nvinfo = self.rpc.call_node_verify(self.my_node_names,
2628 node_verify_param,
2629 self.cfg.GetClusterName())
2630 nvinfo_endtime = time.time()
2631
2632 if self.extra_lv_nodes and vg_name is not None:
2633 extra_lv_nvinfo = \
2634 self.rpc.call_node_verify(self.extra_lv_nodes,
2635 {constants.NV_LVLIST: vg_name},
2636 self.cfg.GetClusterName())
2637 else:
2638 extra_lv_nvinfo = {}
2639
2640 all_drbd_map = self.cfg.ComputeDRBDMap()
2641
2642 feedback_fn("* Gathering disk information (%s nodes)" %
2643 len(self.my_node_names))
2644 instdisk = self._CollectDiskInfo(self.my_node_names, node_image,
2645 self.my_inst_info)
2646
2647 feedback_fn("* Verifying configuration file consistency")
2648
2649 # If not all nodes are being checked, we need to make sure the master node
2650 # and a non-checked vm_capable node are in the list.
2651 absent_nodes = set(self.all_node_info).difference(self.my_node_info)
2652 if absent_nodes:
2653 vf_nvinfo = all_nvinfo.copy()
2654 vf_node_info = list(self.my_node_info.values())
2655 additional_nodes = []
2656 if master_node not in self.my_node_info:
2657 additional_nodes.append(master_node)
2658 vf_node_info.append(self.all_node_info[master_node])
2659 # Add the first vm_capable node we find which is not included,
2660 # excluding the master node (which we already have)
2661 for node in absent_nodes:
2662 nodeinfo = self.all_node_info[node]
2663 if (nodeinfo.vm_capable and not nodeinfo.offline and
2664 node != master_node):
2665 additional_nodes.append(node)
2666 vf_node_info.append(self.all_node_info[node])
2667 break
2668 key = constants.NV_FILELIST
2669 vf_nvinfo.update(self.rpc.call_node_verify(additional_nodes,
2670 {key: node_verify_param[key]},
2671 self.cfg.GetClusterName()))
2672 else:
2673 vf_nvinfo = all_nvinfo
2674 vf_node_info = self.my_node_info.values()
2675
2676 self._VerifyFiles(_ErrorIf, vf_node_info, master_node, vf_nvinfo, filemap)
2677
2678 feedback_fn("* Verifying node status")
2679
2680 refos_img = None
2681
2682 for node_i in node_data_list:
2683 node = node_i.name
2684 nimg = node_image[node]
2685
2686 if node_i.offline:
2687 if verbose:
2688 feedback_fn("* Skipping offline node %s" % (node,))
2689 n_offline += 1
2690 continue
2691
2692 if node == master_node:
2693 ntype = "master"
2694 elif node_i.master_candidate:
2695 ntype = "master candidate"
2696 elif node_i.drained:
2697 ntype = "drained"
2698 n_drained += 1
2699 else:
2700 ntype = "regular"
2701 if verbose:
2702 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2703
2704 msg = all_nvinfo[node].fail_msg
2705 _ErrorIf(msg, constants.CV_ENODERPC, node, "while contacting node: %s",
2706 msg)
2707 if msg:
2708 nimg.rpc_fail = True
2709 continue
2710
2711 nresult = all_nvinfo[node].payload
2712
2713 nimg.call_ok = self._VerifyNode(node_i, nresult)
2714 self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2715 self._VerifyNodeNetwork(node_i, nresult)
2716 self._VerifyNodeUserScripts(node_i, nresult)
2717 self._VerifyOob(node_i, nresult)
2718 self._VerifyFileStoragePaths(node_i, nresult,
2719 node == master_node)
2720
2721 if nimg.vm_capable:
2722 self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
2723 self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2724 all_drbd_map)
2725
2726 self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2727 self._UpdateNodeInstances(node_i, nresult, nimg)
2728 self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2729 self._UpdateNodeOS(node_i, nresult, nimg)
2730
2731 if not nimg.os_fail:
2732 if refos_img is None:
2733 refos_img = nimg
2734 self._VerifyNodeOS(node_i, nimg, refos_img)
2735 self._VerifyNodeBridges(node_i, nresult, bridges)
2736
2737 # Check whether all running instancies are primary for the node. (This
2738 # can no longer be done from _VerifyInstance below, since some of the
2739 # wrong instances could be from other node groups.)
2740 non_primary_inst = set(nimg.instances).difference(nimg.pinst)
2741
2742 for inst in non_primary_inst:
2743 test = inst in self.all_inst_info
2744 _ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, inst,
2745 "instance should not run on node %s", node_i.name)
2746 _ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
2747 "node is running unknown instance %s", inst)
2748
2749 self._VerifyGroupLVM(node_image, vg_name)
2750
2751 for node, result in extra_lv_nvinfo.items():
2752 self._UpdateNodeVolumes(self.all_node_info[node], result.payload,
2753 node_image[node], vg_name)
2754
2755 feedback_fn("* Verifying instance status")
2756 for instance in self.my_inst_names:
2757 if verbose:
2758 feedback_fn("* Verifying instance %s" % instance)
2759 inst_config = self.my_inst_info[instance]
2760 self._VerifyInstance(instance, inst_config, node_image,
2761 instdisk[instance])
2762
2763 # If the instance is non-redundant we cannot survive losing its primary
2764 # node, so we are not N+1 compliant.
2765 if inst_config.disk_template not in constants.DTS_MIRRORED:
2766 i_non_redundant.append(instance)
2767
2768 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2769 i_non_a_balanced.append(instance)
2770
2771 feedback_fn("* Verifying orphan volumes")
2772 reserved = utils.FieldSet(*cluster.reserved_lvs)
2773
2774 # We will get spurious "unknown volume" warnings if any node of this group
2775 # is secondary for an instance whose primary is in another group. To avoid
2776 # them, we find these instances and add their volumes to node_vol_should.
2777 for inst in self.all_inst_info.values():
2778 for secondary in inst.secondary_nodes:
2779 if (secondary in self.my_node_info
2780 and inst.name not in self.my_inst_info):
2781 inst.MapLVsByNode(node_vol_should)
2782 break
2783
2784 self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2785
2786 if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2787 feedback_fn("* Verifying N+1 Memory redundancy")
2788 self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
2789
2790 feedback_fn("* Other Notes")
2791 if i_non_redundant:
2792 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
2793 % len(i_non_redundant))
2794
2795 if i_non_a_balanced:
2796 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
2797 % len(i_non_a_balanced))
2798
2799 if i_offline:
2800 feedback_fn(" - NOTICE: %d offline instance(s) found." % i_offline)
2801
2802 if n_offline:
2803 feedback_fn(" - NOTICE: %d offline node(s) found." % n_offline)
2804
2805 if n_drained:
2806 feedback_fn(" - NOTICE: %d drained node(s) found." % n_drained)
2807
2808 return not self.bad
2809
2810 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2811 """Analyze the post-hooks' result
2812
2813 This method analyses the hook result, handles it, and sends some
2814 nicely-formatted feedback back to the user.
2815
2816 @param phase: one of L{constants.HOOKS_PHASE_POST} or
2817 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2818 @param hooks_results: the results of the multi-node hooks rpc call
2819 @param feedback_fn: function used send feedback back to the caller
2820 @param lu_result: previous Exec result
2821 @return: the new Exec result, based on the previous result
2822 and hook results
2823
2824 """
2825 # We only really run POST phase hooks, only for non-empty groups,
2826 # and are only interested in their results
2827 if not self.my_node_names:
2828 # empty node group
2829 pass
2830 elif phase == constants.HOOKS_PHASE_POST:
2831 # Used to change hooks' output to proper indentation
2832 feedback_fn("* Hooks Results")
2833 assert hooks_results, "invalid result from hooks"
2834
2835 for node_name in hooks_results:
2836 res = hooks_results[node_name]
2837 msg = res.fail_msg
2838 test = msg and not res.offline
2839 self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2840 "Communication failure in hooks execution: %s", msg)
2841 if res.offline or msg:
2842 # No need to investigate payload if node is offline or gave
2843 # an error.
2844 continue
2845 for script, hkr, output in res.payload:
2846 test = hkr == constants.HKR_FAIL
2847 self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2848 "Script %s failed, output:", script)
2849 if test:
2850 output = self._HOOKS_INDENT_RE.sub(" ", output)
2851 feedback_fn("%s" % output)
2852 lu_result = False
2853
2854 return lu_result
2855
2856
2857 class LUClusterVerifyDisks(NoHooksLU):
2858 """Verifies the cluster disks status.
2859
2860 """
2861 REQ_BGL = False
2862
2863 def ExpandNames(self):
2864 self.share_locks = ShareAll()
2865 self.needed_locks = {
2866 locking.LEVEL_NODEGROUP: locking.ALL_SET,
2867 }
2868
2869 def Exec(self, feedback_fn):
2870 group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
2871
2872 # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
2873 return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
2874 for group in group_names])