89a9aee1ea10c839c4e78f4a72cc044cac846af1
[ganeti-github.git] / lib / cmdlib / node.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions are
9 # met:
10 #
11 # 1. Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
13 #
14 # 2. Redistributions in binary form must reproduce the above copyright
15 # notice, this list of conditions and the following disclaimer in the
16 # documentation and/or other materials provided with the distribution.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
19 # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
20 # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30
31 """Logical units dealing with nodes."""
32
33 import logging
34 import operator
35
36 from ganeti import constants
37 from ganeti import errors
38 from ganeti import locking
39 from ganeti import netutils
40 from ganeti import objects
41 from ganeti import opcodes
42 import ganeti.rpc.node as rpc
43 from ganeti import utils
44 from ganeti.masterd import iallocator
45
46 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, ResultWithJobs
47 from ganeti.cmdlib.common import CheckParamsNotGlobal, \
48 MergeAndVerifyHvState, MergeAndVerifyDiskState, \
49 IsExclusiveStorageEnabledNode, CheckNodePVs, \
50 RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \
51 CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
52 AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
53 GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \
54 FindFaultyInstanceDisks, CheckStorageTypeEnabled, CreateNewClientCert, \
55 AddNodeCertToCandidateCerts, RemoveNodeCertFromCandidateCerts, \
56 EnsureKvmdOnNodes
57
58
59 def _DecideSelfPromotion(lu, exceptions=None):
60 """Decide whether I should promote myself as a master candidate.
61
62 """
63 cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
64 mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
65 # the new node will increase mc_max with one, so:
66 mc_should = min(mc_should + 1, cp_size)
67 return mc_now < mc_should
68
69
70 def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
71 """Ensure that a node has the given secondary ip.
72
73 @type lu: L{LogicalUnit}
74 @param lu: the LU on behalf of which we make the check
75 @type node: L{objects.Node}
76 @param node: the node to check
77 @type secondary_ip: string
78 @param secondary_ip: the ip to check
79 @type prereq: boolean
80 @param prereq: whether to throw a prerequisite or an execute error
81 @raise errors.OpPrereqError: if the node doesn't have the ip,
82 and prereq=True
83 @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
84
85 """
86 # this can be called with a new node, which has no UUID yet, so perform the
87 # RPC call using its name
88 result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip)
89 result.Raise("Failure checking secondary ip on node %s" % node.name,
90 prereq=prereq, ecode=errors.ECODE_ENVIRON)
91 if not result.payload:
92 msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
93 " please fix and re-run this command" % secondary_ip)
94 if prereq:
95 raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
96 else:
97 raise errors.OpExecError(msg)
98
99
100 class LUNodeAdd(LogicalUnit):
101 """Logical unit for adding node to the cluster.
102
103 """
104 HPATH = "node-add"
105 HTYPE = constants.HTYPE_NODE
106 _NFLAGS = ["master_capable", "vm_capable"]
107
108 def CheckArguments(self):
109 self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
110 # validate/normalize the node name
111 self.hostname = netutils.GetHostname(name=self.op.node_name,
112 family=self.primary_ip_family)
113 self.op.node_name = self.hostname.name
114
115 if self.op.readd and self.op.node_name == self.cfg.GetMasterNodeName():
116 raise errors.OpPrereqError("Cannot readd the master node",
117 errors.ECODE_STATE)
118
119 if self.op.readd and self.op.group:
120 raise errors.OpPrereqError("Cannot pass a node group when a node is"
121 " being readded", errors.ECODE_INVAL)
122
123 def BuildHooksEnv(self):
124 """Build hooks env.
125
126 This will run on all nodes before, and on all nodes + the new node after.
127
128 """
129 return {
130 "OP_TARGET": self.op.node_name,
131 "NODE_NAME": self.op.node_name,
132 "NODE_PIP": self.op.primary_ip,
133 "NODE_SIP": self.op.secondary_ip,
134 "MASTER_CAPABLE": str(self.op.master_capable),
135 "VM_CAPABLE": str(self.op.vm_capable),
136 }
137
138 def BuildHooksNodes(self):
139 """Build hooks nodes.
140
141 """
142 hook_nodes = self.cfg.GetNodeList()
143 new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name)
144 if new_node_info is not None:
145 # Exclude added node
146 hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid]))
147
148 # add the new node as post hook node by name; it does not have an UUID yet
149 return (hook_nodes, hook_nodes)
150
151 def PreparePostHookNodes(self, post_hook_node_uuids):
152 return post_hook_node_uuids + [self.new_node.uuid]
153
154 def CheckPrereq(self):
155 """Check prerequisites.
156
157 This checks:
158 - the new node is not already in the config
159 - it is resolvable
160 - its parameters (single/dual homed) matches the cluster
161
162 Any errors are signaled by raising errors.OpPrereqError.
163
164 """
165 node_name = self.hostname.name
166 self.op.primary_ip = self.hostname.ip
167 if self.op.secondary_ip is None:
168 if self.primary_ip_family == netutils.IP6Address.family:
169 raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
170 " IPv4 address must be given as secondary",
171 errors.ECODE_INVAL)
172 self.op.secondary_ip = self.op.primary_ip
173
174 secondary_ip = self.op.secondary_ip
175 if not netutils.IP4Address.IsValid(secondary_ip):
176 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
177 " address" % secondary_ip, errors.ECODE_INVAL)
178
179 existing_node_info = self.cfg.GetNodeInfoByName(node_name)
180 if not self.op.readd and existing_node_info is not None:
181 raise errors.OpPrereqError("Node %s is already in the configuration" %
182 node_name, errors.ECODE_EXISTS)
183 elif self.op.readd and existing_node_info is None:
184 raise errors.OpPrereqError("Node %s is not in the configuration" %
185 node_name, errors.ECODE_NOENT)
186
187 self.changed_primary_ip = False
188
189 for existing_node in self.cfg.GetAllNodesInfo().values():
190 if self.op.readd and node_name == existing_node.name:
191 if existing_node.secondary_ip != secondary_ip:
192 raise errors.OpPrereqError("Readded node doesn't have the same IP"
193 " address configuration as before",
194 errors.ECODE_INVAL)
195 if existing_node.primary_ip != self.op.primary_ip:
196 self.changed_primary_ip = True
197
198 continue
199
200 if (existing_node.primary_ip == self.op.primary_ip or
201 existing_node.secondary_ip == self.op.primary_ip or
202 existing_node.primary_ip == secondary_ip or
203 existing_node.secondary_ip == secondary_ip):
204 raise errors.OpPrereqError("New node ip address(es) conflict with"
205 " existing node %s" % existing_node.name,
206 errors.ECODE_NOTUNIQUE)
207
208 # After this 'if' block, None is no longer a valid value for the
209 # _capable op attributes
210 if self.op.readd:
211 assert existing_node_info is not None, \
212 "Can't retrieve locked node %s" % node_name
213 for attr in self._NFLAGS:
214 if getattr(self.op, attr) is None:
215 setattr(self.op, attr, getattr(existing_node_info, attr))
216 else:
217 for attr in self._NFLAGS:
218 if getattr(self.op, attr) is None:
219 setattr(self.op, attr, True)
220
221 if self.op.readd and not self.op.vm_capable:
222 pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid)
223 if pri or sec:
224 raise errors.OpPrereqError("Node %s being re-added with vm_capable"
225 " flag set to false, but it already holds"
226 " instances" % node_name,
227 errors.ECODE_STATE)
228
229 # check that the type of the node (single versus dual homed) is the
230 # same as for the master
231 myself = self.cfg.GetMasterNodeInfo()
232 master_singlehomed = myself.secondary_ip == myself.primary_ip
233 newbie_singlehomed = secondary_ip == self.op.primary_ip
234 if master_singlehomed != newbie_singlehomed:
235 if master_singlehomed:
236 raise errors.OpPrereqError("The master has no secondary ip but the"
237 " new node has one",
238 errors.ECODE_INVAL)
239 else:
240 raise errors.OpPrereqError("The master has a secondary ip but the"
241 " new node doesn't have one",
242 errors.ECODE_INVAL)
243
244 # checks reachability
245 if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT):
246 raise errors.OpPrereqError("Node not reachable by ping",
247 errors.ECODE_ENVIRON)
248
249 if not newbie_singlehomed:
250 # check reachability from my secondary ip to newbie's secondary ip
251 if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
252 source=myself.secondary_ip):
253 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
254 " based ping to node daemon port",
255 errors.ECODE_ENVIRON)
256
257 if self.op.readd:
258 exceptions = [existing_node_info.uuid]
259 else:
260 exceptions = []
261
262 if self.op.master_capable:
263 self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
264 else:
265 self.master_candidate = False
266
267 self.node_group = None
268 if self.op.readd:
269 self.new_node = existing_node_info
270 self.node_group = existing_node_info.group
271 else:
272 self.node_group = self.cfg.LookupNodeGroup(self.op.group)
273 self.new_node = objects.Node(name=node_name,
274 primary_ip=self.op.primary_ip,
275 secondary_ip=secondary_ip,
276 master_candidate=self.master_candidate,
277 offline=False, drained=False,
278 group=self.node_group, ndparams={})
279
280 if self.op.ndparams:
281 utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
282 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
283 "node", "cluster or group")
284
285 if self.op.hv_state:
286 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
287
288 if self.op.disk_state:
289 self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
290
291 # TODO: If we need to have multiple DnsOnlyRunner we probably should make
292 # it a property on the base class.
293 rpcrunner = rpc.DnsOnlyRunner()
294 result = rpcrunner.call_version([node_name])[node_name]
295 result.Raise("Can't get version information from node %s" % node_name,
296 prereq=True)
297 if constants.PROTOCOL_VERSION == result.payload:
298 logging.info("Communication to node %s fine, sw version %s match",
299 node_name, result.payload)
300 else:
301 raise errors.OpPrereqError("Version mismatch master version %s,"
302 " node version %s" %
303 (constants.PROTOCOL_VERSION, result.payload),
304 errors.ECODE_ENVIRON)
305
306 vg_name = self.cfg.GetVGName()
307 if vg_name is not None:
308 vparams = {constants.NV_PVLIST: [vg_name]}
309 excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node)
310 cname = self.cfg.GetClusterName()
311 result = rpcrunner.call_node_verify_light(
312 [node_name], vparams, cname,
313 self.cfg.GetClusterInfo().hvparams,
314 {node_name: self.node_group},
315 self.cfg.GetAllNodeGroupsInfoDict()
316 )[node_name]
317 (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
318 if errmsgs:
319 raise errors.OpPrereqError("Checks on node PVs failed: %s" %
320 "; ".join(errmsgs), errors.ECODE_ENVIRON)
321
322 def _InitOpenVSwitch(self):
323 filled_ndparams = self.cfg.GetClusterInfo().FillND(
324 self.new_node, self.cfg.GetNodeGroup(self.new_node.group))
325
326 ovs = filled_ndparams.get(constants.ND_OVS, None)
327 ovs_name = filled_ndparams.get(constants.ND_OVS_NAME, None)
328 ovs_link = filled_ndparams.get(constants.ND_OVS_LINK, None)
329
330 if ovs:
331 if not ovs_link:
332 self.LogInfo("No physical interface for OpenvSwitch was given."
333 " OpenvSwitch will not have an outside connection. This"
334 " might not be what you want.")
335
336 result = self.rpc.call_node_configure_ovs(
337 self.new_node.name, ovs_name, ovs_link)
338 result.Raise("Failed to initialize OpenVSwitch on new node")
339
340 def Exec(self, feedback_fn):
341 """Adds the new node to the cluster.
342
343 """
344 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
345 "Not owning BGL"
346
347 # We adding a new node so we assume it's powered
348 self.new_node.powered = True
349
350 # for re-adds, reset the offline/drained/master-candidate flags;
351 # we need to reset here, otherwise offline would prevent RPC calls
352 # later in the procedure; this also means that if the re-add
353 # fails, we are left with a non-offlined, broken node
354 if self.op.readd:
355 self.new_node.offline = False
356 self.new_node.drained = False
357 self.LogInfo("Readding a node, the offline/drained flags were reset")
358 # if we demote the node, we do cleanup later in the procedure
359 self.new_node.master_candidate = self.master_candidate
360 if self.changed_primary_ip:
361 self.new_node.primary_ip = self.op.primary_ip
362
363 # copy the master/vm_capable flags
364 for attr in self._NFLAGS:
365 setattr(self.new_node, attr, getattr(self.op, attr))
366
367 # notify the user about any possible mc promotion
368 if self.new_node.master_candidate:
369 self.LogInfo("Node will be a master candidate")
370
371 if self.op.ndparams:
372 self.new_node.ndparams = self.op.ndparams
373 else:
374 self.new_node.ndparams = {}
375
376 if self.op.hv_state:
377 self.new_node.hv_state_static = self.new_hv_state
378
379 if self.op.disk_state:
380 self.new_node.disk_state_static = self.new_disk_state
381
382 # Add node to our /etc/hosts, and add key to known_hosts
383 if self.cfg.GetClusterInfo().modify_etc_hosts:
384 master_node = self.cfg.GetMasterNode()
385 result = self.rpc.call_etc_hosts_modify(
386 master_node, constants.ETC_HOSTS_ADD, self.hostname.name,
387 self.hostname.ip)
388 result.Raise("Can't update hosts file with new host data")
389
390 if self.new_node.secondary_ip != self.new_node.primary_ip:
391 _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip,
392 False)
393
394 node_verifier_uuids = [self.cfg.GetMasterNode()]
395 node_verify_param = {
396 constants.NV_NODELIST: ([self.new_node.name], {}),
397 # TODO: do a node-net-test as well?
398 }
399
400 result = self.rpc.call_node_verify(
401 node_verifier_uuids, node_verify_param,
402 self.cfg.GetClusterName(),
403 self.cfg.GetClusterInfo().hvparams,
404 {self.new_node.name: self.cfg.LookupNodeGroup(self.node_group)},
405 self.cfg.GetAllNodeGroupsInfoDict()
406 )
407 for verifier in node_verifier_uuids:
408 result[verifier].Raise("Cannot communicate with node %s" % verifier)
409 nl_payload = result[verifier].payload[constants.NV_NODELIST]
410 if nl_payload:
411 for failed in nl_payload:
412 feedback_fn("ssh/hostname verification failed"
413 " (checking from %s): %s" %
414 (verifier, nl_payload[failed]))
415 raise errors.OpExecError("ssh/hostname verification failed")
416
417 self._InitOpenVSwitch()
418
419 if self.op.readd:
420 self.context.ReaddNode(self.new_node)
421 RedistributeAncillaryFiles(self)
422 # make sure we redistribute the config
423 self.cfg.Update(self.new_node, feedback_fn)
424 # and make sure the new node will not have old files around
425 if not self.new_node.master_candidate:
426 result = self.rpc.call_node_demote_from_mc(self.new_node.uuid)
427 result.Warn("Node failed to demote itself from master candidate status",
428 self.LogWarning)
429 else:
430 self.context.AddNode(self.cfg, self.new_node, self.proc.GetECId())
431 RedistributeAncillaryFiles(self)
432
433 # We create a new certificate even if the node is readded
434 digest = CreateNewClientCert(self, self.new_node.uuid)
435 if self.new_node.master_candidate:
436 self.cfg.AddNodeToCandidateCerts(self.new_node.uuid, digest)
437 else:
438 self.cfg.RemoveNodeFromCandidateCerts(self.new_node.uuid, warn_fn=None)
439
440 EnsureKvmdOnNodes(self, feedback_fn, nodes=[self.new_node.uuid])
441
442
443 class LUNodeSetParams(LogicalUnit):
444 """Modifies the parameters of a node.
445
446 @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
447 to the node role (as _ROLE_*)
448 @cvar _R2F: a dictionary from node role to tuples of flags
449 @cvar _FLAGS: a list of attribute names corresponding to the flags
450
451 """
452 HPATH = "node-modify"
453 HTYPE = constants.HTYPE_NODE
454 REQ_BGL = False
455 (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
456 _F2R = {
457 (True, False, False): _ROLE_CANDIDATE,
458 (False, True, False): _ROLE_DRAINED,
459 (False, False, True): _ROLE_OFFLINE,
460 (False, False, False): _ROLE_REGULAR,
461 }
462 _R2F = dict((v, k) for k, v in _F2R.items())
463 _FLAGS = ["master_candidate", "drained", "offline"]
464
465 def CheckArguments(self):
466 (self.op.node_uuid, self.op.node_name) = \
467 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
468 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
469 self.op.master_capable, self.op.vm_capable,
470 self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
471 self.op.disk_state]
472 if all_mods.count(None) == len(all_mods):
473 raise errors.OpPrereqError("Please pass at least one modification",
474 errors.ECODE_INVAL)
475 if all_mods.count(True) > 1:
476 raise errors.OpPrereqError("Can't set the node into more than one"
477 " state at the same time",
478 errors.ECODE_INVAL)
479
480 # Boolean value that tells us whether we might be demoting from MC
481 self.might_demote = (self.op.master_candidate is False or
482 self.op.offline is True or
483 self.op.drained is True or
484 self.op.master_capable is False)
485
486 if self.op.secondary_ip:
487 if not netutils.IP4Address.IsValid(self.op.secondary_ip):
488 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
489 " address" % self.op.secondary_ip,
490 errors.ECODE_INVAL)
491
492 self.lock_all = self.op.auto_promote and self.might_demote
493 self.lock_instances = self.op.secondary_ip is not None
494
495 def _InstanceFilter(self, instance):
496 """Filter for getting affected instances.
497
498 """
499 return (instance.disk_template in constants.DTS_INT_MIRROR and
500 self.op.node_uuid in self.cfg.GetInstanceNodes(instance.uuid))
501
502 def ExpandNames(self):
503 if self.lock_all:
504 self.needed_locks = {
505 locking.LEVEL_NODE: locking.ALL_SET,
506
507 # Block allocations when all nodes are locked
508 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
509 }
510 else:
511 self.needed_locks = {
512 locking.LEVEL_NODE: self.op.node_uuid,
513 }
514
515 # Since modifying a node can have severe effects on currently running
516 # operations the resource lock is at least acquired in shared mode
517 self.needed_locks[locking.LEVEL_NODE_RES] = \
518 self.needed_locks[locking.LEVEL_NODE]
519
520 # Get all locks except nodes in shared mode; they are not used for anything
521 # but read-only access
522 self.share_locks = ShareAll()
523 self.share_locks[locking.LEVEL_NODE] = 0
524 self.share_locks[locking.LEVEL_NODE_RES] = 0
525 self.share_locks[locking.LEVEL_NODE_ALLOC] = 0
526
527 if self.lock_instances:
528 self.needed_locks[locking.LEVEL_INSTANCE] = \
529 self.cfg.GetInstanceNames(
530 self.cfg.GetInstancesInfoByFilter(self._InstanceFilter).keys())
531
532 def BuildHooksEnv(self):
533 """Build hooks env.
534
535 This runs on the master node.
536
537 """
538 return {
539 "OP_TARGET": self.op.node_name,
540 "MASTER_CANDIDATE": str(self.op.master_candidate),
541 "OFFLINE": str(self.op.offline),
542 "DRAINED": str(self.op.drained),
543 "MASTER_CAPABLE": str(self.op.master_capable),
544 "VM_CAPABLE": str(self.op.vm_capable),
545 }
546
547 def BuildHooksNodes(self):
548 """Build hooks nodes.
549
550 """
551 nl = [self.cfg.GetMasterNode(), self.op.node_uuid]
552 return (nl, nl)
553
554 def CheckPrereq(self):
555 """Check prerequisites.
556
557 This only checks the instance list against the existing names.
558
559 """
560 node = self.cfg.GetNodeInfo(self.op.node_uuid)
561 if self.lock_instances:
562 affected_instances = \
563 self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
564
565 # Verify instance locks
566 owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
567 wanted_instance_names = frozenset([inst.name for inst in
568 affected_instances.values()])
569 if wanted_instance_names - owned_instance_names:
570 raise errors.OpPrereqError("Instances affected by changing node %s's"
571 " secondary IP address have changed since"
572 " locks were acquired, wanted '%s', have"
573 " '%s'; retry the operation" %
574 (node.name,
575 utils.CommaJoin(wanted_instance_names),
576 utils.CommaJoin(owned_instance_names)),
577 errors.ECODE_STATE)
578 else:
579 affected_instances = None
580
581 if (self.op.master_candidate is not None or
582 self.op.drained is not None or
583 self.op.offline is not None):
584 # we can't change the master's node flags
585 if node.uuid == self.cfg.GetMasterNode():
586 raise errors.OpPrereqError("The master role can be changed"
587 " only via master-failover",
588 errors.ECODE_INVAL)
589
590 if self.op.master_candidate and not node.master_capable:
591 raise errors.OpPrereqError("Node %s is not master capable, cannot make"
592 " it a master candidate" % node.name,
593 errors.ECODE_STATE)
594
595 if self.op.vm_capable is False:
596 (ipri, isec) = self.cfg.GetNodeInstances(node.uuid)
597 if ipri or isec:
598 raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
599 " the vm_capable flag" % node.name,
600 errors.ECODE_STATE)
601
602 if node.master_candidate and self.might_demote and not self.lock_all:
603 assert not self.op.auto_promote, "auto_promote set but lock_all not"
604 # check if after removing the current node, we're missing master
605 # candidates
606 (mc_remaining, mc_should, _) = \
607 self.cfg.GetMasterCandidateStats(exceptions=[node.uuid])
608 if mc_remaining < mc_should:
609 raise errors.OpPrereqError("Not enough master candidates, please"
610 " pass auto promote option to allow"
611 " promotion (--auto-promote or RAPI"
612 " auto_promote=True)", errors.ECODE_STATE)
613
614 self.old_flags = old_flags = (node.master_candidate,
615 node.drained, node.offline)
616 assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
617 self.old_role = old_role = self._F2R[old_flags]
618
619 # Check for ineffective changes
620 for attr in self._FLAGS:
621 if getattr(self.op, attr) is False and getattr(node, attr) is False:
622 self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
623 setattr(self.op, attr, None)
624
625 # Past this point, any flag change to False means a transition
626 # away from the respective state, as only real changes are kept
627
628 # TODO: We might query the real power state if it supports OOB
629 if SupportsOob(self.cfg, node):
630 if self.op.offline is False and not (node.powered or
631 self.op.powered is True):
632 raise errors.OpPrereqError(("Node %s needs to be turned on before its"
633 " offline status can be reset") %
634 self.op.node_name, errors.ECODE_STATE)
635 elif self.op.powered is not None:
636 raise errors.OpPrereqError(("Unable to change powered state for node %s"
637 " as it does not support out-of-band"
638 " handling") % self.op.node_name,
639 errors.ECODE_STATE)
640
641 # If we're being deofflined/drained, we'll MC ourself if needed
642 if (self.op.drained is False or self.op.offline is False or
643 (self.op.master_capable and not node.master_capable)):
644 if _DecideSelfPromotion(self):
645 self.op.master_candidate = True
646 self.LogInfo("Auto-promoting node to master candidate")
647
648 # If we're no longer master capable, we'll demote ourselves from MC
649 if self.op.master_capable is False and node.master_candidate:
650 self.LogInfo("Demoting from master candidate")
651 self.op.master_candidate = False
652
653 # Compute new role
654 assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
655 if self.op.master_candidate:
656 new_role = self._ROLE_CANDIDATE
657 elif self.op.drained:
658 new_role = self._ROLE_DRAINED
659 elif self.op.offline:
660 new_role = self._ROLE_OFFLINE
661 elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
662 # False is still in new flags, which means we're un-setting (the
663 # only) True flag
664 new_role = self._ROLE_REGULAR
665 else: # no new flags, nothing, keep old role
666 new_role = old_role
667
668 self.new_role = new_role
669
670 if old_role == self._ROLE_OFFLINE and new_role != old_role:
671 # Trying to transition out of offline status
672 result = self.rpc.call_version([node.uuid])[node.uuid]
673 if result.fail_msg:
674 raise errors.OpPrereqError("Node %s is being de-offlined but fails"
675 " to report its version: %s" %
676 (node.name, result.fail_msg),
677 errors.ECODE_STATE)
678 else:
679 self.LogWarning("Transitioning node from offline to online state"
680 " without using re-add. Please make sure the node"
681 " is healthy!")
682
683 # When changing the secondary ip, verify if this is a single-homed to
684 # multi-homed transition or vice versa, and apply the relevant
685 # restrictions.
686 if self.op.secondary_ip:
687 # Ok even without locking, because this can't be changed by any LU
688 master = self.cfg.GetMasterNodeInfo()
689 master_singlehomed = master.secondary_ip == master.primary_ip
690 if master_singlehomed and self.op.secondary_ip != node.primary_ip:
691 if self.op.force and node.uuid == master.uuid:
692 self.LogWarning("Transitioning from single-homed to multi-homed"
693 " cluster; all nodes will require a secondary IP"
694 " address")
695 else:
696 raise errors.OpPrereqError("Changing the secondary ip on a"
697 " single-homed cluster requires the"
698 " --force option to be passed, and the"
699 " target node to be the master",
700 errors.ECODE_INVAL)
701 elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
702 if self.op.force and node.uuid == master.uuid:
703 self.LogWarning("Transitioning from multi-homed to single-homed"
704 " cluster; secondary IP addresses will have to be"
705 " removed")
706 else:
707 raise errors.OpPrereqError("Cannot set the secondary IP to be the"
708 " same as the primary IP on a multi-homed"
709 " cluster, unless the --force option is"
710 " passed, and the target node is the"
711 " master", errors.ECODE_INVAL)
712
713 assert not (set([inst.name for inst in affected_instances.values()]) -
714 self.owned_locks(locking.LEVEL_INSTANCE))
715
716 if node.offline:
717 if affected_instances:
718 msg = ("Cannot change secondary IP address: offline node has"
719 " instances (%s) configured to use it" %
720 utils.CommaJoin(
721 [inst.name for inst in affected_instances.values()]))
722 raise errors.OpPrereqError(msg, errors.ECODE_STATE)
723 else:
724 # On online nodes, check that no instances are running, and that
725 # the node has the new ip and we can reach it.
726 for instance in affected_instances.values():
727 CheckInstanceState(self, instance, INSTANCE_DOWN,
728 msg="cannot change secondary ip")
729
730 _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True)
731 if master.uuid != node.uuid:
732 # check reachability from master secondary ip to new secondary ip
733 if not netutils.TcpPing(self.op.secondary_ip,
734 constants.DEFAULT_NODED_PORT,
735 source=master.secondary_ip):
736 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
737 " based ping to node daemon port",
738 errors.ECODE_ENVIRON)
739
740 if self.op.ndparams:
741 new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams)
742 utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
743 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
744 "node", "cluster or group")
745 self.new_ndparams = new_ndparams
746
747 if self.op.hv_state:
748 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
749 node.hv_state_static)
750
751 if self.op.disk_state:
752 self.new_disk_state = \
753 MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
754
755 def Exec(self, feedback_fn):
756 """Modifies a node.
757
758 """
759 node = self.cfg.GetNodeInfo(self.op.node_uuid)
760 result = []
761
762 if self.op.ndparams:
763 node.ndparams = self.new_ndparams
764
765 if self.op.powered is not None:
766 node.powered = self.op.powered
767
768 if self.op.hv_state:
769 node.hv_state_static = self.new_hv_state
770
771 if self.op.disk_state:
772 node.disk_state_static = self.new_disk_state
773
774 for attr in ["master_capable", "vm_capable"]:
775 val = getattr(self.op, attr)
776 if val is not None:
777 setattr(node, attr, val)
778 result.append((attr, str(val)))
779
780 if self.op.secondary_ip:
781 node.secondary_ip = self.op.secondary_ip
782 result.append(("secondary_ip", self.op.secondary_ip))
783
784 # this will trigger configuration file update, if needed
785 self.cfg.Update(node, feedback_fn)
786
787 if self.new_role != self.old_role:
788 new_flags = self._R2F[self.new_role]
789 for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
790 if of != nf:
791 result.append((desc, str(nf)))
792 (node.master_candidate, node.drained, node.offline) = new_flags
793 self.cfg.Update(node, feedback_fn)
794
795 # Tell the node to demote itself, if no longer MC and not offline.
796 # This must be done only after the configuration is updated so that
797 # it's ensured the node won't receive any further configuration updates.
798 if self.old_role == self._ROLE_CANDIDATE and \
799 self.new_role != self._ROLE_OFFLINE:
800 msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
801 if msg:
802 self.LogWarning("Node failed to demote itself: %s", msg)
803
804 # we locked all nodes, we adjust the CP before updating this node
805 if self.lock_all:
806 AdjustCandidatePool(self, [node.uuid])
807
808 # if node gets promoted, grant RPC priviledges
809 if self.new_role == self._ROLE_CANDIDATE:
810 AddNodeCertToCandidateCerts(self, self.cfg, node.uuid)
811 # if node is demoted, revoke RPC priviledges
812 if self.old_role == self._ROLE_CANDIDATE:
813 RemoveNodeCertFromCandidateCerts(self.cfg, node.uuid)
814
815 # this will trigger job queue propagation or cleanup if the mc
816 # flag changed
817 if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1:
818 self.context.ReaddNode(node)
819
820 EnsureKvmdOnNodes(self, feedback_fn, nodes=[node.uuid])
821
822 return result
823
824
825 class LUNodePowercycle(NoHooksLU):
826 """Powercycles a node.
827
828 """
829 REQ_BGL = False
830
831 def CheckArguments(self):
832 (self.op.node_uuid, self.op.node_name) = \
833 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
834
835 if self.op.node_uuid == self.cfg.GetMasterNode() and not self.op.force:
836 raise errors.OpPrereqError("The node is the master and the force"
837 " parameter was not set",
838 errors.ECODE_INVAL)
839
840 def ExpandNames(self):
841 """Locking for PowercycleNode.
842
843 This is a last-resort option and shouldn't block on other
844 jobs. Therefore, we grab no locks.
845
846 """
847 self.needed_locks = {}
848
849 def Exec(self, feedback_fn):
850 """Reboots a node.
851
852 """
853 default_hypervisor = self.cfg.GetHypervisorType()
854 hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor]
855 result = self.rpc.call_node_powercycle(self.op.node_uuid,
856 default_hypervisor,
857 hvparams)
858 result.Raise("Failed to schedule the reboot")
859 return result.payload
860
861
862 def _GetNodeInstancesInner(cfg, fn):
863 return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
864
865
866 def _GetNodePrimaryInstances(cfg, node_uuid):
867 """Returns primary instances on a node.
868
869 """
870 return _GetNodeInstancesInner(cfg,
871 lambda inst: node_uuid == inst.primary_node)
872
873
874 def _GetNodeSecondaryInstances(cfg, node_uuid):
875 """Returns secondary instances on a node.
876
877 """
878 return _GetNodeInstancesInner(cfg,
879 lambda inst: node_uuid in
880 cfg.GetInstanceSecondaryNodes(inst.uuid))
881
882
883 def _GetNodeInstances(cfg, node_uuid):
884 """Returns a list of all primary and secondary instances on a node.
885
886 """
887
888 return _GetNodeInstancesInner(cfg,
889 lambda inst: node_uuid in
890 cfg.GetInstanceNodes(inst.uuid.uuid))
891
892
893 class LUNodeEvacuate(NoHooksLU):
894 """Evacuates instances off a list of nodes.
895
896 """
897 REQ_BGL = False
898
899 def CheckArguments(self):
900 CheckIAllocatorOrNode(self, "iallocator", "remote_node")
901
902 def ExpandNames(self):
903 (self.op.node_uuid, self.op.node_name) = \
904 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
905
906 if self.op.remote_node is not None:
907 (self.op.remote_node_uuid, self.op.remote_node) = \
908 ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
909 self.op.remote_node)
910 assert self.op.remote_node
911
912 if self.op.node_uuid == self.op.remote_node_uuid:
913 raise errors.OpPrereqError("Can not use evacuated node as a new"
914 " secondary node", errors.ECODE_INVAL)
915
916 if self.op.mode != constants.NODE_EVAC_SEC:
917 raise errors.OpPrereqError("Without the use of an iallocator only"
918 " secondary instances can be evacuated",
919 errors.ECODE_INVAL)
920
921 # Declare locks
922 self.share_locks = ShareAll()
923 self.needed_locks = {
924 locking.LEVEL_INSTANCE: [],
925 locking.LEVEL_NODEGROUP: [],
926 locking.LEVEL_NODE: [],
927 }
928
929 # Determine nodes (via group) optimistically, needs verification once locks
930 # have been acquired
931 self.lock_nodes = self._DetermineNodes()
932
933 def _DetermineNodes(self):
934 """Gets the list of node UUIDs to operate on.
935
936 """
937 if self.op.remote_node is None:
938 # Iallocator will choose any node(s) in the same group
939 group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid])
940 else:
941 group_nodes = frozenset([self.op.remote_node_uuid])
942
943 # Determine nodes to be locked
944 return set([self.op.node_uuid]) | group_nodes
945
946 def _DetermineInstances(self):
947 """Builds list of instances to operate on.
948
949 """
950 assert self.op.mode in constants.NODE_EVAC_MODES
951
952 if self.op.mode == constants.NODE_EVAC_PRI:
953 # Primary instances only
954 inst_fn = _GetNodePrimaryInstances
955 assert self.op.remote_node is None, \
956 "Evacuating primary instances requires iallocator"
957 elif self.op.mode == constants.NODE_EVAC_SEC:
958 # Secondary instances only
959 inst_fn = _GetNodeSecondaryInstances
960 else:
961 # All instances
962 assert self.op.mode == constants.NODE_EVAC_ALL
963 inst_fn = _GetNodeInstances
964 # TODO: In 2.6, change the iallocator interface to take an evacuation mode
965 # per instance
966 raise errors.OpPrereqError("Due to an issue with the iallocator"
967 " interface it is not possible to evacuate"
968 " all instances at once; specify explicitly"
969 " whether to evacuate primary or secondary"
970 " instances",
971 errors.ECODE_INVAL)
972
973 return inst_fn(self.cfg, self.op.node_uuid)
974
975 def DeclareLocks(self, level):
976 if level == locking.LEVEL_INSTANCE:
977 # Lock instances optimistically, needs verification once node and group
978 # locks have been acquired
979 self.needed_locks[locking.LEVEL_INSTANCE] = \
980 set(i.name for i in self._DetermineInstances())
981
982 elif level == locking.LEVEL_NODEGROUP:
983 # Lock node groups for all potential target nodes optimistically, needs
984 # verification once nodes have been acquired
985 self.needed_locks[locking.LEVEL_NODEGROUP] = \
986 self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)
987
988 elif level == locking.LEVEL_NODE:
989 self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
990
991 def CheckPrereq(self):
992 # Verify locks
993 owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
994 owned_nodes = self.owned_locks(locking.LEVEL_NODE)
995 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
996
997 need_nodes = self._DetermineNodes()
998
999 if not owned_nodes.issuperset(need_nodes):
1000 raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
1001 " locks were acquired, current nodes are"
1002 " are '%s', used to be '%s'; retry the"
1003 " operation" %
1004 (self.op.node_name,
1005 utils.CommaJoin(need_nodes),
1006 utils.CommaJoin(owned_nodes)),
1007 errors.ECODE_STATE)
1008
1009 wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
1010 if owned_groups != wanted_groups:
1011 raise errors.OpExecError("Node groups changed since locks were acquired,"
1012 " current groups are '%s', used to be '%s';"
1013 " retry the operation" %
1014 (utils.CommaJoin(wanted_groups),
1015 utils.CommaJoin(owned_groups)))
1016
1017 # Determine affected instances
1018 self.instances = self._DetermineInstances()
1019 self.instance_names = [i.name for i in self.instances]
1020
1021 if set(self.instance_names) != owned_instance_names:
1022 raise errors.OpExecError("Instances on node '%s' changed since locks"
1023 " were acquired, current instances are '%s',"
1024 " used to be '%s'; retry the operation" %
1025 (self.op.node_name,
1026 utils.CommaJoin(self.instance_names),
1027 utils.CommaJoin(owned_instance_names)))
1028
1029 if self.instance_names:
1030 self.LogInfo("Evacuating instances from node '%s': %s",
1031 self.op.node_name,
1032 utils.CommaJoin(utils.NiceSort(self.instance_names)))
1033 else:
1034 self.LogInfo("No instances to evacuate from node '%s'",
1035 self.op.node_name)
1036
1037 if self.op.remote_node is not None:
1038 for i in self.instances:
1039 if i.primary_node == self.op.remote_node_uuid:
1040 raise errors.OpPrereqError("Node %s is the primary node of"
1041 " instance %s, cannot use it as"
1042 " secondary" %
1043 (self.op.remote_node, i.name),
1044 errors.ECODE_INVAL)
1045
1046 def Exec(self, feedback_fn):
1047 assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
1048
1049 if not self.instance_names:
1050 # No instances to evacuate
1051 jobs = []
1052
1053 elif self.op.iallocator is not None:
1054 # TODO: Implement relocation to other group
1055 req = iallocator.IAReqNodeEvac(evac_mode=self.op.mode,
1056 instances=list(self.instance_names))
1057 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
1058
1059 ial.Run(self.op.iallocator)
1060
1061 if not ial.success:
1062 raise errors.OpPrereqError("Can't compute node evacuation using"
1063 " iallocator '%s': %s" %
1064 (self.op.iallocator, ial.info),
1065 errors.ECODE_NORES)
1066
1067 jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1068
1069 elif self.op.remote_node is not None:
1070 assert self.op.mode == constants.NODE_EVAC_SEC
1071 jobs = [
1072 [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
1073 remote_node=self.op.remote_node,
1074 disks=[],
1075 mode=constants.REPLACE_DISK_CHG,
1076 early_release=self.op.early_release)]
1077 for instance_name in self.instance_names]
1078
1079 else:
1080 raise errors.ProgrammerError("No iallocator or remote node")
1081
1082 return ResultWithJobs(jobs)
1083
1084
1085 class LUNodeMigrate(LogicalUnit):
1086 """Migrate all instances from a node.
1087
1088 """
1089 HPATH = "node-migrate"
1090 HTYPE = constants.HTYPE_NODE
1091 REQ_BGL = False
1092
1093 def CheckArguments(self):
1094 pass
1095
1096 def ExpandNames(self):
1097 (self.op.node_uuid, self.op.node_name) = \
1098 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1099
1100 self.share_locks = ShareAll()
1101 self.needed_locks = {
1102 locking.LEVEL_NODE: [self.op.node_uuid],
1103 }
1104
1105 def BuildHooksEnv(self):
1106 """Build hooks env.
1107
1108 This runs on the master, the primary and all the secondaries.
1109
1110 """
1111 return {
1112 "NODE_NAME": self.op.node_name,
1113 "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
1114 }
1115
1116 def BuildHooksNodes(self):
1117 """Build hooks nodes.
1118
1119 """
1120 nl = [self.cfg.GetMasterNode()]
1121 return (nl, nl)
1122
1123 def CheckPrereq(self):
1124 pass
1125
1126 def Exec(self, feedback_fn):
1127 # Prepare jobs for migration instances
1128 jobs = [
1129 [opcodes.OpInstanceMigrate(
1130 instance_name=inst.name,
1131 mode=self.op.mode,
1132 live=self.op.live,
1133 iallocator=self.op.iallocator,
1134 target_node=self.op.target_node,
1135 allow_runtime_changes=self.op.allow_runtime_changes,
1136 ignore_ipolicy=self.op.ignore_ipolicy)]
1137 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)]
1138
1139 # TODO: Run iallocator in this opcode and pass correct placement options to
1140 # OpInstanceMigrate. Since other jobs can modify the cluster between
1141 # running the iallocator and the actual migration, a good consistency model
1142 # will have to be found.
1143
1144 assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
1145 frozenset([self.op.node_uuid]))
1146
1147 return ResultWithJobs(jobs)
1148
1149
1150 def _GetStorageTypeArgs(cfg, storage_type):
1151 """Returns the arguments for a storage type.
1152
1153 """
1154 # Special case for file storage
1155
1156 if storage_type == constants.ST_FILE:
1157 return [[cfg.GetFileStorageDir()]]
1158 elif storage_type == constants.ST_SHARED_FILE:
1159 dts = cfg.GetClusterInfo().enabled_disk_templates
1160 paths = []
1161 if constants.DT_SHARED_FILE in dts:
1162 paths.append(cfg.GetSharedFileStorageDir())
1163 if constants.DT_GLUSTER in dts:
1164 paths.append(cfg.GetGlusterStorageDir())
1165 return [paths]
1166 else:
1167 return []
1168
1169
1170 class LUNodeModifyStorage(NoHooksLU):
1171 """Logical unit for modifying a storage volume on a node.
1172
1173 """
1174 REQ_BGL = False
1175
1176 def CheckArguments(self):
1177 (self.op.node_uuid, self.op.node_name) = \
1178 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1179
1180 storage_type = self.op.storage_type
1181
1182 try:
1183 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1184 except KeyError:
1185 raise errors.OpPrereqError("Storage units of type '%s' can not be"
1186 " modified" % storage_type,
1187 errors.ECODE_INVAL)
1188
1189 diff = set(self.op.changes.keys()) - modifiable
1190 if diff:
1191 raise errors.OpPrereqError("The following fields can not be modified for"
1192 " storage units of type '%s': %r" %
1193 (storage_type, list(diff)),
1194 errors.ECODE_INVAL)
1195
1196 def CheckPrereq(self):
1197 """Check prerequisites.
1198
1199 """
1200 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1201
1202 def ExpandNames(self):
1203 self.needed_locks = {
1204 locking.LEVEL_NODE: self.op.node_uuid,
1205 }
1206
1207 def Exec(self, feedback_fn):
1208 """Computes the list of nodes and their attributes.
1209
1210 """
1211 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1212 result = self.rpc.call_storage_modify(self.op.node_uuid,
1213 self.op.storage_type, st_args,
1214 self.op.name, self.op.changes)
1215 result.Raise("Failed to modify storage unit '%s' on %s" %
1216 (self.op.name, self.op.node_name))
1217
1218
1219 def _CheckOutputFields(fields, selected):
1220 """Checks whether all selected fields are valid according to fields.
1221
1222 @type fields: L{utils.FieldSet}
1223 @param fields: fields set
1224 @type selected: L{utils.FieldSet}
1225 @param selected: fields set
1226
1227 """
1228 delta = fields.NonMatching(selected)
1229 if delta:
1230 raise errors.OpPrereqError("Unknown output fields selected: %s"
1231 % ",".join(delta), errors.ECODE_INVAL)
1232
1233
1234 class LUNodeQueryvols(NoHooksLU):
1235 """Logical unit for getting volumes on node(s).
1236
1237 """
1238 REQ_BGL = False
1239
1240 def CheckArguments(self):
1241 _CheckOutputFields(utils.FieldSet(constants.VF_NODE, constants.VF_PHYS,
1242 constants.VF_VG, constants.VF_NAME,
1243 constants.VF_SIZE, constants.VF_INSTANCE),
1244 self.op.output_fields)
1245
1246 def ExpandNames(self):
1247 self.share_locks = ShareAll()
1248
1249 if self.op.nodes:
1250 self.needed_locks = {
1251 locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1252 }
1253 else:
1254 self.needed_locks = {
1255 locking.LEVEL_NODE: locking.ALL_SET,
1256 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1257 }
1258
1259 def Exec(self, feedback_fn):
1260 """Computes the list of nodes and their attributes.
1261
1262 """
1263 node_uuids = self.owned_locks(locking.LEVEL_NODE)
1264 volumes = self.rpc.call_node_volumes(node_uuids)
1265
1266 ilist = self.cfg.GetAllInstancesInfo()
1267 vol2inst = MapInstanceLvsToNodes(self.cfg, ilist.values())
1268
1269 output = []
1270 for node_uuid in node_uuids:
1271 nresult = volumes[node_uuid]
1272 if nresult.offline:
1273 continue
1274 msg = nresult.fail_msg
1275 if msg:
1276 self.LogWarning("Can't compute volume data on node %s: %s",
1277 self.cfg.GetNodeName(node_uuid), msg)
1278 continue
1279
1280 node_vols = sorted(nresult.payload,
1281 key=operator.itemgetter(constants.VF_DEV))
1282
1283 for vol in node_vols:
1284 node_output = []
1285 for field in self.op.output_fields:
1286 if field == constants.VF_NODE:
1287 val = self.cfg.GetNodeName(node_uuid)
1288 elif field == constants.VF_PHYS:
1289 val = vol[constants.VF_DEV]
1290 elif field == constants.VF_VG:
1291 val = vol[constants.VF_VG]
1292 elif field == constants.VF_NAME:
1293 val = vol[constants.VF_NAME]
1294 elif field == constants.VF_SIZE:
1295 val = int(float(vol[constants.VF_SIZE]))
1296 elif field == constants.VF_INSTANCE:
1297 inst = vol2inst.get((node_uuid, vol[constants.VF_VG] + "/" +
1298 vol[constants.VF_NAME]), None)
1299 if inst is not None:
1300 val = inst.name
1301 else:
1302 val = "-"
1303 else:
1304 raise errors.ParameterError(field)
1305 node_output.append(str(val))
1306
1307 output.append(node_output)
1308
1309 return output
1310
1311
1312 class LUNodeQueryStorage(NoHooksLU):
1313 """Logical unit for getting information on storage units on node(s).
1314
1315 """
1316 REQ_BGL = False
1317
1318 def CheckArguments(self):
1319 _CheckOutputFields(utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
1320 self.op.output_fields)
1321
1322 def ExpandNames(self):
1323 self.share_locks = ShareAll()
1324
1325 if self.op.nodes:
1326 self.needed_locks = {
1327 locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1328 }
1329 else:
1330 self.needed_locks = {
1331 locking.LEVEL_NODE: locking.ALL_SET,
1332 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1333 }
1334
1335 def _DetermineStorageType(self):
1336 """Determines the default storage type of the cluster.
1337
1338 """
1339 enabled_disk_templates = self.cfg.GetClusterInfo().enabled_disk_templates
1340 default_storage_type = \
1341 constants.MAP_DISK_TEMPLATE_STORAGE_TYPE[enabled_disk_templates[0]]
1342 return default_storage_type
1343
1344 def CheckPrereq(self):
1345 """Check prerequisites.
1346
1347 """
1348 if self.op.storage_type:
1349 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1350 self.storage_type = self.op.storage_type
1351 else:
1352 self.storage_type = self._DetermineStorageType()
1353 supported_storage_types = constants.STS_REPORT_NODE_STORAGE
1354 if self.storage_type not in supported_storage_types:
1355 raise errors.OpPrereqError(
1356 "Storage reporting for storage type '%s' is not supported. Please"
1357 " use the --storage-type option to specify one of the supported"
1358 " storage types (%s) or set the default disk template to one that"
1359 " supports storage reporting." %
1360 (self.storage_type, utils.CommaJoin(supported_storage_types)))
1361
1362 def Exec(self, feedback_fn):
1363 """Computes the list of nodes and their attributes.
1364
1365 """
1366 if self.op.storage_type:
1367 self.storage_type = self.op.storage_type
1368 else:
1369 self.storage_type = self._DetermineStorageType()
1370
1371 self.node_uuids = self.owned_locks(locking.LEVEL_NODE)
1372
1373 # Always get name to sort by
1374 if constants.SF_NAME in self.op.output_fields:
1375 fields = self.op.output_fields[:]
1376 else:
1377 fields = [constants.SF_NAME] + self.op.output_fields
1378
1379 # Never ask for node or type as it's only known to the LU
1380 for extra in [constants.SF_NODE, constants.SF_TYPE]:
1381 while extra in fields:
1382 fields.remove(extra)
1383
1384 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1385 name_idx = field_idx[constants.SF_NAME]
1386
1387 st_args = _GetStorageTypeArgs(self.cfg, self.storage_type)
1388 data = self.rpc.call_storage_list(self.node_uuids,
1389 self.storage_type, st_args,
1390 self.op.name, fields)
1391
1392 result = []
1393
1394 for node_uuid in utils.NiceSort(self.node_uuids):
1395 node_name = self.cfg.GetNodeName(node_uuid)
1396 nresult = data[node_uuid]
1397 if nresult.offline:
1398 continue
1399
1400 msg = nresult.fail_msg
1401 if msg:
1402 self.LogWarning("Can't get storage data from node %s: %s",
1403 node_name, msg)
1404 continue
1405
1406 rows = dict([(row[name_idx], row) for row in nresult.payload])
1407
1408 for name in utils.NiceSort(rows.keys()):
1409 row = rows[name]
1410
1411 out = []
1412
1413 for field in self.op.output_fields:
1414 if field == constants.SF_NODE:
1415 val = node_name
1416 elif field == constants.SF_TYPE:
1417 val = self.storage_type
1418 elif field in field_idx:
1419 val = row[field_idx[field]]
1420 else:
1421 raise errors.ParameterError(field)
1422
1423 out.append(val)
1424
1425 result.append(out)
1426
1427 return result
1428
1429
1430 class LUNodeRemove(LogicalUnit):
1431 """Logical unit for removing a node.
1432
1433 """
1434 HPATH = "node-remove"
1435 HTYPE = constants.HTYPE_NODE
1436
1437 def BuildHooksEnv(self):
1438 """Build hooks env.
1439
1440 """
1441 return {
1442 "OP_TARGET": self.op.node_name,
1443 "NODE_NAME": self.op.node_name,
1444 }
1445
1446 def BuildHooksNodes(self):
1447 """Build hooks nodes.
1448
1449 This doesn't run on the target node in the pre phase as a failed
1450 node would then be impossible to remove.
1451
1452 """
1453 all_nodes = self.cfg.GetNodeList()
1454 try:
1455 all_nodes.remove(self.op.node_uuid)
1456 except ValueError:
1457 pass
1458 return (all_nodes, all_nodes)
1459
1460 def CheckPrereq(self):
1461 """Check prerequisites.
1462
1463 This checks:
1464 - the node exists in the configuration
1465 - it does not have primary or secondary instances
1466 - it's not the master
1467
1468 Any errors are signaled by raising errors.OpPrereqError.
1469
1470 """
1471 (self.op.node_uuid, self.op.node_name) = \
1472 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1473 node = self.cfg.GetNodeInfo(self.op.node_uuid)
1474 assert node is not None
1475
1476 masternode = self.cfg.GetMasterNode()
1477 if node.uuid == masternode:
1478 raise errors.OpPrereqError("Node is the master node, failover to another"
1479 " node is required", errors.ECODE_INVAL)
1480
1481 for _, instance in self.cfg.GetAllInstancesInfo().items():
1482 if node.uuid in self.cfg.GetInstanceNodes(instance.uuid):
1483 raise errors.OpPrereqError("Instance %s is still running on the node,"
1484 " please remove first" % instance.name,
1485 errors.ECODE_INVAL)
1486 self.op.node_name = node.name
1487 self.node = node
1488
1489 def Exec(self, feedback_fn):
1490 """Removes the node from the cluster.
1491
1492 """
1493 logging.info("Stopping the node daemon and removing configs from node %s",
1494 self.node.name)
1495
1496 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1497
1498 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
1499 "Not owning BGL"
1500
1501 # Promote nodes to master candidate as needed
1502 AdjustCandidatePool(self, [self.node.uuid])
1503 self.context.RemoveNode(self.cfg, self.node)
1504
1505 # Run post hooks on the node before it's removed
1506 RunPostHook(self, self.node.name)
1507
1508 # we have to call this by name rather than by UUID, as the node is no longer
1509 # in the config
1510 result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup)
1511 msg = result.fail_msg
1512 if msg:
1513 self.LogWarning("Errors encountered on the remote node while leaving"
1514 " the cluster: %s", msg)
1515
1516 cluster = self.cfg.GetClusterInfo()
1517
1518 # Remove node from candidate certificate list
1519 if self.node.master_candidate:
1520 self.cfg.RemoveNodeFromCandidateCerts(self.node.uuid)
1521
1522 # Remove node from our /etc/hosts
1523 if cluster.modify_etc_hosts:
1524 master_node_uuid = self.cfg.GetMasterNode()
1525 result = self.rpc.call_etc_hosts_modify(master_node_uuid,
1526 constants.ETC_HOSTS_REMOVE,
1527 self.node.name, None)
1528 result.Raise("Can't update hosts file with new host data")
1529 RedistributeAncillaryFiles(self)
1530
1531
1532 class LURepairNodeStorage(NoHooksLU):
1533 """Repairs the volume group on a node.
1534
1535 """
1536 REQ_BGL = False
1537
1538 def CheckArguments(self):
1539 (self.op.node_uuid, self.op.node_name) = \
1540 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1541
1542 storage_type = self.op.storage_type
1543
1544 if (constants.SO_FIX_CONSISTENCY not in
1545 constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
1546 raise errors.OpPrereqError("Storage units of type '%s' can not be"
1547 " repaired" % storage_type,
1548 errors.ECODE_INVAL)
1549
1550 def ExpandNames(self):
1551 self.needed_locks = {
1552 locking.LEVEL_NODE: [self.op.node_uuid],
1553 }
1554
1555 def _CheckFaultyDisks(self, instance, node_uuid):
1556 """Ensure faulty disks abort the opcode or at least warn."""
1557 try:
1558 if FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
1559 node_uuid, True):
1560 raise errors.OpPrereqError("Instance '%s' has faulty disks on"
1561 " node '%s'" %
1562 (instance.name,
1563 self.cfg.GetNodeName(node_uuid)),
1564 errors.ECODE_STATE)
1565 except errors.OpPrereqError, err:
1566 if self.op.ignore_consistency:
1567 self.LogWarning(str(err.args[0]))
1568 else:
1569 raise
1570
1571 def CheckPrereq(self):
1572 """Check prerequisites.
1573
1574 """
1575 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1576
1577 # Check whether any instance on this node has faulty disks
1578 for inst in _GetNodeInstances(self.cfg, self.op.node_uuid):
1579 if not inst.disks_active:
1580 continue
1581 check_nodes = set(self.cfg.GetInstanceNodes(inst.uuid))
1582 check_nodes.discard(self.op.node_uuid)
1583 for inst_node_uuid in check_nodes:
1584 self._CheckFaultyDisks(inst, inst_node_uuid)
1585
1586 def Exec(self, feedback_fn):
1587 feedback_fn("Repairing storage unit '%s' on %s ..." %
1588 (self.op.name, self.op.node_name))
1589
1590 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1591 result = self.rpc.call_storage_execute(self.op.node_uuid,
1592 self.op.storage_type, st_args,
1593 self.op.name,
1594 constants.SO_FIX_CONSISTENCY)
1595 result.Raise("Failed to repair storage unit '%s' on %s" %
1596 (self.op.name, self.op.node_name))