Add tags in network objects
[ganeti-github.git] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable=W0201,C0302
25
26 # W0201 since most LU attributes are defined in CheckPrereq or similar
27 # functions
28
29 # C0302: since we have waaaay too many lines in this module
30
31 import os
32 import os.path
33 import time
34 import re
35 import logging
36 import copy
37 import OpenSSL
38 import socket
39 import tempfile
40 import shutil
41 import itertools
42 import operator
43 import ipaddr
44
45 from ganeti import ssh
46 from ganeti import utils
47 from ganeti import errors
48 from ganeti import hypervisor
49 from ganeti import locking
50 from ganeti import constants
51 from ganeti import objects
52 from ganeti import ssconf
53 from ganeti import uidpool
54 from ganeti import compat
55 from ganeti import masterd
56 from ganeti import netutils
57 from ganeti import query
58 from ganeti import qlang
59 from ganeti import opcodes
60 from ganeti import ht
61 from ganeti import rpc
62 from ganeti import runtime
63 from ganeti import pathutils
64 from ganeti import vcluster
65 from ganeti import network
66 from ganeti.masterd import iallocator
67
68 import ganeti.masterd.instance # pylint: disable=W0611
69
70
71 # States of instance
72 INSTANCE_DOWN = [constants.ADMINST_DOWN]
73 INSTANCE_ONLINE = [constants.ADMINST_DOWN, constants.ADMINST_UP]
74 INSTANCE_NOT_RUNNING = [constants.ADMINST_DOWN, constants.ADMINST_OFFLINE]
75
76 #: Instance status in which an instance can be marked as offline/online
77 CAN_CHANGE_INSTANCE_OFFLINE = (frozenset(INSTANCE_DOWN) | frozenset([
78 constants.ADMINST_OFFLINE,
79 ]))
80
81
82 class ResultWithJobs:
83 """Data container for LU results with jobs.
84
85 Instances of this class returned from L{LogicalUnit.Exec} will be recognized
86 by L{mcpu._ProcessResult}. The latter will then submit the jobs
87 contained in the C{jobs} attribute and include the job IDs in the opcode
88 result.
89
90 """
91 def __init__(self, jobs, **kwargs):
92 """Initializes this class.
93
94 Additional return values can be specified as keyword arguments.
95
96 @type jobs: list of lists of L{opcode.OpCode}
97 @param jobs: A list of lists of opcode objects
98
99 """
100 self.jobs = jobs
101 self.other = kwargs
102
103
104 class LogicalUnit(object):
105 """Logical Unit base class.
106
107 Subclasses must follow these rules:
108 - implement ExpandNames
109 - implement CheckPrereq (except when tasklets are used)
110 - implement Exec (except when tasklets are used)
111 - implement BuildHooksEnv
112 - implement BuildHooksNodes
113 - redefine HPATH and HTYPE
114 - optionally redefine their run requirements:
115 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
116
117 Note that all commands require root permissions.
118
119 @ivar dry_run_result: the value (if any) that will be returned to the caller
120 in dry-run mode (signalled by opcode dry_run parameter)
121
122 """
123 HPATH = None
124 HTYPE = None
125 REQ_BGL = True
126
127 def __init__(self, processor, op, context, rpc_runner):
128 """Constructor for LogicalUnit.
129
130 This needs to be overridden in derived classes in order to check op
131 validity.
132
133 """
134 self.proc = processor
135 self.op = op
136 self.cfg = context.cfg
137 self.glm = context.glm
138 # readability alias
139 self.owned_locks = context.glm.list_owned
140 self.context = context
141 self.rpc = rpc_runner
142 # Dicts used to declare locking needs to mcpu
143 self.needed_locks = None
144 self.share_locks = dict.fromkeys(locking.LEVELS, 0)
145 self.add_locks = {}
146 self.remove_locks = {}
147 # Used to force good behavior when calling helper functions
148 self.recalculate_locks = {}
149 # logging
150 self.Log = processor.Log # pylint: disable=C0103
151 self.LogWarning = processor.LogWarning # pylint: disable=C0103
152 self.LogInfo = processor.LogInfo # pylint: disable=C0103
153 self.LogStep = processor.LogStep # pylint: disable=C0103
154 # support for dry-run
155 self.dry_run_result = None
156 # support for generic debug attribute
157 if (not hasattr(self.op, "debug_level") or
158 not isinstance(self.op.debug_level, int)):
159 self.op.debug_level = 0
160
161 # Tasklets
162 self.tasklets = None
163
164 # Validate opcode parameters and set defaults
165 self.op.Validate(True)
166
167 self.CheckArguments()
168
169 def CheckArguments(self):
170 """Check syntactic validity for the opcode arguments.
171
172 This method is for doing a simple syntactic check and ensure
173 validity of opcode parameters, without any cluster-related
174 checks. While the same can be accomplished in ExpandNames and/or
175 CheckPrereq, doing these separate is better because:
176
177 - ExpandNames is left as as purely a lock-related function
178 - CheckPrereq is run after we have acquired locks (and possible
179 waited for them)
180
181 The function is allowed to change the self.op attribute so that
182 later methods can no longer worry about missing parameters.
183
184 """
185 pass
186
187 def ExpandNames(self):
188 """Expand names for this LU.
189
190 This method is called before starting to execute the opcode, and it should
191 update all the parameters of the opcode to their canonical form (e.g. a
192 short node name must be fully expanded after this method has successfully
193 completed). This way locking, hooks, logging, etc. can work correctly.
194
195 LUs which implement this method must also populate the self.needed_locks
196 member, as a dict with lock levels as keys, and a list of needed lock names
197 as values. Rules:
198
199 - use an empty dict if you don't need any lock
200 - if you don't need any lock at a particular level omit that
201 level (note that in this case C{DeclareLocks} won't be called
202 at all for that level)
203 - if you need locks at a level, but you can't calculate it in
204 this function, initialise that level with an empty list and do
205 further processing in L{LogicalUnit.DeclareLocks} (see that
206 function's docstring)
207 - don't put anything for the BGL level
208 - if you want all locks at a level use L{locking.ALL_SET} as a value
209
210 If you need to share locks (rather than acquire them exclusively) at one
211 level you can modify self.share_locks, setting a true value (usually 1) for
212 that level. By default locks are not shared.
213
214 This function can also define a list of tasklets, which then will be
215 executed in order instead of the usual LU-level CheckPrereq and Exec
216 functions, if those are not defined by the LU.
217
218 Examples::
219
220 # Acquire all nodes and one instance
221 self.needed_locks = {
222 locking.LEVEL_NODE: locking.ALL_SET,
223 locking.LEVEL_INSTANCE: ['instance1.example.com'],
224 }
225 # Acquire just two nodes
226 self.needed_locks = {
227 locking.LEVEL_NODE: ['node1.example.com', 'node2.example.com'],
228 }
229 # Acquire no locks
230 self.needed_locks = {} # No, you can't leave it to the default value None
231
232 """
233 # The implementation of this method is mandatory only if the new LU is
234 # concurrent, so that old LUs don't need to be changed all at the same
235 # time.
236 if self.REQ_BGL:
237 self.needed_locks = {} # Exclusive LUs don't need locks.
238 else:
239 raise NotImplementedError
240
241 def DeclareLocks(self, level):
242 """Declare LU locking needs for a level
243
244 While most LUs can just declare their locking needs at ExpandNames time,
245 sometimes there's the need to calculate some locks after having acquired
246 the ones before. This function is called just before acquiring locks at a
247 particular level, but after acquiring the ones at lower levels, and permits
248 such calculations. It can be used to modify self.needed_locks, and by
249 default it does nothing.
250
251 This function is only called if you have something already set in
252 self.needed_locks for the level.
253
254 @param level: Locking level which is going to be locked
255 @type level: member of L{ganeti.locking.LEVELS}
256
257 """
258
259 def CheckPrereq(self):
260 """Check prerequisites for this LU.
261
262 This method should check that the prerequisites for the execution
263 of this LU are fulfilled. It can do internode communication, but
264 it should be idempotent - no cluster or system changes are
265 allowed.
266
267 The method should raise errors.OpPrereqError in case something is
268 not fulfilled. Its return value is ignored.
269
270 This method should also update all the parameters of the opcode to
271 their canonical form if it hasn't been done by ExpandNames before.
272
273 """
274 if self.tasklets is not None:
275 for (idx, tl) in enumerate(self.tasklets):
276 logging.debug("Checking prerequisites for tasklet %s/%s",
277 idx + 1, len(self.tasklets))
278 tl.CheckPrereq()
279 else:
280 pass
281
282 def Exec(self, feedback_fn):
283 """Execute the LU.
284
285 This method should implement the actual work. It should raise
286 errors.OpExecError for failures that are somewhat dealt with in
287 code, or expected.
288
289 """
290 if self.tasklets is not None:
291 for (idx, tl) in enumerate(self.tasklets):
292 logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
293 tl.Exec(feedback_fn)
294 else:
295 raise NotImplementedError
296
297 def BuildHooksEnv(self):
298 """Build hooks environment for this LU.
299
300 @rtype: dict
301 @return: Dictionary containing the environment that will be used for
302 running the hooks for this LU. The keys of the dict must not be prefixed
303 with "GANETI_"--that'll be added by the hooks runner. The hooks runner
304 will extend the environment with additional variables. If no environment
305 should be defined, an empty dictionary should be returned (not C{None}).
306 @note: If the C{HPATH} attribute of the LU class is C{None}, this function
307 will not be called.
308
309 """
310 raise NotImplementedError
311
312 def BuildHooksNodes(self):
313 """Build list of nodes to run LU's hooks.
314
315 @rtype: tuple; (list, list)
316 @return: Tuple containing a list of node names on which the hook
317 should run before the execution and a list of node names on which the
318 hook should run after the execution. No nodes should be returned as an
319 empty list (and not None).
320 @note: If the C{HPATH} attribute of the LU class is C{None}, this function
321 will not be called.
322
323 """
324 raise NotImplementedError
325
326 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
327 """Notify the LU about the results of its hooks.
328
329 This method is called every time a hooks phase is executed, and notifies
330 the Logical Unit about the hooks' result. The LU can then use it to alter
331 its result based on the hooks. By default the method does nothing and the
332 previous result is passed back unchanged but any LU can define it if it
333 wants to use the local cluster hook-scripts somehow.
334
335 @param phase: one of L{constants.HOOKS_PHASE_POST} or
336 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
337 @param hook_results: the results of the multi-node hooks rpc call
338 @param feedback_fn: function used send feedback back to the caller
339 @param lu_result: the previous Exec result this LU had, or None
340 in the PRE phase
341 @return: the new Exec result, based on the previous result
342 and hook results
343
344 """
345 # API must be kept, thus we ignore the unused argument and could
346 # be a function warnings
347 # pylint: disable=W0613,R0201
348 return lu_result
349
350 def _ExpandAndLockInstance(self):
351 """Helper function to expand and lock an instance.
352
353 Many LUs that work on an instance take its name in self.op.instance_name
354 and need to expand it and then declare the expanded name for locking. This
355 function does it, and then updates self.op.instance_name to the expanded
356 name. It also initializes needed_locks as a dict, if this hasn't been done
357 before.
358
359 """
360 if self.needed_locks is None:
361 self.needed_locks = {}
362 else:
363 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
364 "_ExpandAndLockInstance called with instance-level locks set"
365 self.op.instance_name = _ExpandInstanceName(self.cfg,
366 self.op.instance_name)
367 self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
368
369 def _LockInstancesNodes(self, primary_only=False,
370 level=locking.LEVEL_NODE):
371 """Helper function to declare instances' nodes for locking.
372
373 This function should be called after locking one or more instances to lock
374 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
375 with all primary or secondary nodes for instances already locked and
376 present in self.needed_locks[locking.LEVEL_INSTANCE].
377
378 It should be called from DeclareLocks, and for safety only works if
379 self.recalculate_locks[locking.LEVEL_NODE] is set.
380
381 In the future it may grow parameters to just lock some instance's nodes, or
382 to just lock primaries or secondary nodes, if needed.
383
384 If should be called in DeclareLocks in a way similar to::
385
386 if level == locking.LEVEL_NODE:
387 self._LockInstancesNodes()
388
389 @type primary_only: boolean
390 @param primary_only: only lock primary nodes of locked instances
391 @param level: Which lock level to use for locking nodes
392
393 """
394 assert level in self.recalculate_locks, \
395 "_LockInstancesNodes helper function called with no nodes to recalculate"
396
397 # TODO: check if we're really been called with the instance locks held
398
399 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
400 # future we might want to have different behaviors depending on the value
401 # of self.recalculate_locks[locking.LEVEL_NODE]
402 wanted_nodes = []
403 locked_i = self.owned_locks(locking.LEVEL_INSTANCE)
404 for _, instance in self.cfg.GetMultiInstanceInfo(locked_i):
405 wanted_nodes.append(instance.primary_node)
406 if not primary_only:
407 wanted_nodes.extend(instance.secondary_nodes)
408
409 if self.recalculate_locks[level] == constants.LOCKS_REPLACE:
410 self.needed_locks[level] = wanted_nodes
411 elif self.recalculate_locks[level] == constants.LOCKS_APPEND:
412 self.needed_locks[level].extend(wanted_nodes)
413 else:
414 raise errors.ProgrammerError("Unknown recalculation mode")
415
416 del self.recalculate_locks[level]
417
418
419 class NoHooksLU(LogicalUnit): # pylint: disable=W0223
420 """Simple LU which runs no hooks.
421
422 This LU is intended as a parent for other LogicalUnits which will
423 run no hooks, in order to reduce duplicate code.
424
425 """
426 HPATH = None
427 HTYPE = None
428
429 def BuildHooksEnv(self):
430 """Empty BuildHooksEnv for NoHooksLu.
431
432 This just raises an error.
433
434 """
435 raise AssertionError("BuildHooksEnv called for NoHooksLUs")
436
437 def BuildHooksNodes(self):
438 """Empty BuildHooksNodes for NoHooksLU.
439
440 """
441 raise AssertionError("BuildHooksNodes called for NoHooksLU")
442
443
444 class Tasklet:
445 """Tasklet base class.
446
447 Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
448 they can mix legacy code with tasklets. Locking needs to be done in the LU,
449 tasklets know nothing about locks.
450
451 Subclasses must follow these rules:
452 - Implement CheckPrereq
453 - Implement Exec
454
455 """
456 def __init__(self, lu):
457 self.lu = lu
458
459 # Shortcuts
460 self.cfg = lu.cfg
461 self.rpc = lu.rpc
462
463 def CheckPrereq(self):
464 """Check prerequisites for this tasklets.
465
466 This method should check whether the prerequisites for the execution of
467 this tasklet are fulfilled. It can do internode communication, but it
468 should be idempotent - no cluster or system changes are allowed.
469
470 The method should raise errors.OpPrereqError in case something is not
471 fulfilled. Its return value is ignored.
472
473 This method should also update all parameters to their canonical form if it
474 hasn't been done before.
475
476 """
477 pass
478
479 def Exec(self, feedback_fn):
480 """Execute the tasklet.
481
482 This method should implement the actual work. It should raise
483 errors.OpExecError for failures that are somewhat dealt with in code, or
484 expected.
485
486 """
487 raise NotImplementedError
488
489
490 class _QueryBase:
491 """Base for query utility classes.
492
493 """
494 #: Attribute holding field definitions
495 FIELDS = None
496
497 #: Field to sort by
498 SORT_FIELD = "name"
499
500 def __init__(self, qfilter, fields, use_locking):
501 """Initializes this class.
502
503 """
504 self.use_locking = use_locking
505
506 self.query = query.Query(self.FIELDS, fields, qfilter=qfilter,
507 namefield=self.SORT_FIELD)
508 self.requested_data = self.query.RequestedData()
509 self.names = self.query.RequestedNames()
510
511 # Sort only if no names were requested
512 self.sort_by_name = not self.names
513
514 self.do_locking = None
515 self.wanted = None
516
517 def _GetNames(self, lu, all_names, lock_level):
518 """Helper function to determine names asked for in the query.
519
520 """
521 if self.do_locking:
522 names = lu.owned_locks(lock_level)
523 else:
524 names = all_names
525
526 if self.wanted == locking.ALL_SET:
527 assert not self.names
528 # caller didn't specify names, so ordering is not important
529 return utils.NiceSort(names)
530
531 # caller specified names and we must keep the same order
532 assert self.names
533 assert not self.do_locking or lu.glm.is_owned(lock_level)
534
535 missing = set(self.wanted).difference(names)
536 if missing:
537 raise errors.OpExecError("Some items were removed before retrieving"
538 " their data: %s" % missing)
539
540 # Return expanded names
541 return self.wanted
542
543 def ExpandNames(self, lu):
544 """Expand names for this query.
545
546 See L{LogicalUnit.ExpandNames}.
547
548 """
549 raise NotImplementedError()
550
551 def DeclareLocks(self, lu, level):
552 """Declare locks for this query.
553
554 See L{LogicalUnit.DeclareLocks}.
555
556 """
557 raise NotImplementedError()
558
559 def _GetQueryData(self, lu):
560 """Collects all data for this query.
561
562 @return: Query data object
563
564 """
565 raise NotImplementedError()
566
567 def NewStyleQuery(self, lu):
568 """Collect data and execute query.
569
570 """
571 return query.GetQueryResponse(self.query, self._GetQueryData(lu),
572 sort_by_name=self.sort_by_name)
573
574 def OldStyleQuery(self, lu):
575 """Collect data and execute query.
576
577 """
578 return self.query.OldStyleQuery(self._GetQueryData(lu),
579 sort_by_name=self.sort_by_name)
580
581
582 def _ShareAll():
583 """Returns a dict declaring all lock levels shared.
584
585 """
586 return dict.fromkeys(locking.LEVELS, 1)
587
588
589 def _AnnotateDiskParams(instance, devs, cfg):
590 """Little helper wrapper to the rpc annotation method.
591
592 @param instance: The instance object
593 @type devs: List of L{objects.Disk}
594 @param devs: The root devices (not any of its children!)
595 @param cfg: The config object
596 @returns The annotated disk copies
597 @see L{rpc.AnnotateDiskParams}
598
599 """
600 return rpc.AnnotateDiskParams(instance.disk_template, devs,
601 cfg.GetInstanceDiskParams(instance))
602
603
604 def _CheckInstancesNodeGroups(cfg, instances, owned_groups, owned_nodes,
605 cur_group_uuid):
606 """Checks if node groups for locked instances are still correct.
607
608 @type cfg: L{config.ConfigWriter}
609 @param cfg: Cluster configuration
610 @type instances: dict; string as key, L{objects.Instance} as value
611 @param instances: Dictionary, instance name as key, instance object as value
612 @type owned_groups: iterable of string
613 @param owned_groups: List of owned groups
614 @type owned_nodes: iterable of string
615 @param owned_nodes: List of owned nodes
616 @type cur_group_uuid: string or None
617 @param cur_group_uuid: Optional group UUID to check against instance's groups
618
619 """
620 for (name, inst) in instances.items():
621 assert owned_nodes.issuperset(inst.all_nodes), \
622 "Instance %s's nodes changed while we kept the lock" % name
623
624 inst_groups = _CheckInstanceNodeGroups(cfg, name, owned_groups)
625
626 assert cur_group_uuid is None or cur_group_uuid in inst_groups, \
627 "Instance %s has no node in group %s" % (name, cur_group_uuid)
628
629
630 def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups,
631 primary_only=False):
632 """Checks if the owned node groups are still correct for an instance.
633
634 @type cfg: L{config.ConfigWriter}
635 @param cfg: The cluster configuration
636 @type instance_name: string
637 @param instance_name: Instance name
638 @type owned_groups: set or frozenset
639 @param owned_groups: List of currently owned node groups
640 @type primary_only: boolean
641 @param primary_only: Whether to check node groups for only the primary node
642
643 """
644 inst_groups = cfg.GetInstanceNodeGroups(instance_name, primary_only)
645
646 if not owned_groups.issuperset(inst_groups):
647 raise errors.OpPrereqError("Instance %s's node groups changed since"
648 " locks were acquired, current groups are"
649 " are '%s', owning groups '%s'; retry the"
650 " operation" %
651 (instance_name,
652 utils.CommaJoin(inst_groups),
653 utils.CommaJoin(owned_groups)),
654 errors.ECODE_STATE)
655
656 return inst_groups
657
658
659 def _CheckNodeGroupInstances(cfg, group_uuid, owned_instances):
660 """Checks if the instances in a node group are still correct.
661
662 @type cfg: L{config.ConfigWriter}
663 @param cfg: The cluster configuration
664 @type group_uuid: string
665 @param group_uuid: Node group UUID
666 @type owned_instances: set or frozenset
667 @param owned_instances: List of currently owned instances
668
669 """
670 wanted_instances = cfg.GetNodeGroupInstances(group_uuid)
671 if owned_instances != wanted_instances:
672 raise errors.OpPrereqError("Instances in node group '%s' changed since"
673 " locks were acquired, wanted '%s', have '%s';"
674 " retry the operation" %
675 (group_uuid,
676 utils.CommaJoin(wanted_instances),
677 utils.CommaJoin(owned_instances)),
678 errors.ECODE_STATE)
679
680 return wanted_instances
681
682
683 def _SupportsOob(cfg, node):
684 """Tells if node supports OOB.
685
686 @type cfg: L{config.ConfigWriter}
687 @param cfg: The cluster configuration
688 @type node: L{objects.Node}
689 @param node: The node
690 @return: The OOB script if supported or an empty string otherwise
691
692 """
693 return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
694
695
696 def _CopyLockList(names):
697 """Makes a copy of a list of lock names.
698
699 Handles L{locking.ALL_SET} correctly.
700
701 """
702 if names == locking.ALL_SET:
703 return locking.ALL_SET
704 else:
705 return names[:]
706
707
708 def _GetWantedNodes(lu, nodes):
709 """Returns list of checked and expanded node names.
710
711 @type lu: L{LogicalUnit}
712 @param lu: the logical unit on whose behalf we execute
713 @type nodes: list
714 @param nodes: list of node names or None for all nodes
715 @rtype: list
716 @return: the list of nodes, sorted
717 @raise errors.ProgrammerError: if the nodes parameter is wrong type
718
719 """
720 if nodes:
721 return [_ExpandNodeName(lu.cfg, name) for name in nodes]
722
723 return utils.NiceSort(lu.cfg.GetNodeList())
724
725
726 def _GetWantedInstances(lu, instances):
727 """Returns list of checked and expanded instance names.
728
729 @type lu: L{LogicalUnit}
730 @param lu: the logical unit on whose behalf we execute
731 @type instances: list
732 @param instances: list of instance names or None for all instances
733 @rtype: list
734 @return: the list of instances, sorted
735 @raise errors.OpPrereqError: if the instances parameter is wrong type
736 @raise errors.OpPrereqError: if any of the passed instances is not found
737
738 """
739 if instances:
740 wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
741 else:
742 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
743 return wanted
744
745
746 def _GetUpdatedParams(old_params, update_dict,
747 use_default=True, use_none=False):
748 """Return the new version of a parameter dictionary.
749
750 @type old_params: dict
751 @param old_params: old parameters
752 @type update_dict: dict
753 @param update_dict: dict containing new parameter values, or
754 constants.VALUE_DEFAULT to reset the parameter to its default
755 value
756 @param use_default: boolean
757 @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
758 values as 'to be deleted' values
759 @param use_none: boolean
760 @type use_none: whether to recognise C{None} values as 'to be
761 deleted' values
762 @rtype: dict
763 @return: the new parameter dictionary
764
765 """
766 params_copy = copy.deepcopy(old_params)
767 for key, val in update_dict.iteritems():
768 if ((use_default and val == constants.VALUE_DEFAULT) or
769 (use_none and val is None)):
770 try:
771 del params_copy[key]
772 except KeyError:
773 pass
774 else:
775 params_copy[key] = val
776 return params_copy
777
778
779 def _GetUpdatedIPolicy(old_ipolicy, new_ipolicy, group_policy=False):
780 """Return the new version of a instance policy.
781
782 @param group_policy: whether this policy applies to a group and thus
783 we should support removal of policy entries
784
785 """
786 use_none = use_default = group_policy
787 ipolicy = copy.deepcopy(old_ipolicy)
788 for key, value in new_ipolicy.items():
789 if key not in constants.IPOLICY_ALL_KEYS:
790 raise errors.OpPrereqError("Invalid key in new ipolicy: %s" % key,
791 errors.ECODE_INVAL)
792 if key in constants.IPOLICY_ISPECS:
793 utils.ForceDictType(value, constants.ISPECS_PARAMETER_TYPES)
794 ipolicy[key] = _GetUpdatedParams(old_ipolicy.get(key, {}), value,
795 use_none=use_none,
796 use_default=use_default)
797 else:
798 if (not value or value == [constants.VALUE_DEFAULT] or
799 value == constants.VALUE_DEFAULT):
800 if group_policy:
801 del ipolicy[key]
802 else:
803 raise errors.OpPrereqError("Can't unset ipolicy attribute '%s'"
804 " on the cluster'" % key,
805 errors.ECODE_INVAL)
806 else:
807 if key in constants.IPOLICY_PARAMETERS:
808 # FIXME: we assume all such values are float
809 try:
810 ipolicy[key] = float(value)
811 except (TypeError, ValueError), err:
812 raise errors.OpPrereqError("Invalid value for attribute"
813 " '%s': '%s', error: %s" %
814 (key, value, err), errors.ECODE_INVAL)
815 else:
816 # FIXME: we assume all others are lists; this should be redone
817 # in a nicer way
818 ipolicy[key] = list(value)
819 try:
820 objects.InstancePolicy.CheckParameterSyntax(ipolicy, not group_policy)
821 except errors.ConfigurationError, err:
822 raise errors.OpPrereqError("Invalid instance policy: %s" % err,
823 errors.ECODE_INVAL)
824 return ipolicy
825
826
827 def _UpdateAndVerifySubDict(base, updates, type_check):
828 """Updates and verifies a dict with sub dicts of the same type.
829
830 @param base: The dict with the old data
831 @param updates: The dict with the new data
832 @param type_check: Dict suitable to ForceDictType to verify correct types
833 @returns: A new dict with updated and verified values
834
835 """
836 def fn(old, value):
837 new = _GetUpdatedParams(old, value)
838 utils.ForceDictType(new, type_check)
839 return new
840
841 ret = copy.deepcopy(base)
842 ret.update(dict((key, fn(base.get(key, {}), value))
843 for key, value in updates.items()))
844 return ret
845
846
847 def _MergeAndVerifyHvState(op_input, obj_input):
848 """Combines the hv state from an opcode with the one of the object
849
850 @param op_input: The input dict from the opcode
851 @param obj_input: The input dict from the objects
852 @return: The verified and updated dict
853
854 """
855 if op_input:
856 invalid_hvs = set(op_input) - constants.HYPER_TYPES
857 if invalid_hvs:
858 raise errors.OpPrereqError("Invalid hypervisor(s) in hypervisor state:"
859 " %s" % utils.CommaJoin(invalid_hvs),
860 errors.ECODE_INVAL)
861 if obj_input is None:
862 obj_input = {}
863 type_check = constants.HVSTS_PARAMETER_TYPES
864 return _UpdateAndVerifySubDict(obj_input, op_input, type_check)
865
866 return None
867
868
869 def _MergeAndVerifyDiskState(op_input, obj_input):
870 """Combines the disk state from an opcode with the one of the object
871
872 @param op_input: The input dict from the opcode
873 @param obj_input: The input dict from the objects
874 @return: The verified and updated dict
875 """
876 if op_input:
877 invalid_dst = set(op_input) - constants.DS_VALID_TYPES
878 if invalid_dst:
879 raise errors.OpPrereqError("Invalid storage type(s) in disk state: %s" %
880 utils.CommaJoin(invalid_dst),
881 errors.ECODE_INVAL)
882 type_check = constants.DSS_PARAMETER_TYPES
883 if obj_input is None:
884 obj_input = {}
885 return dict((key, _UpdateAndVerifySubDict(obj_input.get(key, {}), value,
886 type_check))
887 for key, value in op_input.items())
888
889 return None
890
891
892 def _ReleaseLocks(lu, level, names=None, keep=None):
893 """Releases locks owned by an LU.
894
895 @type lu: L{LogicalUnit}
896 @param level: Lock level
897 @type names: list or None
898 @param names: Names of locks to release
899 @type keep: list or None
900 @param keep: Names of locks to retain
901
902 """
903 assert not (keep is not None and names is not None), \
904 "Only one of the 'names' and the 'keep' parameters can be given"
905
906 if names is not None:
907 should_release = names.__contains__
908 elif keep:
909 should_release = lambda name: name not in keep
910 else:
911 should_release = None
912
913 owned = lu.owned_locks(level)
914 if not owned:
915 # Not owning any lock at this level, do nothing
916 pass
917
918 elif should_release:
919 retain = []
920 release = []
921
922 # Determine which locks to release
923 for name in owned:
924 if should_release(name):
925 release.append(name)
926 else:
927 retain.append(name)
928
929 assert len(lu.owned_locks(level)) == (len(retain) + len(release))
930
931 # Release just some locks
932 lu.glm.release(level, names=release)
933
934 assert frozenset(lu.owned_locks(level)) == frozenset(retain)
935 else:
936 # Release everything
937 lu.glm.release(level)
938
939 assert not lu.glm.is_owned(level), "No locks should be owned"
940
941
942 def _MapInstanceDisksToNodes(instances):
943 """Creates a map from (node, volume) to instance name.
944
945 @type instances: list of L{objects.Instance}
946 @rtype: dict; tuple of (node name, volume name) as key, instance name as value
947
948 """
949 return dict(((node, vol), inst.name)
950 for inst in instances
951 for (node, vols) in inst.MapLVsByNode().items()
952 for vol in vols)
953
954
955 def _RunPostHook(lu, node_name):
956 """Runs the post-hook for an opcode on a single node.
957
958 """
959 hm = lu.proc.BuildHooksManager(lu)
960 try:
961 hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name])
962 except Exception, err: # pylint: disable=W0703
963 lu.LogWarning("Errors occurred running hooks on %s: %s" % (node_name, err))
964
965
966 def _CheckOutputFields(static, dynamic, selected):
967 """Checks whether all selected fields are valid.
968
969 @type static: L{utils.FieldSet}
970 @param static: static fields set
971 @type dynamic: L{utils.FieldSet}
972 @param dynamic: dynamic fields set
973
974 """
975 f = utils.FieldSet()
976 f.Extend(static)
977 f.Extend(dynamic)
978
979 delta = f.NonMatching(selected)
980 if delta:
981 raise errors.OpPrereqError("Unknown output fields selected: %s"
982 % ",".join(delta), errors.ECODE_INVAL)
983
984
985 def _CheckGlobalHvParams(params):
986 """Validates that given hypervisor params are not global ones.
987
988 This will ensure that instances don't get customised versions of
989 global params.
990
991 """
992 used_globals = constants.HVC_GLOBALS.intersection(params)
993 if used_globals:
994 msg = ("The following hypervisor parameters are global and cannot"
995 " be customized at instance level, please modify them at"
996 " cluster level: %s" % utils.CommaJoin(used_globals))
997 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
998
999
1000 def _CheckNodeOnline(lu, node, msg=None):
1001 """Ensure that a given node is online.
1002
1003 @param lu: the LU on behalf of which we make the check
1004 @param node: the node to check
1005 @param msg: if passed, should be a message to replace the default one
1006 @raise errors.OpPrereqError: if the node is offline
1007
1008 """
1009 if msg is None:
1010 msg = "Can't use offline node"
1011 if lu.cfg.GetNodeInfo(node).offline:
1012 raise errors.OpPrereqError("%s: %s" % (msg, node), errors.ECODE_STATE)
1013
1014
1015 def _CheckNodeNotDrained(lu, node):
1016 """Ensure that a given node is not drained.
1017
1018 @param lu: the LU on behalf of which we make the check
1019 @param node: the node to check
1020 @raise errors.OpPrereqError: if the node is drained
1021
1022 """
1023 if lu.cfg.GetNodeInfo(node).drained:
1024 raise errors.OpPrereqError("Can't use drained node %s" % node,
1025 errors.ECODE_STATE)
1026
1027
1028 def _CheckNodeVmCapable(lu, node):
1029 """Ensure that a given node is vm capable.
1030
1031 @param lu: the LU on behalf of which we make the check
1032 @param node: the node to check
1033 @raise errors.OpPrereqError: if the node is not vm capable
1034
1035 """
1036 if not lu.cfg.GetNodeInfo(node).vm_capable:
1037 raise errors.OpPrereqError("Can't use non-vm_capable node %s" % node,
1038 errors.ECODE_STATE)
1039
1040
1041 def _CheckNodeHasOS(lu, node, os_name, force_variant):
1042 """Ensure that a node supports a given OS.
1043
1044 @param lu: the LU on behalf of which we make the check
1045 @param node: the node to check
1046 @param os_name: the OS to query about
1047 @param force_variant: whether to ignore variant errors
1048 @raise errors.OpPrereqError: if the node is not supporting the OS
1049
1050 """
1051 result = lu.rpc.call_os_get(node, os_name)
1052 result.Raise("OS '%s' not in supported OS list for node %s" %
1053 (os_name, node),
1054 prereq=True, ecode=errors.ECODE_INVAL)
1055 if not force_variant:
1056 _CheckOSVariant(result.payload, os_name)
1057
1058
1059 def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
1060 """Ensure that a node has the given secondary ip.
1061
1062 @type lu: L{LogicalUnit}
1063 @param lu: the LU on behalf of which we make the check
1064 @type node: string
1065 @param node: the node to check
1066 @type secondary_ip: string
1067 @param secondary_ip: the ip to check
1068 @type prereq: boolean
1069 @param prereq: whether to throw a prerequisite or an execute error
1070 @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True
1071 @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
1072
1073 """
1074 result = lu.rpc.call_node_has_ip_address(node, secondary_ip)
1075 result.Raise("Failure checking secondary ip on node %s" % node,
1076 prereq=prereq, ecode=errors.ECODE_ENVIRON)
1077 if not result.payload:
1078 msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
1079 " please fix and re-run this command" % secondary_ip)
1080 if prereq:
1081 raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
1082 else:
1083 raise errors.OpExecError(msg)
1084
1085
1086 def _GetClusterDomainSecret():
1087 """Reads the cluster domain secret.
1088
1089 """
1090 return utils.ReadOneLineFile(pathutils.CLUSTER_DOMAIN_SECRET_FILE,
1091 strict=True)
1092
1093
1094 def _CheckInstanceState(lu, instance, req_states, msg=None):
1095 """Ensure that an instance is in one of the required states.
1096
1097 @param lu: the LU on behalf of which we make the check
1098 @param instance: the instance to check
1099 @param msg: if passed, should be a message to replace the default one
1100 @raise errors.OpPrereqError: if the instance is not in the required state
1101
1102 """
1103 if msg is None:
1104 msg = "can't use instance from outside %s states" % ", ".join(req_states)
1105 if instance.admin_state not in req_states:
1106 raise errors.OpPrereqError("Instance '%s' is marked to be %s, %s" %
1107 (instance.name, instance.admin_state, msg),
1108 errors.ECODE_STATE)
1109
1110 if constants.ADMINST_UP not in req_states:
1111 pnode = instance.primary_node
1112 if not lu.cfg.GetNodeInfo(pnode).offline:
1113 ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
1114 ins_l.Raise("Can't contact node %s for instance information" % pnode,
1115 prereq=True, ecode=errors.ECODE_ENVIRON)
1116 if instance.name in ins_l.payload:
1117 raise errors.OpPrereqError("Instance %s is running, %s" %
1118 (instance.name, msg), errors.ECODE_STATE)
1119 else:
1120 lu.LogWarning("Primary node offline, ignoring check that instance"
1121 " is down")
1122
1123
1124 def _ComputeMinMaxSpec(name, qualifier, ipolicy, value):
1125 """Computes if value is in the desired range.
1126
1127 @param name: name of the parameter for which we perform the check
1128 @param qualifier: a qualifier used in the error message (e.g. 'disk/1',
1129 not just 'disk')
1130 @param ipolicy: dictionary containing min, max and std values
1131 @param value: actual value that we want to use
1132 @return: None or element not meeting the criteria
1133
1134
1135 """
1136 if value in [None, constants.VALUE_AUTO]:
1137 return None
1138 max_v = ipolicy[constants.ISPECS_MAX].get(name, value)
1139 min_v = ipolicy[constants.ISPECS_MIN].get(name, value)
1140 if value > max_v or min_v > value:
1141 if qualifier:
1142 fqn = "%s/%s" % (name, qualifier)
1143 else:
1144 fqn = name
1145 return ("%s value %s is not in range [%s, %s]" %
1146 (fqn, value, min_v, max_v))
1147 return None
1148
1149
1150 def _ComputeIPolicySpecViolation(ipolicy, mem_size, cpu_count, disk_count,
1151 nic_count, disk_sizes, spindle_use,
1152 _compute_fn=_ComputeMinMaxSpec):
1153 """Verifies ipolicy against provided specs.
1154
1155 @type ipolicy: dict
1156 @param ipolicy: The ipolicy
1157 @type mem_size: int
1158 @param mem_size: The memory size
1159 @type cpu_count: int
1160 @param cpu_count: Used cpu cores
1161 @type disk_count: int
1162 @param disk_count: Number of disks used
1163 @type nic_count: int
1164 @param nic_count: Number of nics used
1165 @type disk_sizes: list of ints
1166 @param disk_sizes: Disk sizes of used disk (len must match C{disk_count})
1167 @type spindle_use: int
1168 @param spindle_use: The number of spindles this instance uses
1169 @param _compute_fn: The compute function (unittest only)
1170 @return: A list of violations, or an empty list of no violations are found
1171
1172 """
1173 assert disk_count == len(disk_sizes)
1174
1175 test_settings = [
1176 (constants.ISPEC_MEM_SIZE, "", mem_size),
1177 (constants.ISPEC_CPU_COUNT, "", cpu_count),
1178 (constants.ISPEC_DISK_COUNT, "", disk_count),
1179 (constants.ISPEC_NIC_COUNT, "", nic_count),
1180 (constants.ISPEC_SPINDLE_USE, "", spindle_use),
1181 ] + [(constants.ISPEC_DISK_SIZE, str(idx), d)
1182 for idx, d in enumerate(disk_sizes)]
1183
1184 return filter(None,
1185 (_compute_fn(name, qualifier, ipolicy, value)
1186 for (name, qualifier, value) in test_settings))
1187
1188
1189 def _ComputeIPolicyInstanceViolation(ipolicy, instance,
1190 _compute_fn=_ComputeIPolicySpecViolation):
1191 """Compute if instance meets the specs of ipolicy.
1192
1193 @type ipolicy: dict
1194 @param ipolicy: The ipolicy to verify against
1195 @type instance: L{objects.Instance}
1196 @param instance: The instance to verify
1197 @param _compute_fn: The function to verify ipolicy (unittest only)
1198 @see: L{_ComputeIPolicySpecViolation}
1199
1200 """
1201 mem_size = instance.beparams.get(constants.BE_MAXMEM, None)
1202 cpu_count = instance.beparams.get(constants.BE_VCPUS, None)
1203 spindle_use = instance.beparams.get(constants.BE_SPINDLE_USE, None)
1204 disk_count = len(instance.disks)
1205 disk_sizes = [disk.size for disk in instance.disks]
1206 nic_count = len(instance.nics)
1207
1208 return _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count,
1209 disk_sizes, spindle_use)
1210
1211
1212 def _ComputeIPolicyInstanceSpecViolation(
1213 ipolicy, instance_spec, _compute_fn=_ComputeIPolicySpecViolation):
1214 """Compute if instance specs meets the specs of ipolicy.
1215
1216 @type ipolicy: dict
1217 @param ipolicy: The ipolicy to verify against
1218 @param instance_spec: dict
1219 @param instance_spec: The instance spec to verify
1220 @param _compute_fn: The function to verify ipolicy (unittest only)
1221 @see: L{_ComputeIPolicySpecViolation}
1222
1223 """
1224 mem_size = instance_spec.get(constants.ISPEC_MEM_SIZE, None)
1225 cpu_count = instance_spec.get(constants.ISPEC_CPU_COUNT, None)
1226 disk_count = instance_spec.get(constants.ISPEC_DISK_COUNT, 0)
1227 disk_sizes = instance_spec.get(constants.ISPEC_DISK_SIZE, [])
1228 nic_count = instance_spec.get(constants.ISPEC_NIC_COUNT, 0)
1229 spindle_use = instance_spec.get(constants.ISPEC_SPINDLE_USE, None)
1230
1231 return _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count,
1232 disk_sizes, spindle_use)
1233
1234
1235 def _ComputeIPolicyNodeViolation(ipolicy, instance, current_group,
1236 target_group,
1237 _compute_fn=_ComputeIPolicyInstanceViolation):
1238 """Compute if instance meets the specs of the new target group.
1239
1240 @param ipolicy: The ipolicy to verify
1241 @param instance: The instance object to verify
1242 @param current_group: The current group of the instance
1243 @param target_group: The new group of the instance
1244 @param _compute_fn: The function to verify ipolicy (unittest only)
1245 @see: L{_ComputeIPolicySpecViolation}
1246
1247 """
1248 if current_group == target_group:
1249 return []
1250 else:
1251 return _compute_fn(ipolicy, instance)
1252
1253
1254 def _CheckTargetNodeIPolicy(lu, ipolicy, instance, node, ignore=False,
1255 _compute_fn=_ComputeIPolicyNodeViolation):
1256 """Checks that the target node is correct in terms of instance policy.
1257
1258 @param ipolicy: The ipolicy to verify
1259 @param instance: The instance object to verify
1260 @param node: The new node to relocate
1261 @param ignore: Ignore violations of the ipolicy
1262 @param _compute_fn: The function to verify ipolicy (unittest only)
1263 @see: L{_ComputeIPolicySpecViolation}
1264
1265 """
1266 primary_node = lu.cfg.GetNodeInfo(instance.primary_node)
1267 res = _compute_fn(ipolicy, instance, primary_node.group, node.group)
1268
1269 if res:
1270 msg = ("Instance does not meet target node group's (%s) instance"
1271 " policy: %s") % (node.group, utils.CommaJoin(res))
1272 if ignore:
1273 lu.LogWarning(msg)
1274 else:
1275 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
1276
1277
1278 def _ComputeNewInstanceViolations(old_ipolicy, new_ipolicy, instances):
1279 """Computes a set of any instances that would violate the new ipolicy.
1280
1281 @param old_ipolicy: The current (still in-place) ipolicy
1282 @param new_ipolicy: The new (to become) ipolicy
1283 @param instances: List of instances to verify
1284 @return: A list of instances which violates the new ipolicy but
1285 did not before
1286
1287 """
1288 return (_ComputeViolatingInstances(new_ipolicy, instances) -
1289 _ComputeViolatingInstances(old_ipolicy, instances))
1290
1291
1292 def _ExpandItemName(fn, name, kind):
1293 """Expand an item name.
1294
1295 @param fn: the function to use for expansion
1296 @param name: requested item name
1297 @param kind: text description ('Node' or 'Instance')
1298 @return: the resolved (full) name
1299 @raise errors.OpPrereqError: if the item is not found
1300
1301 """
1302 full_name = fn(name)
1303 if full_name is None:
1304 raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
1305 errors.ECODE_NOENT)
1306 return full_name
1307
1308
1309 def _ExpandNodeName(cfg, name):
1310 """Wrapper over L{_ExpandItemName} for nodes."""
1311 return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
1312
1313
1314 def _ExpandInstanceName(cfg, name):
1315 """Wrapper over L{_ExpandItemName} for instance."""
1316 return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
1317
1318 def _BuildNetworkHookEnv(name, network, gateway, network6, gateway6,
1319 network_type, mac_prefix, tags):
1320 env = dict()
1321 if name:
1322 env["NETWORK_NAME"] = name
1323 if network:
1324 env["NETWORK_SUBNET"] = network
1325 if gateway:
1326 env["NETWORK_GATEWAY"] = gateway
1327 if network6:
1328 env["NETWORK_SUBNET6"] = network6
1329 if gateway6:
1330 env["NETWORK_GATEWAY6"] = gateway6
1331 if mac_prefix:
1332 env["NETWORK_MAC_PREFIX"] = mac_prefix
1333 if network_type:
1334 env["NETWORK_TYPE"] = network_type
1335 if tags:
1336 env["NETWORK_TAGS"] = " ".join(tags)
1337
1338 return env
1339
1340
1341 def _BuildNetworkHookEnvByObject(lu, network):
1342 args = {
1343 "name": network.name,
1344 "network": network.network,
1345 "gateway": network.gateway,
1346 "network6": network.network6,
1347 "gateway6": network.gateway6,
1348 "network_type": network.network_type,
1349 "mac_prefix": network.mac_prefix,
1350 "tags" : network.tags,
1351 }
1352 return _BuildNetworkHookEnv(**args)
1353
1354
1355 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
1356 minmem, maxmem, vcpus, nics, disk_template, disks,
1357 bep, hvp, hypervisor_name, tags):
1358 """Builds instance related env variables for hooks
1359
1360 This builds the hook environment from individual variables.
1361
1362 @type name: string
1363 @param name: the name of the instance
1364 @type primary_node: string
1365 @param primary_node: the name of the instance's primary node
1366 @type secondary_nodes: list
1367 @param secondary_nodes: list of secondary nodes as strings
1368 @type os_type: string
1369 @param os_type: the name of the instance's OS
1370 @type status: string
1371 @param status: the desired status of the instance
1372 @type minmem: string
1373 @param minmem: the minimum memory size of the instance
1374 @type maxmem: string
1375 @param maxmem: the maximum memory size of the instance
1376 @type vcpus: string
1377 @param vcpus: the count of VCPUs the instance has
1378 @type nics: list
1379 @param nics: list of tuples (ip, mac, mode, link, network) representing
1380 the NICs the instance has
1381 @type disk_template: string
1382 @param disk_template: the disk template of the instance
1383 @type disks: list
1384 @param disks: the list of (size, mode) pairs
1385 @type bep: dict
1386 @param bep: the backend parameters for the instance
1387 @type hvp: dict
1388 @param hvp: the hypervisor parameters for the instance
1389 @type hypervisor_name: string
1390 @param hypervisor_name: the hypervisor for the instance
1391 @type tags: list
1392 @param tags: list of instance tags as strings
1393 @rtype: dict
1394 @return: the hook environment for this instance
1395
1396 """
1397 env = {
1398 "OP_TARGET": name,
1399 "INSTANCE_NAME": name,
1400 "INSTANCE_PRIMARY": primary_node,
1401 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
1402 "INSTANCE_OS_TYPE": os_type,
1403 "INSTANCE_STATUS": status,
1404 "INSTANCE_MINMEM": minmem,
1405 "INSTANCE_MAXMEM": maxmem,
1406 # TODO(2.7) remove deprecated "memory" value
1407 "INSTANCE_MEMORY": maxmem,
1408 "INSTANCE_VCPUS": vcpus,
1409 "INSTANCE_DISK_TEMPLATE": disk_template,
1410 "INSTANCE_HYPERVISOR": hypervisor_name,
1411 }
1412 if nics:
1413 nic_count = len(nics)
1414 for idx, (ip, mac, mode, link, network, netinfo) in enumerate(nics):
1415 if ip is None:
1416 ip = ""
1417 env["INSTANCE_NIC%d_IP" % idx] = ip
1418 env["INSTANCE_NIC%d_MAC" % idx] = mac
1419 env["INSTANCE_NIC%d_MODE" % idx] = mode
1420 env["INSTANCE_NIC%d_LINK" % idx] = link
1421 if network:
1422 env["INSTANCE_NIC%d_NETWORK" % idx] = network
1423 if netinfo:
1424 nobj = objects.Network.FromDict(netinfo)
1425 if nobj.network:
1426 env["INSTANCE_NIC%d_NETWORK_SUBNET" % idx] = nobj.network
1427 if nobj.gateway:
1428 env["INSTANCE_NIC%d_NETWORK_GATEWAY" % idx] = nobj.gateway
1429 if nobj.network6:
1430 env["INSTANCE_NIC%d_NETWORK_SUBNET6" % idx] = nobj.network6
1431 if nobj.gateway6:
1432 env["INSTANCE_NIC%d_NETWORK_GATEWAY6" % idx] = nobj.gateway6
1433 if nobj.mac_prefix:
1434 env["INSTANCE_NIC%d_NETWORK_MAC_PREFIX" % idx] = nobj.mac_prefix
1435 if nobj.network_type:
1436 env["INSTANCE_NIC%d_NETWORK_TYPE" % idx] = nobj.network_type
1437 if nobj.tags:
1438 env["INSTANCE_NIC%d_NETWORK_TAGS" % idx] = " ".join(nobj.tags)
1439 if mode == constants.NIC_MODE_BRIDGED:
1440 env["INSTANCE_NIC%d_BRIDGE" % idx] = link
1441 else:
1442 nic_count = 0
1443
1444 env["INSTANCE_NIC_COUNT"] = nic_count
1445
1446 if disks:
1447 disk_count = len(disks)
1448 for idx, (size, mode) in enumerate(disks):
1449 env["INSTANCE_DISK%d_SIZE" % idx] = size
1450 env["INSTANCE_DISK%d_MODE" % idx] = mode
1451 else:
1452 disk_count = 0
1453
1454 env["INSTANCE_DISK_COUNT"] = disk_count
1455
1456 if not tags:
1457 tags = []
1458
1459 env["INSTANCE_TAGS"] = " ".join(tags)
1460
1461 for source, kind in [(bep, "BE"), (hvp, "HV")]:
1462 for key, value in source.items():
1463 env["INSTANCE_%s_%s" % (kind, key)] = value
1464
1465 return env
1466
1467 def _NICToTuple(lu, nic):
1468 """Build a tupple of nic information.
1469
1470 @type lu: L{LogicalUnit}
1471 @param lu: the logical unit on whose behalf we execute
1472 @type nic: L{objects.NIC}
1473 @param nic: nic to convert to hooks tuple
1474
1475 """
1476 cluster = lu.cfg.GetClusterInfo()
1477 ip = nic.ip
1478 mac = nic.mac
1479 filled_params = cluster.SimpleFillNIC(nic.nicparams)
1480 mode = filled_params[constants.NIC_MODE]
1481 link = filled_params[constants.NIC_LINK]
1482 network = nic.network
1483 netinfo = None
1484 if network:
1485 net_uuid = lu.cfg.LookupNetwork(network)
1486 if net_uuid:
1487 nobj = lu.cfg.GetNetwork(net_uuid)
1488 netinfo = objects.Network.ToDict(nobj)
1489 return (ip, mac, mode, link, network, netinfo)
1490
1491 def _NICListToTuple(lu, nics):
1492 """Build a list of nic information tuples.
1493
1494 This list is suitable to be passed to _BuildInstanceHookEnv or as a return
1495 value in LUInstanceQueryData.
1496
1497 @type lu: L{LogicalUnit}
1498 @param lu: the logical unit on whose behalf we execute
1499 @type nics: list of L{objects.NIC}
1500 @param nics: list of nics to convert to hooks tuples
1501
1502 """
1503 hooks_nics = []
1504 cluster = lu.cfg.GetClusterInfo()
1505 for nic in nics:
1506 hooks_nics.append(_NICToTuple(lu, nic))
1507 return hooks_nics
1508
1509 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
1510 """Builds instance related env variables for hooks from an object.
1511
1512 @type lu: L{LogicalUnit}
1513 @param lu: the logical unit on whose behalf we execute
1514 @type instance: L{objects.Instance}
1515 @param instance: the instance for which we should build the
1516 environment
1517 @type override: dict
1518 @param override: dictionary with key/values that will override
1519 our values
1520 @rtype: dict
1521 @return: the hook environment dictionary
1522
1523 """
1524 cluster = lu.cfg.GetClusterInfo()
1525 bep = cluster.FillBE(instance)
1526 hvp = cluster.FillHV(instance)
1527 args = {
1528 "name": instance.name,
1529 "primary_node": instance.primary_node,
1530 "secondary_nodes": instance.secondary_nodes,
1531 "os_type": instance.os,
1532 "status": instance.admin_state,
1533 "maxmem": bep[constants.BE_MAXMEM],
1534 "minmem": bep[constants.BE_MINMEM],
1535 "vcpus": bep[constants.BE_VCPUS],
1536 "nics": _NICListToTuple(lu, instance.nics),
1537 "disk_template": instance.disk_template,
1538 "disks": [(disk.size, disk.mode) for disk in instance.disks],
1539 "bep": bep,
1540 "hvp": hvp,
1541 "hypervisor_name": instance.hypervisor,
1542 "tags": instance.tags,
1543 }
1544 if override:
1545 args.update(override)
1546 return _BuildInstanceHookEnv(**args) # pylint: disable=W0142
1547
1548
1549 def _AdjustCandidatePool(lu, exceptions):
1550 """Adjust the candidate pool after node operations.
1551
1552 """
1553 mod_list = lu.cfg.MaintainCandidatePool(exceptions)
1554 if mod_list:
1555 lu.LogInfo("Promoted nodes to master candidate role: %s",
1556 utils.CommaJoin(node.name for node in mod_list))
1557 for name in mod_list:
1558 lu.context.ReaddNode(name)
1559 mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1560 if mc_now > mc_max:
1561 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
1562 (mc_now, mc_max))
1563
1564
1565 def _DecideSelfPromotion(lu, exceptions=None):
1566 """Decide whether I should promote myself as a master candidate.
1567
1568 """
1569 cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
1570 mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1571 # the new node will increase mc_max with one, so:
1572 mc_should = min(mc_should + 1, cp_size)
1573 return mc_now < mc_should
1574
1575
1576 def _ComputeViolatingInstances(ipolicy, instances):
1577 """Computes a set of instances who violates given ipolicy.
1578
1579 @param ipolicy: The ipolicy to verify
1580 @type instances: object.Instance
1581 @param instances: List of instances to verify
1582 @return: A frozenset of instance names violating the ipolicy
1583
1584 """
1585 return frozenset([inst.name for inst in instances
1586 if _ComputeIPolicyInstanceViolation(ipolicy, inst)])
1587
1588
1589 def _CheckNicsBridgesExist(lu, target_nics, target_node):
1590 """Check that the brigdes needed by a list of nics exist.
1591
1592 """
1593 cluster = lu.cfg.GetClusterInfo()
1594 paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics]
1595 brlist = [params[constants.NIC_LINK] for params in paramslist
1596 if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
1597 if brlist:
1598 result = lu.rpc.call_bridges_exist(target_node, brlist)
1599 result.Raise("Error checking bridges on destination node '%s'" %
1600 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
1601
1602
1603 def _CheckInstanceBridgesExist(lu, instance, node=None):
1604 """Check that the brigdes needed by an instance exist.
1605
1606 """
1607 if node is None:
1608 node = instance.primary_node
1609 _CheckNicsBridgesExist(lu, instance.nics, node)
1610
1611
1612 def _CheckOSVariant(os_obj, name):
1613 """Check whether an OS name conforms to the os variants specification.
1614
1615 @type os_obj: L{objects.OS}
1616 @param os_obj: OS object to check
1617 @type name: string
1618 @param name: OS name passed by the user, to check for validity
1619
1620 """
1621 variant = objects.OS.GetVariant(name)
1622 if not os_obj.supported_variants:
1623 if variant:
1624 raise errors.OpPrereqError("OS '%s' doesn't support variants ('%s'"
1625 " passed)" % (os_obj.name, variant),
1626 errors.ECODE_INVAL)
1627 return
1628 if not variant:
1629 raise errors.OpPrereqError("OS name must include a variant",
1630 errors.ECODE_INVAL)
1631
1632 if variant not in os_obj.supported_variants:
1633 raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
1634
1635
1636 def _GetNodeInstancesInner(cfg, fn):
1637 return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
1638
1639
1640 def _GetNodeInstances(cfg, node_name):
1641 """Returns a list of all primary and secondary instances on a node.
1642
1643 """
1644
1645 return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
1646
1647
1648 def _GetNodePrimaryInstances(cfg, node_name):
1649 """Returns primary instances on a node.
1650
1651 """
1652 return _GetNodeInstancesInner(cfg,
1653 lambda inst: node_name == inst.primary_node)
1654
1655
1656 def _GetNodeSecondaryInstances(cfg, node_name):
1657 """Returns secondary instances on a node.
1658
1659 """
1660 return _GetNodeInstancesInner(cfg,
1661 lambda inst: node_name in inst.secondary_nodes)
1662
1663
1664 def _GetStorageTypeArgs(cfg, storage_type):
1665 """Returns the arguments for a storage type.
1666
1667 """
1668 # Special case for file storage
1669 if storage_type == constants.ST_FILE:
1670 # storage.FileStorage wants a list of storage directories
1671 return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1672
1673 return []
1674
1675
1676 def _FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_name, prereq):
1677 faulty = []
1678
1679 for dev in instance.disks:
1680 cfg.SetDiskID(dev, node_name)
1681
1682 result = rpc_runner.call_blockdev_getmirrorstatus(node_name, (instance.disks,
1683 instance))
1684 result.Raise("Failed to get disk status from node %s" % node_name,
1685 prereq=prereq, ecode=errors.ECODE_ENVIRON)
1686
1687 for idx, bdev_status in enumerate(result.payload):
1688 if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
1689 faulty.append(idx)
1690
1691 return faulty
1692
1693
1694 def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
1695 """Check the sanity of iallocator and node arguments and use the
1696 cluster-wide iallocator if appropriate.
1697
1698 Check that at most one of (iallocator, node) is specified. If none is
1699 specified, or the iallocator is L{constants.DEFAULT_IALLOCATOR_SHORTCUT},
1700 then the LU's opcode's iallocator slot is filled with the cluster-wide
1701 default iallocator.
1702
1703 @type iallocator_slot: string
1704 @param iallocator_slot: the name of the opcode iallocator slot
1705 @type node_slot: string
1706 @param node_slot: the name of the opcode target node slot
1707
1708 """
1709 node = getattr(lu.op, node_slot, None)
1710 ialloc = getattr(lu.op, iallocator_slot, None)
1711 if node == []:
1712 node = None
1713
1714 if node is not None and ialloc is not None:
1715 raise errors.OpPrereqError("Do not specify both, iallocator and node",
1716 errors.ECODE_INVAL)
1717 elif ((node is None and ialloc is None) or
1718 ialloc == constants.DEFAULT_IALLOCATOR_SHORTCUT):
1719 default_iallocator = lu.cfg.GetDefaultIAllocator()
1720 if default_iallocator:
1721 setattr(lu.op, iallocator_slot, default_iallocator)
1722 else:
1723 raise errors.OpPrereqError("No iallocator or node given and no"
1724 " cluster-wide default iallocator found;"
1725 " please specify either an iallocator or a"
1726 " node, or set a cluster-wide default"
1727 " iallocator", errors.ECODE_INVAL)
1728
1729
1730 def _GetDefaultIAllocator(cfg, ialloc):
1731 """Decides on which iallocator to use.
1732
1733 @type cfg: L{config.ConfigWriter}
1734 @param cfg: Cluster configuration object
1735 @type ialloc: string or None
1736 @param ialloc: Iallocator specified in opcode
1737 @rtype: string
1738 @return: Iallocator name
1739
1740 """
1741 if not ialloc:
1742 # Use default iallocator
1743 ialloc = cfg.GetDefaultIAllocator()
1744
1745 if not ialloc:
1746 raise errors.OpPrereqError("No iallocator was specified, neither in the"
1747 " opcode nor as a cluster-wide default",
1748 errors.ECODE_INVAL)
1749
1750 return ialloc
1751
1752
1753 def _CheckHostnameSane(lu, name):
1754 """Ensures that a given hostname resolves to a 'sane' name.
1755
1756 The given name is required to be a prefix of the resolved hostname,
1757 to prevent accidental mismatches.
1758
1759 @param lu: the logical unit on behalf of which we're checking
1760 @param name: the name we should resolve and check
1761 @return: the resolved hostname object
1762
1763 """
1764 hostname = netutils.GetHostname(name=name)
1765 if hostname.name != name:
1766 lu.LogInfo("Resolved given name '%s' to '%s'", name, hostname.name)
1767 if not utils.MatchNameComponent(name, [hostname.name]):
1768 raise errors.OpPrereqError(("Resolved hostname '%s' does not look the"
1769 " same as given hostname '%s'") %
1770 (hostname.name, name), errors.ECODE_INVAL)
1771 return hostname
1772
1773
1774 class LUClusterPostInit(LogicalUnit):
1775 """Logical unit for running hooks after cluster initialization.
1776
1777 """
1778 HPATH = "cluster-init"
1779 HTYPE = constants.HTYPE_CLUSTER
1780
1781 def BuildHooksEnv(self):
1782 """Build hooks env.
1783
1784 """
1785 return {
1786 "OP_TARGET": self.cfg.GetClusterName(),
1787 }
1788
1789 def BuildHooksNodes(self):
1790 """Build hooks nodes.
1791
1792 """
1793 return ([], [self.cfg.GetMasterNode()])
1794
1795 def Exec(self, feedback_fn):
1796 """Nothing to do.
1797
1798 """
1799 return True
1800
1801
1802 class LUClusterDestroy(LogicalUnit):
1803 """Logical unit for destroying the cluster.
1804
1805 """
1806 HPATH = "cluster-destroy"
1807 HTYPE = constants.HTYPE_CLUSTER
1808
1809 def BuildHooksEnv(self):
1810 """Build hooks env.
1811
1812 """
1813 return {
1814 "OP_TARGET": self.cfg.GetClusterName(),
1815 }
1816
1817 def BuildHooksNodes(self):
1818 """Build hooks nodes.
1819
1820 """
1821 return ([], [])
1822
1823 def CheckPrereq(self):
1824 """Check prerequisites.
1825
1826 This checks whether the cluster is empty.
1827
1828 Any errors are signaled by raising errors.OpPrereqError.
1829
1830 """
1831 master = self.cfg.GetMasterNode()
1832
1833 nodelist = self.cfg.GetNodeList()
1834 if len(nodelist) != 1 or nodelist[0] != master:
1835 raise errors.OpPrereqError("There are still %d node(s) in"
1836 " this cluster." % (len(nodelist) - 1),
1837 errors.ECODE_INVAL)
1838 instancelist = self.cfg.GetInstanceList()
1839 if instancelist:
1840 raise errors.OpPrereqError("There are still %d instance(s) in"
1841 " this cluster." % len(instancelist),
1842 errors.ECODE_INVAL)
1843
1844 def Exec(self, feedback_fn):
1845 """Destroys the cluster.
1846
1847 """
1848 master_params = self.cfg.GetMasterNetworkParameters()
1849
1850 # Run post hooks on master node before it's removed
1851 _RunPostHook(self, master_params.name)
1852
1853 ems = self.cfg.GetUseExternalMipScript()
1854 result = self.rpc.call_node_deactivate_master_ip(master_params.name,
1855 master_params, ems)
1856 if result.fail_msg:
1857 self.LogWarning("Error disabling the master IP address: %s",
1858 result.fail_msg)
1859
1860 return master_params.name
1861
1862
1863 def _VerifyCertificate(filename):
1864 """Verifies a certificate for L{LUClusterVerifyConfig}.
1865
1866 @type filename: string
1867 @param filename: Path to PEM file
1868
1869 """
1870 try:
1871 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1872 utils.ReadFile(filename))
1873 except Exception, err: # pylint: disable=W0703
1874 return (LUClusterVerifyConfig.ETYPE_ERROR,
1875 "Failed to load X509 certificate %s: %s" % (filename, err))
1876
1877 (errcode, msg) = \
1878 utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1879 constants.SSL_CERT_EXPIRATION_ERROR)
1880
1881 if msg:
1882 fnamemsg = "While verifying %s: %s" % (filename, msg)
1883 else:
1884 fnamemsg = None
1885
1886 if errcode is None:
1887 return (None, fnamemsg)
1888 elif errcode == utils.CERT_WARNING:
1889 return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1890 elif errcode == utils.CERT_ERROR:
1891 return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1892
1893 raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1894
1895
1896 def _GetAllHypervisorParameters(cluster, instances):
1897 """Compute the set of all hypervisor parameters.
1898
1899 @type cluster: L{objects.Cluster}
1900 @param cluster: the cluster object
1901 @param instances: list of L{objects.Instance}
1902 @param instances: additional instances from which to obtain parameters
1903 @rtype: list of (origin, hypervisor, parameters)
1904 @return: a list with all parameters found, indicating the hypervisor they
1905 apply to, and the origin (can be "cluster", "os X", or "instance Y")
1906
1907 """
1908 hvp_data = []
1909
1910 for hv_name in cluster.enabled_hypervisors:
1911 hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1912
1913 for os_name, os_hvp in cluster.os_hvp.items():
1914 for hv_name, hv_params in os_hvp.items():
1915 if hv_params:
1916 full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1917 hvp_data.append(("os %s" % os_name, hv_name, full_params))
1918
1919 # TODO: collapse identical parameter values in a single one
1920 for instance in instances:
1921 if instance.hvparams:
1922 hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1923 cluster.FillHV(instance)))
1924
1925 return hvp_data
1926
1927
1928 class _VerifyErrors(object):
1929 """Mix-in for cluster/group verify LUs.
1930
1931 It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1932 self.op and self._feedback_fn to be available.)
1933
1934 """
1935
1936 ETYPE_FIELD = "code"
1937 ETYPE_ERROR = "ERROR"
1938 ETYPE_WARNING = "WARNING"
1939
1940 def _Error(self, ecode, item, msg, *args, **kwargs):
1941 """Format an error message.
1942
1943 Based on the opcode's error_codes parameter, either format a
1944 parseable error code, or a simpler error string.
1945
1946 This must be called only from Exec and functions called from Exec.
1947
1948 """
1949 ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1950 itype, etxt, _ = ecode
1951 # first complete the msg
1952 if args:
1953 msg = msg % args
1954 # then format the whole message
1955 if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1956 msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1957 else:
1958 if item:
1959 item = " " + item
1960 else:
1961 item = ""
1962 msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1963 # and finally report it via the feedback_fn
1964 self._feedback_fn(" - %s" % msg) # Mix-in. pylint: disable=E1101
1965
1966 def _ErrorIf(self, cond, ecode, *args, **kwargs):
1967 """Log an error message if the passed condition is True.
1968
1969 """
1970 cond = (bool(cond)
1971 or self.op.debug_simulate_errors) # pylint: disable=E1101
1972
1973 # If the error code is in the list of ignored errors, demote the error to a
1974 # warning
1975 (_, etxt, _) = ecode
1976 if etxt in self.op.ignore_errors: # pylint: disable=E1101
1977 kwargs[self.ETYPE_FIELD] = self.ETYPE_WARNING
1978
1979 if cond:
1980 self._Error(ecode, *args, **kwargs)
1981
1982 # do not mark the operation as failed for WARN cases only
1983 if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1984 self.bad = self.bad or cond
1985
1986
1987 class LUClusterVerify(NoHooksLU):
1988 """Submits all jobs necessary to verify the cluster.
1989
1990 """
1991 REQ_BGL = False
1992
1993 def ExpandNames(self):
1994 self.needed_locks = {}
1995
1996 def Exec(self, feedback_fn):
1997 jobs = []
1998
1999 if self.op.group_name:
2000 groups = [self.op.group_name]
2001 depends_fn = lambda: None
2002 else:
2003 groups = self.cfg.GetNodeGroupList()
2004
2005 # Verify global configuration
2006 jobs.append([
2007 opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors)
2008 ])
2009
2010 # Always depend on global verification
2011 depends_fn = lambda: [(-len(jobs), [])]
2012
2013 jobs.extend(
2014 [opcodes.OpClusterVerifyGroup(group_name=group,
2015 ignore_errors=self.op.ignore_errors,
2016 depends=depends_fn())]
2017 for group in groups)
2018
2019 # Fix up all parameters
2020 for op in itertools.chain(*jobs): # pylint: disable=W0142
2021 op.debug_simulate_errors = self.op.debug_simulate_errors
2022 op.verbose = self.op.verbose
2023 op.error_codes = self.op.error_codes
2024 try:
2025 op.skip_checks = self.op.skip_checks
2026 except AttributeError:
2027 assert not isinstance(op, opcodes.OpClusterVerifyGroup)
2028
2029 return ResultWithJobs(jobs)
2030
2031
2032 class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
2033 """Verifies the cluster config.
2034
2035 """
2036 REQ_BGL = False
2037
2038 def _VerifyHVP(self, hvp_data):
2039 """Verifies locally the syntax of the hypervisor parameters.
2040
2041 """
2042 for item, hv_name, hv_params in hvp_data:
2043 msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
2044 (item, hv_name))
2045 try:
2046 hv_class = hypervisor.GetHypervisor(hv_name)
2047 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
2048 hv_class.CheckParameterSyntax(hv_params)
2049 except errors.GenericError, err:
2050 self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
2051
2052 def ExpandNames(self):
2053 self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
2054 self.share_locks = _ShareAll()
2055
2056 def CheckPrereq(self):
2057 """Check prerequisites.
2058
2059 """
2060 # Retrieve all information
2061 self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
2062 self.all_node_info = self.cfg.GetAllNodesInfo()
2063 self.all_inst_info = self.cfg.GetAllInstancesInfo()
2064
2065 def Exec(self, feedback_fn):
2066 """Verify integrity of cluster, performing various test on nodes.
2067
2068 """
2069 self.bad = False
2070 self._feedback_fn = feedback_fn
2071
2072 feedback_fn("* Verifying cluster config")
2073
2074 for msg in self.cfg.VerifyConfig():
2075 self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
2076
2077 feedback_fn("* Verifying cluster certificate files")
2078
2079 for cert_filename in pathutils.ALL_CERT_FILES:
2080 (errcode, msg) = _VerifyCertificate(cert_filename)
2081 self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
2082
2083 feedback_fn("* Verifying hypervisor parameters")
2084
2085 self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
2086 self.all_inst_info.values()))
2087
2088 feedback_fn("* Verifying all nodes belong to an existing group")
2089
2090 # We do this verification here because, should this bogus circumstance
2091 # occur, it would never be caught by VerifyGroup, which only acts on
2092 # nodes/instances reachable from existing node groups.
2093
2094 dangling_nodes = set(node.name for node in self.all_node_info.values()
2095 if node.group not in self.all_group_info)
2096
2097 dangling_instances = {}
2098 no_node_instances = []
2099
2100 for inst in self.all_inst_info.values():
2101 if inst.primary_node in dangling_nodes:
2102 dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
2103 elif inst.primary_node not in self.all_node_info:
2104 no_node_instances.append(inst.name)
2105
2106 pretty_dangling = [
2107 "%s (%s)" %
2108 (node.name,
2109 utils.CommaJoin(dangling_instances.get(node.name,
2110 ["no instances"])))
2111 for node in dangling_nodes]
2112
2113 self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
2114 None,
2115 "the following nodes (and their instances) belong to a non"
2116 " existing group: %s", utils.CommaJoin(pretty_dangling))
2117
2118 self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
2119 None,
2120 "the following instances have a non-existing primary-node:"
2121 " %s", utils.CommaJoin(no_node_instances))
2122
2123 return not self.bad
2124
2125
2126 class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
2127 """Verifies the status of a node group.
2128
2129 """
2130 HPATH = "cluster-verify"
2131 HTYPE = constants.HTYPE_CLUSTER
2132 REQ_BGL = False
2133
2134 _HOOKS_INDENT_RE = re.compile("^", re.M)
2135
2136 class NodeImage(object):
2137 """A class representing the logical and physical status of a node.
2138
2139 @type name: string
2140 @ivar name: the node name to which this object refers
2141 @ivar volumes: a structure as returned from
2142 L{ganeti.backend.GetVolumeList} (runtime)
2143 @ivar instances: a list of running instances (runtime)
2144 @ivar pinst: list of configured primary instances (config)
2145 @ivar sinst: list of configured secondary instances (config)
2146 @ivar sbp: dictionary of {primary-node: list of instances} for all
2147 instances for which this node is secondary (config)
2148 @ivar mfree: free memory, as reported by hypervisor (runtime)
2149 @ivar dfree: free disk, as reported by the node (runtime)
2150 @ivar offline: the offline status (config)
2151 @type rpc_fail: boolean
2152 @ivar rpc_fail: whether the RPC verify call was successfull (overall,
2153 not whether the individual keys were correct) (runtime)
2154 @type lvm_fail: boolean
2155 @ivar lvm_fail: whether the RPC call didn't return valid LVM data
2156 @type hyp_fail: boolean
2157 @ivar hyp_fail: whether the RPC call didn't return the instance list
2158 @type ghost: boolean
2159 @ivar ghost: whether this is a known node or not (config)
2160 @type os_fail: boolean
2161 @ivar os_fail: whether the RPC call didn't return valid OS data
2162 @type oslist: list
2163 @ivar oslist: list of OSes as diagnosed by DiagnoseOS
2164 @type vm_capable: boolean
2165 @ivar vm_capable: whether the node can host instances
2166
2167 """
2168 def __init__(self, offline=False, name=None, vm_capable=True):
2169 self.name = name
2170 self.volumes = {}
2171 self.instances = []
2172 self.pinst = []
2173 self.sinst = []
2174 self.sbp = {}
2175 self.mfree = 0
2176 self.dfree = 0
2177 self.offline = offline
2178 self.vm_capable = vm_capable
2179 self.rpc_fail = False
2180 self.lvm_fail = False
2181 self.hyp_fail = False
2182 self.ghost = False
2183 self.os_fail = False
2184 self.oslist = {}
2185
2186 def ExpandNames(self):
2187 # This raises errors.OpPrereqError on its own:
2188 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
2189
2190 # Get instances in node group; this is unsafe and needs verification later
2191 inst_names = \
2192 self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
2193
2194 self.needed_locks = {
2195 locking.LEVEL_INSTANCE: inst_names,
2196 locking.LEVEL_NODEGROUP: [self.group_uuid],
2197 locking.LEVEL_NODE: [],
2198 }
2199
2200 self.share_locks = _ShareAll()
2201
2202 def DeclareLocks(self, level):
2203 if level == locking.LEVEL_NODE:
2204 # Get members of node group; this is unsafe and needs verification later
2205 nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
2206
2207 all_inst_info = self.cfg.GetAllInstancesInfo()
2208
2209 # In Exec(), we warn about mirrored instances that have primary and
2210 # secondary living in separate node groups. To fully verify that
2211 # volumes for these instances are healthy, we will need to do an
2212 # extra call to their secondaries. We ensure here those nodes will
2213 # be locked.
2214 for inst in self.owned_locks(locking.LEVEL_INSTANCE):
2215 # Important: access only the instances whose lock is owned
2216 if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
2217 nodes.update(all_inst_info[inst].secondary_nodes)
2218
2219 self.needed_locks[locking.LEVEL_NODE] = nodes
2220
2221 def CheckPrereq(self):
2222 assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
2223 self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
2224
2225 group_nodes = set(self.group_info.members)
2226 group_instances = \
2227 self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
2228
2229 unlocked_nodes = \
2230 group_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
2231
2232 unlocked_instances = \
2233 group_instances.difference(self.owned_locks(locking.LEVEL_INSTANCE))
2234
2235 if unlocked_nodes:
2236 raise errors.OpPrereqError("Missing lock for nodes: %s" %
2237 utils.CommaJoin(unlocked_nodes),
2238 errors.ECODE_STATE)
2239
2240 if unlocked_instances:
2241 raise errors.OpPrereqError("Missing lock for instances: %s" %
2242 utils.CommaJoin(unlocked_instances),
2243 errors.ECODE_STATE)
2244
2245 self.all_node_info = self.cfg.GetAllNodesInfo()
2246 self.all_inst_info = self.cfg.GetAllInstancesInfo()
2247
2248 self.my_node_names = utils.NiceSort(group_nodes)
2249 self.my_inst_names = utils.NiceSort(group_instances)
2250
2251 self.my_node_info = dict((name, self.all_node_info[name])
2252 for name in self.my_node_names)
2253
2254 self.my_inst_info = dict((name, self.all_inst_info[name])
2255 for name in self.my_inst_names)
2256
2257 # We detect here the nodes that will need the extra RPC calls for verifying
2258 # split LV volumes; they should be locked.
2259 extra_lv_nodes = set()
2260
2261 for inst in self.my_inst_info.values():
2262 if inst.disk_template in constants.DTS_INT_MIRROR:
2263 for nname in inst.all_nodes:
2264 if self.all_node_info[nname].group != self.group_uuid:
2265 extra_lv_nodes.add(nname)
2266
2267 unlocked_lv_nodes = \
2268 extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
2269
2270 if unlocked_lv_nodes:
2271 raise errors.OpPrereqError("Missing node locks for LV check: %s" %
2272 utils.CommaJoin(unlocked_lv_nodes),
2273 errors.ECODE_STATE)
2274 self.extra_lv_nodes = list(extra_lv_nodes)
2275
2276 def _VerifyNode(self, ninfo, nresult):
2277 """Perform some basic validation on data returned from a node.
2278
2279 - check the result data structure is well formed and has all the
2280 mandatory fields
2281 - check ganeti version
2282
2283 @type ninfo: L{objects.Node}
2284 @param ninfo: the node to check
2285 @param nresult: the results from the node
2286 @rtype: boolean
2287 @return: whether overall this call was successful (and we can expect
2288 reasonable values in the respose)
2289
2290 """
2291 node = ninfo.name
2292 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2293
2294 # main result, nresult should be a non-empty dict
2295 test = not nresult or not isinstance(nresult, dict)
2296 _ErrorIf(test, constants.CV_ENODERPC, node,
2297 "unable to verify node: no data returned")
2298 if test:
2299 return False
2300
2301 # compares ganeti version
2302 local_version = constants.PROTOCOL_VERSION
2303 remote_version = nresult.get("version", None)
2304 test = not (remote_version and
2305 isinstance(remote_version, (list, tuple)) and
2306 len(remote_version) == 2)
2307 _ErrorIf(test, constants.CV_ENODERPC, node,
2308 "connection to node returned invalid data")
2309 if test:
2310 return False
2311
2312 test = local_version != remote_version[0]
2313 _ErrorIf(test, constants.CV_ENODEVERSION, node,
2314 "incompatible protocol versions: master %s,"
2315 " node %s", local_version, remote_version[0])
2316 if test:
2317 return False
2318
2319 # node seems compatible, we can actually try to look into its results
2320
2321 # full package version
2322 self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
2323 constants.CV_ENODEVERSION, node,
2324 "software version mismatch: master %s, node %s",
2325 constants.RELEASE_VERSION, remote_version[1],
2326 code=self.ETYPE_WARNING)
2327
2328 hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
2329 if ninfo.vm_capable and isinstance(hyp_result, dict):
2330 for hv_name, hv_result in hyp_result.iteritems():
2331 test = hv_result is not None
2332 _ErrorIf(test, constants.CV_ENODEHV, node,
2333 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
2334
2335 hvp_result = nresult.get(constants.NV_HVPARAMS, None)
2336 if ninfo.vm_capable and isinstance(hvp_result, list):
2337 for item, hv_name, hv_result in hvp_result:
2338 _ErrorIf(True, constants.CV_ENODEHV, node,
2339 "hypervisor %s parameter verify failure (source %s): %s",
2340 hv_name, item, hv_result)
2341
2342 test = nresult.get(constants.NV_NODESETUP,
2343 ["Missing NODESETUP results"])
2344 _ErrorIf(test, constants.CV_ENODESETUP, node, "node setup error: %s",
2345 "; ".join(test))
2346
2347 return True
2348
2349 def _VerifyNodeTime(self, ninfo, nresult,
2350 nvinfo_starttime, nvinfo_endtime):
2351 """Check the node time.
2352
2353 @type ninfo: L{objects.Node}
2354 @param ninfo: the node to check
2355 @param nresult: the remote results for the node
2356 @param nvinfo_starttime: the start time of the RPC call
2357 @param nvinfo_endtime: the end time of the RPC call
2358
2359 """
2360 node = ninfo.name
2361 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2362
2363 ntime = nresult.get(constants.NV_TIME, None)
2364 try:
2365 ntime_merged = utils.MergeTime(ntime)
2366 except (ValueError, TypeError):
2367 _ErrorIf(True, constants.CV_ENODETIME, node, "Node returned invalid time")
2368 return
2369
2370 if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
2371 ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
2372 elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
2373 ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
2374 else:
2375 ntime_diff = None
2376
2377 _ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, node,
2378 "Node time diverges by at least %s from master node time",
2379 ntime_diff)
2380
2381 def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
2382 """Check the node LVM results.
2383
2384 @type ninfo: L{objects.Node}
2385 @param ninfo: the node to check
2386 @param nresult: the remote results for the node
2387 @param vg_name: the configured VG name
2388
2389 """
2390 if vg_name is None:
2391 return
2392
2393 node = ninfo.name
2394 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2395
2396 # checks vg existence and size > 20G
2397 vglist = nresult.get(constants.NV_VGLIST, None)
2398 test = not vglist
2399 _ErrorIf(test, constants.CV_ENODELVM, node, "unable to check volume groups")
2400 if not test:
2401 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
2402 constants.MIN_VG_SIZE)
2403 _ErrorIf(vgstatus, constants.CV_ENODELVM, node, vgstatus)
2404
2405 # check pv names
2406 pvlist = nresult.get(constants.NV_PVLIST, None)
2407 test = pvlist is None
2408 _ErrorIf(test, constants.CV_ENODELVM, node, "Can't get PV list from node")
2409 if not test:
2410 # check that ':' is not present in PV names, since it's a
2411 # special character for lvcreate (denotes the range of PEs to
2412 # use on the PV)
2413 for _, pvname, owner_vg in pvlist:
2414 test = ":" in pvname
2415 _ErrorIf(test, constants.CV_ENODELVM, node,
2416 "Invalid character ':' in PV '%s' of VG '%s'",
2417 pvname, owner_vg)
2418
2419 def _VerifyNodeBridges(self, ninfo, nresult, bridges):
2420 """Check the node bridges.
2421
2422 @type ninfo: L{objects.Node}
2423 @param ninfo: the node to check
2424 @param nresult: the remote results for the node
2425 @param bridges: the expected list of bridges
2426
2427 """
2428 if not bridges:
2429 return
2430
2431 node = ninfo.name
2432 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2433
2434 missing = nresult.get(constants.NV_BRIDGES, None)
2435 test = not isinstance(missing, list)
2436 _ErrorIf(test, constants.CV_ENODENET, node,
2437 "did not return valid bridge information")
2438 if not test:
2439 _ErrorIf(bool(missing), constants.CV_ENODENET, node,
2440 "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
2441
2442 def _VerifyNodeUserScripts(self, ninfo, nresult):
2443 """Check the results of user scripts presence and executability on the node
2444
2445 @type ninfo: L{objects.Node}
2446 @param ninfo: the node to check
2447 @param nresult: the remote results for the node
2448
2449 """
2450 node = ninfo.name
2451
2452 test = not constants.NV_USERSCRIPTS in nresult
2453 self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, node,
2454 "did not return user scripts information")
2455
2456 broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
2457 if not test:
2458 self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, node,
2459 "user scripts not present or not executable: %s" %
2460 utils.CommaJoin(sorted(broken_scripts)))
2461
2462 def _VerifyNodeNetwork(self, ninfo, nresult):
2463 """Check the node network connectivity results.
2464
2465 @type ninfo: L{objects.Node}
2466 @param ninfo: the node to check
2467 @param nresult: the remote results for the node
2468
2469 """
2470 node = ninfo.name
2471 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2472
2473 test = constants.NV_NODELIST not in nresult
2474 _ErrorIf(test, constants.CV_ENODESSH, node,
2475 "node hasn't returned node ssh connectivity data")
2476 if not test:
2477 if nresult[constants.NV_NODELIST]:
2478 for a_node, a_msg in nresult[constants.NV_NODELIST].items():
2479 _ErrorIf(True, constants.CV_ENODESSH, node,
2480 "ssh communication with node '%s': %s", a_node, a_msg)
2481
2482 test = constants.NV_NODENETTEST not in nresult
2483 _ErrorIf(test, constants.CV_ENODENET, node,
2484 "node hasn't returned node tcp connectivity data")
2485 if not test:
2486 if nresult[constants.NV_NODENETTEST]:
2487 nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
2488 for anode in nlist:
2489 _ErrorIf(True, constants.CV_ENODENET, node,
2490 "tcp communication with node '%s': %s",
2491 anode, nresult[constants.NV_NODENETTEST][anode])
2492
2493 test = constants.NV_MASTERIP not in nresult
2494 _ErrorIf(test, constants.CV_ENODENET, node,
2495 "node hasn't returned node master IP reachability data")
2496 if not test:
2497 if not nresult[constants.NV_MASTERIP]:
2498 if node == self.master_node:
2499 msg = "the master node cannot reach the master IP (not configured?)"
2500 else:
2501 msg = "cannot reach the master IP"
2502 _ErrorIf(True, constants.CV_ENODENET, node, msg)
2503
2504 def _VerifyInstance(self, instance, instanceconfig, node_image,
2505 diskstatus):
2506 """Verify an instance.
2507
2508 This function checks to see if the required block devices are
2509 available on the instance's node.
2510
2511 """
2512 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2513 node_current = instanceconfig.primary_node
2514
2515 node_vol_should = {}
2516 instanceconfig.MapLVsByNode(node_vol_should)
2517
2518 cluster = self.cfg.GetClusterInfo()
2519 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2520 self.group_info)
2521 err = _ComputeIPolicyInstanceViolation(ipolicy, instanceconfig)
2522 _ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance, utils.CommaJoin(err))
2523
2524 for node in node_vol_should:
2525 n_img = node_image[node]
2526 if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2527 # ignore missing volumes on offline or broken nodes
2528 continue
2529 for volume in node_vol_should[node]:
2530 test = volume not in n_img.volumes
2531 _ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance,
2532 "volume %s missing on node %s", volume, node)
2533
2534 if instanceconfig.admin_state == constants.ADMINST_UP:
2535 pri_img = node_image[node_current]
2536 test = instance not in pri_img.instances and not pri_img.offline
2537 _ErrorIf(test, constants.CV_EINSTANCEDOWN, instance,
2538 "instance not running on its primary node %s",
2539 node_current)
2540
2541 diskdata = [(nname, success, status, idx)
2542 for (nname, disks) in diskstatus.items()
2543 for idx, (success, status) in enumerate(disks)]
2544
2545 for nname, success, bdev_status, idx in diskdata:
2546 # the 'ghost node' construction in Exec() ensures that we have a
2547 # node here
2548 snode = node_image[nname]
2549 bad_snode = snode.ghost or snode.offline
2550 _ErrorIf(instanceconfig.admin_state == constants.ADMINST_UP and
2551 not success and not bad_snode,
2552 constants.CV_EINSTANCEFAULTYDISK, instance,
2553 "couldn't retrieve status for disk/%s on %s: %s",
2554 idx, nname, bdev_status)
2555 _ErrorIf((instanceconfig.admin_state == constants.ADMINST_UP and
2556 success and bdev_status.ldisk_status == constants.LDS_FAULTY),
2557 constants.CV_EINSTANCEFAULTYDISK, instance,
2558 "disk/%s on %s is faulty", idx, nname)
2559
2560 def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2561 """Verify if there are any unknown volumes in the cluster.
2562
2563 The .os, .swap and backup volumes are ignored. All other volumes are
2564 reported as unknown.
2565
2566 @type reserved: L{ganeti.utils.FieldSet}
2567 @param reserved: a FieldSet of reserved volume names
2568
2569 """
2570 for node, n_img in node_image.items():
2571 if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2572 self.all_node_info[node].group != self.group_uuid):
2573 # skip non-healthy nodes
2574 continue
2575 for volume in n_img.volumes:
2576 test = ((node not in node_vol_should or
2577 volume not in node_vol_should[node]) and
2578 not reserved.Matches(volume))
2579 self._ErrorIf(test, constants.CV_ENODEORPHANLV, node,
2580 "volume %s is unknown", volume)
2581
2582 def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
2583 """Verify N+1 Memory Resilience.
2584
2585 Check that if one single node dies we can still start all the
2586 instances it was primary for.
2587
2588 """
2589 cluster_info = self.cfg.GetClusterInfo()
2590 for node, n_img in node_image.items():
2591 # This code checks that every node which is now listed as
2592 # secondary has enough memory to host all instances it is
2593 # supposed to should a single other node in the cluster fail.
2594 # FIXME: not ready for failover to an arbitrary node
2595 # FIXME: does not support file-backed instances
2596 # WARNING: we currently take into account down instances as well
2597 # as up ones, considering that even if they're down someone
2598 # might want to start them even in the event of a node failure.
2599 if n_img.offline or self.all_node_info[node].group != self.group_uuid:
2600 # we're skipping nodes marked offline and nodes in other groups from
2601 # the N+1 warning, since most likely we don't have good memory
2602 # infromation from them; we already list instances living on such
2603 # nodes, and that's enough warning
2604 continue
2605 #TODO(dynmem): also consider ballooning out other instances
2606 for prinode, instances in n_img.sbp.items():
2607 needed_mem = 0
2608 for instance in instances:
2609 bep = cluster_info.FillBE(instance_cfg[instance])
2610 if bep[constants.BE_AUTO_BALANCE]:
2611 needed_mem += bep[constants.BE_MINMEM]
2612 test = n_img.mfree < needed_mem
2613 self._ErrorIf(test, constants.CV_ENODEN1, node,
2614 "not enough memory to accomodate instance failovers"
2615 " should node %s fail (%dMiB needed, %dMiB available)",
2616 prinode, needed_mem, n_img.mfree)
2617
2618 @classmethod
2619 def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
2620 (files_all, files_opt, files_mc, files_vm)):
2621 """Verifies file checksums collected from all nodes.
2622
2623 @param errorif: Callback for reporting errors
2624 @param nodeinfo: List of L{objects.Node} objects
2625 @param master_node: Name of master node
2626 @param all_nvinfo: RPC results
2627
2628 """
2629 # Define functions determining which nodes to consider for a file
2630 files2nodefn = [
2631 (files_all, None),
2632 (files_mc, lambda node: (node.master_candidate or
2633 node.name == master_node)),
2634 (files_vm, lambda node: node.vm_capable),
2635 ]
2636
2637 # Build mapping from filename to list of nodes which should have the file
2638 nodefiles = {}
2639 for (files, fn) in files2nodefn:
2640 if fn is None:
2641 filenodes = nodeinfo
2642 else:
2643 filenodes = filter(fn, nodeinfo)
2644 nodefiles.update((filename,
2645 frozenset(map(operator.attrgetter("name"), filenodes)))
2646 for filename in files)
2647
2648 assert set(nodefiles) == (files_all | files_mc | files_vm)
2649
2650 fileinfo = dict((filename, {}) for filename in nodefiles)
2651 ignore_nodes = set()
2652
2653 for node in nodeinfo:
2654 if node.offline:
2655 ignore_nodes.add(node.name)
2656 continue
2657
2658 nresult = all_nvinfo[node.name]
2659
2660 if nresult.fail_msg or not nresult.payload:
2661 node_files = None
2662 else:
2663 fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2664 node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2665 for (key, value) in fingerprints.items())
2666 del fingerprints
2667
2668 test = not (node_files and isinstance(node_files, dict))
2669 errorif(test, constants.CV_ENODEFILECHECK, node.name,
2670 "Node did not return file checksum data")
2671 if test:
2672 ignore_nodes.add(node.name)
2673 continue
2674
2675 # Build per-checksum mapping from filename to nodes having it
2676 for (filename, checksum) in node_files.items():
2677 assert filename in nodefiles
2678 fileinfo[filename].setdefault(checksum, set()).add(node.name)
2679
2680 for (filename, checksums) in fileinfo.items():
2681 assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2682
2683 # Nodes having the file
2684 with_file = frozenset(node_name
2685 for nodes in fileinfo[filename].values()
2686 for node_name in nodes) - ignore_nodes
2687
2688 expected_nodes = nodefiles[filename] - ignore_nodes
2689
2690 # Nodes missing file
2691 missing_file = expected_nodes - with_file
2692
2693 if filename in files_opt:
2694 # All or no nodes
2695 errorif(missing_file and missing_file != expected_nodes,
2696 constants.CV_ECLUSTERFILECHECK, None,
2697 "File %s is optional, but it must exist on all or no"
2698 " nodes (not found on %s)",
2699 filename, utils.CommaJoin(utils.NiceSort(missing_file)))
2700 else:
2701 errorif(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2702 "File %s is missing from node(s) %s", filename,
2703 utils.CommaJoin(utils.NiceSort(missing_file)))
2704
2705 # Warn if a node has a file it shouldn't
2706 unexpected = with_file - expected_nodes
2707 errorif(unexpected,
2708 constants.CV_ECLUSTERFILECHECK, None,
2709 "File %s should not exist on node(s) %s",
2710 filename, utils.CommaJoin(utils.NiceSort(unexpected)))
2711
2712 # See if there are multiple versions of the file
2713 test = len(checksums) > 1
2714 if test:
2715 variants = ["variant %s on %s" %
2716 (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
2717 for (idx, (checksum, nodes)) in
2718 enumerate(sorted(checksums.items()))]
2719 else:
2720 variants = []
2721
2722 errorif(test, constants.CV_ECLUSTERFILECHECK, None,
2723 "File %s found with %s different checksums (%s)",
2724 filename, len(checksums), "; ".join(variants))
2725
2726 def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2727 drbd_map):
2728 """Verifies and the node DRBD status.
2729
2730 @type ninfo: L{objects.Node}
2731 @param ninfo: the node to check
2732 @param nresult: the remote results for the node
2733 @param instanceinfo: the dict of instances
2734 @param drbd_helper: the configured DRBD usermode helper
2735 @param drbd_map: the DRBD map as returned by
2736 L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2737
2738 """
2739 node = ninfo.name
2740 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2741
2742 if drbd_helper:
2743 helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2744 test = (helper_result is None)
2745 _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2746 "no drbd usermode helper returned")
2747 if helper_result:
2748 status, payload = helper_result
2749 test = not status
2750 _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2751 "drbd usermode helper check unsuccessful: %s", payload)
2752 test = status and (payload != drbd_helper)
2753 _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2754 "wrong drbd usermode helper: %s", payload)
2755
2756 # compute the DRBD minors
2757 node_drbd = {}
2758 for minor, instance in drbd_map[node].items():
2759 test = instance not in instanceinfo
2760 _ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2761 "ghost instance '%s' in temporary DRBD map", instance)
2762 # ghost instance should not be running, but otherwise we
2763 # don't give double warnings (both ghost instance and
2764 # unallocated minor in use)
2765 if test:
2766 node_drbd[minor] = (instance, False)
2767 else:
2768 instance = instanceinfo[instance]
2769 node_drbd[minor] = (instance.name,
2770 instance.admin_state == constants.ADMINST_UP)
2771
2772 # and now check them
2773 used_minors = nresult.get(constants.NV_DRBDLIST, [])
2774 test = not isinstance(used_minors, (tuple, list))
2775 _ErrorIf(test, constants.CV_ENODEDRBD, node,
2776 "cannot parse drbd status file: %s", str(used_minors))
2777 if test:
2778 # we cannot check drbd status
2779 return
2780
2781 for minor, (iname, must_exist) in node_drbd.items():
2782 test = minor not in used_minors and must_exist
2783 _ErrorIf(test, constants.CV_ENODEDRBD, node,
2784 "drbd minor %d of instance %s is not active", minor, iname)
2785 for minor in used_minors:
2786 test = minor not in node_drbd
2787 _ErrorIf(test, constants.CV_ENODEDRBD, node,
2788 "unallocated drbd minor %d is in use", minor)
2789
2790 def _UpdateNodeOS(self, ninfo, nresult, nimg):
2791 """Builds the node OS structures.
2792
2793 @type ninfo: L{objects.Node}
2794 @param ninfo: the node to check
2795 @param nresult: the remote results for the node
2796 @param nimg: the node image object
2797
2798 """
2799 node = ninfo.name
2800 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2801
2802 remote_os = nresult.get(constants.NV_OSLIST, None)
2803 test = (not isinstance(remote_os, list) or
2804 not compat.all(isinstance(v, list) and len(v) == 7
2805 for v in remote_os))
2806
2807 _ErrorIf(test, constants.CV_ENODEOS, node,
2808 "node hasn't returned valid OS data")
2809
2810 nimg.os_fail = test
2811
2812 if test:
2813 return
2814
2815 os_dict = {}
2816
2817 for (name, os_path, status, diagnose,
2818 variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2819
2820 if name not in os_dict:
2821 os_dict[name] = []
2822
2823 # parameters is a list of lists instead of list of tuples due to
2824 # JSON lacking a real tuple type, fix it:
2825 parameters = [tuple(v) for v in parameters]
2826 os_dict[name].append((os_path, status, diagnose,
2827 set(variants), set(parameters), set(api_ver)))
2828
2829 nimg.oslist = os_dict
2830
2831 def _VerifyNodeOS(self, ninfo, nimg, base):
2832 """Verifies the node OS list.
2833
2834 @type ninfo: L{objects.Node}
2835 @param ninfo: the node to check
2836 @param nimg: the node image object
2837 @param base: the 'template' node we match against (e.g. from the master)
2838
2839 """
2840 node = ninfo.name
2841 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2842
2843 assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2844
2845 beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2846 for os_name, os_data in nimg.oslist.items():
2847 assert os_data, "Empty OS status for OS %s?!" % os_name
2848 f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2849 _ErrorIf(not f_status, constants.CV_ENODEOS, node,
2850 "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
2851 _ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, node,
2852 "OS '%s' has multiple entries (first one shadows the rest): %s",
2853 os_name, utils.CommaJoin([v[0] for v in os_data]))
2854 # comparisons with the 'base' image
2855 test = os_name not in base.oslist
2856 _ErrorIf(test, constants.CV_ENODEOS, node,
2857 "Extra OS %s not present on reference node (%s)",
2858 os_name, base.name)
2859 if test:
2860 continue
2861 assert base.oslist[os_name], "Base node has empty OS status?"
2862 _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2863 if not b_status:
2864 # base OS is invalid, skipping
2865 continue
2866 for kind, a, b in [("API version", f_api, b_api),
2867 ("variants list", f_var, b_var),
2868 ("parameters", beautify_params(f_param),
2869 beautify_params(b_param))]:
2870 _ErrorIf(a != b, constants.CV_ENODEOS, node,
2871 "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
2872 kind, os_name, base.name,
2873 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2874
2875 # check any missing OSes
2876 missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2877 _ErrorIf(missing, constants.CV_ENODEOS, node,
2878 "OSes present on reference node %s but missing on this node: %s",
2879 base.name, utils.CommaJoin(missing))
2880
2881 def _VerifyFileStoragePaths(self, ninfo, nresult, is_master):
2882 """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2883
2884 @type ninfo: L{objects.Node}
2885 @param ninfo: the node to check
2886 @param nresult: the remote results for the node
2887 @type is_master: bool
2888 @param is_master: Whether node is the master node
2889
2890 """
2891 node = ninfo.name
2892
2893 if (is_master and
2894 (constants.ENABLE_FILE_STORAGE or
2895 constants.ENABLE_SHARED_FILE_STORAGE)):
2896 try:
2897 fspaths = nresult[constants.NV_FILE_STORAGE_PATHS]
2898 except KeyError:
2899 # This should never happen
2900 self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, node,
2901 "Node did not return forbidden file storage paths")
2902 else:
2903 self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, node,
2904 "Found forbidden file storage paths: %s",
2905 utils.CommaJoin(fspaths))
2906 else:
2907 self._ErrorIf(constants.NV_FILE_STORAGE_PATHS in nresult,
2908 constants.CV_ENODEFILESTORAGEPATHS, node,
2909 "Node should not have returned forbidden file storage"
2910 " paths")
2911
2912 def _VerifyOob(self, ninfo, nresult):
2913 """Verifies out of band functionality of a node.
2914
2915 @type ninfo: L{objects.Node}
2916 @param ninfo: the node to check
2917 @param nresult: the remote results for the node
2918
2919 """
2920 node = ninfo.name
2921 # We just have to verify the paths on master and/or master candidates
2922 # as the oob helper is invoked on the master
2923 if ((ninfo.master_candidate or ninfo.master_capable) and
2924 constants.NV_OOB_PATHS in nresult):
2925 for path_result in nresult[constants.NV_OOB_PATHS]:
2926 self._ErrorIf(path_result, constants.CV_ENODEOOBPATH, node, path_result)
2927
2928 def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2929 """Verifies and updates the node volume data.
2930
2931 This function will update a L{NodeImage}'s internal structures
2932 with data from the remote call.
2933
2934 @type ninfo: L{objects.Node}
2935 @param ninfo: the node to check
2936 @param nresult: the remote results for the node
2937 @param nimg: the node image object
2938 @param vg_name: the configured VG name
2939
2940 """
2941 node = ninfo.name
2942 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2943
2944 nimg.lvm_fail = True
2945 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2946 if vg_name is None:
2947 pass
2948 elif isinstance(lvdata, basestring):
2949 _ErrorIf(True, constants.CV_ENODELVM, node, "LVM problem on node: %s",
2950 utils.SafeEncode(lvdata))
2951 elif not isinstance(lvdata, dict):
2952 _ErrorIf(True, constants.CV_ENODELVM, node,
2953 "rpc call to node failed (lvlist)")
2954 else:
2955 nimg.volumes = lvdata
2956 nimg.lvm_fail = False
2957
2958 def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2959 """Verifies and updates the node instance list.
2960
2961 If the listing was successful, then updates this node's instance
2962 list. Otherwise, it marks the RPC call as failed for the instance
2963 list key.
2964
2965 @type ninfo: L{objects.Node}
2966 @param ninfo: the node to check
2967 @param nresult: the remote results for the node
2968 @param nimg: the node image object
2969
2970 """
2971 idata = nresult.get(constants.NV_INSTANCELIST, None)
2972 test = not isinstance(idata, list)
2973 self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2974 "rpc call to node failed (instancelist): %s",
2975 utils.SafeEncode(str(idata)))
2976 if test:
2977 nimg.hyp_fail = True
2978 else:
2979 nimg.instances = idata
2980
2981 def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2982 """Verifies and computes a node information map
2983
2984 @type ninfo: L{objects.Node}
2985 @param ninfo: the node to check
2986 @param nresult: the remote results for the node
2987 @param nimg: the node image object
2988 @param vg_name: the configured VG name
2989
2990 """
2991 node = ninfo.name
2992 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2993
2994 # try to read free memory (from the hypervisor)
2995 hv_info = nresult.get(constants.NV_HVINFO, None)
2996 test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2997 _ErrorIf(test, constants.CV_ENODEHV, node,
2998 "rpc call to node failed (hvinfo)")
2999 if not test:
3000 try:
3001 nimg.mfree = int(hv_info["memory_free"])
3002 except (ValueError, TypeError):
3003 _ErrorIf(True, constants.CV_ENODERPC, node,
3004 "node returned invalid nodeinfo, check hypervisor")
3005
3006 # FIXME: devise a free space model for file based instances as well
3007 if vg_name is not None:
3008 test = (constants.NV_VGLIST not in nresult or
3009 vg_name not in nresult[constants.NV_VGLIST])
3010 _ErrorIf(test, constants.CV_ENODELVM, node,
3011 "node didn't return data for the volume group '%s'"
3012 " - it is either missing or broken", vg_name)
3013 if not test:
3014 try:
3015 nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
3016 except (ValueError, TypeError):
3017 _ErrorIf(True, constants.CV_ENODERPC, node,
3018 "node returned invalid LVM info, check LVM status")
3019
3020 def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
3021 """Gets per-disk status information for all instances.
3022
3023 @type nodelist: list of strings
3024 @param nodelist: Node names
3025 @type node_image: dict of (name, L{objects.Node})
3026 @param node_image: Node objects
3027 @type instanceinfo: dict of (name, L{objects.Instance})
3028 @param instanceinfo: Instance objects
3029 @rtype: {instance: {node: [(succes, payload)]}}
3030 @return: a dictionary of per-instance dictionaries with nodes as
3031 keys and disk information as values; the disk information is a
3032 list of tuples (success, payload)
3033
3034 """
3035 _ErrorIf = self._ErrorIf # pylint: disable=C0103
3036
3037 node_disks = {}
3038 node_disks_devonly = {}
3039 diskless_instances = set()
3040 diskless = constants.DT_DISKLESS
3041
3042 for nname in nodelist:
3043 node_instances = list(itertools.chain(node_image[nname].pinst,
3044 node_image[nname].sinst))
3045 diskless_instances.update(inst for inst in node_instances
3046 if instanceinfo[inst].disk_template == diskless)
3047 disks = [(inst, disk)
3048 for inst in node_instances
3049 for disk in instanceinfo[inst].disks]
3050
3051 if not disks:
3052 # No need to collect data
3053 continue
3054
3055 node_disks[nname] = disks
3056
3057 # _AnnotateDiskParams makes already copies of the disks
3058 devonly = []
3059 for (inst, dev) in disks:
3060 (anno_disk,) = _AnnotateDiskParams(instanceinfo[inst], [dev], self.cfg)
3061 self.cfg.SetDiskID(anno_disk, nname)
3062 devonly.append(anno_disk)
3063
3064 node_disks_devonly[nname] = devonly
3065
3066 assert len(node_disks) == len(node_disks_devonly)
3067
3068 # Collect data from all nodes with disks
3069 result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
3070 node_disks_devonly)
3071
3072 assert len(result) == len(node_disks)
3073
3074 instdisk = {}
3075
3076 for (nname, nres) in result.items():
3077 disks = node_disks[nname]
3078
3079 if nres.offline:
3080 # No data from this node
3081 data = len(disks) * [(False, "node offline")]
3082 else:
3083 msg = nres.fail_msg
3084 _ErrorIf(msg, constants.CV_ENODERPC, nname,
3085 "while getting disk information: %s", msg)
3086 if msg:
3087 # No data from this node
3088 data = len(disks) * [(False, msg)]
3089 else:
3090 data = []
3091 for idx, i in enumerate(nres.payload):
3092 if isinstance(i, (tuple, list)) and len(i) == 2:
3093 data.append(i)
3094 else:
3095 logging.warning("Invalid result from node %s, entry %d: %s",
3096 nname, idx, i)
3097 data.append((False, "Invalid result from the remote node"))
3098
3099 for ((inst, _), status) in zip(disks, data):
3100 instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
3101
3102 # Add empty entries for diskless instances.
3103 for inst in diskless_instances:
3104 assert inst not in instdisk
3105 instdisk[inst] = {}
3106
3107 assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
3108 len(nnames) <= len(instanceinfo[inst].all_nodes) and
3109 compat.all(isinstance(s, (tuple, list)) and
3110 len(s) == 2 for s in statuses)
3111 for inst, nnames in instdisk.items()
3112 for nname, statuses in nnames.items())
3113 assert set(instdisk) == set(instanceinfo), "instdisk consistency failure"
3114
3115 return instdisk
3116
3117 @staticmethod
3118 def _SshNodeSelector(group_uuid, all_nodes):
3119 """Create endless iterators for all potential SSH check hosts.
3120
3121 """
3122 nodes = [node for node in all_nodes
3123 if (node.group != group_uuid and
3124 not node.offline)]
3125 keyfunc = operator.attrgetter("group")
3126
3127 return map(itertools.cycle,
3128 [sorted(map(operator.attrgetter("name"), names))
3129 for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
3130 keyfunc)])
3131
3132 @classmethod
3133 def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
3134 """Choose which nodes should talk to which other nodes.
3135
3136 We will make nodes contact all nodes in their group, and one node from
3137 every other group.
3138
3139 @warning: This algorithm has a known issue if one node group is much
3140 smaller than others (e.g. just one node). In such a case all other
3141 nodes will talk to the single node.
3142
3143 """
3144 online_nodes = sorted(node.name for node in group_nodes if not node.offline)
3145 sel = cls._SshNodeSelector(group_uuid, all_nodes)
3146
3147 return (online_nodes,
3148 dict((name, sorted([i.next() for i in sel]))
3149 for name in online_nodes))
3150
3151 def BuildHooksEnv(self):
3152 """Build hooks env.
3153
3154 Cluster-Verify hooks just ran in the post phase and their failure makes
3155 the output be logged in the verify output and the verification to fail.
3156
3157 """
3158 env = {
3159 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
3160 }
3161
3162 env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
3163 for node in self.my_node_info.values())
3164
3165 return env
3166
3167 def BuildHooksNodes(self):
3168 """Build hooks nodes.
3169
3170 """
3171 return ([], self.my_node_names)
3172
3173 def Exec(self, feedback_fn):
3174 """Verify integrity of the node group, performing various test on nodes.
3175
3176 """
3177 # This method has too many local variables. pylint: disable=R0914
3178 feedback_fn("* Verifying group '%s'" % self.group_info.name)
3179
3180 if not self.my_node_names:
3181 # empty node group
3182 feedback_fn("* Empty node group, skipping verification")
3183 return True
3184
3185 self.bad = False
3186 _ErrorIf = self._ErrorIf # pylint: disable=C0103
3187 verbose = self.op.verbose
3188 self._feedback_fn = feedback_fn
3189
3190 vg_name = self.cfg.GetVGName()
3191 drbd_helper = self.cfg.GetDRBDHelper()
3192 cluster = self.cfg.GetClusterInfo()
3193 groupinfo = self.cfg.GetAllNodeGroupsInfo()
3194 hypervisors = cluster.enabled_hypervisors
3195 node_data_list = [self.my_node_info[name] for name in self.my_node_names]
3196
3197 i_non_redundant = [] # Non redundant instances
3198 i_non_a_balanced = [] # Non auto-balanced instances
3199 i_offline = 0 # Count of offline instances
3200 n_offline = 0 # Count of offline nodes
3201 n_drained = 0 # Count of nodes being drained
3202 node_vol_should = {}
3203
3204 # FIXME: verify OS list
3205
3206 # File verification
3207 filemap = _ComputeAncillaryFiles(cluster, False)
3208
3209 # do local checksums
3210 master_node = self.master_node = self.cfg.GetMasterNode()
3211 master_ip = self.cfg.GetMasterIP()
3212
3213 feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
3214
3215 user_scripts = []
3216 if self.cfg.GetUseExternalMipScript():
3217 user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
3218
3219 node_verify_param = {
3220 constants.NV_FILELIST:
3221 map(vcluster.MakeVirtualPath,
3222 utils.UniqueSequence(filename
3223 for files in filemap
3224 for filename in files)),
3225 constants.NV_NODELIST:
3226 self._SelectSshCheckNodes(node_data_list, self.group_uuid,
3227 self.all_node_info.values()),
3228 constants.NV_HYPERVISOR: hypervisors,
3229 constants.NV_HVPARAMS:
3230 _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
3231 constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
3232 for node in node_data_list
3233 if not node.offline],
3234 constants.NV_INSTANCELIST: hypervisors,
3235 constants.NV_VERSION: None,
3236 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
3237 constants.NV_NODESETUP: None,
3238 constants.NV_TIME: None,
3239 constants.NV_MASTERIP: (master_node, master_ip),
3240 constants.NV_OSLIST: None,
3241 constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
3242 constants.NV_USERSCRIPTS: user_scripts,
3243 }
3244
3245 if vg_name is not None:
3246 node_verify_param[constants.NV_VGLIST] = None
3247 node_verify_param[constants.NV_LVLIST] = vg_name
3248 node_verify_param[constants.NV_PVLIST] = [vg_name]
3249
3250 if drbd_helper:
3251 node_verify_param[constants.NV_DRBDLIST] = None
3252 node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
3253
3254 if constants.ENABLE_FILE_STORAGE or constants.ENABLE_SHARED_FILE_STORAGE:
3255 # Load file storage paths only from master node
3256 node_verify_param[constants.NV_FILE_STORAGE_PATHS] = master_node
3257
3258 # bridge checks
3259 # FIXME: this needs to be changed per node-group, not cluster-wide
3260 bridges = set()
3261 default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
3262 if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3263 bridges.add(default_nicpp[constants.NIC_LINK])
3264 for instance in self.my_inst_info.values():
3265 for nic in instance.nics:
3266 full_nic = cluster.SimpleFillNIC(nic.nicparams)
3267 if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3268 bridges.add(full_nic[constants.NIC_LINK])
3269
3270 if bridges:
3271 node_verify_param[constants.NV_BRIDGES] = list(bridges)
3272
3273 # Build our expected cluster state
3274 node_image = dict((node.name, self.NodeImage(offline=node.offline,
3275 name=node.name,
3276 vm_capable=node.vm_capable))
3277 for node in node_data_list)
3278
3279 # Gather OOB paths
3280 oob_paths = []
3281 for node in self.all_node_info.values():
3282 path = _SupportsOob(self.cfg, node)
3283 if path and path not in oob_paths:
3284 oob_paths.append(path)
3285
3286 if oob_paths:
3287 node_verify_param[constants.NV_OOB_PATHS] = oob_paths
3288
3289 for instance in self.my_inst_names:
3290 inst_config = self.my_inst_info[instance]
3291 if inst_config.admin_state == constants.ADMINST_OFFLINE:
3292 i_offline += 1
3293
3294 for nname in inst_config.all_nodes:
3295 if nname not in node_image:
3296 gnode = self.NodeImage(name=nname)
3297 gnode.ghost = (nname not in self.all_node_info)
3298 node_image[nname] = gnode
3299
3300 inst_config.MapLVsByNode(node_vol_should)
3301
3302 pnode = inst_config.primary_node
3303 node_image[pnode].pinst.append(instance)
3304
3305 for snode in inst_config.secondary_nodes:
3306 nimg = node_image[snode]
3307 nimg.sinst.append(instance)
3308 if pnode not in nimg.sbp:
3309 nimg.sbp[pnode] = []
3310 nimg.sbp[pnode].append(instance)
3311
3312 # At this point, we have the in-memory data structures complete,
3313 # except for the runtime information, which we'll gather next
3314
3315 # Due to the way our RPC system works, exact response times cannot be
3316 # guaranteed (e.g. a broken node could run into a timeout). By keeping the
3317 # time before and after executing the request, we can at least have a time
3318 # window.
3319 nvinfo_starttime = time.time()
3320 all_nvinfo = self.rpc.call_node_verify(self.my_node_names,
3321 node_verify_param,
3322 self.cfg.GetClusterName())
3323 nvinfo_endtime = time.time()
3324
3325 if self.extra_lv_nodes and vg_name is not None:
3326 extra_lv_nvinfo = \
3327 self.rpc.call_node_verify(self.extra_lv_nodes,
3328 {constants.NV_LVLIST: vg_name},
3329 self.cfg.GetClusterName())
3330 else:
333