eececc5129ce14efa37707f24e124cd9d9951fe9
[ganeti-github.git] / lib / cmdlib.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable=W0201,C0302
25
26 # W0201 since most LU attributes are defined in CheckPrereq or similar
27 # functions
28
29 # C0302: since we have waaaay too many lines in this module
30
31 import os
32 import os.path
33 import time
34 import re
35 import logging
36 import copy
37 import OpenSSL
38 import socket
39 import tempfile
40 import shutil
41 import itertools
42 import operator
43 import ipaddr
44
45 from ganeti import ssh
46 from ganeti import utils
47 from ganeti import errors
48 from ganeti import hypervisor
49 from ganeti import locking
50 from ganeti import constants
51 from ganeti import objects
52 from ganeti import ssconf
53 from ganeti import uidpool
54 from ganeti import compat
55 from ganeti import masterd
56 from ganeti import netutils
57 from ganeti import query
58 from ganeti import qlang
59 from ganeti import opcodes
60 from ganeti import ht
61 from ganeti import rpc
62 from ganeti import runtime
63 from ganeti import pathutils
64 from ganeti import vcluster
65 from ganeti import network
66 from ganeti.masterd import iallocator
67
68 import ganeti.masterd.instance # pylint: disable=W0611
69
70
71 # States of instance
72 INSTANCE_DOWN = [constants.ADMINST_DOWN]
73 INSTANCE_ONLINE = [constants.ADMINST_DOWN, constants.ADMINST_UP]
74 INSTANCE_NOT_RUNNING = [constants.ADMINST_DOWN, constants.ADMINST_OFFLINE]
75
76 #: Instance status in which an instance can be marked as offline/online
77 CAN_CHANGE_INSTANCE_OFFLINE = (frozenset(INSTANCE_DOWN) | frozenset([
78 constants.ADMINST_OFFLINE,
79 ]))
80
81
82 class ResultWithJobs:
83 """Data container for LU results with jobs.
84
85 Instances of this class returned from L{LogicalUnit.Exec} will be recognized
86 by L{mcpu._ProcessResult}. The latter will then submit the jobs
87 contained in the C{jobs} attribute and include the job IDs in the opcode
88 result.
89
90 """
91 def __init__(self, jobs, **kwargs):
92 """Initializes this class.
93
94 Additional return values can be specified as keyword arguments.
95
96 @type jobs: list of lists of L{opcode.OpCode}
97 @param jobs: A list of lists of opcode objects
98
99 """
100 self.jobs = jobs
101 self.other = kwargs
102
103
104 class LogicalUnit(object):
105 """Logical Unit base class.
106
107 Subclasses must follow these rules:
108 - implement ExpandNames
109 - implement CheckPrereq (except when tasklets are used)
110 - implement Exec (except when tasklets are used)
111 - implement BuildHooksEnv
112 - implement BuildHooksNodes
113 - redefine HPATH and HTYPE
114 - optionally redefine their run requirements:
115 REQ_BGL: the LU needs to hold the Big Ganeti Lock exclusively
116
117 Note that all commands require root permissions.
118
119 @ivar dry_run_result: the value (if any) that will be returned to the caller
120 in dry-run mode (signalled by opcode dry_run parameter)
121
122 """
123 HPATH = None
124 HTYPE = None
125 REQ_BGL = True
126
127 def __init__(self, processor, op, context, rpc_runner):
128 """Constructor for LogicalUnit.
129
130 This needs to be overridden in derived classes in order to check op
131 validity.
132
133 """
134 self.proc = processor
135 self.op = op
136 self.cfg = context.cfg
137 self.glm = context.glm
138 # readability alias
139 self.owned_locks = context.glm.list_owned
140 self.context = context
141 self.rpc = rpc_runner
142 # Dicts used to declare locking needs to mcpu
143 self.needed_locks = None
144 self.share_locks = dict.fromkeys(locking.LEVELS, 0)
145 self.add_locks = {}
146 self.remove_locks = {}
147 # Used to force good behavior when calling helper functions
148 self.recalculate_locks = {}
149 # logging
150 self.Log = processor.Log # pylint: disable=C0103
151 self.LogWarning = processor.LogWarning # pylint: disable=C0103
152 self.LogInfo = processor.LogInfo # pylint: disable=C0103
153 self.LogStep = processor.LogStep # pylint: disable=C0103
154 # support for dry-run
155 self.dry_run_result = None
156 # support for generic debug attribute
157 if (not hasattr(self.op, "debug_level") or
158 not isinstance(self.op.debug_level, int)):
159 self.op.debug_level = 0
160
161 # Tasklets
162 self.tasklets = None
163
164 # Validate opcode parameters and set defaults
165 self.op.Validate(True)
166
167 self.CheckArguments()
168
169 def CheckArguments(self):
170 """Check syntactic validity for the opcode arguments.
171
172 This method is for doing a simple syntactic check and ensure
173 validity of opcode parameters, without any cluster-related
174 checks. While the same can be accomplished in ExpandNames and/or
175 CheckPrereq, doing these separate is better because:
176
177 - ExpandNames is left as as purely a lock-related function
178 - CheckPrereq is run after we have acquired locks (and possible
179 waited for them)
180
181 The function is allowed to change the self.op attribute so that
182 later methods can no longer worry about missing parameters.
183
184 """
185 pass
186
187 def ExpandNames(self):
188 """Expand names for this LU.
189
190 This method is called before starting to execute the opcode, and it should
191 update all the parameters of the opcode to their canonical form (e.g. a
192 short node name must be fully expanded after this method has successfully
193 completed). This way locking, hooks, logging, etc. can work correctly.
194
195 LUs which implement this method must also populate the self.needed_locks
196 member, as a dict with lock levels as keys, and a list of needed lock names
197 as values. Rules:
198
199 - use an empty dict if you don't need any lock
200 - if you don't need any lock at a particular level omit that
201 level (note that in this case C{DeclareLocks} won't be called
202 at all for that level)
203 - if you need locks at a level, but you can't calculate it in
204 this function, initialise that level with an empty list and do
205 further processing in L{LogicalUnit.DeclareLocks} (see that
206 function's docstring)
207 - don't put anything for the BGL level
208 - if you want all locks at a level use L{locking.ALL_SET} as a value
209
210 If you need to share locks (rather than acquire them exclusively) at one
211 level you can modify self.share_locks, setting a true value (usually 1) for
212 that level. By default locks are not shared.
213
214 This function can also define a list of tasklets, which then will be
215 executed in order instead of the usual LU-level CheckPrereq and Exec
216 functions, if those are not defined by the LU.
217
218 Examples::
219
220 # Acquire all nodes and one instance
221 self.needed_locks = {
222 locking.LEVEL_NODE: locking.ALL_SET,
223 locking.LEVEL_INSTANCE: ['instance1.example.com'],
224 }
225 # Acquire just two nodes
226 self.needed_locks = {
227 locking.LEVEL_NODE: ['node1.example.com', 'node2.example.com'],
228 }
229 # Acquire no locks
230 self.needed_locks = {} # No, you can't leave it to the default value None
231
232 """
233 # The implementation of this method is mandatory only if the new LU is
234 # concurrent, so that old LUs don't need to be changed all at the same
235 # time.
236 if self.REQ_BGL:
237 self.needed_locks = {} # Exclusive LUs don't need locks.
238 else:
239 raise NotImplementedError
240
241 def DeclareLocks(self, level):
242 """Declare LU locking needs for a level
243
244 While most LUs can just declare their locking needs at ExpandNames time,
245 sometimes there's the need to calculate some locks after having acquired
246 the ones before. This function is called just before acquiring locks at a
247 particular level, but after acquiring the ones at lower levels, and permits
248 such calculations. It can be used to modify self.needed_locks, and by
249 default it does nothing.
250
251 This function is only called if you have something already set in
252 self.needed_locks for the level.
253
254 @param level: Locking level which is going to be locked
255 @type level: member of L{ganeti.locking.LEVELS}
256
257 """
258
259 def CheckPrereq(self):
260 """Check prerequisites for this LU.
261
262 This method should check that the prerequisites for the execution
263 of this LU are fulfilled. It can do internode communication, but
264 it should be idempotent - no cluster or system changes are
265 allowed.
266
267 The method should raise errors.OpPrereqError in case something is
268 not fulfilled. Its return value is ignored.
269
270 This method should also update all the parameters of the opcode to
271 their canonical form if it hasn't been done by ExpandNames before.
272
273 """
274 if self.tasklets is not None:
275 for (idx, tl) in enumerate(self.tasklets):
276 logging.debug("Checking prerequisites for tasklet %s/%s",
277 idx + 1, len(self.tasklets))
278 tl.CheckPrereq()
279 else:
280 pass
281
282 def Exec(self, feedback_fn):
283 """Execute the LU.
284
285 This method should implement the actual work. It should raise
286 errors.OpExecError for failures that are somewhat dealt with in
287 code, or expected.
288
289 """
290 if self.tasklets is not None:
291 for (idx, tl) in enumerate(self.tasklets):
292 logging.debug("Executing tasklet %s/%s", idx + 1, len(self.tasklets))
293 tl.Exec(feedback_fn)
294 else:
295 raise NotImplementedError
296
297 def BuildHooksEnv(self):
298 """Build hooks environment for this LU.
299
300 @rtype: dict
301 @return: Dictionary containing the environment that will be used for
302 running the hooks for this LU. The keys of the dict must not be prefixed
303 with "GANETI_"--that'll be added by the hooks runner. The hooks runner
304 will extend the environment with additional variables. If no environment
305 should be defined, an empty dictionary should be returned (not C{None}).
306 @note: If the C{HPATH} attribute of the LU class is C{None}, this function
307 will not be called.
308
309 """
310 raise NotImplementedError
311
312 def BuildHooksNodes(self):
313 """Build list of nodes to run LU's hooks.
314
315 @rtype: tuple; (list, list)
316 @return: Tuple containing a list of node names on which the hook
317 should run before the execution and a list of node names on which the
318 hook should run after the execution. No nodes should be returned as an
319 empty list (and not None).
320 @note: If the C{HPATH} attribute of the LU class is C{None}, this function
321 will not be called.
322
323 """
324 raise NotImplementedError
325
326 def HooksCallBack(self, phase, hook_results, feedback_fn, lu_result):
327 """Notify the LU about the results of its hooks.
328
329 This method is called every time a hooks phase is executed, and notifies
330 the Logical Unit about the hooks' result. The LU can then use it to alter
331 its result based on the hooks. By default the method does nothing and the
332 previous result is passed back unchanged but any LU can define it if it
333 wants to use the local cluster hook-scripts somehow.
334
335 @param phase: one of L{constants.HOOKS_PHASE_POST} or
336 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
337 @param hook_results: the results of the multi-node hooks rpc call
338 @param feedback_fn: function used send feedback back to the caller
339 @param lu_result: the previous Exec result this LU had, or None
340 in the PRE phase
341 @return: the new Exec result, based on the previous result
342 and hook results
343
344 """
345 # API must be kept, thus we ignore the unused argument and could
346 # be a function warnings
347 # pylint: disable=W0613,R0201
348 return lu_result
349
350 def _ExpandAndLockInstance(self):
351 """Helper function to expand and lock an instance.
352
353 Many LUs that work on an instance take its name in self.op.instance_name
354 and need to expand it and then declare the expanded name for locking. This
355 function does it, and then updates self.op.instance_name to the expanded
356 name. It also initializes needed_locks as a dict, if this hasn't been done
357 before.
358
359 """
360 if self.needed_locks is None:
361 self.needed_locks = {}
362 else:
363 assert locking.LEVEL_INSTANCE not in self.needed_locks, \
364 "_ExpandAndLockInstance called with instance-level locks set"
365 self.op.instance_name = _ExpandInstanceName(self.cfg,
366 self.op.instance_name)
367 self.needed_locks[locking.LEVEL_INSTANCE] = self.op.instance_name
368
369 def _LockInstancesNodes(self, primary_only=False,
370 level=locking.LEVEL_NODE):
371 """Helper function to declare instances' nodes for locking.
372
373 This function should be called after locking one or more instances to lock
374 their nodes. Its effect is populating self.needed_locks[locking.LEVEL_NODE]
375 with all primary or secondary nodes for instances already locked and
376 present in self.needed_locks[locking.LEVEL_INSTANCE].
377
378 It should be called from DeclareLocks, and for safety only works if
379 self.recalculate_locks[locking.LEVEL_NODE] is set.
380
381 In the future it may grow parameters to just lock some instance's nodes, or
382 to just lock primaries or secondary nodes, if needed.
383
384 If should be called in DeclareLocks in a way similar to::
385
386 if level == locking.LEVEL_NODE:
387 self._LockInstancesNodes()
388
389 @type primary_only: boolean
390 @param primary_only: only lock primary nodes of locked instances
391 @param level: Which lock level to use for locking nodes
392
393 """
394 assert level in self.recalculate_locks, \
395 "_LockInstancesNodes helper function called with no nodes to recalculate"
396
397 # TODO: check if we're really been called with the instance locks held
398
399 # For now we'll replace self.needed_locks[locking.LEVEL_NODE], but in the
400 # future we might want to have different behaviors depending on the value
401 # of self.recalculate_locks[locking.LEVEL_NODE]
402 wanted_nodes = []
403 locked_i = self.owned_locks(locking.LEVEL_INSTANCE)
404 for _, instance in self.cfg.GetMultiInstanceInfo(locked_i):
405 wanted_nodes.append(instance.primary_node)
406 if not primary_only:
407 wanted_nodes.extend(instance.secondary_nodes)
408
409 if self.recalculate_locks[level] == constants.LOCKS_REPLACE:
410 self.needed_locks[level] = wanted_nodes
411 elif self.recalculate_locks[level] == constants.LOCKS_APPEND:
412 self.needed_locks[level].extend(wanted_nodes)
413 else:
414 raise errors.ProgrammerError("Unknown recalculation mode")
415
416 del self.recalculate_locks[level]
417
418
419 class NoHooksLU(LogicalUnit): # pylint: disable=W0223
420 """Simple LU which runs no hooks.
421
422 This LU is intended as a parent for other LogicalUnits which will
423 run no hooks, in order to reduce duplicate code.
424
425 """
426 HPATH = None
427 HTYPE = None
428
429 def BuildHooksEnv(self):
430 """Empty BuildHooksEnv for NoHooksLu.
431
432 This just raises an error.
433
434 """
435 raise AssertionError("BuildHooksEnv called for NoHooksLUs")
436
437 def BuildHooksNodes(self):
438 """Empty BuildHooksNodes for NoHooksLU.
439
440 """
441 raise AssertionError("BuildHooksNodes called for NoHooksLU")
442
443
444 class Tasklet:
445 """Tasklet base class.
446
447 Tasklets are subcomponents for LUs. LUs can consist entirely of tasklets or
448 they can mix legacy code with tasklets. Locking needs to be done in the LU,
449 tasklets know nothing about locks.
450
451 Subclasses must follow these rules:
452 - Implement CheckPrereq
453 - Implement Exec
454
455 """
456 def __init__(self, lu):
457 self.lu = lu
458
459 # Shortcuts
460 self.cfg = lu.cfg
461 self.rpc = lu.rpc
462
463 def CheckPrereq(self):
464 """Check prerequisites for this tasklets.
465
466 This method should check whether the prerequisites for the execution of
467 this tasklet are fulfilled. It can do internode communication, but it
468 should be idempotent - no cluster or system changes are allowed.
469
470 The method should raise errors.OpPrereqError in case something is not
471 fulfilled. Its return value is ignored.
472
473 This method should also update all parameters to their canonical form if it
474 hasn't been done before.
475
476 """
477 pass
478
479 def Exec(self, feedback_fn):
480 """Execute the tasklet.
481
482 This method should implement the actual work. It should raise
483 errors.OpExecError for failures that are somewhat dealt with in code, or
484 expected.
485
486 """
487 raise NotImplementedError
488
489
490 class _QueryBase:
491 """Base for query utility classes.
492
493 """
494 #: Attribute holding field definitions
495 FIELDS = None
496
497 #: Field to sort by
498 SORT_FIELD = "name"
499
500 def __init__(self, qfilter, fields, use_locking):
501 """Initializes this class.
502
503 """
504 self.use_locking = use_locking
505
506 self.query = query.Query(self.FIELDS, fields, qfilter=qfilter,
507 namefield=self.SORT_FIELD)
508 self.requested_data = self.query.RequestedData()
509 self.names = self.query.RequestedNames()
510
511 # Sort only if no names were requested
512 self.sort_by_name = not self.names
513
514 self.do_locking = None
515 self.wanted = None
516
517 def _GetNames(self, lu, all_names, lock_level):
518 """Helper function to determine names asked for in the query.
519
520 """
521 if self.do_locking:
522 names = lu.owned_locks(lock_level)
523 else:
524 names = all_names
525
526 if self.wanted == locking.ALL_SET:
527 assert not self.names
528 # caller didn't specify names, so ordering is not important
529 return utils.NiceSort(names)
530
531 # caller specified names and we must keep the same order
532 assert self.names
533 assert not self.do_locking or lu.glm.is_owned(lock_level)
534
535 missing = set(self.wanted).difference(names)
536 if missing:
537 raise errors.OpExecError("Some items were removed before retrieving"
538 " their data: %s" % missing)
539
540 # Return expanded names
541 return self.wanted
542
543 def ExpandNames(self, lu):
544 """Expand names for this query.
545
546 See L{LogicalUnit.ExpandNames}.
547
548 """
549 raise NotImplementedError()
550
551 def DeclareLocks(self, lu, level):
552 """Declare locks for this query.
553
554 See L{LogicalUnit.DeclareLocks}.
555
556 """
557 raise NotImplementedError()
558
559 def _GetQueryData(self, lu):
560 """Collects all data for this query.
561
562 @return: Query data object
563
564 """
565 raise NotImplementedError()
566
567 def NewStyleQuery(self, lu):
568 """Collect data and execute query.
569
570 """
571 return query.GetQueryResponse(self.query, self._GetQueryData(lu),
572 sort_by_name=self.sort_by_name)
573
574 def OldStyleQuery(self, lu):
575 """Collect data and execute query.
576
577 """
578 return self.query.OldStyleQuery(self._GetQueryData(lu),
579 sort_by_name=self.sort_by_name)
580
581
582 def _ShareAll():
583 """Returns a dict declaring all lock levels shared.
584
585 """
586 return dict.fromkeys(locking.LEVELS, 1)
587
588
589 def _AnnotateDiskParams(instance, devs, cfg):
590 """Little helper wrapper to the rpc annotation method.
591
592 @param instance: The instance object
593 @type devs: List of L{objects.Disk}
594 @param devs: The root devices (not any of its children!)
595 @param cfg: The config object
596 @returns The annotated disk copies
597 @see L{rpc.AnnotateDiskParams}
598
599 """
600 return rpc.AnnotateDiskParams(instance.disk_template, devs,
601 cfg.GetInstanceDiskParams(instance))
602
603
604 def _CheckInstancesNodeGroups(cfg, instances, owned_groups, owned_nodes,
605 cur_group_uuid):
606 """Checks if node groups for locked instances are still correct.
607
608 @type cfg: L{config.ConfigWriter}
609 @param cfg: Cluster configuration
610 @type instances: dict; string as key, L{objects.Instance} as value
611 @param instances: Dictionary, instance name as key, instance object as value
612 @type owned_groups: iterable of string
613 @param owned_groups: List of owned groups
614 @type owned_nodes: iterable of string
615 @param owned_nodes: List of owned nodes
616 @type cur_group_uuid: string or None
617 @param cur_group_uuid: Optional group UUID to check against instance's groups
618
619 """
620 for (name, inst) in instances.items():
621 assert owned_nodes.issuperset(inst.all_nodes), \
622 "Instance %s's nodes changed while we kept the lock" % name
623
624 inst_groups = _CheckInstanceNodeGroups(cfg, name, owned_groups)
625
626 assert cur_group_uuid is None or cur_group_uuid in inst_groups, \
627 "Instance %s has no node in group %s" % (name, cur_group_uuid)
628
629
630 def _CheckInstanceNodeGroups(cfg, instance_name, owned_groups,
631 primary_only=False):
632 """Checks if the owned node groups are still correct for an instance.
633
634 @type cfg: L{config.ConfigWriter}
635 @param cfg: The cluster configuration
636 @type instance_name: string
637 @param instance_name: Instance name
638 @type owned_groups: set or frozenset
639 @param owned_groups: List of currently owned node groups
640 @type primary_only: boolean
641 @param primary_only: Whether to check node groups for only the primary node
642
643 """
644 inst_groups = cfg.GetInstanceNodeGroups(instance_name, primary_only)
645
646 if not owned_groups.issuperset(inst_groups):
647 raise errors.OpPrereqError("Instance %s's node groups changed since"
648 " locks were acquired, current groups are"
649 " are '%s', owning groups '%s'; retry the"
650 " operation" %
651 (instance_name,
652 utils.CommaJoin(inst_groups),
653 utils.CommaJoin(owned_groups)),
654 errors.ECODE_STATE)
655
656 return inst_groups
657
658
659 def _CheckNodeGroupInstances(cfg, group_uuid, owned_instances):
660 """Checks if the instances in a node group are still correct.
661
662 @type cfg: L{config.ConfigWriter}
663 @param cfg: The cluster configuration
664 @type group_uuid: string
665 @param group_uuid: Node group UUID
666 @type owned_instances: set or frozenset
667 @param owned_instances: List of currently owned instances
668
669 """
670 wanted_instances = cfg.GetNodeGroupInstances(group_uuid)
671 if owned_instances != wanted_instances:
672 raise errors.OpPrereqError("Instances in node group '%s' changed since"
673 " locks were acquired, wanted '%s', have '%s';"
674 " retry the operation" %
675 (group_uuid,
676 utils.CommaJoin(wanted_instances),
677 utils.CommaJoin(owned_instances)),
678 errors.ECODE_STATE)
679
680 return wanted_instances
681
682
683 def _SupportsOob(cfg, node):
684 """Tells if node supports OOB.
685
686 @type cfg: L{config.ConfigWriter}
687 @param cfg: The cluster configuration
688 @type node: L{objects.Node}
689 @param node: The node
690 @return: The OOB script if supported or an empty string otherwise
691
692 """
693 return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
694
695
696 def _CopyLockList(names):
697 """Makes a copy of a list of lock names.
698
699 Handles L{locking.ALL_SET} correctly.
700
701 """
702 if names == locking.ALL_SET:
703 return locking.ALL_SET
704 else:
705 return names[:]
706
707
708 def _GetWantedNodes(lu, nodes):
709 """Returns list of checked and expanded node names.
710
711 @type lu: L{LogicalUnit}
712 @param lu: the logical unit on whose behalf we execute
713 @type nodes: list
714 @param nodes: list of node names or None for all nodes
715 @rtype: list
716 @return: the list of nodes, sorted
717 @raise errors.ProgrammerError: if the nodes parameter is wrong type
718
719 """
720 if nodes:
721 return [_ExpandNodeName(lu.cfg, name) for name in nodes]
722
723 return utils.NiceSort(lu.cfg.GetNodeList())
724
725
726 def _GetWantedInstances(lu, instances):
727 """Returns list of checked and expanded instance names.
728
729 @type lu: L{LogicalUnit}
730 @param lu: the logical unit on whose behalf we execute
731 @type instances: list
732 @param instances: list of instance names or None for all instances
733 @rtype: list
734 @return: the list of instances, sorted
735 @raise errors.OpPrereqError: if the instances parameter is wrong type
736 @raise errors.OpPrereqError: if any of the passed instances is not found
737
738 """
739 if instances:
740 wanted = [_ExpandInstanceName(lu.cfg, name) for name in instances]
741 else:
742 wanted = utils.NiceSort(lu.cfg.GetInstanceList())
743 return wanted
744
745
746 def _GetUpdatedParams(old_params, update_dict,
747 use_default=True, use_none=False):
748 """Return the new version of a parameter dictionary.
749
750 @type old_params: dict
751 @param old_params: old parameters
752 @type update_dict: dict
753 @param update_dict: dict containing new parameter values, or
754 constants.VALUE_DEFAULT to reset the parameter to its default
755 value
756 @param use_default: boolean
757 @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
758 values as 'to be deleted' values
759 @param use_none: boolean
760 @type use_none: whether to recognise C{None} values as 'to be
761 deleted' values
762 @rtype: dict
763 @return: the new parameter dictionary
764
765 """
766 params_copy = copy.deepcopy(old_params)
767 for key, val in update_dict.iteritems():
768 if ((use_default and val == constants.VALUE_DEFAULT) or
769 (use_none and val is None)):
770 try:
771 del params_copy[key]
772 except KeyError:
773 pass
774 else:
775 params_copy[key] = val
776 return params_copy
777
778
779 def _GetUpdatedIPolicy(old_ipolicy, new_ipolicy, group_policy=False):
780 """Return the new version of a instance policy.
781
782 @param group_policy: whether this policy applies to a group and thus
783 we should support removal of policy entries
784
785 """
786 use_none = use_default = group_policy
787 ipolicy = copy.deepcopy(old_ipolicy)
788 for key, value in new_ipolicy.items():
789 if key not in constants.IPOLICY_ALL_KEYS:
790 raise errors.OpPrereqError("Invalid key in new ipolicy: %s" % key,
791 errors.ECODE_INVAL)
792 if key in constants.IPOLICY_ISPECS:
793 utils.ForceDictType(value, constants.ISPECS_PARAMETER_TYPES)
794 ipolicy[key] = _GetUpdatedParams(old_ipolicy.get(key, {}), value,
795 use_none=use_none,
796 use_default=use_default)
797 else:
798 if (not value or value == [constants.VALUE_DEFAULT] or
799 value == constants.VALUE_DEFAULT):
800 if group_policy:
801 del ipolicy[key]
802 else:
803 raise errors.OpPrereqError("Can't unset ipolicy attribute '%s'"
804 " on the cluster'" % key,
805 errors.ECODE_INVAL)
806 else:
807 if key in constants.IPOLICY_PARAMETERS:
808 # FIXME: we assume all such values are float
809 try:
810 ipolicy[key] = float(value)
811 except (TypeError, ValueError), err:
812 raise errors.OpPrereqError("Invalid value for attribute"
813 " '%s': '%s', error: %s" %
814 (key, value, err), errors.ECODE_INVAL)
815 else:
816 # FIXME: we assume all others are lists; this should be redone
817 # in a nicer way
818 ipolicy[key] = list(value)
819 try:
820 objects.InstancePolicy.CheckParameterSyntax(ipolicy, not group_policy)
821 except errors.ConfigurationError, err:
822 raise errors.OpPrereqError("Invalid instance policy: %s" % err,
823 errors.ECODE_INVAL)
824 return ipolicy
825
826
827 def _UpdateAndVerifySubDict(base, updates, type_check):
828 """Updates and verifies a dict with sub dicts of the same type.
829
830 @param base: The dict with the old data
831 @param updates: The dict with the new data
832 @param type_check: Dict suitable to ForceDictType to verify correct types
833 @returns: A new dict with updated and verified values
834
835 """
836 def fn(old, value):
837 new = _GetUpdatedParams(old, value)
838 utils.ForceDictType(new, type_check)
839 return new
840
841 ret = copy.deepcopy(base)
842 ret.update(dict((key, fn(base.get(key, {}), value))
843 for key, value in updates.items()))
844 return ret
845
846
847 def _MergeAndVerifyHvState(op_input, obj_input):
848 """Combines the hv state from an opcode with the one of the object
849
850 @param op_input: The input dict from the opcode
851 @param obj_input: The input dict from the objects
852 @return: The verified and updated dict
853
854 """
855 if op_input:
856 invalid_hvs = set(op_input) - constants.HYPER_TYPES
857 if invalid_hvs:
858 raise errors.OpPrereqError("Invalid hypervisor(s) in hypervisor state:"
859 " %s" % utils.CommaJoin(invalid_hvs),
860 errors.ECODE_INVAL)
861 if obj_input is None:
862 obj_input = {}
863 type_check = constants.HVSTS_PARAMETER_TYPES
864 return _UpdateAndVerifySubDict(obj_input, op_input, type_check)
865
866 return None
867
868
869 def _MergeAndVerifyDiskState(op_input, obj_input):
870 """Combines the disk state from an opcode with the one of the object
871
872 @param op_input: The input dict from the opcode
873 @param obj_input: The input dict from the objects
874 @return: The verified and updated dict
875 """
876 if op_input:
877 invalid_dst = set(op_input) - constants.DS_VALID_TYPES
878 if invalid_dst:
879 raise errors.OpPrereqError("Invalid storage type(s) in disk state: %s" %
880 utils.CommaJoin(invalid_dst),
881 errors.ECODE_INVAL)
882 type_check = constants.DSS_PARAMETER_TYPES
883 if obj_input is None:
884 obj_input = {}
885 return dict((key, _UpdateAndVerifySubDict(obj_input.get(key, {}), value,
886 type_check))
887 for key, value in op_input.items())
888
889 return None
890
891
892 def _ReleaseLocks(lu, level, names=None, keep=None):
893 """Releases locks owned by an LU.
894
895 @type lu: L{LogicalUnit}
896 @param level: Lock level
897 @type names: list or None
898 @param names: Names of locks to release
899 @type keep: list or None
900 @param keep: Names of locks to retain
901
902 """
903 assert not (keep is not None and names is not None), \
904 "Only one of the 'names' and the 'keep' parameters can be given"
905
906 if names is not None:
907 should_release = names.__contains__
908 elif keep:
909 should_release = lambda name: name not in keep
910 else:
911 should_release = None
912
913 owned = lu.owned_locks(level)
914 if not owned:
915 # Not owning any lock at this level, do nothing
916 pass
917
918 elif should_release:
919 retain = []
920 release = []
921
922 # Determine which locks to release
923 for name in owned:
924 if should_release(name):
925 release.append(name)
926 else:
927 retain.append(name)
928
929 assert len(lu.owned_locks(level)) == (len(retain) + len(release))
930
931 # Release just some locks
932 lu.glm.release(level, names=release)
933
934 assert frozenset(lu.owned_locks(level)) == frozenset(retain)
935 else:
936 # Release everything
937 lu.glm.release(level)
938
939 assert not lu.glm.is_owned(level), "No locks should be owned"
940
941
942 def _MapInstanceDisksToNodes(instances):
943 """Creates a map from (node, volume) to instance name.
944
945 @type instances: list of L{objects.Instance}
946 @rtype: dict; tuple of (node name, volume name) as key, instance name as value
947
948 """
949 return dict(((node, vol), inst.name)
950 for inst in instances
951 for (node, vols) in inst.MapLVsByNode().items()
952 for vol in vols)
953
954
955 def _RunPostHook(lu, node_name):
956 """Runs the post-hook for an opcode on a single node.
957
958 """
959 hm = lu.proc.BuildHooksManager(lu)
960 try:
961 hm.RunPhase(constants.HOOKS_PHASE_POST, nodes=[node_name])
962 except Exception, err: # pylint: disable=W0703
963 lu.LogWarning("Errors occurred running hooks on %s: %s" % (node_name, err))
964
965
966 def _CheckOutputFields(static, dynamic, selected):
967 """Checks whether all selected fields are valid.
968
969 @type static: L{utils.FieldSet}
970 @param static: static fields set
971 @type dynamic: L{utils.FieldSet}
972 @param dynamic: dynamic fields set
973
974 """
975 f = utils.FieldSet()
976 f.Extend(static)
977 f.Extend(dynamic)
978
979 delta = f.NonMatching(selected)
980 if delta:
981 raise errors.OpPrereqError("Unknown output fields selected: %s"
982 % ",".join(delta), errors.ECODE_INVAL)
983
984
985 def _CheckGlobalHvParams(params):
986 """Validates that given hypervisor params are not global ones.
987
988 This will ensure that instances don't get customised versions of
989 global params.
990
991 """
992 used_globals = constants.HVC_GLOBALS.intersection(params)
993 if used_globals:
994 msg = ("The following hypervisor parameters are global and cannot"
995 " be customized at instance level, please modify them at"
996 " cluster level: %s" % utils.CommaJoin(used_globals))
997 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
998
999
1000 def _CheckNodeOnline(lu, node, msg=None):
1001 """Ensure that a given node is online.
1002
1003 @param lu: the LU on behalf of which we make the check
1004 @param node: the node to check
1005 @param msg: if passed, should be a message to replace the default one
1006 @raise errors.OpPrereqError: if the node is offline
1007
1008 """
1009 if msg is None:
1010 msg = "Can't use offline node"
1011 if lu.cfg.GetNodeInfo(node).offline:
1012 raise errors.OpPrereqError("%s: %s" % (msg, node), errors.ECODE_STATE)
1013
1014
1015 def _CheckNodeNotDrained(lu, node):
1016 """Ensure that a given node is not drained.
1017
1018 @param lu: the LU on behalf of which we make the check
1019 @param node: the node to check
1020 @raise errors.OpPrereqError: if the node is drained
1021
1022 """
1023 if lu.cfg.GetNodeInfo(node).drained:
1024 raise errors.OpPrereqError("Can't use drained node %s" % node,
1025 errors.ECODE_STATE)
1026
1027
1028 def _CheckNodeVmCapable(lu, node):
1029 """Ensure that a given node is vm capable.
1030
1031 @param lu: the LU on behalf of which we make the check
1032 @param node: the node to check
1033 @raise errors.OpPrereqError: if the node is not vm capable
1034
1035 """
1036 if not lu.cfg.GetNodeInfo(node).vm_capable:
1037 raise errors.OpPrereqError("Can't use non-vm_capable node %s" % node,
1038 errors.ECODE_STATE)
1039
1040
1041 def _CheckNodeHasOS(lu, node, os_name, force_variant):
1042 """Ensure that a node supports a given OS.
1043
1044 @param lu: the LU on behalf of which we make the check
1045 @param node: the node to check
1046 @param os_name: the OS to query about
1047 @param force_variant: whether to ignore variant errors
1048 @raise errors.OpPrereqError: if the node is not supporting the OS
1049
1050 """
1051 result = lu.rpc.call_os_get(node, os_name)
1052 result.Raise("OS '%s' not in supported OS list for node %s" %
1053 (os_name, node),
1054 prereq=True, ecode=errors.ECODE_INVAL)
1055 if not force_variant:
1056 _CheckOSVariant(result.payload, os_name)
1057
1058
1059 def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
1060 """Ensure that a node has the given secondary ip.
1061
1062 @type lu: L{LogicalUnit}
1063 @param lu: the LU on behalf of which we make the check
1064 @type node: string
1065 @param node: the node to check
1066 @type secondary_ip: string
1067 @param secondary_ip: the ip to check
1068 @type prereq: boolean
1069 @param prereq: whether to throw a prerequisite or an execute error
1070 @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True
1071 @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
1072
1073 """
1074 result = lu.rpc.call_node_has_ip_address(node, secondary_ip)
1075 result.Raise("Failure checking secondary ip on node %s" % node,
1076 prereq=prereq, ecode=errors.ECODE_ENVIRON)
1077 if not result.payload:
1078 msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
1079 " please fix and re-run this command" % secondary_ip)
1080 if prereq:
1081 raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
1082 else:
1083 raise errors.OpExecError(msg)
1084
1085
1086 def _GetClusterDomainSecret():
1087 """Reads the cluster domain secret.
1088
1089 """
1090 return utils.ReadOneLineFile(pathutils.CLUSTER_DOMAIN_SECRET_FILE,
1091 strict=True)
1092
1093
1094 def _CheckInstanceState(lu, instance, req_states, msg=None):
1095 """Ensure that an instance is in one of the required states.
1096
1097 @param lu: the LU on behalf of which we make the check
1098 @param instance: the instance to check
1099 @param msg: if passed, should be a message to replace the default one
1100 @raise errors.OpPrereqError: if the instance is not in the required state
1101
1102 """
1103 if msg is None:
1104 msg = "can't use instance from outside %s states" % ", ".join(req_states)
1105 if instance.admin_state not in req_states:
1106 raise errors.OpPrereqError("Instance '%s' is marked to be %s, %s" %
1107 (instance.name, instance.admin_state, msg),
1108 errors.ECODE_STATE)
1109
1110 if constants.ADMINST_UP not in req_states:
1111 pnode = instance.primary_node
1112 if not lu.cfg.GetNodeInfo(pnode).offline:
1113 ins_l = lu.rpc.call_instance_list([pnode], [instance.hypervisor])[pnode]
1114 ins_l.Raise("Can't contact node %s for instance information" % pnode,
1115 prereq=True, ecode=errors.ECODE_ENVIRON)
1116 if instance.name in ins_l.payload:
1117 raise errors.OpPrereqError("Instance %s is running, %s" %
1118 (instance.name, msg), errors.ECODE_STATE)
1119 else:
1120 lu.LogWarning("Primary node offline, ignoring check that instance"
1121 " is down")
1122
1123
1124 def _ComputeMinMaxSpec(name, qualifier, ipolicy, value):
1125 """Computes if value is in the desired range.
1126
1127 @param name: name of the parameter for which we perform the check
1128 @param qualifier: a qualifier used in the error message (e.g. 'disk/1',
1129 not just 'disk')
1130 @param ipolicy: dictionary containing min, max and std values
1131 @param value: actual value that we want to use
1132 @return: None or element not meeting the criteria
1133
1134
1135 """
1136 if value in [None, constants.VALUE_AUTO]:
1137 return None
1138 max_v = ipolicy[constants.ISPECS_MAX].get(name, value)
1139 min_v = ipolicy[constants.ISPECS_MIN].get(name, value)
1140 if value > max_v or min_v > value:
1141 if qualifier:
1142 fqn = "%s/%s" % (name, qualifier)
1143 else:
1144 fqn = name
1145 return ("%s value %s is not in range [%s, %s]" %
1146 (fqn, value, min_v, max_v))
1147 return None
1148
1149
1150 def _ComputeIPolicySpecViolation(ipolicy, mem_size, cpu_count, disk_count,
1151 nic_count, disk_sizes, spindle_use,
1152 _compute_fn=_ComputeMinMaxSpec):
1153 """Verifies ipolicy against provided specs.
1154
1155 @type ipolicy: dict
1156 @param ipolicy: The ipolicy
1157 @type mem_size: int
1158 @param mem_size: The memory size
1159 @type cpu_count: int
1160 @param cpu_count: Used cpu cores
1161 @type disk_count: int
1162 @param disk_count: Number of disks used
1163 @type nic_count: int
1164 @param nic_count: Number of nics used
1165 @type disk_sizes: list of ints
1166 @param disk_sizes: Disk sizes of used disk (len must match C{disk_count})
1167 @type spindle_use: int
1168 @param spindle_use: The number of spindles this instance uses
1169 @param _compute_fn: The compute function (unittest only)
1170 @return: A list of violations, or an empty list of no violations are found
1171
1172 """
1173 assert disk_count == len(disk_sizes)
1174
1175 test_settings = [
1176 (constants.ISPEC_MEM_SIZE, "", mem_size),
1177 (constants.ISPEC_CPU_COUNT, "", cpu_count),
1178 (constants.ISPEC_DISK_COUNT, "", disk_count),
1179 (constants.ISPEC_NIC_COUNT, "", nic_count),
1180 (constants.ISPEC_SPINDLE_USE, "", spindle_use),
1181 ] + [(constants.ISPEC_DISK_SIZE, str(idx), d)
1182 for idx, d in enumerate(disk_sizes)]
1183
1184 return filter(None,
1185 (_compute_fn(name, qualifier, ipolicy, value)
1186 for (name, qualifier, value) in test_settings))
1187
1188
1189 def _ComputeIPolicyInstanceViolation(ipolicy, instance,
1190 _compute_fn=_ComputeIPolicySpecViolation):
1191 """Compute if instance meets the specs of ipolicy.
1192
1193 @type ipolicy: dict
1194 @param ipolicy: The ipolicy to verify against
1195 @type instance: L{objects.Instance}
1196 @param instance: The instance to verify
1197 @param _compute_fn: The function to verify ipolicy (unittest only)
1198 @see: L{_ComputeIPolicySpecViolation}
1199
1200 """
1201 mem_size = instance.beparams.get(constants.BE_MAXMEM, None)
1202 cpu_count = instance.beparams.get(constants.BE_VCPUS, None)
1203 spindle_use = instance.beparams.get(constants.BE_SPINDLE_USE, None)
1204 disk_count = len(instance.disks)
1205 disk_sizes = [disk.size for disk in instance.disks]
1206 nic_count = len(instance.nics)
1207
1208 return _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count,
1209 disk_sizes, spindle_use)
1210
1211
1212 def _ComputeIPolicyInstanceSpecViolation(
1213 ipolicy, instance_spec, _compute_fn=_ComputeIPolicySpecViolation):
1214 """Compute if instance specs meets the specs of ipolicy.
1215
1216 @type ipolicy: dict
1217 @param ipolicy: The ipolicy to verify against
1218 @param instance_spec: dict
1219 @param instance_spec: The instance spec to verify
1220 @param _compute_fn: The function to verify ipolicy (unittest only)
1221 @see: L{_ComputeIPolicySpecViolation}
1222
1223 """
1224 mem_size = instance_spec.get(constants.ISPEC_MEM_SIZE, None)
1225 cpu_count = instance_spec.get(constants.ISPEC_CPU_COUNT, None)
1226 disk_count = instance_spec.get(constants.ISPEC_DISK_COUNT, 0)
1227 disk_sizes = instance_spec.get(constants.ISPEC_DISK_SIZE, [])
1228 nic_count = instance_spec.get(constants.ISPEC_NIC_COUNT, 0)
1229 spindle_use = instance_spec.get(constants.ISPEC_SPINDLE_USE, None)
1230
1231 return _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count,
1232 disk_sizes, spindle_use)
1233
1234
1235 def _ComputeIPolicyNodeViolation(ipolicy, instance, current_group,
1236 target_group,
1237 _compute_fn=_ComputeIPolicyInstanceViolation):
1238 """Compute if instance meets the specs of the new target group.
1239
1240 @param ipolicy: The ipolicy to verify
1241 @param instance: The instance object to verify
1242 @param current_group: The current group of the instance
1243 @param target_group: The new group of the instance
1244 @param _compute_fn: The function to verify ipolicy (unittest only)
1245 @see: L{_ComputeIPolicySpecViolation}
1246
1247 """
1248 if current_group == target_group:
1249 return []
1250 else:
1251 return _compute_fn(ipolicy, instance)
1252
1253
1254 def _CheckTargetNodeIPolicy(lu, ipolicy, instance, node, ignore=False,
1255 _compute_fn=_ComputeIPolicyNodeViolation):
1256 """Checks that the target node is correct in terms of instance policy.
1257
1258 @param ipolicy: The ipolicy to verify
1259 @param instance: The instance object to verify
1260 @param node: The new node to relocate
1261 @param ignore: Ignore violations of the ipolicy
1262 @param _compute_fn: The function to verify ipolicy (unittest only)
1263 @see: L{_ComputeIPolicySpecViolation}
1264
1265 """
1266 primary_node = lu.cfg.GetNodeInfo(instance.primary_node)
1267 res = _compute_fn(ipolicy, instance, primary_node.group, node.group)
1268
1269 if res:
1270 msg = ("Instance does not meet target node group's (%s) instance"
1271 " policy: %s") % (node.group, utils.CommaJoin(res))
1272 if ignore:
1273 lu.LogWarning(msg)
1274 else:
1275 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
1276
1277
1278 def _ComputeNewInstanceViolations(old_ipolicy, new_ipolicy, instances):
1279 """Computes a set of any instances that would violate the new ipolicy.
1280
1281 @param old_ipolicy: The current (still in-place) ipolicy
1282 @param new_ipolicy: The new (to become) ipolicy
1283 @param instances: List of instances to verify
1284 @return: A list of instances which violates the new ipolicy but
1285 did not before
1286
1287 """
1288 return (_ComputeViolatingInstances(new_ipolicy, instances) -
1289 _ComputeViolatingInstances(old_ipolicy, instances))
1290
1291
1292 def _ExpandItemName(fn, name, kind):
1293 """Expand an item name.
1294
1295 @param fn: the function to use for expansion
1296 @param name: requested item name
1297 @param kind: text description ('Node' or 'Instance')
1298 @return: the resolved (full) name
1299 @raise errors.OpPrereqError: if the item is not found
1300
1301 """
1302 full_name = fn(name)
1303 if full_name is None:
1304 raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
1305 errors.ECODE_NOENT)
1306 return full_name
1307
1308
1309 def _ExpandNodeName(cfg, name):
1310 """Wrapper over L{_ExpandItemName} for nodes."""
1311 return _ExpandItemName(cfg.ExpandNodeName, name, "Node")
1312
1313
1314 def _ExpandInstanceName(cfg, name):
1315 """Wrapper over L{_ExpandItemName} for instance."""
1316 return _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
1317
1318 def _BuildNetworkHookEnv(name, network, gateway, network6, gateway6,
1319 network_type, mac_prefix):
1320 env = dict()
1321 if name:
1322 env["NETWORK_NAME"] = name
1323 if network:
1324 env["NETWORK_SUBNET"] = network
1325 if gateway:
1326 env["NETWORK_GATEWAY"] = gateway
1327 if network6:
1328 env["NETWORK_SUBNET6"] = network6
1329 if gateway6:
1330 env["NETWORK_GATEWAY6"] = gateway6
1331 if mac_prefix:
1332 env["NETWORK_MAC_PREFIX"] = mac_prefix
1333 if network_type:
1334 env["NETWORK_TYPE"] = network_type
1335
1336 return env
1337
1338
1339 def _BuildNetworkHookEnvByObject(lu, network):
1340 args = {
1341 "name": network.name,
1342 "network": network.network,
1343 "gateway": network.gateway,
1344 "network6": network.network6,
1345 "gateway6": network.gateway6,
1346 "network_type": network.network_type,
1347 "mac_prefix": network.mac_prefix,
1348 }
1349 return _BuildNetworkHookEnv(**args)
1350
1351
1352 def _BuildInstanceHookEnv(name, primary_node, secondary_nodes, os_type, status,
1353 minmem, maxmem, vcpus, nics, disk_template, disks,
1354 bep, hvp, hypervisor_name, tags):
1355 """Builds instance related env variables for hooks
1356
1357 This builds the hook environment from individual variables.
1358
1359 @type name: string
1360 @param name: the name of the instance
1361 @type primary_node: string
1362 @param primary_node: the name of the instance's primary node
1363 @type secondary_nodes: list
1364 @param secondary_nodes: list of secondary nodes as strings
1365 @type os_type: string
1366 @param os_type: the name of the instance's OS
1367 @type status: string
1368 @param status: the desired status of the instance
1369 @type minmem: string
1370 @param minmem: the minimum memory size of the instance
1371 @type maxmem: string
1372 @param maxmem: the maximum memory size of the instance
1373 @type vcpus: string
1374 @param vcpus: the count of VCPUs the instance has
1375 @type nics: list
1376 @param nics: list of tuples (ip, mac, mode, link, network) representing
1377 the NICs the instance has
1378 @type disk_template: string
1379 @param disk_template: the disk template of the instance
1380 @type disks: list
1381 @param disks: the list of (size, mode) pairs
1382 @type bep: dict
1383 @param bep: the backend parameters for the instance
1384 @type hvp: dict
1385 @param hvp: the hypervisor parameters for the instance
1386 @type hypervisor_name: string
1387 @param hypervisor_name: the hypervisor for the instance
1388 @type tags: list
1389 @param tags: list of instance tags as strings
1390 @rtype: dict
1391 @return: the hook environment for this instance
1392
1393 """
1394 env = {
1395 "OP_TARGET": name,
1396 "INSTANCE_NAME": name,
1397 "INSTANCE_PRIMARY": primary_node,
1398 "INSTANCE_SECONDARIES": " ".join(secondary_nodes),
1399 "INSTANCE_OS_TYPE": os_type,
1400 "INSTANCE_STATUS": status,
1401 "INSTANCE_MINMEM": minmem,
1402 "INSTANCE_MAXMEM": maxmem,
1403 # TODO(2.7) remove deprecated "memory" value
1404 "INSTANCE_MEMORY": maxmem,
1405 "INSTANCE_VCPUS": vcpus,
1406 "INSTANCE_DISK_TEMPLATE": disk_template,
1407 "INSTANCE_HYPERVISOR": hypervisor_name,
1408 }
1409 if nics:
1410 nic_count = len(nics)
1411 for idx, (ip, mac, mode, link, network, netinfo) in enumerate(nics):
1412 if ip is None:
1413 ip = ""
1414 env["INSTANCE_NIC%d_IP" % idx] = ip
1415 env["INSTANCE_NIC%d_MAC" % idx] = mac
1416 env["INSTANCE_NIC%d_MODE" % idx] = mode
1417 env["INSTANCE_NIC%d_LINK" % idx] = link
1418 if network:
1419 env["INSTANCE_NIC%d_NETWORK" % idx] = network
1420 if netinfo:
1421 nobj = objects.Network.FromDict(netinfo)
1422 if nobj.network:
1423 env["INSTANCE_NIC%d_NETWORK_SUBNET" % idx] = nobj.network
1424 if nobj.gateway:
1425 env["INSTANCE_NIC%d_NETWORK_GATEWAY" % idx] = nobj.gateway
1426 if nobj.network6:
1427 env["INSTANCE_NIC%d_NETWORK_SUBNET6" % idx] = nobj.network6
1428 if nobj.gateway6:
1429 env["INSTANCE_NIC%d_NETWORK_GATEWAY6" % idx] = nobj.gateway6
1430 if nobj.mac_prefix:
1431 env["INSTANCE_NIC%d_NETWORK_MAC_PREFIX" % idx] = nobj.mac_prefix
1432 if nobj.network_type:
1433 env["INSTANCE_NIC%d_NETWORK_TYPE" % idx] = nobj.network_type
1434 if mode == constants.NIC_MODE_BRIDGED:
1435 env["INSTANCE_NIC%d_BRIDGE" % idx] = link
1436 else:
1437 nic_count = 0
1438
1439 env["INSTANCE_NIC_COUNT"] = nic_count
1440
1441 if disks:
1442 disk_count = len(disks)
1443 for idx, (size, mode) in enumerate(disks):
1444 env["INSTANCE_DISK%d_SIZE" % idx] = size
1445 env["INSTANCE_DISK%d_MODE" % idx] = mode
1446 else:
1447 disk_count = 0
1448
1449 env["INSTANCE_DISK_COUNT"] = disk_count
1450
1451 if not tags:
1452 tags = []
1453
1454 env["INSTANCE_TAGS"] = " ".join(tags)
1455
1456 for source, kind in [(bep, "BE"), (hvp, "HV")]:
1457 for key, value in source.items():
1458 env["INSTANCE_%s_%s" % (kind, key)] = value
1459
1460 return env
1461
1462 def _NICToTuple(lu, nic):
1463 """Build a tupple of nic information.
1464
1465 @type lu: L{LogicalUnit}
1466 @param lu: the logical unit on whose behalf we execute
1467 @type nic: L{objects.NIC}
1468 @param nic: nic to convert to hooks tuple
1469
1470 """
1471 cluster = lu.cfg.GetClusterInfo()
1472 ip = nic.ip
1473 mac = nic.mac
1474 filled_params = cluster.SimpleFillNIC(nic.nicparams)
1475 mode = filled_params[constants.NIC_MODE]
1476 link = filled_params[constants.NIC_LINK]
1477 network = nic.network
1478 netinfo = None
1479 if network:
1480 net_uuid = lu.cfg.LookupNetwork(network)
1481 if net_uuid:
1482 nobj = lu.cfg.GetNetwork(net_uuid)
1483 netinfo = objects.Network.ToDict(nobj)
1484 return (ip, mac, mode, link, network, netinfo)
1485
1486 def _NICListToTuple(lu, nics):
1487 """Build a list of nic information tuples.
1488
1489 This list is suitable to be passed to _BuildInstanceHookEnv or as a return
1490 value in LUInstanceQueryData.
1491
1492 @type lu: L{LogicalUnit}
1493 @param lu: the logical unit on whose behalf we execute
1494 @type nics: list of L{objects.NIC}
1495 @param nics: list of nics to convert to hooks tuples
1496
1497 """
1498 hooks_nics = []
1499 cluster = lu.cfg.GetClusterInfo()
1500 for nic in nics:
1501 hooks_nics.append(_NICToTuple(lu, nic))
1502 return hooks_nics
1503
1504 def _BuildInstanceHookEnvByObject(lu, instance, override=None):
1505 """Builds instance related env variables for hooks from an object.
1506
1507 @type lu: L{LogicalUnit}
1508 @param lu: the logical unit on whose behalf we execute
1509 @type instance: L{objects.Instance}
1510 @param instance: the instance for which we should build the
1511 environment
1512 @type override: dict
1513 @param override: dictionary with key/values that will override
1514 our values
1515 @rtype: dict
1516 @return: the hook environment dictionary
1517
1518 """
1519 cluster = lu.cfg.GetClusterInfo()
1520 bep = cluster.FillBE(instance)
1521 hvp = cluster.FillHV(instance)
1522 args = {
1523 "name": instance.name,
1524 "primary_node": instance.primary_node,
1525 "secondary_nodes": instance.secondary_nodes,
1526 "os_type": instance.os,
1527 "status": instance.admin_state,
1528 "maxmem": bep[constants.BE_MAXMEM],
1529 "minmem": bep[constants.BE_MINMEM],
1530 "vcpus": bep[constants.BE_VCPUS],
1531 "nics": _NICListToTuple(lu, instance.nics),
1532 "disk_template": instance.disk_template,
1533 "disks": [(disk.size, disk.mode) for disk in instance.disks],
1534 "bep": bep,
1535 "hvp": hvp,
1536 "hypervisor_name": instance.hypervisor,
1537 "tags": instance.tags,
1538 }
1539 if override:
1540 args.update(override)
1541 return _BuildInstanceHookEnv(**args) # pylint: disable=W0142
1542
1543
1544 def _AdjustCandidatePool(lu, exceptions):
1545 """Adjust the candidate pool after node operations.
1546
1547 """
1548 mod_list = lu.cfg.MaintainCandidatePool(exceptions)
1549 if mod_list:
1550 lu.LogInfo("Promoted nodes to master candidate role: %s",
1551 utils.CommaJoin(node.name for node in mod_list))
1552 for name in mod_list:
1553 lu.context.ReaddNode(name)
1554 mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1555 if mc_now > mc_max:
1556 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
1557 (mc_now, mc_max))
1558
1559
1560 def _DecideSelfPromotion(lu, exceptions=None):
1561 """Decide whether I should promote myself as a master candidate.
1562
1563 """
1564 cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
1565 mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
1566 # the new node will increase mc_max with one, so:
1567 mc_should = min(mc_should + 1, cp_size)
1568 return mc_now < mc_should
1569
1570
1571 def _ComputeViolatingInstances(ipolicy, instances):
1572 """Computes a set of instances who violates given ipolicy.
1573
1574 @param ipolicy: The ipolicy to verify
1575 @type instances: object.Instance
1576 @param instances: List of instances to verify
1577 @return: A frozenset of instance names violating the ipolicy
1578
1579 """
1580 return frozenset([inst.name for inst in instances
1581 if _ComputeIPolicyInstanceViolation(ipolicy, inst)])
1582
1583
1584 def _CheckNicsBridgesExist(lu, target_nics, target_node):
1585 """Check that the brigdes needed by a list of nics exist.
1586
1587 """
1588 cluster = lu.cfg.GetClusterInfo()
1589 paramslist = [cluster.SimpleFillNIC(nic.nicparams) for nic in target_nics]
1590 brlist = [params[constants.NIC_LINK] for params in paramslist
1591 if params[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED]
1592 if brlist:
1593 result = lu.rpc.call_bridges_exist(target_node, brlist)
1594 result.Raise("Error checking bridges on destination node '%s'" %
1595 target_node, prereq=True, ecode=errors.ECODE_ENVIRON)
1596
1597
1598 def _CheckInstanceBridgesExist(lu, instance, node=None):
1599 """Check that the brigdes needed by an instance exist.
1600
1601 """
1602 if node is None:
1603 node = instance.primary_node
1604 _CheckNicsBridgesExist(lu, instance.nics, node)
1605
1606
1607 def _CheckOSVariant(os_obj, name):
1608 """Check whether an OS name conforms to the os variants specification.
1609
1610 @type os_obj: L{objects.OS}
1611 @param os_obj: OS object to check
1612 @type name: string
1613 @param name: OS name passed by the user, to check for validity
1614
1615 """
1616 variant = objects.OS.GetVariant(name)
1617 if not os_obj.supported_variants:
1618 if variant:
1619 raise errors.OpPrereqError("OS '%s' doesn't support variants ('%s'"
1620 " passed)" % (os_obj.name, variant),
1621 errors.ECODE_INVAL)
1622 return
1623 if not variant:
1624 raise errors.OpPrereqError("OS name must include a variant",
1625 errors.ECODE_INVAL)
1626
1627 if variant not in os_obj.supported_variants:
1628 raise errors.OpPrereqError("Unsupported OS variant", errors.ECODE_INVAL)
1629
1630
1631 def _GetNodeInstancesInner(cfg, fn):
1632 return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
1633
1634
1635 def _GetNodeInstances(cfg, node_name):
1636 """Returns a list of all primary and secondary instances on a node.
1637
1638 """
1639
1640 return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
1641
1642
1643 def _GetNodePrimaryInstances(cfg, node_name):
1644 """Returns primary instances on a node.
1645
1646 """
1647 return _GetNodeInstancesInner(cfg,
1648 lambda inst: node_name == inst.primary_node)
1649
1650
1651 def _GetNodeSecondaryInstances(cfg, node_name):
1652 """Returns secondary instances on a node.
1653
1654 """
1655 return _GetNodeInstancesInner(cfg,
1656 lambda inst: node_name in inst.secondary_nodes)
1657
1658
1659 def _GetStorageTypeArgs(cfg, storage_type):
1660 """Returns the arguments for a storage type.
1661
1662 """
1663 # Special case for file storage
1664 if storage_type == constants.ST_FILE:
1665 # storage.FileStorage wants a list of storage directories
1666 return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1667
1668 return []
1669
1670
1671 def _FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_name, prereq):
1672 faulty = []
1673
1674 for dev in instance.disks:
1675 cfg.SetDiskID(dev, node_name)
1676
1677 result = rpc_runner.call_blockdev_getmirrorstatus(node_name, (instance.disks,
1678 instance))
1679 result.Raise("Failed to get disk status from node %s" % node_name,
1680 prereq=prereq, ecode=errors.ECODE_ENVIRON)
1681
1682 for idx, bdev_status in enumerate(result.payload):
1683 if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
1684 faulty.append(idx)
1685
1686 return faulty
1687
1688
1689 def _CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
1690 """Check the sanity of iallocator and node arguments and use the
1691 cluster-wide iallocator if appropriate.
1692
1693 Check that at most one of (iallocator, node) is specified. If none is
1694 specified, or the iallocator is L{constants.DEFAULT_IALLOCATOR_SHORTCUT},
1695 then the LU's opcode's iallocator slot is filled with the cluster-wide
1696 default iallocator.
1697
1698 @type iallocator_slot: string
1699 @param iallocator_slot: the name of the opcode iallocator slot
1700 @type node_slot: string
1701 @param node_slot: the name of the opcode target node slot
1702
1703 """
1704 node = getattr(lu.op, node_slot, None)
1705 ialloc = getattr(lu.op, iallocator_slot, None)
1706 if node == []:
1707 node = None
1708
1709 if node is not None and ialloc is not None:
1710 raise errors.OpPrereqError("Do not specify both, iallocator and node",
1711 errors.ECODE_INVAL)
1712 elif ((node is None and ialloc is None) or
1713 ialloc == constants.DEFAULT_IALLOCATOR_SHORTCUT):
1714 default_iallocator = lu.cfg.GetDefaultIAllocator()
1715 if default_iallocator:
1716 setattr(lu.op, iallocator_slot, default_iallocator)
1717 else:
1718 raise errors.OpPrereqError("No iallocator or node given and no"
1719 " cluster-wide default iallocator found;"
1720 " please specify either an iallocator or a"
1721 " node, or set a cluster-wide default"
1722 " iallocator", errors.ECODE_INVAL)
1723
1724
1725 def _GetDefaultIAllocator(cfg, ialloc):
1726 """Decides on which iallocator to use.
1727
1728 @type cfg: L{config.ConfigWriter}
1729 @param cfg: Cluster configuration object
1730 @type ialloc: string or None
1731 @param ialloc: Iallocator specified in opcode
1732 @rtype: string
1733 @return: Iallocator name
1734
1735 """
1736 if not ialloc:
1737 # Use default iallocator
1738 ialloc = cfg.GetDefaultIAllocator()
1739
1740 if not ialloc:
1741 raise errors.OpPrereqError("No iallocator was specified, neither in the"
1742 " opcode nor as a cluster-wide default",
1743 errors.ECODE_INVAL)
1744
1745 return ialloc
1746
1747
1748 def _CheckHostnameSane(lu, name):
1749 """Ensures that a given hostname resolves to a 'sane' name.
1750
1751 The given name is required to be a prefix of the resolved hostname,
1752 to prevent accidental mismatches.
1753
1754 @param lu: the logical unit on behalf of which we're checking
1755 @param name: the name we should resolve and check
1756 @return: the resolved hostname object
1757
1758 """
1759 hostname = netutils.GetHostname(name=name)
1760 if hostname.name != name:
1761 lu.LogInfo("Resolved given name '%s' to '%s'", name, hostname.name)
1762 if not utils.MatchNameComponent(name, [hostname.name]):
1763 raise errors.OpPrereqError(("Resolved hostname '%s' does not look the"
1764 " same as given hostname '%s'") %
1765 (hostname.name, name), errors.ECODE_INVAL)
1766 return hostname
1767
1768
1769 class LUClusterPostInit(LogicalUnit):
1770 """Logical unit for running hooks after cluster initialization.
1771
1772 """
1773 HPATH = "cluster-init"
1774 HTYPE = constants.HTYPE_CLUSTER
1775
1776 def BuildHooksEnv(self):
1777 """Build hooks env.
1778
1779 """
1780 return {
1781 "OP_TARGET": self.cfg.GetClusterName(),
1782 }
1783
1784 def BuildHooksNodes(self):
1785 """Build hooks nodes.
1786
1787 """
1788 return ([], [self.cfg.GetMasterNode()])
1789
1790 def Exec(self, feedback_fn):
1791 """Nothing to do.
1792
1793 """
1794 return True
1795
1796
1797 class LUClusterDestroy(LogicalUnit):
1798 """Logical unit for destroying the cluster.
1799
1800 """
1801 HPATH = "cluster-destroy"
1802 HTYPE = constants.HTYPE_CLUSTER
1803
1804 def BuildHooksEnv(self):
1805 """Build hooks env.
1806
1807 """
1808 return {
1809 "OP_TARGET": self.cfg.GetClusterName(),
1810 }
1811
1812 def BuildHooksNodes(self):
1813 """Build hooks nodes.
1814
1815 """
1816 return ([], [])
1817
1818 def CheckPrereq(self):
1819 """Check prerequisites.
1820
1821 This checks whether the cluster is empty.
1822
1823 Any errors are signaled by raising errors.OpPrereqError.
1824
1825 """
1826 master = self.cfg.GetMasterNode()
1827
1828 nodelist = self.cfg.GetNodeList()
1829 if len(nodelist) != 1 or nodelist[0] != master:
1830 raise errors.OpPrereqError("There are still %d node(s) in"
1831 " this cluster." % (len(nodelist) - 1),
1832 errors.ECODE_INVAL)
1833 instancelist = self.cfg.GetInstanceList()
1834 if instancelist:
1835 raise errors.OpPrereqError("There are still %d instance(s) in"
1836 " this cluster." % len(instancelist),
1837 errors.ECODE_INVAL)
1838
1839 def Exec(self, feedback_fn):
1840 """Destroys the cluster.
1841
1842 """
1843 master_params = self.cfg.GetMasterNetworkParameters()
1844
1845 # Run post hooks on master node before it's removed
1846 _RunPostHook(self, master_params.name)
1847
1848 ems = self.cfg.GetUseExternalMipScript()
1849 result = self.rpc.call_node_deactivate_master_ip(master_params.name,
1850 master_params, ems)
1851 if result.fail_msg:
1852 self.LogWarning("Error disabling the master IP address: %s",
1853 result.fail_msg)
1854
1855 return master_params.name
1856
1857
1858 def _VerifyCertificate(filename):
1859 """Verifies a certificate for L{LUClusterVerifyConfig}.
1860
1861 @type filename: string
1862 @param filename: Path to PEM file
1863
1864 """
1865 try:
1866 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1867 utils.ReadFile(filename))
1868 except Exception, err: # pylint: disable=W0703
1869 return (LUClusterVerifyConfig.ETYPE_ERROR,
1870 "Failed to load X509 certificate %s: %s" % (filename, err))
1871
1872 (errcode, msg) = \
1873 utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1874 constants.SSL_CERT_EXPIRATION_ERROR)
1875
1876 if msg:
1877 fnamemsg = "While verifying %s: %s" % (filename, msg)
1878 else:
1879 fnamemsg = None
1880
1881 if errcode is None:
1882 return (None, fnamemsg)
1883 elif errcode == utils.CERT_WARNING:
1884 return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1885 elif errcode == utils.CERT_ERROR:
1886 return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1887
1888 raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1889
1890
1891 def _GetAllHypervisorParameters(cluster, instances):
1892 """Compute the set of all hypervisor parameters.
1893
1894 @type cluster: L{objects.Cluster}
1895 @param cluster: the cluster object
1896 @param instances: list of L{objects.Instance}
1897 @param instances: additional instances from which to obtain parameters
1898 @rtype: list of (origin, hypervisor, parameters)
1899 @return: a list with all parameters found, indicating the hypervisor they
1900 apply to, and the origin (can be "cluster", "os X", or "instance Y")
1901
1902 """
1903 hvp_data = []
1904
1905 for hv_name in cluster.enabled_hypervisors:
1906 hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1907
1908 for os_name, os_hvp in cluster.os_hvp.items():
1909 for hv_name, hv_params in os_hvp.items():
1910 if hv_params:
1911 full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1912 hvp_data.append(("os %s" % os_name, hv_name, full_params))
1913
1914 # TODO: collapse identical parameter values in a single one
1915 for instance in instances:
1916 if instance.hvparams:
1917 hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1918 cluster.FillHV(instance)))
1919
1920 return hvp_data
1921
1922
1923 class _VerifyErrors(object):
1924 """Mix-in for cluster/group verify LUs.
1925
1926 It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1927 self.op and self._feedback_fn to be available.)
1928
1929 """
1930
1931 ETYPE_FIELD = "code"
1932 ETYPE_ERROR = "ERROR"
1933 ETYPE_WARNING = "WARNING"
1934
1935 def _Error(self, ecode, item, msg, *args, **kwargs):
1936 """Format an error message.
1937
1938 Based on the opcode's error_codes parameter, either format a
1939 parseable error code, or a simpler error string.
1940
1941 This must be called only from Exec and functions called from Exec.
1942
1943 """
1944 ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1945 itype, etxt, _ = ecode
1946 # first complete the msg
1947 if args:
1948 msg = msg % args
1949 # then format the whole message
1950 if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1951 msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1952 else:
1953 if item:
1954 item = " " + item
1955 else:
1956 item = ""
1957 msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1958 # and finally report it via the feedback_fn
1959 self._feedback_fn(" - %s" % msg) # Mix-in. pylint: disable=E1101
1960
1961 def _ErrorIf(self, cond, ecode, *args, **kwargs):
1962 """Log an error message if the passed condition is True.
1963
1964 """
1965 cond = (bool(cond)
1966 or self.op.debug_simulate_errors) # pylint: disable=E1101
1967
1968 # If the error code is in the list of ignored errors, demote the error to a
1969 # warning
1970 (_, etxt, _) = ecode
1971 if etxt in self.op.ignore_errors: # pylint: disable=E1101
1972 kwargs[self.ETYPE_FIELD] = self.ETYPE_WARNING
1973
1974 if cond:
1975 self._Error(ecode, *args, **kwargs)
1976
1977 # do not mark the operation as failed for WARN cases only
1978 if kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR) == self.ETYPE_ERROR:
1979 self.bad = self.bad or cond
1980
1981
1982 class LUClusterVerify(NoHooksLU):
1983 """Submits all jobs necessary to verify the cluster.
1984
1985 """
1986 REQ_BGL = False
1987
1988 def ExpandNames(self):
1989 self.needed_locks = {}
1990
1991 def Exec(self, feedback_fn):
1992 jobs = []
1993
1994 if self.op.group_name:
1995 groups = [self.op.group_name]
1996 depends_fn = lambda: None
1997 else:
1998 groups = self.cfg.GetNodeGroupList()
1999
2000 # Verify global configuration
2001 jobs.append([
2002 opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors)
2003 ])
2004
2005 # Always depend on global verification
2006 depends_fn = lambda: [(-len(jobs), [])]
2007
2008 jobs.extend(
2009 [opcodes.OpClusterVerifyGroup(group_name=group,
2010 ignore_errors=self.op.ignore_errors,
2011 depends=depends_fn())]
2012 for group in groups)
2013
2014 # Fix up all parameters
2015 for op in itertools.chain(*jobs): # pylint: disable=W0142
2016 op.debug_simulate_errors = self.op.debug_simulate_errors
2017 op.verbose = self.op.verbose
2018 op.error_codes = self.op.error_codes
2019 try:
2020 op.skip_checks = self.op.skip_checks
2021 except AttributeError:
2022 assert not isinstance(op, opcodes.OpClusterVerifyGroup)
2023
2024 return ResultWithJobs(jobs)
2025
2026
2027 class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
2028 """Verifies the cluster config.
2029
2030 """
2031 REQ_BGL = False
2032
2033 def _VerifyHVP(self, hvp_data):
2034 """Verifies locally the syntax of the hypervisor parameters.
2035
2036 """
2037 for item, hv_name, hv_params in hvp_data:
2038 msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
2039 (item, hv_name))
2040 try:
2041 hv_class = hypervisor.GetHypervisor(hv_name)
2042 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
2043 hv_class.CheckParameterSyntax(hv_params)
2044 except errors.GenericError, err:
2045 self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
2046
2047 def ExpandNames(self):
2048 self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
2049 self.share_locks = _ShareAll()
2050
2051 def CheckPrereq(self):
2052 """Check prerequisites.
2053
2054 """
2055 # Retrieve all information
2056 self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
2057 self.all_node_info = self.cfg.GetAllNodesInfo()
2058 self.all_inst_info = self.cfg.GetAllInstancesInfo()
2059
2060 def Exec(self, feedback_fn):
2061 """Verify integrity of cluster, performing various test on nodes.
2062
2063 """
2064 self.bad = False
2065 self._feedback_fn = feedback_fn
2066
2067 feedback_fn("* Verifying cluster config")
2068
2069 for msg in self.cfg.VerifyConfig():
2070 self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
2071
2072 feedback_fn("* Verifying cluster certificate files")
2073
2074 for cert_filename in pathutils.ALL_CERT_FILES:
2075 (errcode, msg) = _VerifyCertificate(cert_filename)
2076 self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
2077
2078 feedback_fn("* Verifying hypervisor parameters")
2079
2080 self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
2081 self.all_inst_info.values()))
2082
2083 feedback_fn("* Verifying all nodes belong to an existing group")
2084
2085 # We do this verification here because, should this bogus circumstance
2086 # occur, it would never be caught by VerifyGroup, which only acts on
2087 # nodes/instances reachable from existing node groups.
2088
2089 dangling_nodes = set(node.name for node in self.all_node_info.values()
2090 if node.group not in self.all_group_info)
2091
2092 dangling_instances = {}
2093 no_node_instances = []
2094
2095 for inst in self.all_inst_info.values():
2096 if inst.primary_node in dangling_nodes:
2097 dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
2098 elif inst.primary_node not in self.all_node_info:
2099 no_node_instances.append(inst.name)
2100
2101 pretty_dangling = [
2102 "%s (%s)" %
2103 (node.name,
2104 utils.CommaJoin(dangling_instances.get(node.name,
2105 ["no instances"])))
2106 for node in dangling_nodes]
2107
2108 self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
2109 None,
2110 "the following nodes (and their instances) belong to a non"
2111 " existing group: %s", utils.CommaJoin(pretty_dangling))
2112
2113 self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
2114 None,
2115 "the following instances have a non-existing primary-node:"
2116 " %s", utils.CommaJoin(no_node_instances))
2117
2118 return not self.bad
2119
2120
2121 class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
2122 """Verifies the status of a node group.
2123
2124 """
2125 HPATH = "cluster-verify"
2126 HTYPE = constants.HTYPE_CLUSTER
2127 REQ_BGL = False
2128
2129 _HOOKS_INDENT_RE = re.compile("^", re.M)
2130
2131 class NodeImage(object):
2132 """A class representing the logical and physical status of a node.
2133
2134 @type name: string
2135 @ivar name: the node name to which this object refers
2136 @ivar volumes: a structure as returned from
2137 L{ganeti.backend.GetVolumeList} (runtime)
2138 @ivar instances: a list of running instances (runtime)
2139 @ivar pinst: list of configured primary instances (config)
2140 @ivar sinst: list of configured secondary instances (config)
2141 @ivar sbp: dictionary of {primary-node: list of instances} for all
2142 instances for which this node is secondary (config)
2143 @ivar mfree: free memory, as reported by hypervisor (runtime)
2144 @ivar dfree: free disk, as reported by the node (runtime)
2145 @ivar offline: the offline status (config)
2146 @type rpc_fail: boolean
2147 @ivar rpc_fail: whether the RPC verify call was successfull (overall,
2148 not whether the individual keys were correct) (runtime)
2149 @type lvm_fail: boolean
2150 @ivar lvm_fail: whether the RPC call didn't return valid LVM data
2151 @type hyp_fail: boolean
2152 @ivar hyp_fail: whether the RPC call didn't return the instance list
2153 @type ghost: boolean
2154 @ivar ghost: whether this is a known node or not (config)
2155 @type os_fail: boolean
2156 @ivar os_fail: whether the RPC call didn't return valid OS data
2157 @type oslist: list
2158 @ivar oslist: list of OSes as diagnosed by DiagnoseOS
2159 @type vm_capable: boolean
2160 @ivar vm_capable: whether the node can host instances
2161
2162 """
2163 def __init__(self, offline=False, name=None, vm_capable=True):
2164 self.name = name
2165 self.volumes = {}
2166 self.instances = []
2167 self.pinst = []
2168 self.sinst = []
2169 self.sbp = {}
2170 self.mfree = 0
2171 self.dfree = 0
2172 self.offline = offline
2173 self.vm_capable = vm_capable
2174 self.rpc_fail = False
2175 self.lvm_fail = False
2176 self.hyp_fail = False
2177 self.ghost = False
2178 self.os_fail = False
2179 self.oslist = {}
2180
2181 def ExpandNames(self):
2182 # This raises errors.OpPrereqError on its own:
2183 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
2184
2185 # Get instances in node group; this is unsafe and needs verification later
2186 inst_names = \
2187 self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
2188
2189 self.needed_locks = {
2190 locking.LEVEL_INSTANCE: inst_names,
2191 locking.LEVEL_NODEGROUP: [self.group_uuid],
2192 locking.LEVEL_NODE: [],
2193 }
2194
2195 self.share_locks = _ShareAll()
2196
2197 def DeclareLocks(self, level):
2198 if level == locking.LEVEL_NODE:
2199 # Get members of node group; this is unsafe and needs verification later
2200 nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
2201
2202 all_inst_info = self.cfg.GetAllInstancesInfo()
2203
2204 # In Exec(), we warn about mirrored instances that have primary and
2205 # secondary living in separate node groups. To fully verify that
2206 # volumes for these instances are healthy, we will need to do an
2207 # extra call to their secondaries. We ensure here those nodes will
2208 # be locked.
2209 for inst in self.owned_locks(locking.LEVEL_INSTANCE):
2210 # Important: access only the instances whose lock is owned
2211 if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
2212 nodes.update(all_inst_info[inst].secondary_nodes)
2213
2214 self.needed_locks[locking.LEVEL_NODE] = nodes
2215
2216 def CheckPrereq(self):
2217 assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
2218 self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
2219
2220 group_nodes = set(self.group_info.members)
2221 group_instances = \
2222 self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
2223
2224 unlocked_nodes = \
2225 group_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
2226
2227 unlocked_instances = \
2228 group_instances.difference(self.owned_locks(locking.LEVEL_INSTANCE))
2229
2230 if unlocked_nodes:
2231 raise errors.OpPrereqError("Missing lock for nodes: %s" %
2232 utils.CommaJoin(unlocked_nodes),
2233 errors.ECODE_STATE)
2234
2235 if unlocked_instances:
2236 raise errors.OpPrereqError("Missing lock for instances: %s" %
2237 utils.CommaJoin(unlocked_instances),
2238 errors.ECODE_STATE)
2239
2240 self.all_node_info = self.cfg.GetAllNodesInfo()
2241 self.all_inst_info = self.cfg.GetAllInstancesInfo()
2242
2243 self.my_node_names = utils.NiceSort(group_nodes)
2244 self.my_inst_names = utils.NiceSort(group_instances)
2245
2246 self.my_node_info = dict((name, self.all_node_info[name])
2247 for name in self.my_node_names)
2248
2249 self.my_inst_info = dict((name, self.all_inst_info[name])
2250 for name in self.my_inst_names)
2251
2252 # We detect here the nodes that will need the extra RPC calls for verifying
2253 # split LV volumes; they should be locked.
2254 extra_lv_nodes = set()
2255
2256 for inst in self.my_inst_info.values():
2257 if inst.disk_template in constants.DTS_INT_MIRROR:
2258 for nname in inst.all_nodes:
2259 if self.all_node_info[nname].group != self.group_uuid:
2260 extra_lv_nodes.add(nname)
2261
2262 unlocked_lv_nodes = \
2263 extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
2264
2265 if unlocked_lv_nodes:
2266 raise errors.OpPrereqError("Missing node locks for LV check: %s" %
2267 utils.CommaJoin(unlocked_lv_nodes),
2268 errors.ECODE_STATE)
2269 self.extra_lv_nodes = list(extra_lv_nodes)
2270
2271 def _VerifyNode(self, ninfo, nresult):
2272 """Perform some basic validation on data returned from a node.
2273
2274 - check the result data structure is well formed and has all the
2275 mandatory fields
2276 - check ganeti version
2277
2278 @type ninfo: L{objects.Node}
2279 @param ninfo: the node to check
2280 @param nresult: the results from the node
2281 @rtype: boolean
2282 @return: whether overall this call was successful (and we can expect
2283 reasonable values in the respose)
2284
2285 """
2286 node = ninfo.name
2287 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2288
2289 # main result, nresult should be a non-empty dict
2290 test = not nresult or not isinstance(nresult, dict)
2291 _ErrorIf(test, constants.CV_ENODERPC, node,
2292 "unable to verify node: no data returned")
2293 if test:
2294 return False
2295
2296 # compares ganeti version
2297 local_version = constants.PROTOCOL_VERSION
2298 remote_version = nresult.get("version", None)
2299 test = not (remote_version and
2300 isinstance(remote_version, (list, tuple)) and
2301 len(remote_version) == 2)
2302 _ErrorIf(test, constants.CV_ENODERPC, node,
2303 "connection to node returned invalid data")
2304 if test:
2305 return False
2306
2307 test = local_version != remote_version[0]
2308 _ErrorIf(test, constants.CV_ENODEVERSION, node,
2309 "incompatible protocol versions: master %s,"
2310 " node %s", local_version, remote_version[0])
2311 if test:
2312 return False
2313
2314 # node seems compatible, we can actually try to look into its results
2315
2316 # full package version
2317 self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
2318 constants.CV_ENODEVERSION, node,
2319 "software version mismatch: master %s, node %s",
2320 constants.RELEASE_VERSION, remote_version[1],
2321 code=self.ETYPE_WARNING)
2322
2323 hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
2324 if ninfo.vm_capable and isinstance(hyp_result, dict):
2325 for hv_name, hv_result in hyp_result.iteritems():
2326 test = hv_result is not None
2327 _ErrorIf(test, constants.CV_ENODEHV, node,
2328 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
2329
2330 hvp_result = nresult.get(constants.NV_HVPARAMS, None)
2331 if ninfo.vm_capable and isinstance(hvp_result, list):
2332 for item, hv_name, hv_result in hvp_result:
2333 _ErrorIf(True, constants.CV_ENODEHV, node,
2334 "hypervisor %s parameter verify failure (source %s): %s",
2335 hv_name, item, hv_result)
2336
2337 test = nresult.get(constants.NV_NODESETUP,
2338 ["Missing NODESETUP results"])
2339 _ErrorIf(test, constants.CV_ENODESETUP, node, "node setup error: %s",
2340 "; ".join(test))
2341
2342 return True
2343
2344 def _VerifyNodeTime(self, ninfo, nresult,
2345 nvinfo_starttime, nvinfo_endtime):
2346 """Check the node time.
2347
2348 @type ninfo: L{objects.Node}
2349 @param ninfo: the node to check
2350 @param nresult: the remote results for the node
2351 @param nvinfo_starttime: the start time of the RPC call
2352 @param nvinfo_endtime: the end time of the RPC call
2353
2354 """
2355 node = ninfo.name
2356 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2357
2358 ntime = nresult.get(constants.NV_TIME, None)
2359 try:
2360 ntime_merged = utils.MergeTime(ntime)
2361 except (ValueError, TypeError):
2362 _ErrorIf(True, constants.CV_ENODETIME, node, "Node returned invalid time")
2363 return
2364
2365 if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
2366 ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
2367 elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
2368 ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
2369 else:
2370 ntime_diff = None
2371
2372 _ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, node,
2373 "Node time diverges by at least %s from master node time",
2374 ntime_diff)
2375
2376 def _VerifyNodeLVM(self, ninfo, nresult, vg_name):
2377 """Check the node LVM results.
2378
2379 @type ninfo: L{objects.Node}
2380 @param ninfo: the node to check
2381 @param nresult: the remote results for the node
2382 @param vg_name: the configured VG name
2383
2384 """
2385 if vg_name is None:
2386 return
2387
2388 node = ninfo.name
2389 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2390
2391 # checks vg existence and size > 20G
2392 vglist = nresult.get(constants.NV_VGLIST, None)
2393 test = not vglist
2394 _ErrorIf(test, constants.CV_ENODELVM, node, "unable to check volume groups")
2395 if not test:
2396 vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
2397 constants.MIN_VG_SIZE)
2398 _ErrorIf(vgstatus, constants.CV_ENODELVM, node, vgstatus)
2399
2400 # check pv names
2401 pvlist = nresult.get(constants.NV_PVLIST, None)
2402 test = pvlist is None
2403 _ErrorIf(test, constants.CV_ENODELVM, node, "Can't get PV list from node")
2404 if not test:
2405 # check that ':' is not present in PV names, since it's a
2406 # special character for lvcreate (denotes the range of PEs to
2407 # use on the PV)
2408 for _, pvname, owner_vg in pvlist:
2409 test = ":" in pvname
2410 _ErrorIf(test, constants.CV_ENODELVM, node,
2411 "Invalid character ':' in PV '%s' of VG '%s'",
2412 pvname, owner_vg)
2413
2414 def _VerifyNodeBridges(self, ninfo, nresult, bridges):
2415 """Check the node bridges.
2416
2417 @type ninfo: L{objects.Node}
2418 @param ninfo: the node to check
2419 @param nresult: the remote results for the node
2420 @param bridges: the expected list of bridges
2421
2422 """
2423 if not bridges:
2424 return
2425
2426 node = ninfo.name
2427 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2428
2429 missing = nresult.get(constants.NV_BRIDGES, None)
2430 test = not isinstance(missing, list)
2431 _ErrorIf(test, constants.CV_ENODENET, node,
2432 "did not return valid bridge information")
2433 if not test:
2434 _ErrorIf(bool(missing), constants.CV_ENODENET, node,
2435 "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
2436
2437 def _VerifyNodeUserScripts(self, ninfo, nresult):
2438 """Check the results of user scripts presence and executability on the node
2439
2440 @type ninfo: L{objects.Node}
2441 @param ninfo: the node to check
2442 @param nresult: the remote results for the node
2443
2444 """
2445 node = ninfo.name
2446
2447 test = not constants.NV_USERSCRIPTS in nresult
2448 self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, node,
2449 "did not return user scripts information")
2450
2451 broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
2452 if not test:
2453 self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, node,
2454 "user scripts not present or not executable: %s" %
2455 utils.CommaJoin(sorted(broken_scripts)))
2456
2457 def _VerifyNodeNetwork(self, ninfo, nresult):
2458 """Check the node network connectivity results.
2459
2460 @type ninfo: L{objects.Node}
2461 @param ninfo: the node to check
2462 @param nresult: the remote results for the node
2463
2464 """
2465 node = ninfo.name
2466 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2467
2468 test = constants.NV_NODELIST not in nresult
2469 _ErrorIf(test, constants.CV_ENODESSH, node,
2470 "node hasn't returned node ssh connectivity data")
2471 if not test:
2472 if nresult[constants.NV_NODELIST]:
2473 for a_node, a_msg in nresult[constants.NV_NODELIST].items():
2474 _ErrorIf(True, constants.CV_ENODESSH, node,
2475 "ssh communication with node '%s': %s", a_node, a_msg)
2476
2477 test = constants.NV_NODENETTEST not in nresult
2478 _ErrorIf(test, constants.CV_ENODENET, node,
2479 "node hasn't returned node tcp connectivity data")
2480 if not test:
2481 if nresult[constants.NV_NODENETTEST]:
2482 nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
2483 for anode in nlist:
2484 _ErrorIf(True, constants.CV_ENODENET, node,
2485 "tcp communication with node '%s': %s",
2486 anode, nresult[constants.NV_NODENETTEST][anode])
2487
2488 test = constants.NV_MASTERIP not in nresult
2489 _ErrorIf(test, constants.CV_ENODENET, node,
2490 "node hasn't returned node master IP reachability data")
2491 if not test:
2492 if not nresult[constants.NV_MASTERIP]:
2493 if node == self.master_node:
2494 msg = "the master node cannot reach the master IP (not configured?)"
2495 else:
2496 msg = "cannot reach the master IP"
2497 _ErrorIf(True, constants.CV_ENODENET, node, msg)
2498
2499 def _VerifyInstance(self, instance, instanceconfig, node_image,
2500 diskstatus):
2501 """Verify an instance.
2502
2503 This function checks to see if the required block devices are
2504 available on the instance's node.
2505
2506 """
2507 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2508 node_current = instanceconfig.primary_node
2509
2510 node_vol_should = {}
2511 instanceconfig.MapLVsByNode(node_vol_should)
2512
2513 cluster = self.cfg.GetClusterInfo()
2514 ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2515 self.group_info)
2516 err = _ComputeIPolicyInstanceViolation(ipolicy, instanceconfig)
2517 _ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance, utils.CommaJoin(err))
2518
2519 for node in node_vol_should:
2520 n_img = node_image[node]
2521 if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2522 # ignore missing volumes on offline or broken nodes
2523 continue
2524 for volume in node_vol_should[node]:
2525 test = volume not in n_img.volumes
2526 _ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance,
2527 "volume %s missing on node %s", volume, node)
2528
2529 if instanceconfig.admin_state == constants.ADMINST_UP:
2530 pri_img = node_image[node_current]
2531 test = instance not in pri_img.instances and not pri_img.offline
2532 _ErrorIf(test, constants.CV_EINSTANCEDOWN, instance,
2533 "instance not running on its primary node %s",
2534 node_current)
2535
2536 diskdata = [(nname, success, status, idx)
2537 for (nname, disks) in diskstatus.items()
2538 for idx, (success, status) in enumerate(disks)]
2539
2540 for nname, success, bdev_status, idx in diskdata:
2541 # the 'ghost node' construction in Exec() ensures that we have a
2542 # node here
2543 snode = node_image[nname]
2544 bad_snode = snode.ghost or snode.offline
2545 _ErrorIf(instanceconfig.admin_state == constants.ADMINST_UP and
2546 not success and not bad_snode,
2547 constants.CV_EINSTANCEFAULTYDISK, instance,
2548 "couldn't retrieve status for disk/%s on %s: %s",
2549 idx, nname, bdev_status)
2550 _ErrorIf((instanceconfig.admin_state == constants.ADMINST_UP and
2551 success and bdev_status.ldisk_status == constants.LDS_FAULTY),
2552 constants.CV_EINSTANCEFAULTYDISK, instance,
2553 "disk/%s on %s is faulty", idx, nname)
2554
2555 def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2556 """Verify if there are any unknown volumes in the cluster.
2557
2558 The .os, .swap and backup volumes are ignored. All other volumes are
2559 reported as unknown.
2560
2561 @type reserved: L{ganeti.utils.FieldSet}
2562 @param reserved: a FieldSet of reserved volume names
2563
2564 """
2565 for node, n_img in node_image.items():
2566 if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2567 self.all_node_info[node].group != self.group_uuid):
2568 # skip non-healthy nodes
2569 continue
2570 for volume in n_img.volumes:
2571 test = ((node not in node_vol_should or
2572 volume not in node_vol_should[node]) and
2573 not reserved.Matches(volume))
2574 self._ErrorIf(test, constants.CV_ENODEORPHANLV, node,
2575 "volume %s is unknown", volume)
2576
2577 def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
2578 """Verify N+1 Memory Resilience.
2579
2580 Check that if one single node dies we can still start all the
2581 instances it was primary for.
2582
2583 """
2584 cluster_info = self.cfg.GetClusterInfo()
2585 for node, n_img in node_image.items():
2586 # This code checks that every node which is now listed as
2587 # secondary has enough memory to host all instances it is
2588 # supposed to should a single other node in the cluster fail.
2589 # FIXME: not ready for failover to an arbitrary node
2590 # FIXME: does not support file-backed instances
2591 # WARNING: we currently take into account down instances as well
2592 # as up ones, considering that even if they're down someone
2593 # might want to start them even in the event of a node failure.
2594 if n_img.offline or self.all_node_info[node].group != self.group_uuid:
2595 # we're skipping nodes marked offline and nodes in other groups from
2596 # the N+1 warning, since most likely we don't have good memory
2597 # infromation from them; we already list instances living on such
2598 # nodes, and that's enough warning
2599 continue
2600 #TODO(dynmem): also consider ballooning out other instances
2601 for prinode, instances in n_img.sbp.items():
2602 needed_mem = 0
2603 for instance in instances:
2604 bep = cluster_info.FillBE(instance_cfg[instance])
2605 if bep[constants.BE_AUTO_BALANCE]:
2606 needed_mem += bep[constants.BE_MINMEM]
2607 test = n_img.mfree < needed_mem
2608 self._ErrorIf(test, constants.CV_ENODEN1, node,
2609 "not enough memory to accomodate instance failovers"
2610 " should node %s fail (%dMiB needed, %dMiB available)",
2611 prinode, needed_mem, n_img.mfree)
2612
2613 @classmethod
2614 def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
2615 (files_all, files_opt, files_mc, files_vm)):
2616 """Verifies file checksums collected from all nodes.
2617
2618 @param errorif: Callback for reporting errors
2619 @param nodeinfo: List of L{objects.Node} objects
2620 @param master_node: Name of master node
2621 @param all_nvinfo: RPC results
2622
2623 """
2624 # Define functions determining which nodes to consider for a file
2625 files2nodefn = [
2626 (files_all, None),
2627 (files_mc, lambda node: (node.master_candidate or
2628 node.name == master_node)),
2629 (files_vm, lambda node: node.vm_capable),
2630 ]
2631
2632 # Build mapping from filename to list of nodes which should have the file
2633 nodefiles = {}
2634 for (files, fn) in files2nodefn:
2635 if fn is None:
2636 filenodes = nodeinfo
2637 else:
2638 filenodes = filter(fn, nodeinfo)
2639 nodefiles.update((filename,
2640 frozenset(map(operator.attrgetter("name"), filenodes)))
2641 for filename in files)
2642
2643 assert set(nodefiles) == (files_all | files_mc | files_vm)
2644
2645 fileinfo = dict((filename, {}) for filename in nodefiles)
2646 ignore_nodes = set()
2647
2648 for node in nodeinfo:
2649 if node.offline:
2650 ignore_nodes.add(node.name)
2651 continue
2652
2653 nresult = all_nvinfo[node.name]
2654
2655 if nresult.fail_msg or not nresult.payload:
2656 node_files = None
2657 else:
2658 fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2659 node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2660 for (key, value) in fingerprints.items())
2661 del fingerprints
2662
2663 test = not (node_files and isinstance(node_files, dict))
2664 errorif(test, constants.CV_ENODEFILECHECK, node.name,
2665 "Node did not return file checksum data")
2666 if test:
2667 ignore_nodes.add(node.name)
2668 continue
2669
2670 # Build per-checksum mapping from filename to nodes having it
2671 for (filename, checksum) in node_files.items():
2672 assert filename in nodefiles
2673 fileinfo[filename].setdefault(checksum, set()).add(node.name)
2674
2675 for (filename, checksums) in fileinfo.items():
2676 assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2677
2678 # Nodes having the file
2679 with_file = frozenset(node_name
2680 for nodes in fileinfo[filename].values()
2681 for node_name in nodes) - ignore_nodes
2682
2683 expected_nodes = nodefiles[filename] - ignore_nodes
2684
2685 # Nodes missing file
2686 missing_file = expected_nodes - with_file
2687
2688 if filename in files_opt:
2689 # All or no nodes
2690 errorif(missing_file and missing_file != expected_nodes,
2691 constants.CV_ECLUSTERFILECHECK, None,
2692 "File %s is optional, but it must exist on all or no"
2693 " nodes (not found on %s)",
2694 filename, utils.CommaJoin(utils.NiceSort(missing_file)))
2695 else:
2696 errorif(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2697 "File %s is missing from node(s) %s", filename,
2698 utils.CommaJoin(utils.NiceSort(missing_file)))
2699
2700 # Warn if a node has a file it shouldn't
2701 unexpected = with_file - expected_nodes
2702 errorif(unexpected,
2703 constants.CV_ECLUSTERFILECHECK, None,
2704 "File %s should not exist on node(s) %s",
2705 filename, utils.CommaJoin(utils.NiceSort(unexpected)))
2706
2707 # See if there are multiple versions of the file
2708 test = len(checksums) > 1
2709 if test:
2710 variants = ["variant %s on %s" %
2711 (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
2712 for (idx, (checksum, nodes)) in
2713 enumerate(sorted(checksums.items()))]
2714 else:
2715 variants = []
2716
2717 errorif(test, constants.CV_ECLUSTERFILECHECK, None,
2718 "File %s found with %s different checksums (%s)",
2719 filename, len(checksums), "; ".join(variants))
2720
2721 def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2722 drbd_map):
2723 """Verifies and the node DRBD status.
2724
2725 @type ninfo: L{objects.Node}
2726 @param ninfo: the node to check
2727 @param nresult: the remote results for the node
2728 @param instanceinfo: the dict of instances
2729 @param drbd_helper: the configured DRBD usermode helper
2730 @param drbd_map: the DRBD map as returned by
2731 L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2732
2733 """
2734 node = ninfo.name
2735 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2736
2737 if drbd_helper:
2738 helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2739 test = (helper_result is None)
2740 _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2741 "no drbd usermode helper returned")
2742 if helper_result:
2743 status, payload = helper_result
2744 test = not status
2745 _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2746 "drbd usermode helper check unsuccessful: %s", payload)
2747 test = status and (payload != drbd_helper)
2748 _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2749 "wrong drbd usermode helper: %s", payload)
2750
2751 # compute the DRBD minors
2752 node_drbd = {}
2753 for minor, instance in drbd_map[node].items():
2754 test = instance not in instanceinfo
2755 _ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2756 "ghost instance '%s' in temporary DRBD map", instance)
2757 # ghost instance should not be running, but otherwise we
2758 # don't give double warnings (both ghost instance and
2759 # unallocated minor in use)
2760 if test:
2761 node_drbd[minor] = (instance, False)
2762 else:
2763 instance = instanceinfo[instance]
2764 node_drbd[minor] = (instance.name,
2765 instance.admin_state == constants.ADMINST_UP)
2766
2767 # and now check them
2768 used_minors = nresult.get(constants.NV_DRBDLIST, [])
2769 test = not isinstance(used_minors, (tuple, list))
2770 _ErrorIf(test, constants.CV_ENODEDRBD, node,
2771 "cannot parse drbd status file: %s", str(used_minors))
2772 if test:
2773 # we cannot check drbd status
2774 return
2775
2776 for minor, (iname, must_exist) in node_drbd.items():
2777 test = minor not in used_minors and must_exist
2778 _ErrorIf(test, constants.CV_ENODEDRBD, node,
2779 "drbd minor %d of instance %s is not active", minor, iname)
2780 for minor in used_minors:
2781 test = minor not in node_drbd
2782 _ErrorIf(test, constants.CV_ENODEDRBD, node,
2783 "unallocated drbd minor %d is in use", minor)
2784
2785 def _UpdateNodeOS(self, ninfo, nresult, nimg):
2786 """Builds the node OS structures.
2787
2788 @type ninfo: L{objects.Node}
2789 @param ninfo: the node to check
2790 @param nresult: the remote results for the node
2791 @param nimg: the node image object
2792
2793 """
2794 node = ninfo.name
2795 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2796
2797 remote_os = nresult.get(constants.NV_OSLIST, None)
2798 test = (not isinstance(remote_os, list) or
2799 not compat.all(isinstance(v, list) and len(v) == 7
2800 for v in remote_os))
2801
2802 _ErrorIf(test, constants.CV_ENODEOS, node,
2803 "node hasn't returned valid OS data")
2804
2805 nimg.os_fail = test
2806
2807 if test:
2808 return
2809
2810 os_dict = {}
2811
2812 for (name, os_path, status, diagnose,
2813 variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2814
2815 if name not in os_dict:
2816 os_dict[name] = []
2817
2818 # parameters is a list of lists instead of list of tuples due to
2819 # JSON lacking a real tuple type, fix it:
2820 parameters = [tuple(v) for v in parameters]
2821 os_dict[name].append((os_path, status, diagnose,
2822 set(variants), set(parameters), set(api_ver)))
2823
2824 nimg.oslist = os_dict
2825
2826 def _VerifyNodeOS(self, ninfo, nimg, base):
2827 """Verifies the node OS list.
2828
2829 @type ninfo: L{objects.Node}
2830 @param ninfo: the node to check
2831 @param nimg: the node image object
2832 @param base: the 'template' node we match against (e.g. from the master)
2833
2834 """
2835 node = ninfo.name
2836 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2837
2838 assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2839
2840 beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2841 for os_name, os_data in nimg.oslist.items():
2842 assert os_data, "Empty OS status for OS %s?!" % os_name
2843 f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2844 _ErrorIf(not f_status, constants.CV_ENODEOS, node,
2845 "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
2846 _ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, node,
2847 "OS '%s' has multiple entries (first one shadows the rest): %s",
2848 os_name, utils.CommaJoin([v[0] for v in os_data]))
2849 # comparisons with the 'base' image
2850 test = os_name not in base.oslist
2851 _ErrorIf(test, constants.CV_ENODEOS, node,
2852 "Extra OS %s not present on reference node (%s)",
2853 os_name, base.name)
2854 if test:
2855 continue
2856 assert base.oslist[os_name], "Base node has empty OS status?"
2857 _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2858 if not b_status:
2859 # base OS is invalid, skipping
2860 continue
2861 for kind, a, b in [("API version", f_api, b_api),
2862 ("variants list", f_var, b_var),
2863 ("parameters", beautify_params(f_param),
2864 beautify_params(b_param))]:
2865 _ErrorIf(a != b, constants.CV_ENODEOS, node,
2866 "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
2867 kind, os_name, base.name,
2868 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2869
2870 # check any missing OSes
2871 missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2872 _ErrorIf(missing, constants.CV_ENODEOS, node,
2873 "OSes present on reference node %s but missing on this node: %s",
2874 base.name, utils.CommaJoin(missing))
2875
2876 def _VerifyFileStoragePaths(self, ninfo, nresult, is_master):
2877 """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2878
2879 @type ninfo: L{objects.Node}
2880 @param ninfo: the node to check
2881 @param nresult: the remote results for the node
2882 @type is_master: bool
2883 @param is_master: Whether node is the master node
2884
2885 """
2886 node = ninfo.name
2887
2888 if (is_master and
2889 (constants.ENABLE_FILE_STORAGE or
2890 constants.ENABLE_SHARED_FILE_STORAGE)):
2891 try:
2892 fspaths = nresult[constants.NV_FILE_STORAGE_PATHS]
2893 except KeyError:
2894 # This should never happen
2895 self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, node,
2896 "Node did not return forbidden file storage paths")
2897 else:
2898 self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, node,
2899 "Found forbidden file storage paths: %s",
2900 utils.CommaJoin(fspaths))
2901 else:
2902 self._ErrorIf(constants.NV_FILE_STORAGE_PATHS in nresult,
2903 constants.CV_ENODEFILESTORAGEPATHS, node,
2904 "Node should not have returned forbidden file storage"
2905 " paths")
2906
2907 def _VerifyOob(self, ninfo, nresult):
2908 """Verifies out of band functionality of a node.
2909
2910 @type ninfo: L{objects.Node}
2911 @param ninfo: the node to check
2912 @param nresult: the remote results for the node
2913
2914 """
2915 node = ninfo.name
2916 # We just have to verify the paths on master and/or master candidates
2917 # as the oob helper is invoked on the master
2918 if ((ninfo.master_candidate or ninfo.master_capable) and
2919 constants.NV_OOB_PATHS in nresult):
2920 for path_result in nresult[constants.NV_OOB_PATHS]:
2921 self._ErrorIf(path_result, constants.CV_ENODEOOBPATH, node, path_result)
2922
2923 def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2924 """Verifies and updates the node volume data.
2925
2926 This function will update a L{NodeImage}'s internal structures
2927 with data from the remote call.
2928
2929 @type ninfo: L{objects.Node}
2930 @param ninfo: the node to check
2931 @param nresult: the remote results for the node
2932 @param nimg: the node image object
2933 @param vg_name: the configured VG name
2934
2935 """
2936 node = ninfo.name
2937 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2938
2939 nimg.lvm_fail = True
2940 lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2941 if vg_name is None:
2942 pass
2943 elif isinstance(lvdata, basestring):
2944 _ErrorIf(True, constants.CV_ENODELVM, node, "LVM problem on node: %s",
2945 utils.SafeEncode(lvdata))
2946 elif not isinstance(lvdata, dict):
2947 _ErrorIf(True, constants.CV_ENODELVM, node,
2948 "rpc call to node failed (lvlist)")
2949 else:
2950 nimg.volumes = lvdata
2951 nimg.lvm_fail = False
2952
2953 def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2954 """Verifies and updates the node instance list.
2955
2956 If the listing was successful, then updates this node's instance
2957 list. Otherwise, it marks the RPC call as failed for the instance
2958 list key.
2959
2960 @type ninfo: L{objects.Node}
2961 @param ninfo: the node to check
2962 @param nresult: the remote results for the node
2963 @param nimg: the node image object
2964
2965 """
2966 idata = nresult.get(constants.NV_INSTANCELIST, None)
2967 test = not isinstance(idata, list)
2968 self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2969 "rpc call to node failed (instancelist): %s",
2970 utils.SafeEncode(str(idata)))
2971 if test:
2972 nimg.hyp_fail = True
2973 else:
2974 nimg.instances = idata
2975
2976 def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2977 """Verifies and computes a node information map
2978
2979 @type ninfo: L{objects.Node}
2980 @param ninfo: the node to check
2981 @param nresult: the remote results for the node
2982 @param nimg: the node image object
2983 @param vg_name: the configured VG name
2984
2985 """
2986 node = ninfo.name
2987 _ErrorIf = self._ErrorIf # pylint: disable=C0103
2988
2989 # try to read free memory (from the hypervisor)
2990 hv_info = nresult.get(constants.NV_HVINFO, None)
2991 test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2992 _ErrorIf(test, constants.CV_ENODEHV, node,
2993 "rpc call to node failed (hvinfo)")
2994 if not test:
2995 try:
2996 nimg.mfree = int(hv_info["memory_free"])
2997 except (ValueError, TypeError):
2998 _ErrorIf(True, constants.CV_ENODERPC, node,
2999 "node returned invalid nodeinfo, check hypervisor")
3000
3001 # FIXME: devise a free space model for file based instances as well
3002 if vg_name is not None:
3003 test = (constants.NV_VGLIST not in nresult or
3004 vg_name not in nresult[constants.NV_VGLIST])
3005 _ErrorIf(test, constants.CV_ENODELVM, node,
3006 "node didn't return data for the volume group '%s'"
3007 " - it is either missing or broken", vg_name)
3008 if not test:
3009 try:
3010 nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
3011 except (ValueError, TypeError):
3012 _ErrorIf(True, constants.CV_ENODERPC, node,
3013 "node returned invalid LVM info, check LVM status")
3014
3015 def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
3016 """Gets per-disk status information for all instances.
3017
3018 @type nodelist: list of strings
3019 @param nodelist: Node names
3020 @type node_image: dict of (name, L{objects.Node})
3021 @param node_image: Node objects
3022 @type instanceinfo: dict of (name, L{objects.Instance})
3023 @param instanceinfo: Instance objects
3024 @rtype: {instance: {node: [(succes, payload)]}}
3025 @return: a dictionary of per-instance dictionaries with nodes as
3026 keys and disk information as values; the disk information is a
3027 list of tuples (success, payload)
3028
3029 """
3030 _ErrorIf = self._ErrorIf # pylint: disable=C0103
3031
3032 node_disks = {}
3033 node_disks_devonly = {}
3034 diskless_instances = set()
3035 diskless = constants.DT_DISKLESS
3036
3037 for nname in nodelist:
3038 node_instances = list(itertools.chain(node_image[nname].pinst,
3039 node_image[nname].sinst))
3040 diskless_instances.update(inst for inst in node_instances
3041 if instanceinfo[inst].disk_template == diskless)
3042 disks = [(inst, disk)
3043 for inst in node_instances
3044 for disk in instanceinfo[inst].disks]
3045
3046 if not disks:
3047 # No need to collect data
3048 continue
3049
3050 node_disks[nname] = disks
3051
3052 # _AnnotateDiskParams makes already copies of the disks
3053 devonly = []
3054 for (inst, dev) in disks:
3055 (anno_disk,) = _AnnotateDiskParams(instanceinfo[inst], [dev], self.cfg)
3056 self.cfg.SetDiskID(anno_disk, nname)
3057 devonly.append(anno_disk)
3058
3059 node_disks_devonly[nname] = devonly
3060
3061 assert len(node_disks) == len(node_disks_devonly)
3062
3063 # Collect data from all nodes with disks
3064 result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
3065 node_disks_devonly)
3066
3067 assert len(result) == len(node_disks)
3068
3069 instdisk = {}
3070
3071 for (nname, nres) in result.items():
3072 disks = node_disks[nname]
3073
3074 if nres.offline:
3075 # No data from this node
3076 data = len(disks) * [(False, "node offline")]
3077 else:
3078 msg = nres.fail_msg
3079 _ErrorIf(msg, constants.CV_ENODERPC, nname,
3080 "while getting disk information: %s", msg)
3081 if msg:
3082 # No data from this node
3083 data = len(disks) * [(False, msg)]
3084 else:
3085 data = []
3086 for idx, i in enumerate(nres.payload):
3087 if isinstance(i, (tuple, list)) and len(i) == 2:
3088 data.append(i)
3089 else:
3090 logging.warning("Invalid result from node %s, entry %d: %s",
3091 nname, idx, i)
3092 data.append((False, "Invalid result from the remote node"))
3093
3094 for ((inst, _), status) in zip(disks, data):
3095 instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
3096
3097 # Add empty entries for diskless instances.
3098 for inst in diskless_instances:
3099 assert inst not in instdisk
3100 instdisk[inst] = {}
3101
3102 assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
3103 len(nnames) <= len(instanceinfo[inst].all_nodes) and
3104 compat.all(isinstance(s, (tuple, list)) and
3105 len(s) == 2 for s in statuses)
3106 for inst, nnames in instdisk.items()
3107 for nname, statuses in nnames.items())
3108 assert set(instdisk) == set(instanceinfo), "instdisk consistency failure"
3109
3110 return instdisk
3111
3112 @staticmethod
3113 def _SshNodeSelector(group_uuid, all_nodes):
3114 """Create endless iterators for all potential SSH check hosts.
3115
3116 """
3117 nodes = [node for node in all_nodes
3118 if (node.group != group_uuid and
3119 not node.offline)]
3120 keyfunc = operator.attrgetter("group")
3121
3122 return map(itertools.cycle,
3123 [sorted(map(operator.attrgetter("name"), names))
3124 for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
3125 keyfunc)])
3126
3127 @classmethod
3128 def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
3129 """Choose which nodes should talk to which other nodes.
3130
3131 We will make nodes contact all nodes in their group, and one node from
3132 every other group.
3133
3134 @warning: This algorithm has a known issue if one node group is much
3135 smaller than others (e.g. just one node). In such a case all other
3136 nodes will talk to the single node.
3137
3138 """
3139 online_nodes = sorted(node.name for node in group_nodes if not node.offline)
3140 sel = cls._SshNodeSelector(group_uuid, all_nodes)
3141
3142 return (online_nodes,
3143 dict((name, sorted([i.next() for i in sel]))
3144 for name in online_nodes))
3145
3146 def BuildHooksEnv(self):
3147 """Build hooks env.
3148
3149 Cluster-Verify hooks just ran in the post phase and their failure makes
3150 the output be logged in the verify output and the verification to fail.
3151
3152 """
3153 env = {
3154 "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags())
3155 }
3156
3157 env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
3158 for node in self.my_node_info.values())
3159
3160 return env
3161
3162 def BuildHooksNodes(self):
3163 """Build hooks nodes.
3164
3165 """
3166 return ([], self.my_node_names)
3167
3168 def Exec(self, feedback_fn):
3169 """Verify integrity of the node group, performing various test on nodes.
3170
3171 """
3172 # This method has too many local variables. pylint: disable=R0914
3173 feedback_fn("* Verifying group '%s'" % self.group_info.name)
3174
3175 if not self.my_node_names:
3176 # empty node group
3177 feedback_fn("* Empty node group, skipping verification")
3178 return True
3179
3180 self.bad = False
3181 _ErrorIf = self._ErrorIf # pylint: disable=C0103
3182 verbose = self.op.verbose
3183 self._feedback_fn = feedback_fn
3184
3185 vg_name = self.cfg.GetVGName()
3186 drbd_helper = self.cfg.GetDRBDHelper()
3187 cluster = self.cfg.GetClusterInfo()
3188 groupinfo = self.cfg.GetAllNodeGroupsInfo()
3189 hypervisors = cluster.enabled_hypervisors
3190 node_data_list = [self.my_node_info[name] for name in self.my_node_names]
3191
3192 i_non_redundant = [] # Non redundant instances
3193 i_non_a_balanced = [] # Non auto-balanced instances
3194 i_offline = 0 # Count of offline instances
3195 n_offline = 0 # Count of offline nodes
3196 n_drained = 0 # Count of nodes being drained
3197 node_vol_should = {}
3198
3199 # FIXME: verify OS list
3200
3201 # File verification
3202 filemap = _ComputeAncillaryFiles(cluster, False)
3203
3204 # do local checksums
3205 master_node = self.master_node = self.cfg.GetMasterNode()
3206 master_ip = self.cfg.GetMasterIP()
3207
3208 feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
3209
3210 user_scripts = []
3211 if self.cfg.GetUseExternalMipScript():
3212 user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
3213
3214 node_verify_param = {
3215 constants.NV_FILELIST:
3216 map(vcluster.MakeVirtualPath,
3217 utils.UniqueSequence(filename
3218 for files in filemap
3219 for filename in files)),
3220 constants.NV_NODELIST:
3221 self._SelectSshCheckNodes(node_data_list, self.group_uuid,
3222 self.all_node_info.values()),
3223 constants.NV_HYPERVISOR: hypervisors,
3224 constants.NV_HVPARAMS:
3225 _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
3226 constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
3227 for node in node_data_list
3228 if not node.offline],
3229 constants.NV_INSTANCELIST: hypervisors,
3230 constants.NV_VERSION: None,
3231 constants.NV_HVINFO: self.cfg.GetHypervisorType(),
3232 constants.NV_NODESETUP: None,
3233 constants.NV_TIME: None,
3234 constants.NV_MASTERIP: (master_node, master_ip),
3235 constants.NV_OSLIST: None,
3236 constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
3237 constants.NV_USERSCRIPTS: user_scripts,
3238 }
3239
3240 if vg_name is not None:
3241 node_verify_param[constants.NV_VGLIST] = None
3242 node_verify_param[constants.NV_LVLIST] = vg_name
3243 node_verify_param[constants.NV_PVLIST] = [vg_name]
3244
3245 if drbd_helper:
3246 node_verify_param[constants.NV_DRBDLIST] = None
3247 node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
3248
3249 if constants.ENABLE_FILE_STORAGE or constants.ENABLE_SHARED_FILE_STORAGE:
3250 # Load file storage paths only from master node
3251 node_verify_param[constants.NV_FILE_STORAGE_PATHS] = master_node
3252
3253 # bridge checks
3254 # FIXME: this needs to be changed per node-group, not cluster-wide
3255 bridges = set()
3256 default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
3257 if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3258 bridges.add(default_nicpp[constants.NIC_LINK])
3259 for instance in self.my_inst_info.values():
3260 for nic in instance.nics:
3261 full_nic = cluster.SimpleFillNIC(nic.nicparams)
3262 if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3263 bridges.add(full_nic[constants.NIC_LINK])
3264
3265 if bridges:
3266 node_verify_param[constants.NV_BRIDGES] = list(bridges)
3267
3268 # Build our expected cluster state
3269 node_image = dict((node.name, self.NodeImage(offline=node.offline,
3270 name=node.name,
3271 vm_capable=node.vm_capable))
3272 for node in node_data_list)
3273
3274 # Gather OOB paths
3275 oob_paths = []
3276 for node in self.all_node_info.values():
3277 path = _SupportsOob(self.cfg, node)
3278 if path and path not in oob_paths:
3279 oob_paths.append(path)
3280
3281 if oob_paths:
3282 node_verify_param[constants.NV_OOB_PATHS] = oob_paths
3283
3284 for instance in self.my_inst_names:
3285 inst_config = self.my_inst_info[instance]
3286 if inst_config.admin_state == constants.ADMINST_OFFLINE:
3287 i_offline += 1
3288
3289 for nname in inst_config.all_nodes:
3290 if nname not in node_image:
3291 gnode = self.NodeImage(name=nname)
3292 gnode.ghost = (nname not in self.all_node_info)
3293 node_image[nname] = gnode
3294
3295 inst_config.MapLVsByNode(node_vol_should)
3296
3297 pnode = inst_config.primary_node
3298 node_image[pnode].pinst.append(instance)
3299
3300 for snode in inst_config.secondary_nodes:
3301 nimg = node_image[snode]
3302 nimg.sinst.append(instance)
3303 if pnode not in nimg.sbp:
3304 nimg.sbp[pnode] = []
3305 nimg.sbp[pnode].append(instance)
3306
3307 # At this point, we have the in-memory data structures complete,
3308 # except for the runtime information, which we'll gather next
3309
3310 # Due to the way our RPC system works, exact response times cannot be
3311 # guaranteed (e.g. a broken node could run into a timeout). By keeping the
3312 # time before and after executing the request, we can at least have a time
3313 # window.
3314 nvinfo_starttime = time.time()
3315 all_nvinfo = self.rpc.call_node_verify(self.my_node_names,
3316 node_verify_param,
3317 self.cfg.GetClusterName())
3318 nvinfo_endtime = time.time()
3319
3320 if self.extra_lv_nodes and vg_name is not None:
3321 extra_lv_nvinfo = \
3322 self.rpc.call_node_verify(self.extra_lv_nodes,
3323 {constants.NV_LVLIST: vg_name},
3324 self.cfg.GetClusterName())
3325 else:
3326 extra_lv_nvinfo = {}
3327
3328 all_drbd_map = self.cfg.ComputeDRBDMap()
3329
3330 feedback_fn("* Gathering disk information (%s nodes)" %
3331 len(self.my_node_names))
3332 instdisk = self._CollectDiskInfo(self.my_node_names, node_image,
3333 self.my_inst_info)
3334
3335 feedback_fn("* Verifying configuration file consistency")
3336
3337 # If not all nodes are being checked, we need to make sure the master node
3338 # and a non-checked vm_capable node are in the list.
3339 absent_nodes = set(self.all_node_info).difference(self.my_node_info)
3340 if absent_nodes:
3341 vf_nvinfo = all_nvinfo.copy()
3342 vf_node_info = list(self.my_node_info.values())
3343 additional_nodes = []
3344 if master_node not in self.my_node_info:
3345 additional_nodes.append(master_node)
3346 vf_node_info.append(self.all_node_info[master_node])
3347 # Add the first vm_capable node we find which is not included,
3348 # excluding the master node (which we already have)
3349 for node in absent_nodes:
3350 nodeinfo = self.all_node_info[node]
3351 if (nodeinfo.vm_capable and not nodeinfo.offline and
3352 node != master_node):
3353 additional_nodes.append(node)
3354 vf_node_info.append(self.all_node_info[node])
3355 break
3356 key = constants.NV_FILELIST
3357 vf_nvinfo.update(self.rpc.call_node_verify(additional_nodes,
3358 {key: node_verify_param[key]},
3359 self.cfg.GetClusterName()))
3360 else:
3361 vf_nvinfo = all_nvinfo
3362 vf_node_info = self.my_node_info.values()
3363
3364 self._VerifyFiles(_ErrorIf, vf_node_info, master_node, vf_nvinfo, filemap)
3365
3366 feedback_fn("* Verifying node status")
3367
3368 refos_img = None
3369
3370 for node_i in node_data_list:
3371 node = node_i.name
3372 nimg = node_image[node]
3373
3374 if node_i.offline:
3375 if verbose:
3376 feedback_fn("* Skipping offline node %s" % (node,))
3377 n_offline += 1
3378 continue
3379
3380 if node == master_node:
3381 ntype = "master"
3382 elif node_i.master_candidate:
3383 ntype = "master candidate"
3384 elif node_i.drained:
3385 ntype = "drained"
3386 n_drained += 1
3387 else:
3388 ntype = "regular"
3389 if verbose:
3390 feedback_fn("* Verifying node %s (%s)" % (node, ntype))
3391
3392 msg = all_nvinfo[node].fail_msg
3393 _ErrorIf(msg, constants.CV_ENODERPC, node, "while contacting node: %s",
3394 msg)
3395 if msg:
3396 nimg.rpc_fail = True
3397 continue
3398
3399 nresult = all_nvinfo[node].payload
3400
3401 nimg.call_ok = self._VerifyNode(node_i, nresult)
3402 self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
3403 self._VerifyNodeNetwork(node_i, nresult)
3404 self._VerifyNodeUserScripts(node_i, nresult)
3405 self._VerifyOob(node_i, nresult)
3406 self._VerifyFileStoragePaths(node_i, nresult,
3407 node == master_node)
3408
3409 if nimg.vm_capable:
3410 self._VerifyNodeLVM(node_i, nresult, vg_name)
3411 self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
3412 all_drbd_map)
3413
3414 self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
3415 self._UpdateNodeInstances(node_i, nresult, nimg)
3416 self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
3417 self._UpdateNodeOS(node_i, nresult, nimg)
3418
3419 if not nimg.os_fail:
3420 if refos_img is None:
3421 refos_img = nimg
3422 self._VerifyNodeOS(node_i, nimg, refos_img)
3423 self._VerifyNodeBridges(node_i, nresult, bridges)
3424
3425 # Check whether all running instancies are primary for the node. (This
3426 # can no longer be done from _VerifyInstance below, since some of the
3427 # wrong instances could be from other node groups.)
3428 non_primary_inst = set(nimg.instances).difference(nimg.pinst)
3429
3430 for inst in non_primary_inst:
3431 test = inst in self.all_inst_info
3432 _ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, inst,
3433 "instance should not run on node %s", node_i.name)
3434 _ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
3435 "node is running unknown instance %s", inst)
3436
3437 for node, result in extra_lv_nvinfo.items():
3438 self._UpdateNodeVolumes(self.all_node_info[node], result.payload,
3439 node_image[node], vg_name)
3440
3441 feedback_fn("* Verifying instance status")
3442 for instance in self.my_inst_names:
3443 if verbose:
3444 feedback_fn("* Verifying instance %s" % instance)
3445 inst_config = self.my_inst_info[instance]
3446 self._VerifyInstance(instance, inst_config, node_image,
3447 instdisk[instance])
3448 inst_nodes_offline = []
3449
3450 pnode = inst_config.primary_node
3451 pnode_img = node_image[pnode]
3452 _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
3453 constants.CV_ENODERPC, pnode, "instance %s, connection to"
3454 " primary node failed", instance)
3455
3456 _ErrorIf(inst_config.admin_state == constants.ADMINST_UP and
3457 pnode_img.offline,
3458 constants.CV_EINSTANCEBADNODE, instance,
3459 "instance is marked as running and lives on offline node %s",
3460 inst_config.primary_node)
3461
3462 # If the instance is non-redundant we cannot survive losing its primary
3463 # node, so we are not N+1 compliant.
3464 if inst_config.disk_template not in constants.DTS_MIRRORED:
3465 i_non_redundant.append(instance)
3466
3467 _ErrorIf(len(inst_config.secondary_nodes) > 1,
3468 constants.CV_EINSTANCELAYOUT,
3469 instance, "instance has multiple secondary nodes: %s",
3470 utils.CommaJoin(inst_config.secondary_nodes),
3471 code=self.ETYPE_WARNING)
3472
3473 if inst_config.disk_template in constants.DTS_INT_MIRROR:
3474 pnode = inst_config.primary_node
3475 instance_nodes = utils.NiceSort(inst_config.all_nodes)
3476 instance_groups = {}
3477
3478 for node in instance_nodes:
3479 instance_groups.setdefault(self.all_node_info[node].group,
3480 []).append(node)
3481
3482 pretty_list = [
3483 "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name)
3484 # Sort so that we always list the primary node first.
3485 for group, nodes in sorted(instance_groups.items(),
3486 key=lambda (_, nodes): pnode in nodes,
3487 reverse=True)]
3488
3489 self._ErrorIf(len(instance_groups) > 1,
3490 constants.CV_EINSTANCESPLITGROUPS,
3491 instance, "instance has primary and secondary nodes in"
3492 " different groups: %s", utils.CommaJoin(pretty_list),
3493 code=self.ETYPE_WARNING)
3494
3495 if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
3496 i_non_a_balanced.append(instance)
3497
3498 for snode in inst_config.secondary_nodes:
3499 s_img = node_image[snode]
3500 _ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
3501 snode, "instance %s, connection to secondary node failed",
3502 instance)
3503
3504 if s_img.offline:
3505 inst_nodes_offline.append(snode)
3506
3507 # warn that the instance lives on offline nodes
3508 _ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE, instance,
3509 "instance has offline secondary node(s) %s",
3510 utils.CommaJoin(inst_nodes_offline))
3511 # ... or ghost/non-vm_capable nodes
3512 for node in inst_config.all_nodes:
3513 _ErrorIf(node_image[node].ghost, constants.CV_EINSTANCEBADNODE,
3514 instance, "instance lives on ghost node %s", node)
3515 _ErrorIf(not node_image[node].vm_capable, constants.CV_EINSTANCEBADNODE,
3516 instance, "instance lives on non-vm_capable node %s", node)
3517
3518 feedback_fn("* Verifying orphan volumes")
3519 reserved = utils.FieldSet(*cluster.reserved_lvs)
3520
3521 # We will get spurious "unknown volume" warnings if any node of this group
3522 # is secondary for an instance whose primary is in another group. To avoid
3523 # them, we find these instances and add their volumes to node_vol_should.
3524 for inst in self.all_inst_info.values():
3525 for secondary in inst.secondary_nodes:
3526 if (secondary in self.my_node_info
3527 and inst.name not in self.my_inst_info):
3528 inst.MapLVsByNode(node_vol_should)
3529 break
3530
3531 self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3532
3533 if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3534 feedback_fn("* Verifying N+1 Memory redundancy")
3535 self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3536
3537 feedback_fn("* Other Notes")
3538 if i_non_redundant:
3539 feedback_fn(" - NOTICE: %d non-redundant instance(s) found."
3540 % len(i_non_redundant))
3541
3542 if i_non_a_balanced:
3543 feedback_fn(" - NOTICE: %d non-auto-balanced instance(s) found."
3544 % len(i_non_a_balanced))
3545
3546 if i_offline:
3547 feedback_fn(" - NOTICE: %d offline instance(s) found." % i_offline)
3548
3549 if n_offline:
3550 feedback_fn(" - NOTICE: %d offline node(s) found." % n_offline)
3551
3552 if n_drained:
3553 feedback_fn(" - NOTICE: %d drained node(s) found." % n_drained)
3554
3555 return not self.bad
3556
3557 def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3558 """Analyze the post-hooks' result
3559
3560 This method analyses the hook result, handles it, and sends some
3561 nicely-formatted feedback back to the user.
3562
3563 @param phase: one of L{constants.HOOKS_PHASE_POST} or
3564 L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3565 @param hooks_results: the results of the multi-node hooks rpc call
3566 @param feedback_fn: function used send feedback back to the caller
3567 @param lu_result: previous Exec result
3568 @return: the new Exec result, based on the previous result
3569 and hook results
3570
3571 """
3572 # We only really run POST phase hooks, only for non-empty groups,
3573 # and are only interested in their results
3574 if not self.my_node_names:
3575 # empty node group
3576 pass
3577 elif phase == constants.HOOKS_PHASE_POST:
3578 # Used to change hooks' output to proper indentation
3579 feedback_fn("* Hooks Results")
3580 assert hooks_results, "invalid result from hooks"
3581
3582 for node_name in hooks_results:
3583 res = hooks_results[node_name]
3584 msg = res.fail_msg
3585 test = msg and not res.offline
3586 self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3587 "Communication failure in hooks execution: %s", msg)
3588 if res.offline or msg:
3589 # No need to investigate payload if node is offline or gave
3590 # an error.
3591 continue
3592 for script, hkr, output in res.payload:
3593 test = hkr == constants.HKR_FAIL
3594 self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3595 "Script %s failed, output:", script)
3596 if test:
3597 output = self._HOOKS_INDENT_RE.sub(" ", output)
3598 feedback_fn("%s" % output)
3599 lu_result = False
3600
3601 return lu_result
3602
3603
3604 class LUClusterVerifyDisks(NoHooksLU):
3605 """Verifies the cluster disks status.
3606
3607 """
3608 REQ_BGL = False
3609
3610 def ExpandNames(self):
3611 self.share_locks = _ShareAll()
3612 self.needed_locks = {
3613 locking.LEVEL_NODEGROUP: locking.ALL_SET,
3614 }
3615
3616 def Exec(self, feedback_fn):
3617 group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3618
3619 # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3620 return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3621 for group in group_names])
3622
3623
3624 class LUGroupVerifyDisks(NoHooksLU):
3625 """Verifies the status of all disks in a node group.
3626
3627 """
3628 REQ_BGL = False
3629
3630 def ExpandNames(self):
3631 # Raises errors.OpPrereqError on its own if group can't be found
3632 self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
3633
3634 self.share_locks = _ShareAll()
3635 self.needed_locks = {
3636 locking.LEVEL_INSTANCE: [],
3637 locking.LEVEL_NODEGROUP: [],
3638 locking.LEVEL_NODE: [],
3639 }
3640
3641 def DeclareLocks(self, level):
3642 if level == locking.LEVEL_INSTANCE:
3643 assert not self.needed_locks[locking.LEVEL_INSTANCE]
3644
3645 # Lock instances optimistically, needs verification once node and group
3646 # locks have been acquired
3647 self.needed_locks[locking.LEVEL_INSTANCE] = \
3648 self.cfg.GetNodeGroupInstances(self.group_uuid)
3649
3650 elif level == locking.LEVEL_NODEGROUP:
3651 assert not self.needed_locks[locking.LEVEL_NODEGROUP]
3652
3653 self.needed_locks[locking.LEVEL_NODEGROUP] = \
3654 set([self.group_uuid] +
3655 # Lock all groups used by instances optimistically; this requires
3656 # going via the node before it's locked, requiring verification
3657 # later on
3658 [group_uuid
3659 for instance_name in self.owned_locks(locking.LEVEL_INSTANCE)
3660 for group_uuid in self.cfg.GetInstanceNodeGroups(instance_name)])
3661
3662 elif level == locking.LEVEL_NODE:
3663 # This will only lock the nodes in the group to be verified which contain
3664 # actual instances
3665 self.recalculate_locks[locking.LEVEL_NODE] = constants.LOCKS_APPEND
3666 self._LockInstancesNodes()
3667
3668 # Lock all nodes in group to be verified
3669 assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
3670 member_nodes = self.cfg.GetNodeGroup(self.group_uuid).members
3671 self.needed_locks[locking.LEVEL_NODE].extend(member_nodes)
3672
3673 def CheckPrereq(self):
3674 owned_instances = frozenset(self.owned_locks(locking.LEVEL_INSTANCE))
3675 owned_groups = frozenset(self.owned_locks(locking.LEVEL_NODEGROUP))
3676 owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE))
3677
3678 assert self.group_uuid in owned_groups
3679
3680 # Check if locked instances are still correct
3681 _CheckNodeGroupInstances(self.cfg, self.group_uuid, owned_instances)
3682
3683 # Get instance information
3684 self.instances = dict(self.cfg.GetMultiInstanceInfo(owned_instances))
3685
3686 # Check if node groups for locked instances are still correct
3687 _CheckInstancesNodeGroups(self.cfg, self.instances,
3688 owned_groups, owned_nodes, self.group_uuid)
3689
3690 def Exec(self, feedback_fn):
3691 """Verify integrity of cluster disks.
3692
3693 @rtype: tuple of three items
3694 @return: a tuple of (dict of node-to-node_error, list of instances
3695 which need activate-disks, dict of instance: (node, volume) for
3696 missing volumes
3697
3698 """
3699 res_nodes = {}
3700 res_instances = set()
3701 res_missing = {}
3702
3703 nv_dict = _MapInstanceDisksToNodes(
3704 [inst for inst in self.instances.values()
3705 if inst.admin_state == constants.ADMINST_UP])
3706
3707 if nv_dict:
3708 nodes = utils.NiceSort(set(self.owned_locks(locking.LEVEL_NODE)) &
3709 set(self.cfg.GetVmCapableNodeList()))
3710
3711 node_lvs = self.rpc.call_lv_list(nodes, [])
3712
3713 for (node, node_res) in node_lvs.items():
3714 if node_res.offline:
3715 continue
3716
3717 msg = node_res.fail_msg
3718 if msg:
3719 logging.warning("Error enumerating LVs on node %s: %s", node, msg)
3720 res_nodes[node] = msg
3721 continue
3722
3723 for lv_name, (_, _, lv_online) in node_res.payload.items():
3724 inst = nv_dict.pop((node, lv_name), None)
3725 if not (lv_online or inst is None):
3726 res_instances.add(inst)
3727
3728 # any leftover items in nv_dict are missing LVs, let's arrange the data
3729 # better
3730 for key, inst in nv_dict.iteritems():
3731 res_missing.setdefault(inst, []).append(list(key))
3732
3733 return (res_nodes, list(res_instances), res_missing)
3734
3735
3736 class LUClusterRepairDiskSizes(NoHooksLU):
3737 """Verifies the cluster disks sizes.
3738
3739 """
3740 REQ_BGL = False
3741
3742 def ExpandNames(self):
3743 if self.op.instances:
3744 self.wanted_names = _GetWantedInstances(self, self.op.instances)
3745 self.needed_locks = {
3746 locking.LEVEL_NODE_RES: [],
3747 locking.LEVEL_INSTANCE: self.wanted_names,
3748 }
3749 self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
3750 else:
3751 self.wanted_names = None
3752 self.needed_locks = {
3753 locking.LEVEL_NODE_RES: locking.ALL_SET,
3754 locking.LEVEL_INSTANCE: locking.ALL_SET,
3755 }
3756 self.share_locks = {
3757 locking.LEVEL_NODE_RES: 1,
3758 locking.LEVEL_INSTANCE: 0,
3759 }
3760
3761 def DeclareLocks(self, level):
3762 if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
3763 self._LockInstancesNodes(primary_only=True, level=level)
3764
3765 def CheckPrereq(self):
3766 """Check prerequisites.
3767
3768 This only checks the optional instance list against the existing names.
3769
3770 """
3771 if self.wanted_names is None:
3772 self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
3773
3774 self.wanted_instances = \
3775 map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
3776
3777 def _EnsureChildSizes(self, disk):
3778 """Ensure children of the disk have the needed disk size.
3779
3780 This is valid mainly for DRBD8 and fixes an issue where the
3781 children have smaller disk size.
3782
3783 @param disk: an L{ganeti.objects.Disk} object
3784
3785 """
3786 if disk.dev_type == constants.LD_DRBD8:
3787 assert disk.children, "Empty children for DRBD8?"
3788 fchild = disk.children[0]
3789 mismatch = fchild.size < disk.size
3790 if mismatch:
3791 self.LogInfo("Child disk has size %d, parent %d, fixing",
3792 fchild.size, disk.size)
3793 fchild.size = disk.size
3794
3795 # and we recurse on this child only, not on the metadev
3796 return self._EnsureChildSizes(fchild) or mismatch
3797 else:
3798 return False
3799
3800 def Exec(self, feedback_fn):
3801 """Verify the size of cluster disks.
3802
3803 """
3804 # TODO: check child disks too
3805 # TODO: check differences in size between primary/secondary nodes
3806 per_node_disks = {}
3807 for instance in self.wanted_instances:
3808 pnode = instance.primary_node
3809 if pnode not in per_node_disks:
3810 per_node_disks[pnode] = []
3811 for idx, disk in enumerate(instance.disks):
3812 per_node_disks[pnode].append((instance, idx, disk))
3813
3814 assert not (frozenset(per_node_disks.keys()) -
3815 self.owned_locks(locking.LEVEL_NODE_RES)), \
3816 "Not owning correct locks"
3817 assert not self.owned_locks(locking.LEVEL_NODE)
3818
3819 changed = []
3820 for node, dskl in per_node_disks.items():
3821 newl = [v[2].Copy() for v in dskl]
3822 for dsk in newl:
3823 self.cfg.SetDiskID(dsk, node)
3824 result = self.rpc.call_blockdev_getsize(node, newl)
3825 if result.fail_msg:
3826 self.LogWarning("Failure in blockdev_getsize call to node"
3827 " %s, ignoring", node)
3828 continue
3829 if len(result.payload) != len(dskl):
3830 logging.warning("Invalid result from node %s: len(dksl)=%d,"
3831 " result.payload=%s", node, len(dskl), result.payload)
3832 self.LogWarning("Invalid result from node %s, ignoring node results",
3833 node)
3834 continue
3835 for ((instance, idx, disk), size) in zip(dskl, result.payload):
3836 if size is None:
3837 self.LogWarning("Disk %d of instance %s did not return size"
3838 " information, ignoring", idx, instance.name)
3839 continue
3840 if not isinstance(size, (int, long)):
3841 self.LogWarning("Disk %d of instance %s did not return valid"
3842 " size information, ignoring", idx, instance.name)
3843 continue
3844 size = size >> 20
3845 if size != disk.size:
3846 self.LogInfo("Disk %d of instance %s has mismatched size,"
3847 " correcting: recorded %d, actual %d", idx,
3848 instance.name, disk.size, size)
3849 disk.size = size
3850 self.cfg.Update(instance, feedback_fn)
3851