cmdlib: Cleanup public/private functions
[ganeti-github.git] / lib / cmdlib / node.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Logical units dealing with nodes."""
23
24 import logging
25 import operator
26
27 from ganeti import constants
28 from ganeti import errors
29 from ganeti import locking
30 from ganeti import netutils
31 from ganeti import objects
32 from ganeti import opcodes
33 from ganeti import qlang
34 from ganeti import query
35 from ganeti import rpc
36 from ganeti import utils
37 from ganeti.masterd import iallocator
38
39 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, QueryBase, \
40 ResultWithJobs
41 from ganeti.cmdlib.common import CheckParamsNotGlobal, \
42 MergeAndVerifyHvState, MergeAndVerifyDiskState, \
43 IsExclusiveStorageEnabledNode, CheckNodePVs, \
44 RedistributeAncillaryFiles, ExpandNodeName, ShareAll, SupportsOob, \
45 CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
46 AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
47 GetWantedNodes, MapInstanceDisksToNodes, RunPostHook, \
48 FindFaultyInstanceDisks
49
50
51 def _DecideSelfPromotion(lu, exceptions=None):
52 """Decide whether I should promote myself as a master candidate.
53
54 """
55 cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
56 mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
57 # the new node will increase mc_max with one, so:
58 mc_should = min(mc_should + 1, cp_size)
59 return mc_now < mc_should
60
61
62 def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
63 """Ensure that a node has the given secondary ip.
64
65 @type lu: L{LogicalUnit}
66 @param lu: the LU on behalf of which we make the check
67 @type node: string
68 @param node: the node to check
69 @type secondary_ip: string
70 @param secondary_ip: the ip to check
71 @type prereq: boolean
72 @param prereq: whether to throw a prerequisite or an execute error
73 @raise errors.OpPrereqError: if the node doesn't have the ip, and prereq=True
74 @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
75
76 """
77 result = lu.rpc.call_node_has_ip_address(node, secondary_ip)
78 result.Raise("Failure checking secondary ip on node %s" % node,
79 prereq=prereq, ecode=errors.ECODE_ENVIRON)
80 if not result.payload:
81 msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
82 " please fix and re-run this command" % secondary_ip)
83 if prereq:
84 raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
85 else:
86 raise errors.OpExecError(msg)
87
88
89 class LUNodeAdd(LogicalUnit):
90 """Logical unit for adding node to the cluster.
91
92 """
93 HPATH = "node-add"
94 HTYPE = constants.HTYPE_NODE
95 _NFLAGS = ["master_capable", "vm_capable"]
96
97 def CheckArguments(self):
98 self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
99 # validate/normalize the node name
100 self.hostname = netutils.GetHostname(name=self.op.node_name,
101 family=self.primary_ip_family)
102 self.op.node_name = self.hostname.name
103
104 if self.op.readd and self.op.node_name == self.cfg.GetMasterNode():
105 raise errors.OpPrereqError("Cannot readd the master node",
106 errors.ECODE_STATE)
107
108 if self.op.readd and self.op.group:
109 raise errors.OpPrereqError("Cannot pass a node group when a node is"
110 " being readded", errors.ECODE_INVAL)
111
112 def BuildHooksEnv(self):
113 """Build hooks env.
114
115 This will run on all nodes before, and on all nodes + the new node after.
116
117 """
118 return {
119 "OP_TARGET": self.op.node_name,
120 "NODE_NAME": self.op.node_name,
121 "NODE_PIP": self.op.primary_ip,
122 "NODE_SIP": self.op.secondary_ip,
123 "MASTER_CAPABLE": str(self.op.master_capable),
124 "VM_CAPABLE": str(self.op.vm_capable),
125 }
126
127 def BuildHooksNodes(self):
128 """Build hooks nodes.
129
130 """
131 # Exclude added node
132 pre_nodes = list(set(self.cfg.GetNodeList()) - set([self.op.node_name]))
133 post_nodes = pre_nodes + [self.op.node_name, ]
134
135 return (pre_nodes, post_nodes)
136
137 def CheckPrereq(self):
138 """Check prerequisites.
139
140 This checks:
141 - the new node is not already in the config
142 - it is resolvable
143 - its parameters (single/dual homed) matches the cluster
144
145 Any errors are signaled by raising errors.OpPrereqError.
146
147 """
148 cfg = self.cfg
149 hostname = self.hostname
150 node = hostname.name
151 primary_ip = self.op.primary_ip = hostname.ip
152 if self.op.secondary_ip is None:
153 if self.primary_ip_family == netutils.IP6Address.family:
154 raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
155 " IPv4 address must be given as secondary",
156 errors.ECODE_INVAL)
157 self.op.secondary_ip = primary_ip
158
159 secondary_ip = self.op.secondary_ip
160 if not netutils.IP4Address.IsValid(secondary_ip):
161 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
162 " address" % secondary_ip, errors.ECODE_INVAL)
163
164 node_list = cfg.GetNodeList()
165 if not self.op.readd and node in node_list:
166 raise errors.OpPrereqError("Node %s is already in the configuration" %
167 node, errors.ECODE_EXISTS)
168 elif self.op.readd and node not in node_list:
169 raise errors.OpPrereqError("Node %s is not in the configuration" % node,
170 errors.ECODE_NOENT)
171
172 self.changed_primary_ip = False
173
174 for existing_node_name, existing_node in cfg.GetMultiNodeInfo(node_list):
175 if self.op.readd and node == existing_node_name:
176 if existing_node.secondary_ip != secondary_ip:
177 raise errors.OpPrereqError("Readded node doesn't have the same IP"
178 " address configuration as before",
179 errors.ECODE_INVAL)
180 if existing_node.primary_ip != primary_ip:
181 self.changed_primary_ip = True
182
183 continue
184
185 if (existing_node.primary_ip == primary_ip or
186 existing_node.secondary_ip == primary_ip or
187 existing_node.primary_ip == secondary_ip or
188 existing_node.secondary_ip == secondary_ip):
189 raise errors.OpPrereqError("New node ip address(es) conflict with"
190 " existing node %s" % existing_node.name,
191 errors.ECODE_NOTUNIQUE)
192
193 # After this 'if' block, None is no longer a valid value for the
194 # _capable op attributes
195 if self.op.readd:
196 old_node = self.cfg.GetNodeInfo(node)
197 assert old_node is not None, "Can't retrieve locked node %s" % node
198 for attr in self._NFLAGS:
199 if getattr(self.op, attr) is None:
200 setattr(self.op, attr, getattr(old_node, attr))
201 else:
202 for attr in self._NFLAGS:
203 if getattr(self.op, attr) is None:
204 setattr(self.op, attr, True)
205
206 if self.op.readd and not self.op.vm_capable:
207 pri, sec = cfg.GetNodeInstances(node)
208 if pri or sec:
209 raise errors.OpPrereqError("Node %s being re-added with vm_capable"
210 " flag set to false, but it already holds"
211 " instances" % node,
212 errors.ECODE_STATE)
213
214 # check that the type of the node (single versus dual homed) is the
215 # same as for the master
216 myself = cfg.GetNodeInfo(self.cfg.GetMasterNode())
217 master_singlehomed = myself.secondary_ip == myself.primary_ip
218 newbie_singlehomed = secondary_ip == primary_ip
219 if master_singlehomed != newbie_singlehomed:
220 if master_singlehomed:
221 raise errors.OpPrereqError("The master has no secondary ip but the"
222 " new node has one",
223 errors.ECODE_INVAL)
224 else:
225 raise errors.OpPrereqError("The master has a secondary ip but the"
226 " new node doesn't have one",
227 errors.ECODE_INVAL)
228
229 # checks reachability
230 if not netutils.TcpPing(primary_ip, constants.DEFAULT_NODED_PORT):
231 raise errors.OpPrereqError("Node not reachable by ping",
232 errors.ECODE_ENVIRON)
233
234 if not newbie_singlehomed:
235 # check reachability from my secondary ip to newbie's secondary ip
236 if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
237 source=myself.secondary_ip):
238 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
239 " based ping to node daemon port",
240 errors.ECODE_ENVIRON)
241
242 if self.op.readd:
243 exceptions = [node]
244 else:
245 exceptions = []
246
247 if self.op.master_capable:
248 self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
249 else:
250 self.master_candidate = False
251
252 if self.op.readd:
253 self.new_node = old_node
254 else:
255 node_group = cfg.LookupNodeGroup(self.op.group)
256 self.new_node = objects.Node(name=node,
257 primary_ip=primary_ip,
258 secondary_ip=secondary_ip,
259 master_candidate=self.master_candidate,
260 offline=False, drained=False,
261 group=node_group, ndparams={})
262
263 if self.op.ndparams:
264 utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
265 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
266 "node", "cluster or group")
267
268 if self.op.hv_state:
269 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
270
271 if self.op.disk_state:
272 self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
273
274 # TODO: If we need to have multiple DnsOnlyRunner we probably should make
275 # it a property on the base class.
276 rpcrunner = rpc.DnsOnlyRunner()
277 result = rpcrunner.call_version([node])[node]
278 result.Raise("Can't get version information from node %s" % node)
279 if constants.PROTOCOL_VERSION == result.payload:
280 logging.info("Communication to node %s fine, sw version %s match",
281 node, result.payload)
282 else:
283 raise errors.OpPrereqError("Version mismatch master version %s,"
284 " node version %s" %
285 (constants.PROTOCOL_VERSION, result.payload),
286 errors.ECODE_ENVIRON)
287
288 vg_name = cfg.GetVGName()
289 if vg_name is not None:
290 vparams = {constants.NV_PVLIST: [vg_name]}
291 excl_stor = IsExclusiveStorageEnabledNode(cfg, self.new_node)
292 cname = self.cfg.GetClusterName()
293 result = rpcrunner.call_node_verify_light([node], vparams, cname)[node]
294 (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
295 if errmsgs:
296 raise errors.OpPrereqError("Checks on node PVs failed: %s" %
297 "; ".join(errmsgs), errors.ECODE_ENVIRON)
298
299 def Exec(self, feedback_fn):
300 """Adds the new node to the cluster.
301
302 """
303 new_node = self.new_node
304 node = new_node.name
305
306 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
307 "Not owning BGL"
308
309 # We adding a new node so we assume it's powered
310 new_node.powered = True
311
312 # for re-adds, reset the offline/drained/master-candidate flags;
313 # we need to reset here, otherwise offline would prevent RPC calls
314 # later in the procedure; this also means that if the re-add
315 # fails, we are left with a non-offlined, broken node
316 if self.op.readd:
317 new_node.drained = new_node.offline = False # pylint: disable=W0201
318 self.LogInfo("Readding a node, the offline/drained flags were reset")
319 # if we demote the node, we do cleanup later in the procedure
320 new_node.master_candidate = self.master_candidate
321 if self.changed_primary_ip:
322 new_node.primary_ip = self.op.primary_ip
323
324 # copy the master/vm_capable flags
325 for attr in self._NFLAGS:
326 setattr(new_node, attr, getattr(self.op, attr))
327
328 # notify the user about any possible mc promotion
329 if new_node.master_candidate:
330 self.LogInfo("Node will be a master candidate")
331
332 if self.op.ndparams:
333 new_node.ndparams = self.op.ndparams
334 else:
335 new_node.ndparams = {}
336
337 if self.op.hv_state:
338 new_node.hv_state_static = self.new_hv_state
339
340 if self.op.disk_state:
341 new_node.disk_state_static = self.new_disk_state
342
343 # Add node to our /etc/hosts, and add key to known_hosts
344 if self.cfg.GetClusterInfo().modify_etc_hosts:
345 master_node = self.cfg.GetMasterNode()
346 result = self.rpc.call_etc_hosts_modify(master_node,
347 constants.ETC_HOSTS_ADD,
348 self.hostname.name,
349 self.hostname.ip)
350 result.Raise("Can't update hosts file with new host data")
351
352 if new_node.secondary_ip != new_node.primary_ip:
353 _CheckNodeHasSecondaryIP(self, new_node.name, new_node.secondary_ip,
354 False)
355
356 node_verify_list = [self.cfg.GetMasterNode()]
357 node_verify_param = {
358 constants.NV_NODELIST: ([node], {}),
359 # TODO: do a node-net-test as well?
360 }
361
362 result = self.rpc.call_node_verify(node_verify_list, node_verify_param,
363 self.cfg.GetClusterName())
364 for verifier in node_verify_list:
365 result[verifier].Raise("Cannot communicate with node %s" % verifier)
366 nl_payload = result[verifier].payload[constants.NV_NODELIST]
367 if nl_payload:
368 for failed in nl_payload:
369 feedback_fn("ssh/hostname verification failed"
370 " (checking from %s): %s" %
371 (verifier, nl_payload[failed]))
372 raise errors.OpExecError("ssh/hostname verification failed")
373
374 if self.op.readd:
375 RedistributeAncillaryFiles(self)
376 self.context.ReaddNode(new_node)
377 # make sure we redistribute the config
378 self.cfg.Update(new_node, feedback_fn)
379 # and make sure the new node will not have old files around
380 if not new_node.master_candidate:
381 result = self.rpc.call_node_demote_from_mc(new_node.name)
382 msg = result.fail_msg
383 if msg:
384 self.LogWarning("Node failed to demote itself from master"
385 " candidate status: %s" % msg)
386 else:
387 RedistributeAncillaryFiles(self, additional_nodes=[node],
388 additional_vm=self.op.vm_capable)
389 self.context.AddNode(new_node, self.proc.GetECId())
390
391
392 class LUNodeSetParams(LogicalUnit):
393 """Modifies the parameters of a node.
394
395 @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
396 to the node role (as _ROLE_*)
397 @cvar _R2F: a dictionary from node role to tuples of flags
398 @cvar _FLAGS: a list of attribute names corresponding to the flags
399
400 """
401 HPATH = "node-modify"
402 HTYPE = constants.HTYPE_NODE
403 REQ_BGL = False
404 (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
405 _F2R = {
406 (True, False, False): _ROLE_CANDIDATE,
407 (False, True, False): _ROLE_DRAINED,
408 (False, False, True): _ROLE_OFFLINE,
409 (False, False, False): _ROLE_REGULAR,
410 }
411 _R2F = dict((v, k) for k, v in _F2R.items())
412 _FLAGS = ["master_candidate", "drained", "offline"]
413
414 def CheckArguments(self):
415 self.op.node_name = ExpandNodeName(self.cfg, self.op.node_name)
416 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
417 self.op.master_capable, self.op.vm_capable,
418 self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
419 self.op.disk_state]
420 if all_mods.count(None) == len(all_mods):
421 raise errors.OpPrereqError("Please pass at least one modification",
422 errors.ECODE_INVAL)
423 if all_mods.count(True) > 1:
424 raise errors.OpPrereqError("Can't set the node into more than one"
425 " state at the same time",
426 errors.ECODE_INVAL)
427
428 # Boolean value that tells us whether we might be demoting from MC
429 self.might_demote = (self.op.master_candidate is False or
430 self.op.offline is True or
431 self.op.drained is True or
432 self.op.master_capable is False)
433
434 if self.op.secondary_ip:
435 if not netutils.IP4Address.IsValid(self.op.secondary_ip):
436 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
437 " address" % self.op.secondary_ip,
438 errors.ECODE_INVAL)
439
440 self.lock_all = self.op.auto_promote and self.might_demote
441 self.lock_instances = self.op.secondary_ip is not None
442
443 def _InstanceFilter(self, instance):
444 """Filter for getting affected instances.
445
446 """
447 return (instance.disk_template in constants.DTS_INT_MIRROR and
448 self.op.node_name in instance.all_nodes)
449
450 def ExpandNames(self):
451 if self.lock_all:
452 self.needed_locks = {
453 locking.LEVEL_NODE: locking.ALL_SET,
454
455 # Block allocations when all nodes are locked
456 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
457 }
458 else:
459 self.needed_locks = {
460 locking.LEVEL_NODE: self.op.node_name,
461 }
462
463 # Since modifying a node can have severe effects on currently running
464 # operations the resource lock is at least acquired in shared mode
465 self.needed_locks[locking.LEVEL_NODE_RES] = \
466 self.needed_locks[locking.LEVEL_NODE]
467
468 # Get all locks except nodes in shared mode; they are not used for anything
469 # but read-only access
470 self.share_locks = ShareAll()
471 self.share_locks[locking.LEVEL_NODE] = 0
472 self.share_locks[locking.LEVEL_NODE_RES] = 0
473 self.share_locks[locking.LEVEL_NODE_ALLOC] = 0
474
475 if self.lock_instances:
476 self.needed_locks[locking.LEVEL_INSTANCE] = \
477 frozenset(self.cfg.GetInstancesInfoByFilter(self._InstanceFilter))
478
479 def BuildHooksEnv(self):
480 """Build hooks env.
481
482 This runs on the master node.
483
484 """
485 return {
486 "OP_TARGET": self.op.node_name,
487 "MASTER_CANDIDATE": str(self.op.master_candidate),
488 "OFFLINE": str(self.op.offline),
489 "DRAINED": str(self.op.drained),
490 "MASTER_CAPABLE": str(self.op.master_capable),
491 "VM_CAPABLE": str(self.op.vm_capable),
492 }
493
494 def BuildHooksNodes(self):
495 """Build hooks nodes.
496
497 """
498 nl = [self.cfg.GetMasterNode(), self.op.node_name]
499 return (nl, nl)
500
501 def CheckPrereq(self):
502 """Check prerequisites.
503
504 This only checks the instance list against the existing names.
505
506 """
507 node = self.node = self.cfg.GetNodeInfo(self.op.node_name)
508
509 if self.lock_instances:
510 affected_instances = \
511 self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
512
513 # Verify instance locks
514 owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
515 wanted_instances = frozenset(affected_instances.keys())
516 if wanted_instances - owned_instances:
517 raise errors.OpPrereqError("Instances affected by changing node %s's"
518 " secondary IP address have changed since"
519 " locks were acquired, wanted '%s', have"
520 " '%s'; retry the operation" %
521 (self.op.node_name,
522 utils.CommaJoin(wanted_instances),
523 utils.CommaJoin(owned_instances)),
524 errors.ECODE_STATE)
525 else:
526 affected_instances = None
527
528 if (self.op.master_candidate is not None or
529 self.op.drained is not None or
530 self.op.offline is not None):
531 # we can't change the master's node flags
532 if self.op.node_name == self.cfg.GetMasterNode():
533 raise errors.OpPrereqError("The master role can be changed"
534 " only via master-failover",
535 errors.ECODE_INVAL)
536
537 if self.op.master_candidate and not node.master_capable:
538 raise errors.OpPrereqError("Node %s is not master capable, cannot make"
539 " it a master candidate" % node.name,
540 errors.ECODE_STATE)
541
542 if self.op.vm_capable is False:
543 (ipri, isec) = self.cfg.GetNodeInstances(self.op.node_name)
544 if ipri or isec:
545 raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
546 " the vm_capable flag" % node.name,
547 errors.ECODE_STATE)
548
549 if node.master_candidate and self.might_demote and not self.lock_all:
550 assert not self.op.auto_promote, "auto_promote set but lock_all not"
551 # check if after removing the current node, we're missing master
552 # candidates
553 (mc_remaining, mc_should, _) = \
554 self.cfg.GetMasterCandidateStats(exceptions=[node.name])
555 if mc_remaining < mc_should:
556 raise errors.OpPrereqError("Not enough master candidates, please"
557 " pass auto promote option to allow"
558 " promotion (--auto-promote or RAPI"
559 " auto_promote=True)", errors.ECODE_STATE)
560
561 self.old_flags = old_flags = (node.master_candidate,
562 node.drained, node.offline)
563 assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
564 self.old_role = old_role = self._F2R[old_flags]
565
566 # Check for ineffective changes
567 for attr in self._FLAGS:
568 if (getattr(self.op, attr) is False and getattr(node, attr) is False):
569 self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
570 setattr(self.op, attr, None)
571
572 # Past this point, any flag change to False means a transition
573 # away from the respective state, as only real changes are kept
574
575 # TODO: We might query the real power state if it supports OOB
576 if SupportsOob(self.cfg, node):
577 if self.op.offline is False and not (node.powered or
578 self.op.powered is True):
579 raise errors.OpPrereqError(("Node %s needs to be turned on before its"
580 " offline status can be reset") %
581 self.op.node_name, errors.ECODE_STATE)
582 elif self.op.powered is not None:
583 raise errors.OpPrereqError(("Unable to change powered state for node %s"
584 " as it does not support out-of-band"
585 " handling") % self.op.node_name,
586 errors.ECODE_STATE)
587
588 # If we're being deofflined/drained, we'll MC ourself if needed
589 if (self.op.drained is False or self.op.offline is False or
590 (self.op.master_capable and not node.master_capable)):
591 if _DecideSelfPromotion(self):
592 self.op.master_candidate = True
593 self.LogInfo("Auto-promoting node to master candidate")
594
595 # If we're no longer master capable, we'll demote ourselves from MC
596 if self.op.master_capable is False and node.master_candidate:
597 self.LogInfo("Demoting from master candidate")
598 self.op.master_candidate = False
599
600 # Compute new role
601 assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
602 if self.op.master_candidate:
603 new_role = self._ROLE_CANDIDATE
604 elif self.op.drained:
605 new_role = self._ROLE_DRAINED
606 elif self.op.offline:
607 new_role = self._ROLE_OFFLINE
608 elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
609 # False is still in new flags, which means we're un-setting (the
610 # only) True flag
611 new_role = self._ROLE_REGULAR
612 else: # no new flags, nothing, keep old role
613 new_role = old_role
614
615 self.new_role = new_role
616
617 if old_role == self._ROLE_OFFLINE and new_role != old_role:
618 # Trying to transition out of offline status
619 result = self.rpc.call_version([node.name])[node.name]
620 if result.fail_msg:
621 raise errors.OpPrereqError("Node %s is being de-offlined but fails"
622 " to report its version: %s" %
623 (node.name, result.fail_msg),
624 errors.ECODE_STATE)
625 else:
626 self.LogWarning("Transitioning node from offline to online state"
627 " without using re-add. Please make sure the node"
628 " is healthy!")
629
630 # When changing the secondary ip, verify if this is a single-homed to
631 # multi-homed transition or vice versa, and apply the relevant
632 # restrictions.
633 if self.op.secondary_ip:
634 # Ok even without locking, because this can't be changed by any LU
635 master = self.cfg.GetNodeInfo(self.cfg.GetMasterNode())
636 master_singlehomed = master.secondary_ip == master.primary_ip
637 if master_singlehomed and self.op.secondary_ip != node.primary_ip:
638 if self.op.force and node.name == master.name:
639 self.LogWarning("Transitioning from single-homed to multi-homed"
640 " cluster; all nodes will require a secondary IP"
641 " address")
642 else:
643 raise errors.OpPrereqError("Changing the secondary ip on a"
644 " single-homed cluster requires the"
645 " --force option to be passed, and the"
646 " target node to be the master",
647 errors.ECODE_INVAL)
648 elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
649 if self.op.force and node.name == master.name:
650 self.LogWarning("Transitioning from multi-homed to single-homed"
651 " cluster; secondary IP addresses will have to be"
652 " removed")
653 else:
654 raise errors.OpPrereqError("Cannot set the secondary IP to be the"
655 " same as the primary IP on a multi-homed"
656 " cluster, unless the --force option is"
657 " passed, and the target node is the"
658 " master", errors.ECODE_INVAL)
659
660 assert not (frozenset(affected_instances) -
661 self.owned_locks(locking.LEVEL_INSTANCE))
662
663 if node.offline:
664 if affected_instances:
665 msg = ("Cannot change secondary IP address: offline node has"
666 " instances (%s) configured to use it" %
667 utils.CommaJoin(affected_instances.keys()))
668 raise errors.OpPrereqError(msg, errors.ECODE_STATE)
669 else:
670 # On online nodes, check that no instances are running, and that
671 # the node has the new ip and we can reach it.
672 for instance in affected_instances.values():
673 CheckInstanceState(self, instance, INSTANCE_DOWN,
674 msg="cannot change secondary ip")
675
676 _CheckNodeHasSecondaryIP(self, node.name, self.op.secondary_ip, True)
677 if master.name != node.name:
678 # check reachability from master secondary ip to new secondary ip
679 if not netutils.TcpPing(self.op.secondary_ip,
680 constants.DEFAULT_NODED_PORT,
681 source=master.secondary_ip):
682 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
683 " based ping to node daemon port",
684 errors.ECODE_ENVIRON)
685
686 if self.op.ndparams:
687 new_ndparams = GetUpdatedParams(self.node.ndparams, self.op.ndparams)
688 utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
689 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
690 "node", "cluster or group")
691 self.new_ndparams = new_ndparams
692
693 if self.op.hv_state:
694 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
695 self.node.hv_state_static)
696
697 if self.op.disk_state:
698 self.new_disk_state = \
699 MergeAndVerifyDiskState(self.op.disk_state,
700 self.node.disk_state_static)
701
702 def Exec(self, feedback_fn):
703 """Modifies a node.
704
705 """
706 node = self.node
707 old_role = self.old_role
708 new_role = self.new_role
709
710 result = []
711
712 if self.op.ndparams:
713 node.ndparams = self.new_ndparams
714
715 if self.op.powered is not None:
716 node.powered = self.op.powered
717
718 if self.op.hv_state:
719 node.hv_state_static = self.new_hv_state
720
721 if self.op.disk_state:
722 node.disk_state_static = self.new_disk_state
723
724 for attr in ["master_capable", "vm_capable"]:
725 val = getattr(self.op, attr)
726 if val is not None:
727 setattr(node, attr, val)
728 result.append((attr, str(val)))
729
730 if new_role != old_role:
731 # Tell the node to demote itself, if no longer MC and not offline
732 if old_role == self._ROLE_CANDIDATE and new_role != self._ROLE_OFFLINE:
733 msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
734 if msg:
735 self.LogWarning("Node failed to demote itself: %s", msg)
736
737 new_flags = self._R2F[new_role]
738 for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
739 if of != nf:
740 result.append((desc, str(nf)))
741 (node.master_candidate, node.drained, node.offline) = new_flags
742
743 # we locked all nodes, we adjust the CP before updating this node
744 if self.lock_all:
745 AdjustCandidatePool(self, [node.name])
746
747 if self.op.secondary_ip:
748 node.secondary_ip = self.op.secondary_ip
749 result.append(("secondary_ip", self.op.secondary_ip))
750
751 # this will trigger configuration file update, if needed
752 self.cfg.Update(node, feedback_fn)
753
754 # this will trigger job queue propagation or cleanup if the mc
755 # flag changed
756 if [old_role, new_role].count(self._ROLE_CANDIDATE) == 1:
757 self.context.ReaddNode(node)
758
759 return result
760
761
762 class LUNodePowercycle(NoHooksLU):
763 """Powercycles a node.
764
765 """
766 REQ_BGL = False
767
768 def CheckArguments(self):
769 self.op.node_name = ExpandNodeName(self.cfg, self.op.node_name)
770 if self.op.node_name == self.cfg.GetMasterNode() and not self.op.force:
771 raise errors.OpPrereqError("The node is the master and the force"
772 " parameter was not set",
773 errors.ECODE_INVAL)
774
775 def ExpandNames(self):
776 """Locking for PowercycleNode.
777
778 This is a last-resort option and shouldn't block on other
779 jobs. Therefore, we grab no locks.
780
781 """
782 self.needed_locks = {}
783
784 def Exec(self, feedback_fn):
785 """Reboots a node.
786
787 """
788 result = self.rpc.call_node_powercycle(self.op.node_name,
789 self.cfg.GetHypervisorType())
790 result.Raise("Failed to schedule the reboot")
791 return result.payload
792
793
794 def _GetNodeInstancesInner(cfg, fn):
795 return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
796
797
798 def _GetNodePrimaryInstances(cfg, node_name):
799 """Returns primary instances on a node.
800
801 """
802 return _GetNodeInstancesInner(cfg,
803 lambda inst: node_name == inst.primary_node)
804
805
806 def _GetNodeSecondaryInstances(cfg, node_name):
807 """Returns secondary instances on a node.
808
809 """
810 return _GetNodeInstancesInner(cfg,
811 lambda inst: node_name in inst.secondary_nodes)
812
813
814 def _GetNodeInstances(cfg, node_name):
815 """Returns a list of all primary and secondary instances on a node.
816
817 """
818
819 return _GetNodeInstancesInner(cfg, lambda inst: node_name in inst.all_nodes)
820
821
822 class LUNodeEvacuate(NoHooksLU):
823 """Evacuates instances off a list of nodes.
824
825 """
826 REQ_BGL = False
827
828 _MODE2IALLOCATOR = {
829 constants.NODE_EVAC_PRI: constants.IALLOCATOR_NEVAC_PRI,
830 constants.NODE_EVAC_SEC: constants.IALLOCATOR_NEVAC_SEC,
831 constants.NODE_EVAC_ALL: constants.IALLOCATOR_NEVAC_ALL,
832 }
833 assert frozenset(_MODE2IALLOCATOR.keys()) == constants.NODE_EVAC_MODES
834 assert (frozenset(_MODE2IALLOCATOR.values()) ==
835 constants.IALLOCATOR_NEVAC_MODES)
836
837 def CheckArguments(self):
838 CheckIAllocatorOrNode(self, "iallocator", "remote_node")
839
840 def ExpandNames(self):
841 self.op.node_name = ExpandNodeName(self.cfg, self.op.node_name)
842
843 if self.op.remote_node is not None:
844 self.op.remote_node = ExpandNodeName(self.cfg, self.op.remote_node)
845 assert self.op.remote_node
846
847 if self.op.remote_node == self.op.node_name:
848 raise errors.OpPrereqError("Can not use evacuated node as a new"
849 " secondary node", errors.ECODE_INVAL)
850
851 if self.op.mode != constants.NODE_EVAC_SEC:
852 raise errors.OpPrereqError("Without the use of an iallocator only"
853 " secondary instances can be evacuated",
854 errors.ECODE_INVAL)
855
856 # Declare locks
857 self.share_locks = ShareAll()
858 self.needed_locks = {
859 locking.LEVEL_INSTANCE: [],
860 locking.LEVEL_NODEGROUP: [],
861 locking.LEVEL_NODE: [],
862 }
863
864 # Determine nodes (via group) optimistically, needs verification once locks
865 # have been acquired
866 self.lock_nodes = self._DetermineNodes()
867
868 def _DetermineNodes(self):
869 """Gets the list of nodes to operate on.
870
871 """
872 if self.op.remote_node is None:
873 # Iallocator will choose any node(s) in the same group
874 group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_name])
875 else:
876 group_nodes = frozenset([self.op.remote_node])
877
878 # Determine nodes to be locked
879 return set([self.op.node_name]) | group_nodes
880
881 def _DetermineInstances(self):
882 """Builds list of instances to operate on.
883
884 """
885 assert self.op.mode in constants.NODE_EVAC_MODES
886
887 if self.op.mode == constants.NODE_EVAC_PRI:
888 # Primary instances only
889 inst_fn = _GetNodePrimaryInstances
890 assert self.op.remote_node is None, \
891 "Evacuating primary instances requires iallocator"
892 elif self.op.mode == constants.NODE_EVAC_SEC:
893 # Secondary instances only
894 inst_fn = _GetNodeSecondaryInstances
895 else:
896 # All instances
897 assert self.op.mode == constants.NODE_EVAC_ALL
898 inst_fn = _GetNodeInstances
899 # TODO: In 2.6, change the iallocator interface to take an evacuation mode
900 # per instance
901 raise errors.OpPrereqError("Due to an issue with the iallocator"
902 " interface it is not possible to evacuate"
903 " all instances at once; specify explicitly"
904 " whether to evacuate primary or secondary"
905 " instances",
906 errors.ECODE_INVAL)
907
908 return inst_fn(self.cfg, self.op.node_name)
909
910 def DeclareLocks(self, level):
911 if level == locking.LEVEL_INSTANCE:
912 # Lock instances optimistically, needs verification once node and group
913 # locks have been acquired
914 self.needed_locks[locking.LEVEL_INSTANCE] = \
915 set(i.name for i in self._DetermineInstances())
916
917 elif level == locking.LEVEL_NODEGROUP:
918 # Lock node groups for all potential target nodes optimistically, needs
919 # verification once nodes have been acquired
920 self.needed_locks[locking.LEVEL_NODEGROUP] = \
921 self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)
922
923 elif level == locking.LEVEL_NODE:
924 self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
925
926 def CheckPrereq(self):
927 # Verify locks
928 owned_instances = self.owned_locks(locking.LEVEL_INSTANCE)
929 owned_nodes = self.owned_locks(locking.LEVEL_NODE)
930 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
931
932 need_nodes = self._DetermineNodes()
933
934 if not owned_nodes.issuperset(need_nodes):
935 raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
936 " locks were acquired, current nodes are"
937 " are '%s', used to be '%s'; retry the"
938 " operation" %
939 (self.op.node_name,
940 utils.CommaJoin(need_nodes),
941 utils.CommaJoin(owned_nodes)),
942 errors.ECODE_STATE)
943
944 wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
945 if owned_groups != wanted_groups:
946 raise errors.OpExecError("Node groups changed since locks were acquired,"
947 " current groups are '%s', used to be '%s';"
948 " retry the operation" %
949 (utils.CommaJoin(wanted_groups),
950 utils.CommaJoin(owned_groups)))
951
952 # Determine affected instances
953 self.instances = self._DetermineInstances()
954 self.instance_names = [i.name for i in self.instances]
955
956 if set(self.instance_names) != owned_instances:
957 raise errors.OpExecError("Instances on node '%s' changed since locks"
958 " were acquired, current instances are '%s',"
959 " used to be '%s'; retry the operation" %
960 (self.op.node_name,
961 utils.CommaJoin(self.instance_names),
962 utils.CommaJoin(owned_instances)))
963
964 if self.instance_names:
965 self.LogInfo("Evacuating instances from node '%s': %s",
966 self.op.node_name,
967 utils.CommaJoin(utils.NiceSort(self.instance_names)))
968 else:
969 self.LogInfo("No instances to evacuate from node '%s'",
970 self.op.node_name)
971
972 if self.op.remote_node is not None:
973 for i in self.instances:
974 if i.primary_node == self.op.remote_node:
975 raise errors.OpPrereqError("Node %s is the primary node of"
976 " instance %s, cannot use it as"
977 " secondary" %
978 (self.op.remote_node, i.name),
979 errors.ECODE_INVAL)
980
981 def Exec(self, feedback_fn):
982 assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
983
984 if not self.instance_names:
985 # No instances to evacuate
986 jobs = []
987
988 elif self.op.iallocator is not None:
989 # TODO: Implement relocation to other group
990 evac_mode = self._MODE2IALLOCATOR[self.op.mode]
991 req = iallocator.IAReqNodeEvac(evac_mode=evac_mode,
992 instances=list(self.instance_names))
993 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
994
995 ial.Run(self.op.iallocator)
996
997 if not ial.success:
998 raise errors.OpPrereqError("Can't compute node evacuation using"
999 " iallocator '%s': %s" %
1000 (self.op.iallocator, ial.info),
1001 errors.ECODE_NORES)
1002
1003 jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1004
1005 elif self.op.remote_node is not None:
1006 assert self.op.mode == constants.NODE_EVAC_SEC
1007 jobs = [
1008 [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
1009 remote_node=self.op.remote_node,
1010 disks=[],
1011 mode=constants.REPLACE_DISK_CHG,
1012 early_release=self.op.early_release)]
1013 for instance_name in self.instance_names]
1014
1015 else:
1016 raise errors.ProgrammerError("No iallocator or remote node")
1017
1018 return ResultWithJobs(jobs)
1019
1020
1021 class LUNodeMigrate(LogicalUnit):
1022 """Migrate all instances from a node.
1023
1024 """
1025 HPATH = "node-migrate"
1026 HTYPE = constants.HTYPE_NODE
1027 REQ_BGL = False
1028
1029 def CheckArguments(self):
1030 pass
1031
1032 def ExpandNames(self):
1033 self.op.node_name = ExpandNodeName(self.cfg, self.op.node_name)
1034
1035 self.share_locks = ShareAll()
1036 self.needed_locks = {
1037 locking.LEVEL_NODE: [self.op.node_name],
1038 }
1039
1040 def BuildHooksEnv(self):
1041 """Build hooks env.
1042
1043 This runs on the master, the primary and all the secondaries.
1044
1045 """
1046 return {
1047 "NODE_NAME": self.op.node_name,
1048 "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
1049 }
1050
1051 def BuildHooksNodes(self):
1052 """Build hooks nodes.
1053
1054 """
1055 nl = [self.cfg.GetMasterNode()]
1056 return (nl, nl)
1057
1058 def CheckPrereq(self):
1059 pass
1060
1061 def Exec(self, feedback_fn):
1062 # Prepare jobs for migration instances
1063 allow_runtime_changes = self.op.allow_runtime_changes
1064 jobs = [
1065 [opcodes.OpInstanceMigrate(instance_name=inst.name,
1066 mode=self.op.mode,
1067 live=self.op.live,
1068 iallocator=self.op.iallocator,
1069 target_node=self.op.target_node,
1070 allow_runtime_changes=allow_runtime_changes,
1071 ignore_ipolicy=self.op.ignore_ipolicy)]
1072 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_name)]
1073
1074 # TODO: Run iallocator in this opcode and pass correct placement options to
1075 # OpInstanceMigrate. Since other jobs can modify the cluster between
1076 # running the iallocator and the actual migration, a good consistency model
1077 # will have to be found.
1078
1079 assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
1080 frozenset([self.op.node_name]))
1081
1082 return ResultWithJobs(jobs)
1083
1084
1085 def _GetStorageTypeArgs(cfg, storage_type):
1086 """Returns the arguments for a storage type.
1087
1088 """
1089 # Special case for file storage
1090 if storage_type == constants.ST_FILE:
1091 # storage.FileStorage wants a list of storage directories
1092 return [[cfg.GetFileStorageDir(), cfg.GetSharedFileStorageDir()]]
1093
1094 return []
1095
1096
1097 class LUNodeModifyStorage(NoHooksLU):
1098 """Logical unit for modifying a storage volume on a node.
1099
1100 """
1101 REQ_BGL = False
1102
1103 def CheckArguments(self):
1104 self.op.node_name = ExpandNodeName(self.cfg, self.op.node_name)
1105
1106 storage_type = self.op.storage_type
1107
1108 try:
1109 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1110 except KeyError:
1111 raise errors.OpPrereqError("Storage units of type '%s' can not be"
1112 " modified" % storage_type,
1113 errors.ECODE_INVAL)
1114
1115 diff = set(self.op.changes.keys()) - modifiable
1116 if diff:
1117 raise errors.OpPrereqError("The following fields can not be modified for"
1118 " storage units of type '%s': %r" %
1119 (storage_type, list(diff)),
1120 errors.ECODE_INVAL)
1121
1122 def ExpandNames(self):
1123 self.needed_locks = {
1124 locking.LEVEL_NODE: self.op.node_name,
1125 }
1126
1127 def Exec(self, feedback_fn):
1128 """Computes the list of nodes and their attributes.
1129
1130 """
1131 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1132 result = self.rpc.call_storage_modify(self.op.node_name,
1133 self.op.storage_type, st_args,
1134 self.op.name, self.op.changes)
1135 result.Raise("Failed to modify storage unit '%s' on %s" %
1136 (self.op.name, self.op.node_name))
1137
1138
1139 class NodeQuery(QueryBase):
1140 FIELDS = query.NODE_FIELDS
1141
1142 def ExpandNames(self, lu):
1143 lu.needed_locks = {}
1144 lu.share_locks = ShareAll()
1145
1146 if self.names:
1147 self.wanted = GetWantedNodes(lu, self.names)
1148 else:
1149 self.wanted = locking.ALL_SET
1150
1151 self.do_locking = (self.use_locking and
1152 query.NQ_LIVE in self.requested_data)
1153
1154 if self.do_locking:
1155 # If any non-static field is requested we need to lock the nodes
1156 lu.needed_locks[locking.LEVEL_NODE] = self.wanted
1157 lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
1158
1159 def DeclareLocks(self, lu, level):
1160 pass
1161
1162 def _GetQueryData(self, lu):
1163 """Computes the list of nodes and their attributes.
1164
1165 """
1166 all_info = lu.cfg.GetAllNodesInfo()
1167
1168 nodenames = self._GetNames(lu, all_info.keys(), locking.LEVEL_NODE)
1169
1170 # Gather data as requested
1171 if query.NQ_LIVE in self.requested_data:
1172 # filter out non-vm_capable nodes
1173 toquery_nodes = [name for name in nodenames if all_info[name].vm_capable]
1174
1175 es_flags = rpc.GetExclusiveStorageForNodeNames(lu.cfg, toquery_nodes)
1176 node_data = lu.rpc.call_node_info(toquery_nodes, [lu.cfg.GetVGName()],
1177 [lu.cfg.GetHypervisorType()], es_flags)
1178 live_data = dict((name, rpc.MakeLegacyNodeInfo(nresult.payload))
1179 for (name, nresult) in node_data.items()
1180 if not nresult.fail_msg and nresult.payload)
1181 else:
1182 live_data = None
1183
1184 if query.NQ_INST in self.requested_data:
1185 node_to_primary = dict([(name, set()) for name in nodenames])
1186 node_to_secondary = dict([(name, set()) for name in nodenames])
1187
1188 inst_data = lu.cfg.GetAllInstancesInfo()
1189
1190 for inst in inst_data.values():
1191 if inst.primary_node in node_to_primary:
1192 node_to_primary[inst.primary_node].add(inst.name)
1193 for secnode in inst.secondary_nodes:
1194 if secnode in node_to_secondary:
1195 node_to_secondary[secnode].add(inst.name)
1196 else:
1197 node_to_primary = None
1198 node_to_secondary = None
1199
1200 if query.NQ_OOB in self.requested_data:
1201 oob_support = dict((name, bool(SupportsOob(lu.cfg, node)))
1202 for name, node in all_info.iteritems())
1203 else:
1204 oob_support = None
1205
1206 if query.NQ_GROUP in self.requested_data:
1207 groups = lu.cfg.GetAllNodeGroupsInfo()
1208 else:
1209 groups = {}
1210
1211 return query.NodeQueryData([all_info[name] for name in nodenames],
1212 live_data, lu.cfg.GetMasterNode(),
1213 node_to_primary, node_to_secondary, groups,
1214 oob_support, lu.cfg.GetClusterInfo())
1215
1216
1217 class LUNodeQuery(NoHooksLU):
1218 """Logical unit for querying nodes.
1219
1220 """
1221 # pylint: disable=W0142
1222 REQ_BGL = False
1223
1224 def CheckArguments(self):
1225 self.nq = NodeQuery(qlang.MakeSimpleFilter("name", self.op.names),
1226 self.op.output_fields, self.op.use_locking)
1227
1228 def ExpandNames(self):
1229 self.nq.ExpandNames(self)
1230
1231 def DeclareLocks(self, level):
1232 self.nq.DeclareLocks(self, level)
1233
1234 def Exec(self, feedback_fn):
1235 return self.nq.OldStyleQuery(self)
1236
1237
1238 def _CheckOutputFields(static, dynamic, selected):
1239 """Checks whether all selected fields are valid.
1240
1241 @type static: L{utils.FieldSet}
1242 @param static: static fields set
1243 @type dynamic: L{utils.FieldSet}
1244 @param dynamic: dynamic fields set
1245
1246 """
1247 f = utils.FieldSet()
1248 f.Extend(static)
1249 f.Extend(dynamic)
1250
1251 delta = f.NonMatching(selected)
1252 if delta:
1253 raise errors.OpPrereqError("Unknown output fields selected: %s"
1254 % ",".join(delta), errors.ECODE_INVAL)
1255
1256
1257 class LUNodeQueryvols(NoHooksLU):
1258 """Logical unit for getting volumes on node(s).
1259
1260 """
1261 REQ_BGL = False
1262 _FIELDS_DYNAMIC = utils.FieldSet("phys", "vg", "name", "size", "instance")
1263 _FIELDS_STATIC = utils.FieldSet("node")
1264
1265 def CheckArguments(self):
1266 _CheckOutputFields(static=self._FIELDS_STATIC,
1267 dynamic=self._FIELDS_DYNAMIC,
1268 selected=self.op.output_fields)
1269
1270 def ExpandNames(self):
1271 self.share_locks = ShareAll()
1272
1273 if self.op.nodes:
1274 self.needed_locks = {
1275 locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes),
1276 }
1277 else:
1278 self.needed_locks = {
1279 locking.LEVEL_NODE: locking.ALL_SET,
1280 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1281 }
1282
1283 def Exec(self, feedback_fn):
1284 """Computes the list of nodes and their attributes.
1285
1286 """
1287 nodenames = self.owned_locks(locking.LEVEL_NODE)
1288 volumes = self.rpc.call_node_volumes(nodenames)
1289
1290 ilist = self.cfg.GetAllInstancesInfo()
1291 vol2inst = MapInstanceDisksToNodes(ilist.values())
1292
1293 output = []
1294 for node in nodenames:
1295 nresult = volumes[node]
1296 if nresult.offline:
1297 continue
1298 msg = nresult.fail_msg
1299 if msg:
1300 self.LogWarning("Can't compute volume data on node %s: %s", node, msg)
1301 continue
1302
1303 node_vols = sorted(nresult.payload,
1304 key=operator.itemgetter("dev"))
1305
1306 for vol in node_vols:
1307 node_output = []
1308 for field in self.op.output_fields:
1309 if field == "node":
1310 val = node
1311 elif field == "phys":
1312 val = vol["dev"]
1313 elif field == "vg":
1314 val = vol["vg"]
1315 elif field == "name":
1316 val = vol["name"]
1317 elif field == "size":
1318 val = int(float(vol["size"]))
1319 elif field == "instance":
1320 val = vol2inst.get((node, vol["vg"] + "/" + vol["name"]), "-")
1321 else:
1322 raise errors.ParameterError(field)
1323 node_output.append(str(val))
1324
1325 output.append(node_output)
1326
1327 return output
1328
1329
1330 class LUNodeQueryStorage(NoHooksLU):
1331 """Logical unit for getting information on storage units on node(s).
1332
1333 """
1334 _FIELDS_STATIC = utils.FieldSet(constants.SF_NODE)
1335 REQ_BGL = False
1336
1337 def CheckArguments(self):
1338 _CheckOutputFields(static=self._FIELDS_STATIC,
1339 dynamic=utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
1340 selected=self.op.output_fields)
1341
1342 def ExpandNames(self):
1343 self.share_locks = ShareAll()
1344
1345 if self.op.nodes:
1346 self.needed_locks = {
1347 locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes),
1348 }
1349 else:
1350 self.needed_locks = {
1351 locking.LEVEL_NODE: locking.ALL_SET,
1352 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1353 }
1354
1355 def Exec(self, feedback_fn):
1356 """Computes the list of nodes and their attributes.
1357
1358 """
1359 self.nodes = self.owned_locks(locking.LEVEL_NODE)
1360
1361 # Always get name to sort by
1362 if constants.SF_NAME in self.op.output_fields:
1363 fields = self.op.output_fields[:]
1364 else:
1365 fields = [constants.SF_NAME] + self.op.output_fields
1366
1367 # Never ask for node or type as it's only known to the LU
1368 for extra in [constants.SF_NODE, constants.SF_TYPE]:
1369 while extra in fields:
1370 fields.remove(extra)
1371
1372 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1373 name_idx = field_idx[constants.SF_NAME]
1374
1375 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1376 data = self.rpc.call_storage_list(self.nodes,
1377 self.op.storage_type, st_args,
1378 self.op.name, fields)
1379
1380 result = []
1381
1382 for node in utils.NiceSort(self.nodes):
1383 nresult = data[node]
1384 if nresult.offline:
1385 continue
1386
1387 msg = nresult.fail_msg
1388 if msg:
1389 self.LogWarning("Can't get storage data from node %s: %s", node, msg)
1390 continue
1391
1392 rows = dict([(row[name_idx], row) for row in nresult.payload])
1393
1394 for name in utils.NiceSort(rows.keys()):
1395 row = rows[name]
1396
1397 out = []
1398
1399 for field in self.op.output_fields:
1400 if field == constants.SF_NODE:
1401 val = node
1402 elif field == constants.SF_TYPE:
1403 val = self.op.storage_type
1404 elif field in field_idx:
1405 val = row[field_idx[field]]
1406 else:
1407 raise errors.ParameterError(field)
1408
1409 out.append(val)
1410
1411 result.append(out)
1412
1413 return result
1414
1415
1416 class LUNodeRemove(LogicalUnit):
1417 """Logical unit for removing a node.
1418
1419 """
1420 HPATH = "node-remove"
1421 HTYPE = constants.HTYPE_NODE
1422
1423 def BuildHooksEnv(self):
1424 """Build hooks env.
1425
1426 """
1427 return {
1428 "OP_TARGET": self.op.node_name,
1429 "NODE_NAME": self.op.node_name,
1430 }
1431
1432 def BuildHooksNodes(self):
1433 """Build hooks nodes.
1434
1435 This doesn't run on the target node in the pre phase as a failed
1436 node would then be impossible to remove.
1437
1438 """
1439 all_nodes = self.cfg.GetNodeList()
1440 try:
1441 all_nodes.remove(self.op.node_name)
1442 except ValueError:
1443 pass
1444 return (all_nodes, all_nodes)
1445
1446 def CheckPrereq(self):
1447 """Check prerequisites.
1448
1449 This checks:
1450 - the node exists in the configuration
1451 - it does not have primary or secondary instances
1452 - it's not the master
1453
1454 Any errors are signaled by raising errors.OpPrereqError.
1455
1456 """
1457 self.op.node_name = ExpandNodeName(self.cfg, self.op.node_name)
1458 node = self.cfg.GetNodeInfo(self.op.node_name)
1459 assert node is not None
1460
1461 masternode = self.cfg.GetMasterNode()
1462 if node.name == masternode:
1463 raise errors.OpPrereqError("Node is the master node, failover to another"
1464 " node is required", errors.ECODE_INVAL)
1465
1466 for instance_name, instance in self.cfg.GetAllInstancesInfo().items():
1467 if node.name in instance.all_nodes:
1468 raise errors.OpPrereqError("Instance %s is still running on the node,"
1469 " please remove first" % instance_name,
1470 errors.ECODE_INVAL)
1471 self.op.node_name = node.name
1472 self.node = node
1473
1474 def Exec(self, feedback_fn):
1475 """Removes the node from the cluster.
1476
1477 """
1478 node = self.node
1479 logging.info("Stopping the node daemon and removing configs from node %s",
1480 node.name)
1481
1482 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1483
1484 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
1485 "Not owning BGL"
1486
1487 # Promote nodes to master candidate as needed
1488 AdjustCandidatePool(self, exceptions=[node.name])
1489 self.context.RemoveNode(node.name)
1490
1491 # Run post hooks on the node before it's removed
1492 RunPostHook(self, node.name)
1493
1494 result = self.rpc.call_node_leave_cluster(node.name, modify_ssh_setup)
1495 msg = result.fail_msg
1496 if msg:
1497 self.LogWarning("Errors encountered on the remote node while leaving"
1498 " the cluster: %s", msg)
1499
1500 # Remove node from our /etc/hosts
1501 if self.cfg.GetClusterInfo().modify_etc_hosts:
1502 master_node = self.cfg.GetMasterNode()
1503 result = self.rpc.call_etc_hosts_modify(master_node,
1504 constants.ETC_HOSTS_REMOVE,
1505 node.name, None)
1506 result.Raise("Can't update hosts file with new host data")
1507 RedistributeAncillaryFiles(self)
1508
1509
1510 class LURepairNodeStorage(NoHooksLU):
1511 """Repairs the volume group on a node.
1512
1513 """
1514 REQ_BGL = False
1515
1516 def CheckArguments(self):
1517 self.op.node_name = ExpandNodeName(self.cfg, self.op.node_name)
1518
1519 storage_type = self.op.storage_type
1520
1521 if (constants.SO_FIX_CONSISTENCY not in
1522 constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
1523 raise errors.OpPrereqError("Storage units of type '%s' can not be"
1524 " repaired" % storage_type,
1525 errors.ECODE_INVAL)
1526
1527 def ExpandNames(self):
1528 self.needed_locks = {
1529 locking.LEVEL_NODE: [self.op.node_name],
1530 }
1531
1532 def _CheckFaultyDisks(self, instance, node_name):
1533 """Ensure faulty disks abort the opcode or at least warn."""
1534 try:
1535 if FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
1536 node_name, True):
1537 raise errors.OpPrereqError("Instance '%s' has faulty disks on"
1538 " node '%s'" % (instance.name, node_name),
1539 errors.ECODE_STATE)
1540 except errors.OpPrereqError, err:
1541 if self.op.ignore_consistency:
1542 self.LogWarning(str(err.args[0]))
1543 else:
1544 raise
1545
1546 def CheckPrereq(self):
1547 """Check prerequisites.
1548
1549 """
1550 # Check whether any instance on this node has faulty disks
1551 for inst in _GetNodeInstances(self.cfg, self.op.node_name):
1552 if inst.admin_state != constants.ADMINST_UP:
1553 continue
1554 check_nodes = set(inst.all_nodes)
1555 check_nodes.discard(self.op.node_name)
1556 for inst_node_name in check_nodes:
1557 self._CheckFaultyDisks(inst, inst_node_name)
1558
1559 def Exec(self, feedback_fn):
1560 feedback_fn("Repairing storage unit '%s' on %s ..." %
1561 (self.op.name, self.op.node_name))
1562
1563 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1564 result = self.rpc.call_storage_execute(self.op.node_name,
1565 self.op.storage_type, st_args,
1566 self.op.name,
1567 constants.SO_FIX_CONSISTENCY)
1568 result.Raise("Failed to repair storage unit '%s' on %s" %
1569 (self.op.name, self.op.node_name))