Verify if we're master before demoting
[ganeti-github.git] / lib / cmdlib / node.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions are
9 # met:
10 #
11 # 1. Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
13 #
14 # 2. Redistributions in binary form must reproduce the above copyright
15 # notice, this list of conditions and the following disclaimer in the
16 # documentation and/or other materials provided with the distribution.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
19 # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
20 # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30
31 """Logical units dealing with nodes."""
32
33 import logging
34 import operator
35
36 from ganeti import constants
37 from ganeti import errors
38 from ganeti import locking
39 from ganeti import netutils
40 from ganeti import objects
41 from ganeti import opcodes
42 import ganeti.rpc.node as rpc
43 from ganeti import utils
44 from ganeti.masterd import iallocator
45
46 from ganeti.cmdlib.base import LogicalUnit, NoHooksLU, ResultWithJobs
47 from ganeti.cmdlib.common import CheckParamsNotGlobal, \
48 MergeAndVerifyHvState, MergeAndVerifyDiskState, \
49 IsExclusiveStorageEnabledNode, CheckNodePVs, \
50 RedistributeAncillaryFiles, ExpandNodeUuidAndName, ShareAll, SupportsOob, \
51 CheckInstanceState, INSTANCE_DOWN, GetUpdatedParams, \
52 AdjustCandidatePool, CheckIAllocatorOrNode, LoadNodeEvacResult, \
53 GetWantedNodes, MapInstanceLvsToNodes, RunPostHook, \
54 FindFaultyInstanceDisks, CheckStorageTypeEnabled, CreateNewClientCert, \
55 AddNodeCertToCandidateCerts, RemoveNodeCertFromCandidateCerts, \
56 EnsureKvmdOnNodes
57
58
59 def _DecideSelfPromotion(lu, exceptions=None):
60 """Decide whether I should promote myself as a master candidate.
61
62 """
63 cp_size = lu.cfg.GetClusterInfo().candidate_pool_size
64 mc_now, mc_should, _ = lu.cfg.GetMasterCandidateStats(exceptions)
65 # the new node will increase mc_max with one, so:
66 mc_should = min(mc_should + 1, cp_size)
67 return mc_now < mc_should
68
69
70 def _CheckNodeHasSecondaryIP(lu, node, secondary_ip, prereq):
71 """Ensure that a node has the given secondary ip.
72
73 @type lu: L{LogicalUnit}
74 @param lu: the LU on behalf of which we make the check
75 @type node: L{objects.Node}
76 @param node: the node to check
77 @type secondary_ip: string
78 @param secondary_ip: the ip to check
79 @type prereq: boolean
80 @param prereq: whether to throw a prerequisite or an execute error
81 @raise errors.OpPrereqError: if the node doesn't have the ip,
82 and prereq=True
83 @raise errors.OpExecError: if the node doesn't have the ip, and prereq=False
84
85 """
86 # this can be called with a new node, which has no UUID yet, so perform the
87 # RPC call using its name
88 result = lu.rpc.call_node_has_ip_address(node.name, secondary_ip)
89 result.Raise("Failure checking secondary ip on node %s" % node.name,
90 prereq=prereq, ecode=errors.ECODE_ENVIRON)
91 if not result.payload:
92 msg = ("Node claims it doesn't have the secondary ip you gave (%s),"
93 " please fix and re-run this command" % secondary_ip)
94 if prereq:
95 raise errors.OpPrereqError(msg, errors.ECODE_ENVIRON)
96 else:
97 raise errors.OpExecError(msg)
98
99
100 class LUNodeAdd(LogicalUnit):
101 """Logical unit for adding node to the cluster.
102
103 """
104 HPATH = "node-add"
105 HTYPE = constants.HTYPE_NODE
106 _NFLAGS = ["master_capable", "vm_capable"]
107
108 def CheckArguments(self):
109 self.primary_ip_family = self.cfg.GetPrimaryIPFamily()
110 # validate/normalize the node name
111 self.hostname = netutils.GetHostname(name=self.op.node_name,
112 family=self.primary_ip_family)
113 self.op.node_name = self.hostname.name
114
115 if self.op.readd and self.op.node_name == self.cfg.GetMasterNodeName():
116 raise errors.OpPrereqError("Cannot readd the master node",
117 errors.ECODE_STATE)
118
119 if self.op.readd and self.op.group:
120 raise errors.OpPrereqError("Cannot pass a node group when a node is"
121 " being readded", errors.ECODE_INVAL)
122
123 def BuildHooksEnv(self):
124 """Build hooks env.
125
126 This will run on all nodes before, and on all nodes + the new node after.
127
128 """
129 return {
130 "OP_TARGET": self.op.node_name,
131 "NODE_NAME": self.op.node_name,
132 "NODE_PIP": self.op.primary_ip,
133 "NODE_SIP": self.op.secondary_ip,
134 "MASTER_CAPABLE": str(self.op.master_capable),
135 "VM_CAPABLE": str(self.op.vm_capable),
136 }
137
138 def BuildHooksNodes(self):
139 """Build hooks nodes.
140
141 """
142 hook_nodes = self.cfg.GetNodeList()
143 new_node_info = self.cfg.GetNodeInfoByName(self.op.node_name)
144 if new_node_info is not None:
145 # Exclude added node
146 hook_nodes = list(set(hook_nodes) - set([new_node_info.uuid]))
147
148 # add the new node as post hook node by name; it does not have an UUID yet
149 return (hook_nodes, hook_nodes)
150
151 def PreparePostHookNodes(self, post_hook_node_uuids):
152 return post_hook_node_uuids + [self.new_node.uuid]
153
154 def CheckPrereq(self):
155 """Check prerequisites.
156
157 This checks:
158 - the new node is not already in the config
159 - it is resolvable
160 - its parameters (single/dual homed) matches the cluster
161
162 Any errors are signaled by raising errors.OpPrereqError.
163
164 """
165 node_name = self.hostname.name
166 self.op.primary_ip = self.hostname.ip
167 if self.op.secondary_ip is None:
168 if self.primary_ip_family == netutils.IP6Address.family:
169 raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
170 " IPv4 address must be given as secondary",
171 errors.ECODE_INVAL)
172 self.op.secondary_ip = self.op.primary_ip
173
174 secondary_ip = self.op.secondary_ip
175 if not netutils.IP4Address.IsValid(secondary_ip):
176 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
177 " address" % secondary_ip, errors.ECODE_INVAL)
178
179 existing_node_info = self.cfg.GetNodeInfoByName(node_name)
180 if not self.op.readd and existing_node_info is not None:
181 raise errors.OpPrereqError("Node %s is already in the configuration" %
182 node_name, errors.ECODE_EXISTS)
183 elif self.op.readd and existing_node_info is None:
184 raise errors.OpPrereqError("Node %s is not in the configuration" %
185 node_name, errors.ECODE_NOENT)
186
187 self.changed_primary_ip = False
188
189 for existing_node in self.cfg.GetAllNodesInfo().values():
190 if self.op.readd and node_name == existing_node.name:
191 if existing_node.secondary_ip != secondary_ip:
192 raise errors.OpPrereqError("Readded node doesn't have the same IP"
193 " address configuration as before",
194 errors.ECODE_INVAL)
195 if existing_node.primary_ip != self.op.primary_ip:
196 self.changed_primary_ip = True
197
198 continue
199
200 if (existing_node.primary_ip == self.op.primary_ip or
201 existing_node.secondary_ip == self.op.primary_ip or
202 existing_node.primary_ip == secondary_ip or
203 existing_node.secondary_ip == secondary_ip):
204 raise errors.OpPrereqError("New node ip address(es) conflict with"
205 " existing node %s" % existing_node.name,
206 errors.ECODE_NOTUNIQUE)
207
208 # After this 'if' block, None is no longer a valid value for the
209 # _capable op attributes
210 if self.op.readd:
211 assert existing_node_info is not None, \
212 "Can't retrieve locked node %s" % node_name
213 for attr in self._NFLAGS:
214 if getattr(self.op, attr) is None:
215 setattr(self.op, attr, getattr(existing_node_info, attr))
216 else:
217 for attr in self._NFLAGS:
218 if getattr(self.op, attr) is None:
219 setattr(self.op, attr, True)
220
221 if self.op.readd and not self.op.vm_capable:
222 pri, sec = self.cfg.GetNodeInstances(existing_node_info.uuid)
223 if pri or sec:
224 raise errors.OpPrereqError("Node %s being re-added with vm_capable"
225 " flag set to false, but it already holds"
226 " instances" % node_name,
227 errors.ECODE_STATE)
228
229 # check that the type of the node (single versus dual homed) is the
230 # same as for the master
231 myself = self.cfg.GetMasterNodeInfo()
232 master_singlehomed = myself.secondary_ip == myself.primary_ip
233 newbie_singlehomed = secondary_ip == self.op.primary_ip
234 if master_singlehomed != newbie_singlehomed:
235 if master_singlehomed:
236 raise errors.OpPrereqError("The master has no secondary ip but the"
237 " new node has one",
238 errors.ECODE_INVAL)
239 else:
240 raise errors.OpPrereqError("The master has a secondary ip but the"
241 " new node doesn't have one",
242 errors.ECODE_INVAL)
243
244 # checks reachability
245 if not netutils.TcpPing(self.op.primary_ip, constants.DEFAULT_NODED_PORT):
246 raise errors.OpPrereqError("Node not reachable by ping",
247 errors.ECODE_ENVIRON)
248
249 if not newbie_singlehomed:
250 # check reachability from my secondary ip to newbie's secondary ip
251 if not netutils.TcpPing(secondary_ip, constants.DEFAULT_NODED_PORT,
252 source=myself.secondary_ip):
253 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
254 " based ping to node daemon port",
255 errors.ECODE_ENVIRON)
256
257 if self.op.readd:
258 exceptions = [existing_node_info.uuid]
259 else:
260 exceptions = []
261
262 if self.op.master_capable:
263 self.master_candidate = _DecideSelfPromotion(self, exceptions=exceptions)
264 else:
265 self.master_candidate = False
266
267 self.node_group = None
268 if self.op.readd:
269 self.new_node = existing_node_info
270 self.node_group = existing_node_info.group
271 else:
272 self.node_group = self.cfg.LookupNodeGroup(self.op.group)
273 self.new_node = objects.Node(name=node_name,
274 primary_ip=self.op.primary_ip,
275 secondary_ip=secondary_ip,
276 master_candidate=self.master_candidate,
277 offline=False, drained=False,
278 group=self.node_group, ndparams={})
279
280 if self.op.ndparams:
281 utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
282 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
283 "node", "cluster or group")
284
285 if self.op.hv_state:
286 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state, None)
287
288 if self.op.disk_state:
289 self.new_disk_state = MergeAndVerifyDiskState(self.op.disk_state, None)
290
291 # TODO: If we need to have multiple DnsOnlyRunner we probably should make
292 # it a property on the base class.
293 rpcrunner = rpc.DnsOnlyRunner()
294 result = rpcrunner.call_version([node_name])[node_name]
295 result.Raise("Can't get version information from node %s" % node_name,
296 prereq=True)
297 if constants.PROTOCOL_VERSION == result.payload:
298 logging.info("Communication to node %s fine, sw version %s match",
299 node_name, result.payload)
300 else:
301 raise errors.OpPrereqError("Version mismatch master version %s,"
302 " node version %s" %
303 (constants.PROTOCOL_VERSION, result.payload),
304 errors.ECODE_ENVIRON)
305
306 vg_name = self.cfg.GetVGName()
307 if vg_name is not None:
308 vparams = {constants.NV_PVLIST: [vg_name]}
309 excl_stor = IsExclusiveStorageEnabledNode(self.cfg, self.new_node)
310 cname = self.cfg.GetClusterName()
311 result = rpcrunner.call_node_verify_light(
312 [node_name], vparams, cname,
313 self.cfg.GetClusterInfo().hvparams,
314 {node_name: self.node_group},
315 self.cfg.GetAllNodeGroupsInfoDict()
316 )[node_name]
317 (errmsgs, _) = CheckNodePVs(result.payload, excl_stor)
318 if errmsgs:
319 raise errors.OpPrereqError("Checks on node PVs failed: %s" %
320 "; ".join(errmsgs), errors.ECODE_ENVIRON)
321
322 def _InitOpenVSwitch(self):
323 filled_ndparams = self.cfg.GetClusterInfo().FillND(
324 self.new_node, self.cfg.GetNodeGroup(self.new_node.group))
325
326 ovs = filled_ndparams.get(constants.ND_OVS, None)
327 ovs_name = filled_ndparams.get(constants.ND_OVS_NAME, None)
328 ovs_link = filled_ndparams.get(constants.ND_OVS_LINK, None)
329
330 if ovs:
331 if not ovs_link:
332 self.LogInfo("No physical interface for OpenvSwitch was given."
333 " OpenvSwitch will not have an outside connection. This"
334 " might not be what you want.")
335
336 result = self.rpc.call_node_configure_ovs(
337 self.new_node.name, ovs_name, ovs_link)
338 result.Raise("Failed to initialize OpenVSwitch on new node")
339
340 def Exec(self, feedback_fn):
341 """Adds the new node to the cluster.
342
343 """
344 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
345 "Not owning BGL"
346
347 # We adding a new node so we assume it's powered
348 self.new_node.powered = True
349
350 # for re-adds, reset the offline/drained/master-candidate flags;
351 # we need to reset here, otherwise offline would prevent RPC calls
352 # later in the procedure; this also means that if the re-add
353 # fails, we are left with a non-offlined, broken node
354 if self.op.readd:
355 self.new_node.offline = False
356 self.new_node.drained = False
357 self.LogInfo("Readding a node, the offline/drained flags were reset")
358 # if we demote the node, we do cleanup later in the procedure
359 self.new_node.master_candidate = self.master_candidate
360 if self.changed_primary_ip:
361 self.new_node.primary_ip = self.op.primary_ip
362
363 # copy the master/vm_capable flags
364 for attr in self._NFLAGS:
365 setattr(self.new_node, attr, getattr(self.op, attr))
366
367 # notify the user about any possible mc promotion
368 if self.new_node.master_candidate:
369 self.LogInfo("Node will be a master candidate")
370
371 if self.op.ndparams:
372 self.new_node.ndparams = self.op.ndparams
373 else:
374 self.new_node.ndparams = {}
375
376 if self.op.hv_state:
377 self.new_node.hv_state_static = self.new_hv_state
378
379 if self.op.disk_state:
380 self.new_node.disk_state_static = self.new_disk_state
381
382 # Add node to our /etc/hosts, and add key to known_hosts
383 if self.cfg.GetClusterInfo().modify_etc_hosts:
384 master_node = self.cfg.GetMasterNode()
385 result = self.rpc.call_etc_hosts_modify(
386 master_node, constants.ETC_HOSTS_ADD, self.hostname.name,
387 self.hostname.ip)
388 result.Raise("Can't update hosts file with new host data")
389
390 if self.new_node.secondary_ip != self.new_node.primary_ip:
391 _CheckNodeHasSecondaryIP(self, self.new_node, self.new_node.secondary_ip,
392 False)
393
394 node_verifier_uuids = [self.cfg.GetMasterNode()]
395 node_verify_param = {
396 constants.NV_NODELIST: ([self.new_node.name], {}),
397 # TODO: do a node-net-test as well?
398 }
399
400 result = self.rpc.call_node_verify(
401 node_verifier_uuids, node_verify_param,
402 self.cfg.GetClusterName(),
403 self.cfg.GetClusterInfo().hvparams,
404 {self.new_node.name: self.cfg.LookupNodeGroup(self.node_group)},
405 self.cfg.GetAllNodeGroupsInfoDict()
406 )
407 for verifier in node_verifier_uuids:
408 result[verifier].Raise("Cannot communicate with node %s" % verifier)
409 nl_payload = result[verifier].payload[constants.NV_NODELIST]
410 if nl_payload:
411 for failed in nl_payload:
412 feedback_fn("ssh/hostname verification failed"
413 " (checking from %s): %s" %
414 (verifier, nl_payload[failed]))
415 raise errors.OpExecError("ssh/hostname verification failed")
416
417 self._InitOpenVSwitch()
418
419 if self.op.readd:
420 self.context.ReaddNode(self.new_node)
421 RedistributeAncillaryFiles(self)
422 # make sure we redistribute the config
423 self.cfg.Update(self.new_node, feedback_fn)
424 # and make sure the new node will not have old files around
425 if not self.new_node.master_candidate:
426 result = self.rpc.call_node_demote_from_mc(self.new_node.uuid)
427 result.Warn("Node failed to demote itself from master candidate status",
428 self.LogWarning)
429 else:
430 self.context.AddNode(self.cfg, self.new_node, self.proc.GetECId())
431 RedistributeAncillaryFiles(self)
432
433 # We create a new certificate even if the node is readded
434 digest = CreateNewClientCert(self, self.new_node.uuid)
435 if self.new_node.master_candidate:
436 self.cfg.AddNodeToCandidateCerts(self.new_node.uuid, digest)
437 else:
438 self.cfg.RemoveNodeFromCandidateCerts(self.new_node.uuid, warn_fn=None)
439
440 EnsureKvmdOnNodes(self, feedback_fn, nodes=[self.new_node.uuid])
441
442
443 class LUNodeSetParams(LogicalUnit):
444 """Modifies the parameters of a node.
445
446 @cvar _F2R: a dictionary from tuples of flags (mc, drained, offline)
447 to the node role (as _ROLE_*)
448 @cvar _R2F: a dictionary from node role to tuples of flags
449 @cvar _FLAGS: a list of attribute names corresponding to the flags
450
451 """
452 HPATH = "node-modify"
453 HTYPE = constants.HTYPE_NODE
454 REQ_BGL = False
455 (_ROLE_CANDIDATE, _ROLE_DRAINED, _ROLE_OFFLINE, _ROLE_REGULAR) = range(4)
456 _F2R = {
457 (True, False, False): _ROLE_CANDIDATE,
458 (False, True, False): _ROLE_DRAINED,
459 (False, False, True): _ROLE_OFFLINE,
460 (False, False, False): _ROLE_REGULAR,
461 }
462 _R2F = dict((v, k) for k, v in _F2R.items())
463 _FLAGS = ["master_candidate", "drained", "offline"]
464
465 def CheckArguments(self):
466 (self.op.node_uuid, self.op.node_name) = \
467 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
468 all_mods = [self.op.offline, self.op.master_candidate, self.op.drained,
469 self.op.master_capable, self.op.vm_capable,
470 self.op.secondary_ip, self.op.ndparams, self.op.hv_state,
471 self.op.disk_state]
472 if all_mods.count(None) == len(all_mods):
473 raise errors.OpPrereqError("Please pass at least one modification",
474 errors.ECODE_INVAL)
475 if all_mods.count(True) > 1:
476 raise errors.OpPrereqError("Can't set the node into more than one"
477 " state at the same time",
478 errors.ECODE_INVAL)
479
480 # Boolean value that tells us whether we might be demoting from MC
481 self.might_demote = (self.op.master_candidate is False or
482 self.op.offline is True or
483 self.op.drained is True or
484 self.op.master_capable is False)
485
486 if self.op.secondary_ip:
487 if not netutils.IP4Address.IsValid(self.op.secondary_ip):
488 raise errors.OpPrereqError("Secondary IP (%s) needs to be a valid IPv4"
489 " address" % self.op.secondary_ip,
490 errors.ECODE_INVAL)
491
492 self.lock_all = self.op.auto_promote and self.might_demote
493 self.lock_instances = self.op.secondary_ip is not None
494
495 def _InstanceFilter(self, instance):
496 """Filter for getting affected instances.
497
498 """
499 return (instance.disk_template in constants.DTS_INT_MIRROR and
500 self.op.node_uuid in self.cfg.GetInstanceNodes(instance.uuid))
501
502 def ExpandNames(self):
503 if self.lock_all:
504 self.needed_locks = {
505 locking.LEVEL_NODE: locking.ALL_SET,
506
507 # Block allocations when all nodes are locked
508 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
509 }
510 else:
511 self.needed_locks = {
512 locking.LEVEL_NODE: self.op.node_uuid,
513 }
514
515 # Since modifying a node can have severe effects on currently running
516 # operations the resource lock is at least acquired in shared mode
517 self.needed_locks[locking.LEVEL_NODE_RES] = \
518 self.needed_locks[locking.LEVEL_NODE]
519
520 # Get all locks except nodes in shared mode; they are not used for anything
521 # but read-only access
522 self.share_locks = ShareAll()
523 self.share_locks[locking.LEVEL_NODE] = 0
524 self.share_locks[locking.LEVEL_NODE_RES] = 0
525 self.share_locks[locking.LEVEL_NODE_ALLOC] = 0
526
527 if self.lock_instances:
528 self.needed_locks[locking.LEVEL_INSTANCE] = \
529 self.cfg.GetInstanceNames(
530 self.cfg.GetInstancesInfoByFilter(self._InstanceFilter).keys())
531
532 def BuildHooksEnv(self):
533 """Build hooks env.
534
535 This runs on the master node.
536
537 """
538 return {
539 "OP_TARGET": self.op.node_name,
540 "MASTER_CANDIDATE": str(self.op.master_candidate),
541 "OFFLINE": str(self.op.offline),
542 "DRAINED": str(self.op.drained),
543 "MASTER_CAPABLE": str(self.op.master_capable),
544 "VM_CAPABLE": str(self.op.vm_capable),
545 }
546
547 def BuildHooksNodes(self):
548 """Build hooks nodes.
549
550 """
551 nl = [self.cfg.GetMasterNode(), self.op.node_uuid]
552 return (nl, nl)
553
554 def CheckPrereq(self):
555 """Check prerequisites.
556
557 This only checks the instance list against the existing names.
558
559 """
560 node = self.cfg.GetNodeInfo(self.op.node_uuid)
561 if self.lock_instances:
562 affected_instances = \
563 self.cfg.GetInstancesInfoByFilter(self._InstanceFilter)
564
565 # Verify instance locks
566 owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
567 wanted_instance_names = frozenset([inst.name for inst in
568 affected_instances.values()])
569 if wanted_instance_names - owned_instance_names:
570 raise errors.OpPrereqError("Instances affected by changing node %s's"
571 " secondary IP address have changed since"
572 " locks were acquired, wanted '%s', have"
573 " '%s'; retry the operation" %
574 (node.name,
575 utils.CommaJoin(wanted_instance_names),
576 utils.CommaJoin(owned_instance_names)),
577 errors.ECODE_STATE)
578 else:
579 affected_instances = None
580
581 if (self.op.master_candidate is not None or
582 self.op.drained is not None or
583 self.op.offline is not None):
584 # we can't change the master's node flags
585 if node.uuid == self.cfg.GetMasterNode():
586 raise errors.OpPrereqError("The master role can be changed"
587 " only via master-failover",
588 errors.ECODE_INVAL)
589
590 if self.op.master_candidate and not node.master_capable:
591 raise errors.OpPrereqError("Node %s is not master capable, cannot make"
592 " it a master candidate" % node.name,
593 errors.ECODE_STATE)
594
595 if self.op.vm_capable is False:
596 (ipri, isec) = self.cfg.GetNodeInstances(node.uuid)
597 if ipri or isec:
598 raise errors.OpPrereqError("Node %s hosts instances, cannot unset"
599 " the vm_capable flag" % node.name,
600 errors.ECODE_STATE)
601
602 if node.master_candidate and self.might_demote and not self.lock_all:
603 assert not self.op.auto_promote, "auto_promote set but lock_all not"
604 # check if after removing the current node, we're missing master
605 # candidates
606 (mc_remaining, mc_should, _) = \
607 self.cfg.GetMasterCandidateStats(exceptions=[node.uuid])
608 if mc_remaining < mc_should:
609 raise errors.OpPrereqError("Not enough master candidates, please"
610 " pass auto promote option to allow"
611 " promotion (--auto-promote or RAPI"
612 " auto_promote=True)", errors.ECODE_STATE)
613
614 self.old_flags = old_flags = (node.master_candidate,
615 node.drained, node.offline)
616 assert old_flags in self._F2R, "Un-handled old flags %s" % str(old_flags)
617 self.old_role = old_role = self._F2R[old_flags]
618
619 # Check for ineffective changes
620 for attr in self._FLAGS:
621 if getattr(self.op, attr) is False and getattr(node, attr) is False:
622 self.LogInfo("Ignoring request to unset flag %s, already unset", attr)
623 setattr(self.op, attr, None)
624
625 # Past this point, any flag change to False means a transition
626 # away from the respective state, as only real changes are kept
627
628 # TODO: We might query the real power state if it supports OOB
629 if SupportsOob(self.cfg, node):
630 if self.op.offline is False and not (node.powered or
631 self.op.powered is True):
632 raise errors.OpPrereqError(("Node %s needs to be turned on before its"
633 " offline status can be reset") %
634 self.op.node_name, errors.ECODE_STATE)
635 elif self.op.powered is not None:
636 raise errors.OpPrereqError(("Unable to change powered state for node %s"
637 " as it does not support out-of-band"
638 " handling") % self.op.node_name,
639 errors.ECODE_STATE)
640
641 # If we're being deofflined/drained, we'll MC ourself if needed
642 if (self.op.drained is False or self.op.offline is False or
643 (self.op.master_capable and not node.master_capable)):
644 if _DecideSelfPromotion(self):
645 self.op.master_candidate = True
646 self.LogInfo("Auto-promoting node to master candidate")
647
648 # If we're no longer master capable, we'll demote ourselves from MC
649 if self.op.master_capable is False and node.master_candidate:
650 if self.op.node_uuid == self.cfg.GetMasterNode():
651 raise errors.OpPrereqError("Master must remain master capable",
652 errors.ECODE_STATE)
653 self.LogInfo("Demoting from master candidate")
654 self.op.master_candidate = False
655
656 # Compute new role
657 assert [getattr(self.op, attr) for attr in self._FLAGS].count(True) <= 1
658 if self.op.master_candidate:
659 new_role = self._ROLE_CANDIDATE
660 elif self.op.drained:
661 new_role = self._ROLE_DRAINED
662 elif self.op.offline:
663 new_role = self._ROLE_OFFLINE
664 elif False in [self.op.master_candidate, self.op.drained, self.op.offline]:
665 # False is still in new flags, which means we're un-setting (the
666 # only) True flag
667 new_role = self._ROLE_REGULAR
668 else: # no new flags, nothing, keep old role
669 new_role = old_role
670
671 self.new_role = new_role
672
673 if old_role == self._ROLE_OFFLINE and new_role != old_role:
674 # Trying to transition out of offline status
675 result = self.rpc.call_version([node.uuid])[node.uuid]
676 if result.fail_msg:
677 raise errors.OpPrereqError("Node %s is being de-offlined but fails"
678 " to report its version: %s" %
679 (node.name, result.fail_msg),
680 errors.ECODE_STATE)
681 else:
682 self.LogWarning("Transitioning node from offline to online state"
683 " without using re-add. Please make sure the node"
684 " is healthy!")
685
686 # When changing the secondary ip, verify if this is a single-homed to
687 # multi-homed transition or vice versa, and apply the relevant
688 # restrictions.
689 if self.op.secondary_ip:
690 # Ok even without locking, because this can't be changed by any LU
691 master = self.cfg.GetMasterNodeInfo()
692 master_singlehomed = master.secondary_ip == master.primary_ip
693 if master_singlehomed and self.op.secondary_ip != node.primary_ip:
694 if self.op.force and node.uuid == master.uuid:
695 self.LogWarning("Transitioning from single-homed to multi-homed"
696 " cluster; all nodes will require a secondary IP"
697 " address")
698 else:
699 raise errors.OpPrereqError("Changing the secondary ip on a"
700 " single-homed cluster requires the"
701 " --force option to be passed, and the"
702 " target node to be the master",
703 errors.ECODE_INVAL)
704 elif not master_singlehomed and self.op.secondary_ip == node.primary_ip:
705 if self.op.force and node.uuid == master.uuid:
706 self.LogWarning("Transitioning from multi-homed to single-homed"
707 " cluster; secondary IP addresses will have to be"
708 " removed")
709 else:
710 raise errors.OpPrereqError("Cannot set the secondary IP to be the"
711 " same as the primary IP on a multi-homed"
712 " cluster, unless the --force option is"
713 " passed, and the target node is the"
714 " master", errors.ECODE_INVAL)
715
716 assert not (set([inst.name for inst in affected_instances.values()]) -
717 self.owned_locks(locking.LEVEL_INSTANCE))
718
719 if node.offline:
720 if affected_instances:
721 msg = ("Cannot change secondary IP address: offline node has"
722 " instances (%s) configured to use it" %
723 utils.CommaJoin(
724 [inst.name for inst in affected_instances.values()]))
725 raise errors.OpPrereqError(msg, errors.ECODE_STATE)
726 else:
727 # On online nodes, check that no instances are running, and that
728 # the node has the new ip and we can reach it.
729 for instance in affected_instances.values():
730 CheckInstanceState(self, instance, INSTANCE_DOWN,
731 msg="cannot change secondary ip")
732
733 _CheckNodeHasSecondaryIP(self, node, self.op.secondary_ip, True)
734 if master.uuid != node.uuid:
735 # check reachability from master secondary ip to new secondary ip
736 if not netutils.TcpPing(self.op.secondary_ip,
737 constants.DEFAULT_NODED_PORT,
738 source=master.secondary_ip):
739 raise errors.OpPrereqError("Node secondary ip not reachable by TCP"
740 " based ping to node daemon port",
741 errors.ECODE_ENVIRON)
742
743 if self.op.ndparams:
744 new_ndparams = GetUpdatedParams(node.ndparams, self.op.ndparams)
745 utils.ForceDictType(new_ndparams, constants.NDS_PARAMETER_TYPES)
746 CheckParamsNotGlobal(self.op.ndparams, constants.NDC_GLOBALS, "node",
747 "node", "cluster or group")
748 self.new_ndparams = new_ndparams
749
750 if self.op.hv_state:
751 self.new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
752 node.hv_state_static)
753
754 if self.op.disk_state:
755 self.new_disk_state = \
756 MergeAndVerifyDiskState(self.op.disk_state, node.disk_state_static)
757
758 def Exec(self, feedback_fn):
759 """Modifies a node.
760
761 """
762 node = self.cfg.GetNodeInfo(self.op.node_uuid)
763 result = []
764
765 if self.op.ndparams:
766 node.ndparams = self.new_ndparams
767
768 if self.op.powered is not None:
769 node.powered = self.op.powered
770
771 if self.op.hv_state:
772 node.hv_state_static = self.new_hv_state
773
774 if self.op.disk_state:
775 node.disk_state_static = self.new_disk_state
776
777 for attr in ["master_capable", "vm_capable"]:
778 val = getattr(self.op, attr)
779 if val is not None:
780 setattr(node, attr, val)
781 result.append((attr, str(val)))
782
783 if self.op.secondary_ip:
784 node.secondary_ip = self.op.secondary_ip
785 result.append(("secondary_ip", self.op.secondary_ip))
786
787 # this will trigger configuration file update, if needed
788 self.cfg.Update(node, feedback_fn)
789
790 if self.new_role != self.old_role:
791 new_flags = self._R2F[self.new_role]
792 for of, nf, desc in zip(self.old_flags, new_flags, self._FLAGS):
793 if of != nf:
794 result.append((desc, str(nf)))
795 (node.master_candidate, node.drained, node.offline) = new_flags
796 self.cfg.Update(node, feedback_fn)
797
798 # Tell the node to demote itself, if no longer MC and not offline.
799 # This must be done only after the configuration is updated so that
800 # it's ensured the node won't receive any further configuration updates.
801 if self.old_role == self._ROLE_CANDIDATE and \
802 self.new_role != self._ROLE_OFFLINE:
803 msg = self.rpc.call_node_demote_from_mc(node.name).fail_msg
804 if msg:
805 self.LogWarning("Node failed to demote itself: %s", msg)
806
807 # we locked all nodes, we adjust the CP before updating this node
808 if self.lock_all:
809 AdjustCandidatePool(self, [node.uuid])
810
811 # if node gets promoted, grant RPC priviledges
812 if self.new_role == self._ROLE_CANDIDATE:
813 AddNodeCertToCandidateCerts(self, self.cfg, node.uuid)
814 # if node is demoted, revoke RPC priviledges
815 if self.old_role == self._ROLE_CANDIDATE:
816 RemoveNodeCertFromCandidateCerts(self.cfg, node.uuid)
817
818 # this will trigger job queue propagation or cleanup if the mc
819 # flag changed
820 if [self.old_role, self.new_role].count(self._ROLE_CANDIDATE) == 1:
821 self.context.ReaddNode(node)
822
823 EnsureKvmdOnNodes(self, feedback_fn, nodes=[node.uuid])
824
825 return result
826
827
828 class LUNodePowercycle(NoHooksLU):
829 """Powercycles a node.
830
831 """
832 REQ_BGL = False
833
834 def CheckArguments(self):
835 (self.op.node_uuid, self.op.node_name) = \
836 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
837
838 if self.op.node_uuid == self.cfg.GetMasterNode() and not self.op.force:
839 raise errors.OpPrereqError("The node is the master and the force"
840 " parameter was not set",
841 errors.ECODE_INVAL)
842
843 def ExpandNames(self):
844 """Locking for PowercycleNode.
845
846 This is a last-resort option and shouldn't block on other
847 jobs. Therefore, we grab no locks.
848
849 """
850 self.needed_locks = {}
851
852 def Exec(self, feedback_fn):
853 """Reboots a node.
854
855 """
856 default_hypervisor = self.cfg.GetHypervisorType()
857 hvparams = self.cfg.GetClusterInfo().hvparams[default_hypervisor]
858 result = self.rpc.call_node_powercycle(self.op.node_uuid,
859 default_hypervisor,
860 hvparams)
861 result.Raise("Failed to schedule the reboot")
862 return result.payload
863
864
865 def _GetNodeInstancesInner(cfg, fn):
866 return [i for i in cfg.GetAllInstancesInfo().values() if fn(i)]
867
868
869 def _GetNodePrimaryInstances(cfg, node_uuid):
870 """Returns primary instances on a node.
871
872 """
873 return _GetNodeInstancesInner(cfg,
874 lambda inst: node_uuid == inst.primary_node)
875
876
877 def _GetNodeSecondaryInstances(cfg, node_uuid):
878 """Returns secondary instances on a node.
879
880 """
881 return _GetNodeInstancesInner(cfg,
882 lambda inst: node_uuid in
883 cfg.GetInstanceSecondaryNodes(inst.uuid))
884
885
886 def _GetNodeInstances(cfg, node_uuid):
887 """Returns a list of all primary and secondary instances on a node.
888
889 """
890
891 return _GetNodeInstancesInner(cfg,
892 lambda inst: node_uuid in
893 cfg.GetInstanceNodes(inst.uuid.uuid))
894
895
896 class LUNodeEvacuate(NoHooksLU):
897 """Evacuates instances off a list of nodes.
898
899 """
900 REQ_BGL = False
901
902 def CheckArguments(self):
903 CheckIAllocatorOrNode(self, "iallocator", "remote_node")
904
905 def ExpandNames(self):
906 (self.op.node_uuid, self.op.node_name) = \
907 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
908
909 if self.op.remote_node is not None:
910 (self.op.remote_node_uuid, self.op.remote_node) = \
911 ExpandNodeUuidAndName(self.cfg, self.op.remote_node_uuid,
912 self.op.remote_node)
913 assert self.op.remote_node
914
915 if self.op.node_uuid == self.op.remote_node_uuid:
916 raise errors.OpPrereqError("Can not use evacuated node as a new"
917 " secondary node", errors.ECODE_INVAL)
918
919 if self.op.mode != constants.NODE_EVAC_SEC:
920 raise errors.OpPrereqError("Without the use of an iallocator only"
921 " secondary instances can be evacuated",
922 errors.ECODE_INVAL)
923
924 # Declare locks
925 self.share_locks = ShareAll()
926 self.needed_locks = {
927 locking.LEVEL_INSTANCE: [],
928 locking.LEVEL_NODEGROUP: [],
929 locking.LEVEL_NODE: [],
930 }
931
932 # Determine nodes (via group) optimistically, needs verification once locks
933 # have been acquired
934 self.lock_nodes = self._DetermineNodes()
935
936 def _DetermineNodes(self):
937 """Gets the list of node UUIDs to operate on.
938
939 """
940 if self.op.remote_node is None:
941 # Iallocator will choose any node(s) in the same group
942 group_nodes = self.cfg.GetNodeGroupMembersByNodes([self.op.node_uuid])
943 else:
944 group_nodes = frozenset([self.op.remote_node_uuid])
945
946 # Determine nodes to be locked
947 return set([self.op.node_uuid]) | group_nodes
948
949 def _DetermineInstances(self):
950 """Builds list of instances to operate on.
951
952 """
953 assert self.op.mode in constants.NODE_EVAC_MODES
954
955 if self.op.mode == constants.NODE_EVAC_PRI:
956 # Primary instances only
957 inst_fn = _GetNodePrimaryInstances
958 assert self.op.remote_node is None, \
959 "Evacuating primary instances requires iallocator"
960 elif self.op.mode == constants.NODE_EVAC_SEC:
961 # Secondary instances only
962 inst_fn = _GetNodeSecondaryInstances
963 else:
964 # All instances
965 assert self.op.mode == constants.NODE_EVAC_ALL
966 inst_fn = _GetNodeInstances
967 # TODO: In 2.6, change the iallocator interface to take an evacuation mode
968 # per instance
969 raise errors.OpPrereqError("Due to an issue with the iallocator"
970 " interface it is not possible to evacuate"
971 " all instances at once; specify explicitly"
972 " whether to evacuate primary or secondary"
973 " instances",
974 errors.ECODE_INVAL)
975
976 return inst_fn(self.cfg, self.op.node_uuid)
977
978 def DeclareLocks(self, level):
979 if level == locking.LEVEL_INSTANCE:
980 # Lock instances optimistically, needs verification once node and group
981 # locks have been acquired
982 self.needed_locks[locking.LEVEL_INSTANCE] = \
983 set(i.name for i in self._DetermineInstances())
984
985 elif level == locking.LEVEL_NODEGROUP:
986 # Lock node groups for all potential target nodes optimistically, needs
987 # verification once nodes have been acquired
988 self.needed_locks[locking.LEVEL_NODEGROUP] = \
989 self.cfg.GetNodeGroupsFromNodes(self.lock_nodes)
990
991 elif level == locking.LEVEL_NODE:
992 self.needed_locks[locking.LEVEL_NODE] = self.lock_nodes
993
994 def CheckPrereq(self):
995 # Verify locks
996 owned_instance_names = self.owned_locks(locking.LEVEL_INSTANCE)
997 owned_nodes = self.owned_locks(locking.LEVEL_NODE)
998 owned_groups = self.owned_locks(locking.LEVEL_NODEGROUP)
999
1000 need_nodes = self._DetermineNodes()
1001
1002 if not owned_nodes.issuperset(need_nodes):
1003 raise errors.OpPrereqError("Nodes in same group as '%s' changed since"
1004 " locks were acquired, current nodes are"
1005 " are '%s', used to be '%s'; retry the"
1006 " operation" %
1007 (self.op.node_name,
1008 utils.CommaJoin(need_nodes),
1009 utils.CommaJoin(owned_nodes)),
1010 errors.ECODE_STATE)
1011
1012 wanted_groups = self.cfg.GetNodeGroupsFromNodes(owned_nodes)
1013 if owned_groups != wanted_groups:
1014 raise errors.OpExecError("Node groups changed since locks were acquired,"
1015 " current groups are '%s', used to be '%s';"
1016 " retry the operation" %
1017 (utils.CommaJoin(wanted_groups),
1018 utils.CommaJoin(owned_groups)))
1019
1020 # Determine affected instances
1021 self.instances = self._DetermineInstances()
1022 self.instance_names = [i.name for i in self.instances]
1023
1024 if set(self.instance_names) != owned_instance_names:
1025 raise errors.OpExecError("Instances on node '%s' changed since locks"
1026 " were acquired, current instances are '%s',"
1027 " used to be '%s'; retry the operation" %
1028 (self.op.node_name,
1029 utils.CommaJoin(self.instance_names),
1030 utils.CommaJoin(owned_instance_names)))
1031
1032 if self.instance_names:
1033 self.LogInfo("Evacuating instances from node '%s': %s",
1034 self.op.node_name,
1035 utils.CommaJoin(utils.NiceSort(self.instance_names)))
1036 else:
1037 self.LogInfo("No instances to evacuate from node '%s'",
1038 self.op.node_name)
1039
1040 if self.op.remote_node is not None:
1041 for i in self.instances:
1042 if i.primary_node == self.op.remote_node_uuid:
1043 raise errors.OpPrereqError("Node %s is the primary node of"
1044 " instance %s, cannot use it as"
1045 " secondary" %
1046 (self.op.remote_node, i.name),
1047 errors.ECODE_INVAL)
1048
1049 def Exec(self, feedback_fn):
1050 assert (self.op.iallocator is not None) ^ (self.op.remote_node is not None)
1051
1052 if not self.instance_names:
1053 # No instances to evacuate
1054 jobs = []
1055
1056 elif self.op.iallocator is not None:
1057 # TODO: Implement relocation to other group
1058 req = iallocator.IAReqNodeEvac(evac_mode=self.op.mode,
1059 instances=list(self.instance_names))
1060 ial = iallocator.IAllocator(self.cfg, self.rpc, req)
1061
1062 ial.Run(self.op.iallocator)
1063
1064 if not ial.success:
1065 raise errors.OpPrereqError("Can't compute node evacuation using"
1066 " iallocator '%s': %s" %
1067 (self.op.iallocator, ial.info),
1068 errors.ECODE_NORES)
1069
1070 jobs = LoadNodeEvacResult(self, ial.result, self.op.early_release, True)
1071
1072 elif self.op.remote_node is not None:
1073 assert self.op.mode == constants.NODE_EVAC_SEC
1074 jobs = [
1075 [opcodes.OpInstanceReplaceDisks(instance_name=instance_name,
1076 remote_node=self.op.remote_node,
1077 disks=[],
1078 mode=constants.REPLACE_DISK_CHG,
1079 early_release=self.op.early_release)]
1080 for instance_name in self.instance_names]
1081
1082 else:
1083 raise errors.ProgrammerError("No iallocator or remote node")
1084
1085 return ResultWithJobs(jobs)
1086
1087
1088 class LUNodeMigrate(LogicalUnit):
1089 """Migrate all instances from a node.
1090
1091 """
1092 HPATH = "node-migrate"
1093 HTYPE = constants.HTYPE_NODE
1094 REQ_BGL = False
1095
1096 def CheckArguments(self):
1097 pass
1098
1099 def ExpandNames(self):
1100 (self.op.node_uuid, self.op.node_name) = \
1101 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1102
1103 self.share_locks = ShareAll()
1104 self.needed_locks = {
1105 locking.LEVEL_NODE: [self.op.node_uuid],
1106 }
1107
1108 def BuildHooksEnv(self):
1109 """Build hooks env.
1110
1111 This runs on the master, the primary and all the secondaries.
1112
1113 """
1114 return {
1115 "NODE_NAME": self.op.node_name,
1116 "ALLOW_RUNTIME_CHANGES": self.op.allow_runtime_changes,
1117 }
1118
1119 def BuildHooksNodes(self):
1120 """Build hooks nodes.
1121
1122 """
1123 nl = [self.cfg.GetMasterNode()]
1124 return (nl, nl)
1125
1126 def CheckPrereq(self):
1127 pass
1128
1129 def Exec(self, feedback_fn):
1130 # Prepare jobs for migration instances
1131 jobs = [
1132 [opcodes.OpInstanceMigrate(
1133 instance_name=inst.name,
1134 mode=self.op.mode,
1135 live=self.op.live,
1136 iallocator=self.op.iallocator,
1137 target_node=self.op.target_node,
1138 allow_runtime_changes=self.op.allow_runtime_changes,
1139 ignore_ipolicy=self.op.ignore_ipolicy)]
1140 for inst in _GetNodePrimaryInstances(self.cfg, self.op.node_uuid)]
1141
1142 # TODO: Run iallocator in this opcode and pass correct placement options to
1143 # OpInstanceMigrate. Since other jobs can modify the cluster between
1144 # running the iallocator and the actual migration, a good consistency model
1145 # will have to be found.
1146
1147 assert (frozenset(self.owned_locks(locking.LEVEL_NODE)) ==
1148 frozenset([self.op.node_uuid]))
1149
1150 return ResultWithJobs(jobs)
1151
1152
1153 def _GetStorageTypeArgs(cfg, storage_type):
1154 """Returns the arguments for a storage type.
1155
1156 """
1157 # Special case for file storage
1158
1159 if storage_type == constants.ST_FILE:
1160 return [[cfg.GetFileStorageDir()]]
1161 elif storage_type == constants.ST_SHARED_FILE:
1162 dts = cfg.GetClusterInfo().enabled_disk_templates
1163 paths = []
1164 if constants.DT_SHARED_FILE in dts:
1165 paths.append(cfg.GetSharedFileStorageDir())
1166 if constants.DT_GLUSTER in dts:
1167 paths.append(cfg.GetGlusterStorageDir())
1168 return [paths]
1169 else:
1170 return []
1171
1172
1173 class LUNodeModifyStorage(NoHooksLU):
1174 """Logical unit for modifying a storage volume on a node.
1175
1176 """
1177 REQ_BGL = False
1178
1179 def CheckArguments(self):
1180 (self.op.node_uuid, self.op.node_name) = \
1181 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1182
1183 storage_type = self.op.storage_type
1184
1185 try:
1186 modifiable = constants.MODIFIABLE_STORAGE_FIELDS[storage_type]
1187 except KeyError:
1188 raise errors.OpPrereqError("Storage units of type '%s' can not be"
1189 " modified" % storage_type,
1190 errors.ECODE_INVAL)
1191
1192 diff = set(self.op.changes.keys()) - modifiable
1193 if diff:
1194 raise errors.OpPrereqError("The following fields can not be modified for"
1195 " storage units of type '%s': %r" %
1196 (storage_type, list(diff)),
1197 errors.ECODE_INVAL)
1198
1199 def CheckPrereq(self):
1200 """Check prerequisites.
1201
1202 """
1203 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1204
1205 def ExpandNames(self):
1206 self.needed_locks = {
1207 locking.LEVEL_NODE: self.op.node_uuid,
1208 }
1209
1210 def Exec(self, feedback_fn):
1211 """Computes the list of nodes and their attributes.
1212
1213 """
1214 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1215 result = self.rpc.call_storage_modify(self.op.node_uuid,
1216 self.op.storage_type, st_args,
1217 self.op.name, self.op.changes)
1218 result.Raise("Failed to modify storage unit '%s' on %s" %
1219 (self.op.name, self.op.node_name))
1220
1221
1222 def _CheckOutputFields(fields, selected):
1223 """Checks whether all selected fields are valid according to fields.
1224
1225 @type fields: L{utils.FieldSet}
1226 @param fields: fields set
1227 @type selected: L{utils.FieldSet}
1228 @param selected: fields set
1229
1230 """
1231 delta = fields.NonMatching(selected)
1232 if delta:
1233 raise errors.OpPrereqError("Unknown output fields selected: %s"
1234 % ",".join(delta), errors.ECODE_INVAL)
1235
1236
1237 class LUNodeQueryvols(NoHooksLU):
1238 """Logical unit for getting volumes on node(s).
1239
1240 """
1241 REQ_BGL = False
1242
1243 def CheckArguments(self):
1244 _CheckOutputFields(utils.FieldSet(constants.VF_NODE, constants.VF_PHYS,
1245 constants.VF_VG, constants.VF_NAME,
1246 constants.VF_SIZE, constants.VF_INSTANCE),
1247 self.op.output_fields)
1248
1249 def ExpandNames(self):
1250 self.share_locks = ShareAll()
1251
1252 if self.op.nodes:
1253 self.needed_locks = {
1254 locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1255 }
1256 else:
1257 self.needed_locks = {
1258 locking.LEVEL_NODE: locking.ALL_SET,
1259 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1260 }
1261
1262 def Exec(self, feedback_fn):
1263 """Computes the list of nodes and their attributes.
1264
1265 """
1266 node_uuids = self.owned_locks(locking.LEVEL_NODE)
1267 volumes = self.rpc.call_node_volumes(node_uuids)
1268
1269 ilist = self.cfg.GetAllInstancesInfo()
1270 vol2inst = MapInstanceLvsToNodes(self.cfg, ilist.values())
1271
1272 output = []
1273 for node_uuid in node_uuids:
1274 nresult = volumes[node_uuid]
1275 if nresult.offline:
1276 continue
1277 msg = nresult.fail_msg
1278 if msg:
1279 self.LogWarning("Can't compute volume data on node %s: %s",
1280 self.cfg.GetNodeName(node_uuid), msg)
1281 continue
1282
1283 node_vols = sorted(nresult.payload,
1284 key=operator.itemgetter(constants.VF_DEV))
1285
1286 for vol in node_vols:
1287 node_output = []
1288 for field in self.op.output_fields:
1289 if field == constants.VF_NODE:
1290 val = self.cfg.GetNodeName(node_uuid)
1291 elif field == constants.VF_PHYS:
1292 val = vol[constants.VF_DEV]
1293 elif field == constants.VF_VG:
1294 val = vol[constants.VF_VG]
1295 elif field == constants.VF_NAME:
1296 val = vol[constants.VF_NAME]
1297 elif field == constants.VF_SIZE:
1298 val = int(float(vol[constants.VF_SIZE]))
1299 elif field == constants.VF_INSTANCE:
1300 inst = vol2inst.get((node_uuid, vol[constants.VF_VG] + "/" +
1301 vol[constants.VF_NAME]), None)
1302 if inst is not None:
1303 val = inst.name
1304 else:
1305 val = "-"
1306 else:
1307 raise errors.ParameterError(field)
1308 node_output.append(str(val))
1309
1310 output.append(node_output)
1311
1312 return output
1313
1314
1315 class LUNodeQueryStorage(NoHooksLU):
1316 """Logical unit for getting information on storage units on node(s).
1317
1318 """
1319 REQ_BGL = False
1320
1321 def CheckArguments(self):
1322 _CheckOutputFields(utils.FieldSet(*constants.VALID_STORAGE_FIELDS),
1323 self.op.output_fields)
1324
1325 def ExpandNames(self):
1326 self.share_locks = ShareAll()
1327
1328 if self.op.nodes:
1329 self.needed_locks = {
1330 locking.LEVEL_NODE: GetWantedNodes(self, self.op.nodes)[0],
1331 }
1332 else:
1333 self.needed_locks = {
1334 locking.LEVEL_NODE: locking.ALL_SET,
1335 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1336 }
1337
1338 def _DetermineStorageType(self):
1339 """Determines the default storage type of the cluster.
1340
1341 """
1342 enabled_disk_templates = self.cfg.GetClusterInfo().enabled_disk_templates
1343 default_storage_type = \
1344 constants.MAP_DISK_TEMPLATE_STORAGE_TYPE[enabled_disk_templates[0]]
1345 return default_storage_type
1346
1347 def CheckPrereq(self):
1348 """Check prerequisites.
1349
1350 """
1351 if self.op.storage_type:
1352 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1353 self.storage_type = self.op.storage_type
1354 else:
1355 self.storage_type = self._DetermineStorageType()
1356 supported_storage_types = constants.STS_REPORT_NODE_STORAGE
1357 if self.storage_type not in supported_storage_types:
1358 raise errors.OpPrereqError(
1359 "Storage reporting for storage type '%s' is not supported. Please"
1360 " use the --storage-type option to specify one of the supported"
1361 " storage types (%s) or set the default disk template to one that"
1362 " supports storage reporting." %
1363 (self.storage_type, utils.CommaJoin(supported_storage_types)))
1364
1365 def Exec(self, feedback_fn):
1366 """Computes the list of nodes and their attributes.
1367
1368 """
1369 if self.op.storage_type:
1370 self.storage_type = self.op.storage_type
1371 else:
1372 self.storage_type = self._DetermineStorageType()
1373
1374 self.node_uuids = self.owned_locks(locking.LEVEL_NODE)
1375
1376 # Always get name to sort by
1377 if constants.SF_NAME in self.op.output_fields:
1378 fields = self.op.output_fields[:]
1379 else:
1380 fields = [constants.SF_NAME] + self.op.output_fields
1381
1382 # Never ask for node or type as it's only known to the LU
1383 for extra in [constants.SF_NODE, constants.SF_TYPE]:
1384 while extra in fields:
1385 fields.remove(extra)
1386
1387 field_idx = dict([(name, idx) for (idx, name) in enumerate(fields)])
1388 name_idx = field_idx[constants.SF_NAME]
1389
1390 st_args = _GetStorageTypeArgs(self.cfg, self.storage_type)
1391 data = self.rpc.call_storage_list(self.node_uuids,
1392 self.storage_type, st_args,
1393 self.op.name, fields)
1394
1395 result = []
1396
1397 for node_uuid in utils.NiceSort(self.node_uuids):
1398 node_name = self.cfg.GetNodeName(node_uuid)
1399 nresult = data[node_uuid]
1400 if nresult.offline:
1401 continue
1402
1403 msg = nresult.fail_msg
1404 if msg:
1405 self.LogWarning("Can't get storage data from node %s: %s",
1406 node_name, msg)
1407 continue
1408
1409 rows = dict([(row[name_idx], row) for row in nresult.payload])
1410
1411 for name in utils.NiceSort(rows.keys()):
1412 row = rows[name]
1413
1414 out = []
1415
1416 for field in self.op.output_fields:
1417 if field == constants.SF_NODE:
1418 val = node_name
1419 elif field == constants.SF_TYPE:
1420 val = self.storage_type
1421 elif field in field_idx:
1422 val = row[field_idx[field]]
1423 else:
1424 raise errors.ParameterError(field)
1425
1426 out.append(val)
1427
1428 result.append(out)
1429
1430 return result
1431
1432
1433 class LUNodeRemove(LogicalUnit):
1434 """Logical unit for removing a node.
1435
1436 """
1437 HPATH = "node-remove"
1438 HTYPE = constants.HTYPE_NODE
1439
1440 def BuildHooksEnv(self):
1441 """Build hooks env.
1442
1443 """
1444 return {
1445 "OP_TARGET": self.op.node_name,
1446 "NODE_NAME": self.op.node_name,
1447 }
1448
1449 def BuildHooksNodes(self):
1450 """Build hooks nodes.
1451
1452 This doesn't run on the target node in the pre phase as a failed
1453 node would then be impossible to remove.
1454
1455 """
1456 all_nodes = self.cfg.GetNodeList()
1457 try:
1458 all_nodes.remove(self.op.node_uuid)
1459 except ValueError:
1460 pass
1461 return (all_nodes, all_nodes)
1462
1463 def CheckPrereq(self):
1464 """Check prerequisites.
1465
1466 This checks:
1467 - the node exists in the configuration
1468 - it does not have primary or secondary instances
1469 - it's not the master
1470
1471 Any errors are signaled by raising errors.OpPrereqError.
1472
1473 """
1474 (self.op.node_uuid, self.op.node_name) = \
1475 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1476 node = self.cfg.GetNodeInfo(self.op.node_uuid)
1477 assert node is not None
1478
1479 masternode = self.cfg.GetMasterNode()
1480 if node.uuid == masternode:
1481 raise errors.OpPrereqError("Node is the master node, failover to another"
1482 " node is required", errors.ECODE_INVAL)
1483
1484 for _, instance in self.cfg.GetAllInstancesInfo().items():
1485 if node.uuid in self.cfg.GetInstanceNodes(instance.uuid):
1486 raise errors.OpPrereqError("Instance %s is still running on the node,"
1487 " please remove first" % instance.name,
1488 errors.ECODE_INVAL)
1489 self.op.node_name = node.name
1490 self.node = node
1491
1492 def Exec(self, feedback_fn):
1493 """Removes the node from the cluster.
1494
1495 """
1496 logging.info("Stopping the node daemon and removing configs from node %s",
1497 self.node.name)
1498
1499 modify_ssh_setup = self.cfg.GetClusterInfo().modify_ssh_setup
1500
1501 assert locking.BGL in self.owned_locks(locking.LEVEL_CLUSTER), \
1502 "Not owning BGL"
1503
1504 # Promote nodes to master candidate as needed
1505 AdjustCandidatePool(self, [self.node.uuid])
1506 self.context.RemoveNode(self.cfg, self.node)
1507
1508 # Run post hooks on the node before it's removed
1509 RunPostHook(self, self.node.name)
1510
1511 # we have to call this by name rather than by UUID, as the node is no longer
1512 # in the config
1513 result = self.rpc.call_node_leave_cluster(self.node.name, modify_ssh_setup)
1514 msg = result.fail_msg
1515 if msg:
1516 self.LogWarning("Errors encountered on the remote node while leaving"
1517 " the cluster: %s", msg)
1518
1519 cluster = self.cfg.GetClusterInfo()
1520
1521 # Remove node from candidate certificate list
1522 if self.node.master_candidate:
1523 self.cfg.RemoveNodeFromCandidateCerts(self.node.uuid)
1524
1525 # Remove node from our /etc/hosts
1526 if cluster.modify_etc_hosts:
1527 master_node_uuid = self.cfg.GetMasterNode()
1528 result = self.rpc.call_etc_hosts_modify(master_node_uuid,
1529 constants.ETC_HOSTS_REMOVE,
1530 self.node.name, None)
1531 result.Raise("Can't update hosts file with new host data")
1532 RedistributeAncillaryFiles(self)
1533
1534
1535 class LURepairNodeStorage(NoHooksLU):
1536 """Repairs the volume group on a node.
1537
1538 """
1539 REQ_BGL = False
1540
1541 def CheckArguments(self):
1542 (self.op.node_uuid, self.op.node_name) = \
1543 ExpandNodeUuidAndName(self.cfg, self.op.node_uuid, self.op.node_name)
1544
1545 storage_type = self.op.storage_type
1546
1547 if (constants.SO_FIX_CONSISTENCY not in
1548 constants.VALID_STORAGE_OPERATIONS.get(storage_type, [])):
1549 raise errors.OpPrereqError("Storage units of type '%s' can not be"
1550 " repaired" % storage_type,
1551 errors.ECODE_INVAL)
1552
1553 def ExpandNames(self):
1554 self.needed_locks = {
1555 locking.LEVEL_NODE: [self.op.node_uuid],
1556 }
1557
1558 def _CheckFaultyDisks(self, instance, node_uuid):
1559 """Ensure faulty disks abort the opcode or at least warn."""
1560 try:
1561 if FindFaultyInstanceDisks(self.cfg, self.rpc, instance,
1562 node_uuid, True):
1563 raise errors.OpPrereqError("Instance '%s' has faulty disks on"
1564 " node '%s'" %
1565 (instance.name,
1566 self.cfg.GetNodeName(node_uuid)),
1567 errors.ECODE_STATE)
1568 except errors.OpPrereqError, err:
1569 if self.op.ignore_consistency:
1570 self.LogWarning(str(err.args[0]))
1571 else:
1572 raise
1573
1574 def CheckPrereq(self):
1575 """Check prerequisites.
1576
1577 """
1578 CheckStorageTypeEnabled(self.cfg.GetClusterInfo(), self.op.storage_type)
1579
1580 # Check whether any instance on this node has faulty disks
1581 for inst in _GetNodeInstances(self.cfg, self.op.node_uuid):
1582 if not inst.disks_active:
1583 continue
1584 check_nodes = set(self.cfg.GetInstanceNodes(inst.uuid))
1585 check_nodes.discard(self.op.node_uuid)
1586 for inst_node_uuid in check_nodes:
1587 self._CheckFaultyDisks(inst, inst_node_uuid)
1588
1589 def Exec(self, feedback_fn):
1590 feedback_fn("Repairing storage unit '%s' on %s ..." %
1591 (self.op.name, self.op.node_name))
1592
1593 st_args = _GetStorageTypeArgs(self.cfg, self.op.storage_type)
1594 result = self.rpc.call_storage_execute(self.op.node_uuid,
1595 self.op.storage_type, st_args,
1596 self.op.name,
1597 constants.SO_FIX_CONSISTENCY)
1598 result.Raise("Failed to repair storage unit '%s' on %s" %
1599 (self.op.name, self.op.node_name))