Extract instance related logical units from cmdlib
[ganeti-github.git] / lib / cmdlib / __init__.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Module implementing the master-side code."""
23
24 # pylint: disable=W0201,C0302
25
26 # W0201 since most LU attributes are defined in CheckPrereq or similar
27 # functions
28
29 # C0302: since we have waaaay too many lines in this module
30
31 import time
32 import logging
33 import OpenSSL
34
35 from ganeti import utils
36 from ganeti import errors
37 from ganeti import locking
38 from ganeti import constants
39 from ganeti import compat
40 from ganeti import masterd
41 from ganeti import query
42 from ganeti import qlang
43
44 from ganeti.cmdlib.base import ResultWithJobs, LogicalUnit, NoHooksLU, \
45 Tasklet, _QueryBase
46 from ganeti.cmdlib.common import INSTANCE_DOWN, INSTANCE_ONLINE, \
47 INSTANCE_NOT_RUNNING, CAN_CHANGE_INSTANCE_OFFLINE, \
48 _ExpandInstanceName, _ExpandItemName, \
49 _ExpandNodeName, _ShareAll, _CheckNodeGroupInstances, _GetWantedNodes, \
50 _GetWantedInstances, _RunPostHook, _RedistributeAncillaryFiles, \
51 _MergeAndVerifyHvState, _MergeAndVerifyDiskState, _GetUpdatedIPolicy, \
52 _ComputeNewInstanceViolations, _GetUpdatedParams, _CheckOSParams, \
53 _CheckHVParams, _AdjustCandidatePool, _CheckNodePVs, \
54 _ComputeIPolicyInstanceViolation, _AnnotateDiskParams, _SupportsOob, \
55 _ComputeIPolicySpecViolation, _GetDefaultIAllocator, \
56 _CheckInstancesNodeGroups, _LoadNodeEvacResult, _MapInstanceDisksToNodes, \
57 _CheckInstanceNodeGroups, _CheckParamsNotGlobal, \
58 _IsExclusiveStorageEnabledNode, _CheckInstanceState, \
59 _CheckIAllocatorOrNode, _FindFaultyInstanceDisks, _CheckNodeOnline
60 from ganeti.cmdlib.instance_utils import _AssembleInstanceDisks, \
61 _BuildInstanceHookEnvByObject, _GetClusterDomainSecret, \
62 _CheckNodeNotDrained, _RemoveDisks, _ShutdownInstanceDisks, \
63 _StartInstanceDisks, _RemoveInstance
64
65 from ganeti.cmdlib.cluster import LUClusterActivateMasterIp, \
66 LUClusterDeactivateMasterIp, LUClusterConfigQuery, LUClusterDestroy, \
67 LUClusterPostInit, _ClusterQuery, LUClusterQuery, LUClusterRedistConf, \
68 LUClusterRename, LUClusterRepairDiskSizes, LUClusterSetParams, \
69 LUClusterVerify, LUClusterVerifyConfig, LUClusterVerifyGroup, \
70 LUClusterVerifyDisks
71 from ganeti.cmdlib.group import LUGroupAdd, LUGroupAssignNodes, \
72 _GroupQuery, LUGroupQuery, LUGroupSetParams, LUGroupRemove, \
73 LUGroupRename, LUGroupEvacuate, LUGroupVerifyDisks
74 from ganeti.cmdlib.node import LUNodeAdd, LUNodeSetParams, \
75 LUNodePowercycle, LUNodeEvacuate, LUNodeMigrate, LUNodeModifyStorage, \
76 _NodeQuery, LUNodeQuery, LUNodeQueryvols, LUNodeQueryStorage, \
77 LUNodeRemove, LURepairNodeStorage
78 from ganeti.cmdlib.instance import LUInstanceCreate, LUInstanceRename, \
79 LUInstanceRemove, LUInstanceMove, _InstanceQuery, LUInstanceQuery, \
80 LUInstanceQueryData, LUInstanceRecreateDisks, LUInstanceGrowDisk, \
81 LUInstanceReplaceDisks, LUInstanceActivateDisks, \
82 LUInstanceDeactivateDisks, LUInstanceStartup, LUInstanceShutdown, \
83 LUInstanceReinstall, LUInstanceReboot, LUInstanceConsole, \
84 LUInstanceFailover, LUInstanceMigrate, LUInstanceMultiAlloc, \
85 LUInstanceSetParams, LUInstanceChangeGroup
86 from ganeti.cmdlib.tags import LUTagsGet, LUTagsSearch, LUTagsSet, LUTagsDel
87 from ganeti.cmdlib.network import LUNetworkAdd, LUNetworkRemove, \
88 LUNetworkSetParams, _NetworkQuery, LUNetworkQuery, LUNetworkConnect, \
89 LUNetworkDisconnect
90 from ganeti.cmdlib.test import LUTestDelay, LUTestJqueue, LUTestAllocator
91
92
93 class LUOobCommand(NoHooksLU):
94 """Logical unit for OOB handling.
95
96 """
97 REQ_BGL = False
98 _SKIP_MASTER = (constants.OOB_POWER_OFF, constants.OOB_POWER_CYCLE)
99
100 def ExpandNames(self):
101 """Gather locks we need.
102
103 """
104 if self.op.node_names:
105 self.op.node_names = _GetWantedNodes(self, self.op.node_names)
106 lock_names = self.op.node_names
107 else:
108 lock_names = locking.ALL_SET
109
110 self.needed_locks = {
111 locking.LEVEL_NODE: lock_names,
112 }
113
114 self.share_locks[locking.LEVEL_NODE_ALLOC] = 1
115
116 if not self.op.node_names:
117 # Acquire node allocation lock only if all nodes are affected
118 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
119
120 def CheckPrereq(self):
121 """Check prerequisites.
122
123 This checks:
124 - the node exists in the configuration
125 - OOB is supported
126
127 Any errors are signaled by raising errors.OpPrereqError.
128
129 """
130 self.nodes = []
131 self.master_node = self.cfg.GetMasterNode()
132
133 assert self.op.power_delay >= 0.0
134
135 if self.op.node_names:
136 if (self.op.command in self._SKIP_MASTER and
137 self.master_node in self.op.node_names):
138 master_node_obj = self.cfg.GetNodeInfo(self.master_node)
139 master_oob_handler = _SupportsOob(self.cfg, master_node_obj)
140
141 if master_oob_handler:
142 additional_text = ("run '%s %s %s' if you want to operate on the"
143 " master regardless") % (master_oob_handler,
144 self.op.command,
145 self.master_node)
146 else:
147 additional_text = "it does not support out-of-band operations"
148
149 raise errors.OpPrereqError(("Operating on the master node %s is not"
150 " allowed for %s; %s") %
151 (self.master_node, self.op.command,
152 additional_text), errors.ECODE_INVAL)
153 else:
154 self.op.node_names = self.cfg.GetNodeList()
155 if self.op.command in self._SKIP_MASTER:
156 self.op.node_names.remove(self.master_node)
157
158 if self.op.command in self._SKIP_MASTER:
159 assert self.master_node not in self.op.node_names
160
161 for (node_name, node) in self.cfg.GetMultiNodeInfo(self.op.node_names):
162 if node is None:
163 raise errors.OpPrereqError("Node %s not found" % node_name,
164 errors.ECODE_NOENT)
165 else:
166 self.nodes.append(node)
167
168 if (not self.op.ignore_status and
169 (self.op.command == constants.OOB_POWER_OFF and not node.offline)):
170 raise errors.OpPrereqError(("Cannot power off node %s because it is"
171 " not marked offline") % node_name,
172 errors.ECODE_STATE)
173
174 def Exec(self, feedback_fn):
175 """Execute OOB and return result if we expect any.
176
177 """
178 master_node = self.master_node
179 ret = []
180
181 for idx, node in enumerate(utils.NiceSort(self.nodes,
182 key=lambda node: node.name)):
183 node_entry = [(constants.RS_NORMAL, node.name)]
184 ret.append(node_entry)
185
186 oob_program = _SupportsOob(self.cfg, node)
187
188 if not oob_program:
189 node_entry.append((constants.RS_UNAVAIL, None))
190 continue
191
192 logging.info("Executing out-of-band command '%s' using '%s' on %s",
193 self.op.command, oob_program, node.name)
194 result = self.rpc.call_run_oob(master_node, oob_program,
195 self.op.command, node.name,
196 self.op.timeout)
197
198 if result.fail_msg:
199 self.LogWarning("Out-of-band RPC failed on node '%s': %s",
200 node.name, result.fail_msg)
201 node_entry.append((constants.RS_NODATA, None))
202 else:
203 try:
204 self._CheckPayload(result)
205 except errors.OpExecError, err:
206 self.LogWarning("Payload returned by node '%s' is not valid: %s",
207 node.name, err)
208 node_entry.append((constants.RS_NODATA, None))
209 else:
210 if self.op.command == constants.OOB_HEALTH:
211 # For health we should log important events
212 for item, status in result.payload:
213 if status in [constants.OOB_STATUS_WARNING,
214 constants.OOB_STATUS_CRITICAL]:
215 self.LogWarning("Item '%s' on node '%s' has status '%s'",
216 item, node.name, status)
217
218 if self.op.command == constants.OOB_POWER_ON:
219 node.powered = True
220 elif self.op.command == constants.OOB_POWER_OFF:
221 node.powered = False
222 elif self.op.command == constants.OOB_POWER_STATUS:
223 powered = result.payload[constants.OOB_POWER_STATUS_POWERED]
224 if powered != node.powered:
225 logging.warning(("Recorded power state (%s) of node '%s' does not"
226 " match actual power state (%s)"), node.powered,
227 node.name, powered)
228
229 # For configuration changing commands we should update the node
230 if self.op.command in (constants.OOB_POWER_ON,
231 constants.OOB_POWER_OFF):
232 self.cfg.Update(node, feedback_fn)
233
234 node_entry.append((constants.RS_NORMAL, result.payload))
235
236 if (self.op.command == constants.OOB_POWER_ON and
237 idx < len(self.nodes) - 1):
238 time.sleep(self.op.power_delay)
239
240 return ret
241
242 def _CheckPayload(self, result):
243 """Checks if the payload is valid.
244
245 @param result: RPC result
246 @raises errors.OpExecError: If payload is not valid
247
248 """
249 errs = []
250 if self.op.command == constants.OOB_HEALTH:
251 if not isinstance(result.payload, list):
252 errs.append("command 'health' is expected to return a list but got %s" %
253 type(result.payload))
254 else:
255 for item, status in result.payload:
256 if status not in constants.OOB_STATUSES:
257 errs.append("health item '%s' has invalid status '%s'" %
258 (item, status))
259
260 if self.op.command == constants.OOB_POWER_STATUS:
261 if not isinstance(result.payload, dict):
262 errs.append("power-status is expected to return a dict but got %s" %
263 type(result.payload))
264
265 if self.op.command in [
266 constants.OOB_POWER_ON,
267 constants.OOB_POWER_OFF,
268 constants.OOB_POWER_CYCLE,
269 ]:
270 if result.payload is not None:
271 errs.append("%s is expected to not return payload but got '%s'" %
272 (self.op.command, result.payload))
273
274 if errs:
275 raise errors.OpExecError("Check of out-of-band payload failed due to %s" %
276 utils.CommaJoin(errs))
277
278
279 class _OsQuery(_QueryBase):
280 FIELDS = query.OS_FIELDS
281
282 def ExpandNames(self, lu):
283 # Lock all nodes in shared mode
284 # Temporary removal of locks, should be reverted later
285 # TODO: reintroduce locks when they are lighter-weight
286 lu.needed_locks = {}
287 #self.share_locks[locking.LEVEL_NODE] = 1
288 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
289
290 # The following variables interact with _QueryBase._GetNames
291 if self.names:
292 self.wanted = self.names
293 else:
294 self.wanted = locking.ALL_SET
295
296 self.do_locking = self.use_locking
297
298 def DeclareLocks(self, lu, level):
299 pass
300
301 @staticmethod
302 def _DiagnoseByOS(rlist):
303 """Remaps a per-node return list into an a per-os per-node dictionary
304
305 @param rlist: a map with node names as keys and OS objects as values
306
307 @rtype: dict
308 @return: a dictionary with osnames as keys and as value another
309 map, with nodes as keys and tuples of (path, status, diagnose,
310 variants, parameters, api_versions) as values, eg::
311
312 {"debian-etch": {"node1": [(/usr/lib/..., True, "", [], []),
313 (/srv/..., False, "invalid api")],
314 "node2": [(/srv/..., True, "", [], [])]}
315 }
316
317 """
318 all_os = {}
319 # we build here the list of nodes that didn't fail the RPC (at RPC
320 # level), so that nodes with a non-responding node daemon don't
321 # make all OSes invalid
322 good_nodes = [node_name for node_name in rlist
323 if not rlist[node_name].fail_msg]
324 for node_name, nr in rlist.items():
325 if nr.fail_msg or not nr.payload:
326 continue
327 for (name, path, status, diagnose, variants,
328 params, api_versions) in nr.payload:
329 if name not in all_os:
330 # build a list of nodes for this os containing empty lists
331 # for each node in node_list
332 all_os[name] = {}
333 for nname in good_nodes:
334 all_os[name][nname] = []
335 # convert params from [name, help] to (name, help)
336 params = [tuple(v) for v in params]
337 all_os[name][node_name].append((path, status, diagnose,
338 variants, params, api_versions))
339 return all_os
340
341 def _GetQueryData(self, lu):
342 """Computes the list of nodes and their attributes.
343
344 """
345 # Locking is not used
346 assert not (compat.any(lu.glm.is_owned(level)
347 for level in locking.LEVELS
348 if level != locking.LEVEL_CLUSTER) or
349 self.do_locking or self.use_locking)
350
351 valid_nodes = [node.name
352 for node in lu.cfg.GetAllNodesInfo().values()
353 if not node.offline and node.vm_capable]
354 pol = self._DiagnoseByOS(lu.rpc.call_os_diagnose(valid_nodes))
355 cluster = lu.cfg.GetClusterInfo()
356
357 data = {}
358
359 for (os_name, os_data) in pol.items():
360 info = query.OsInfo(name=os_name, valid=True, node_status=os_data,
361 hidden=(os_name in cluster.hidden_os),
362 blacklisted=(os_name in cluster.blacklisted_os))
363
364 variants = set()
365 parameters = set()
366 api_versions = set()
367
368 for idx, osl in enumerate(os_data.values()):
369 info.valid = bool(info.valid and osl and osl[0][1])
370 if not info.valid:
371 break
372
373 (node_variants, node_params, node_api) = osl[0][3:6]
374 if idx == 0:
375 # First entry
376 variants.update(node_variants)
377 parameters.update(node_params)
378 api_versions.update(node_api)
379 else:
380 # Filter out inconsistent values
381 variants.intersection_update(node_variants)
382 parameters.intersection_update(node_params)
383 api_versions.intersection_update(node_api)
384
385 info.variants = list(variants)
386 info.parameters = list(parameters)
387 info.api_versions = list(api_versions)
388
389 data[os_name] = info
390
391 # Prepare data in requested order
392 return [data[name] for name in self._GetNames(lu, pol.keys(), None)
393 if name in data]
394
395
396 class LUOsDiagnose(NoHooksLU):
397 """Logical unit for OS diagnose/query.
398
399 """
400 REQ_BGL = False
401
402 @staticmethod
403 def _BuildFilter(fields, names):
404 """Builds a filter for querying OSes.
405
406 """
407 name_filter = qlang.MakeSimpleFilter("name", names)
408
409 # Legacy behaviour: Hide hidden, blacklisted or invalid OSes if the
410 # respective field is not requested
411 status_filter = [[qlang.OP_NOT, [qlang.OP_TRUE, fname]]
412 for fname in ["hidden", "blacklisted"]
413 if fname not in fields]
414 if "valid" not in fields:
415 status_filter.append([qlang.OP_TRUE, "valid"])
416
417 if status_filter:
418 status_filter.insert(0, qlang.OP_AND)
419 else:
420 status_filter = None
421
422 if name_filter and status_filter:
423 return [qlang.OP_AND, name_filter, status_filter]
424 elif name_filter:
425 return name_filter
426 else:
427 return status_filter
428
429 def CheckArguments(self):
430 self.oq = _OsQuery(self._BuildFilter(self.op.output_fields, self.op.names),
431 self.op.output_fields, False)
432
433 def ExpandNames(self):
434 self.oq.ExpandNames(self)
435
436 def Exec(self, feedback_fn):
437 return self.oq.OldStyleQuery(self)
438
439
440 class _ExtStorageQuery(_QueryBase):
441 FIELDS = query.EXTSTORAGE_FIELDS
442
443 def ExpandNames(self, lu):
444 # Lock all nodes in shared mode
445 # Temporary removal of locks, should be reverted later
446 # TODO: reintroduce locks when they are lighter-weight
447 lu.needed_locks = {}
448 #self.share_locks[locking.LEVEL_NODE] = 1
449 #self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
450
451 # The following variables interact with _QueryBase._GetNames
452 if self.names:
453 self.wanted = self.names
454 else:
455 self.wanted = locking.ALL_SET
456
457 self.do_locking = self.use_locking
458
459 def DeclareLocks(self, lu, level):
460 pass
461
462 @staticmethod
463 def _DiagnoseByProvider(rlist):
464 """Remaps a per-node return list into an a per-provider per-node dictionary
465
466 @param rlist: a map with node names as keys and ExtStorage objects as values
467
468 @rtype: dict
469 @return: a dictionary with extstorage providers as keys and as
470 value another map, with nodes as keys and tuples of
471 (path, status, diagnose, parameters) as values, eg::
472
473 {"provider1": {"node1": [(/usr/lib/..., True, "", [])]
474 "node2": [(/srv/..., False, "missing file")]
475 "node3": [(/srv/..., True, "", [])]
476 }
477
478 """
479 all_es = {}
480 # we build here the list of nodes that didn't fail the RPC (at RPC
481 # level), so that nodes with a non-responding node daemon don't
482 # make all OSes invalid
483 good_nodes = [node_name for node_name in rlist
484 if not rlist[node_name].fail_msg]
485 for node_name, nr in rlist.items():
486 if nr.fail_msg or not nr.payload:
487 continue
488 for (name, path, status, diagnose, params) in nr.payload:
489 if name not in all_es:
490 # build a list of nodes for this os containing empty lists
491 # for each node in node_list
492 all_es[name] = {}
493 for nname in good_nodes:
494 all_es[name][nname] = []
495 # convert params from [name, help] to (name, help)
496 params = [tuple(v) for v in params]
497 all_es[name][node_name].append((path, status, diagnose, params))
498 return all_es
499
500 def _GetQueryData(self, lu):
501 """Computes the list of nodes and their attributes.
502
503 """
504 # Locking is not used
505 assert not (compat.any(lu.glm.is_owned(level)
506 for level in locking.LEVELS
507 if level != locking.LEVEL_CLUSTER) or
508 self.do_locking or self.use_locking)
509
510 valid_nodes = [node.name
511 for node in lu.cfg.GetAllNodesInfo().values()
512 if not node.offline and node.vm_capable]
513 pol = self._DiagnoseByProvider(lu.rpc.call_extstorage_diagnose(valid_nodes))
514
515 data = {}
516
517 nodegroup_list = lu.cfg.GetNodeGroupList()
518
519 for (es_name, es_data) in pol.items():
520 # For every provider compute the nodegroup validity.
521 # To do this we need to check the validity of each node in es_data
522 # and then construct the corresponding nodegroup dict:
523 # { nodegroup1: status
524 # nodegroup2: status
525 # }
526 ndgrp_data = {}
527 for nodegroup in nodegroup_list:
528 ndgrp = lu.cfg.GetNodeGroup(nodegroup)
529
530 nodegroup_nodes = ndgrp.members
531 nodegroup_name = ndgrp.name
532 node_statuses = []
533
534 for node in nodegroup_nodes:
535 if node in valid_nodes:
536 if es_data[node] != []:
537 node_status = es_data[node][0][1]
538 node_statuses.append(node_status)
539 else:
540 node_statuses.append(False)
541
542 if False in node_statuses:
543 ndgrp_data[nodegroup_name] = False
544 else:
545 ndgrp_data[nodegroup_name] = True
546
547 # Compute the provider's parameters
548 parameters = set()
549 for idx, esl in enumerate(es_data.values()):
550 valid = bool(esl and esl[0][1])
551 if not valid:
552 break
553
554 node_params = esl[0][3]
555 if idx == 0:
556 # First entry
557 parameters.update(node_params)
558 else:
559 # Filter out inconsistent values
560 parameters.intersection_update(node_params)
561
562 params = list(parameters)
563
564 # Now fill all the info for this provider
565 info = query.ExtStorageInfo(name=es_name, node_status=es_data,
566 nodegroup_status=ndgrp_data,
567 parameters=params)
568
569 data[es_name] = info
570
571 # Prepare data in requested order
572 return [data[name] for name in self._GetNames(lu, pol.keys(), None)
573 if name in data]
574
575
576 class LUExtStorageDiagnose(NoHooksLU):
577 """Logical unit for ExtStorage diagnose/query.
578
579 """
580 REQ_BGL = False
581
582 def CheckArguments(self):
583 self.eq = _ExtStorageQuery(qlang.MakeSimpleFilter("name", self.op.names),
584 self.op.output_fields, False)
585
586 def ExpandNames(self):
587 self.eq.ExpandNames(self)
588
589 def Exec(self, feedback_fn):
590 return self.eq.OldStyleQuery(self)
591
592
593 class LUQuery(NoHooksLU):
594 """Query for resources/items of a certain kind.
595
596 """
597 # pylint: disable=W0142
598 REQ_BGL = False
599
600 def CheckArguments(self):
601 qcls = _GetQueryImplementation(self.op.what)
602
603 self.impl = qcls(self.op.qfilter, self.op.fields, self.op.use_locking)
604
605 def ExpandNames(self):
606 self.impl.ExpandNames(self)
607
608 def DeclareLocks(self, level):
609 self.impl.DeclareLocks(self, level)
610
611 def Exec(self, feedback_fn):
612 return self.impl.NewStyleQuery(self)
613
614
615 class LUQueryFields(NoHooksLU):
616 """Query for resources/items of a certain kind.
617
618 """
619 # pylint: disable=W0142
620 REQ_BGL = False
621
622 def CheckArguments(self):
623 self.qcls = _GetQueryImplementation(self.op.what)
624
625 def ExpandNames(self):
626 self.needed_locks = {}
627
628 def Exec(self, feedback_fn):
629 return query.QueryFields(self.qcls.FIELDS, self.op.fields)
630
631
632 class LUBackupQuery(NoHooksLU):
633 """Query the exports list
634
635 """
636 REQ_BGL = False
637
638 def CheckArguments(self):
639 self.expq = _ExportQuery(qlang.MakeSimpleFilter("node", self.op.nodes),
640 ["node", "export"], self.op.use_locking)
641
642 def ExpandNames(self):
643 self.expq.ExpandNames(self)
644
645 def DeclareLocks(self, level):
646 self.expq.DeclareLocks(self, level)
647
648 def Exec(self, feedback_fn):
649 result = {}
650
651 for (node, expname) in self.expq.OldStyleQuery(self):
652 if expname is None:
653 result[node] = False
654 else:
655 result.setdefault(node, []).append(expname)
656
657 return result
658
659
660 class _ExportQuery(_QueryBase):
661 FIELDS = query.EXPORT_FIELDS
662
663 #: The node name is not a unique key for this query
664 SORT_FIELD = "node"
665
666 def ExpandNames(self, lu):
667 lu.needed_locks = {}
668
669 # The following variables interact with _QueryBase._GetNames
670 if self.names:
671 self.wanted = _GetWantedNodes(lu, self.names)
672 else:
673 self.wanted = locking.ALL_SET
674
675 self.do_locking = self.use_locking
676
677 if self.do_locking:
678 lu.share_locks = _ShareAll()
679 lu.needed_locks = {
680 locking.LEVEL_NODE: self.wanted,
681 }
682
683 if not self.names:
684 lu.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
685
686 def DeclareLocks(self, lu, level):
687 pass
688
689 def _GetQueryData(self, lu):
690 """Computes the list of nodes and their attributes.
691
692 """
693 # Locking is not used
694 # TODO
695 assert not (compat.any(lu.glm.is_owned(level)
696 for level in locking.LEVELS
697 if level != locking.LEVEL_CLUSTER) or
698 self.do_locking or self.use_locking)
699
700 nodes = self._GetNames(lu, lu.cfg.GetNodeList(), locking.LEVEL_NODE)
701
702 result = []
703
704 for (node, nres) in lu.rpc.call_export_list(nodes).items():
705 if nres.fail_msg:
706 result.append((node, None))
707 else:
708 result.extend((node, expname) for expname in nres.payload)
709
710 return result
711
712
713 class LUBackupPrepare(NoHooksLU):
714 """Prepares an instance for an export and returns useful information.
715
716 """
717 REQ_BGL = False
718
719 def ExpandNames(self):
720 self._ExpandAndLockInstance()
721
722 def CheckPrereq(self):
723 """Check prerequisites.
724
725 """
726 instance_name = self.op.instance_name
727
728 self.instance = self.cfg.GetInstanceInfo(instance_name)
729 assert self.instance is not None, \
730 "Cannot retrieve locked instance %s" % self.op.instance_name
731 _CheckNodeOnline(self, self.instance.primary_node)
732
733 self._cds = _GetClusterDomainSecret()
734
735 def Exec(self, feedback_fn):
736 """Prepares an instance for an export.
737
738 """
739 instance = self.instance
740
741 if self.op.mode == constants.EXPORT_MODE_REMOTE:
742 salt = utils.GenerateSecret(8)
743
744 feedback_fn("Generating X509 certificate on %s" % instance.primary_node)
745 result = self.rpc.call_x509_cert_create(instance.primary_node,
746 constants.RIE_CERT_VALIDITY)
747 result.Raise("Can't create X509 key and certificate on %s" % result.node)
748
749 (name, cert_pem) = result.payload
750
751 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
752 cert_pem)
753
754 return {
755 "handshake": masterd.instance.ComputeRemoteExportHandshake(self._cds),
756 "x509_key_name": (name, utils.Sha1Hmac(self._cds, name, salt=salt),
757 salt),
758 "x509_ca": utils.SignX509Certificate(cert, self._cds, salt),
759 }
760
761 return None
762
763
764 class LUBackupExport(LogicalUnit):
765 """Export an instance to an image in the cluster.
766
767 """
768 HPATH = "instance-export"
769 HTYPE = constants.HTYPE_INSTANCE
770 REQ_BGL = False
771
772 def CheckArguments(self):
773 """Check the arguments.
774
775 """
776 self.x509_key_name = self.op.x509_key_name
777 self.dest_x509_ca_pem = self.op.destination_x509_ca
778
779 if self.op.mode == constants.EXPORT_MODE_REMOTE:
780 if not self.x509_key_name:
781 raise errors.OpPrereqError("Missing X509 key name for encryption",
782 errors.ECODE_INVAL)
783
784 if not self.dest_x509_ca_pem:
785 raise errors.OpPrereqError("Missing destination X509 CA",
786 errors.ECODE_INVAL)
787
788 def ExpandNames(self):
789 self._ExpandAndLockInstance()
790
791 # Lock all nodes for local exports
792 if self.op.mode == constants.EXPORT_MODE_LOCAL:
793 # FIXME: lock only instance primary and destination node
794 #
795 # Sad but true, for now we have do lock all nodes, as we don't know where
796 # the previous export might be, and in this LU we search for it and
797 # remove it from its current node. In the future we could fix this by:
798 # - making a tasklet to search (share-lock all), then create the
799 # new one, then one to remove, after
800 # - removing the removal operation altogether
801 self.needed_locks[locking.LEVEL_NODE] = locking.ALL_SET
802
803 # Allocations should be stopped while this LU runs with node locks, but
804 # it doesn't have to be exclusive
805 self.share_locks[locking.LEVEL_NODE_ALLOC] = 1
806 self.needed_locks[locking.LEVEL_NODE_ALLOC] = locking.ALL_SET
807
808 def DeclareLocks(self, level):
809 """Last minute lock declaration."""
810 # All nodes are locked anyway, so nothing to do here.
811
812 def BuildHooksEnv(self):
813 """Build hooks env.
814
815 This will run on the master, primary node and target node.
816
817 """
818 env = {
819 "EXPORT_MODE": self.op.mode,
820 "EXPORT_NODE": self.op.target_node,
821 "EXPORT_DO_SHUTDOWN": self.op.shutdown,
822 "SHUTDOWN_TIMEOUT": self.op.shutdown_timeout,
823 # TODO: Generic function for boolean env variables
824 "REMOVE_INSTANCE": str(bool(self.op.remove_instance)),
825 }
826
827 env.update(_BuildInstanceHookEnvByObject(self, self.instance))
828
829 return env
830
831 def BuildHooksNodes(self):
832 """Build hooks nodes.
833
834 """
835 nl = [self.cfg.GetMasterNode(), self.instance.primary_node]
836
837 if self.op.mode == constants.EXPORT_MODE_LOCAL:
838 nl.append(self.op.target_node)
839
840 return (nl, nl)
841
842 def CheckPrereq(self):
843 """Check prerequisites.
844
845 This checks that the instance and node names are valid.
846
847 """
848 instance_name = self.op.instance_name
849
850 self.instance = self.cfg.GetInstanceInfo(instance_name)
851 assert self.instance is not None, \
852 "Cannot retrieve locked instance %s" % self.op.instance_name
853 _CheckNodeOnline(self, self.instance.primary_node)
854
855 if (self.op.remove_instance and
856 self.instance.admin_state == constants.ADMINST_UP and
857 not self.op.shutdown):
858 raise errors.OpPrereqError("Can not remove instance without shutting it"
859 " down before", errors.ECODE_STATE)
860
861 if self.op.mode == constants.EXPORT_MODE_LOCAL:
862 self.op.target_node = _ExpandNodeName(self.cfg, self.op.target_node)
863 self.dst_node = self.cfg.GetNodeInfo(self.op.target_node)
864 assert self.dst_node is not None
865
866 _CheckNodeOnline(self, self.dst_node.name)
867 _CheckNodeNotDrained(self, self.dst_node.name)
868
869 self._cds = None
870 self.dest_disk_info = None
871 self.dest_x509_ca = None
872
873 elif self.op.mode == constants.EXPORT_MODE_REMOTE:
874 self.dst_node = None
875
876 if len(self.op.target_node) != len(self.instance.disks):
877 raise errors.OpPrereqError(("Received destination information for %s"
878 " disks, but instance %s has %s disks") %
879 (len(self.op.target_node), instance_name,
880 len(self.instance.disks)),
881 errors.ECODE_INVAL)
882
883 cds = _GetClusterDomainSecret()
884
885 # Check X509 key name
886 try:
887 (key_name, hmac_digest, hmac_salt) = self.x509_key_name
888 except (TypeError, ValueError), err:
889 raise errors.OpPrereqError("Invalid data for X509 key name: %s" % err,
890 errors.ECODE_INVAL)
891
892 if not utils.VerifySha1Hmac(cds, key_name, hmac_digest, salt=hmac_salt):
893 raise errors.OpPrereqError("HMAC for X509 key name is wrong",
894 errors.ECODE_INVAL)
895
896 # Load and verify CA
897 try:
898 (cert, _) = utils.LoadSignedX509Certificate(self.dest_x509_ca_pem, cds)
899 except OpenSSL.crypto.Error, err:
900 raise errors.OpPrereqError("Unable to load destination X509 CA (%s)" %
901 (err, ), errors.ECODE_INVAL)
902
903 (errcode, msg) = utils.VerifyX509Certificate(cert, None, None)
904 if errcode is not None:
905 raise errors.OpPrereqError("Invalid destination X509 CA (%s)" %
906 (msg, ), errors.ECODE_INVAL)
907
908 self.dest_x509_ca = cert
909
910 # Verify target information
911 disk_info = []
912 for idx, disk_data in enumerate(self.op.target_node):
913 try:
914 (host, port, magic) = \
915 masterd.instance.CheckRemoteExportDiskInfo(cds, idx, disk_data)
916 except errors.GenericError, err:
917 raise errors.OpPrereqError("Target info for disk %s: %s" %
918 (idx, err), errors.ECODE_INVAL)
919
920 disk_info.append((host, port, magic))
921
922 assert len(disk_info) == len(self.op.target_node)
923 self.dest_disk_info = disk_info
924
925 else:
926 raise errors.ProgrammerError("Unhandled export mode %r" %
927 self.op.mode)
928
929 # instance disk type verification
930 # TODO: Implement export support for file-based disks
931 for disk in self.instance.disks:
932 if disk.dev_type == constants.LD_FILE:
933 raise errors.OpPrereqError("Export not supported for instances with"
934 " file-based disks", errors.ECODE_INVAL)
935
936 def _CleanupExports(self, feedback_fn):
937 """Removes exports of current instance from all other nodes.
938
939 If an instance in a cluster with nodes A..D was exported to node C, its
940 exports will be removed from the nodes A, B and D.
941
942 """
943 assert self.op.mode != constants.EXPORT_MODE_REMOTE
944
945 nodelist = self.cfg.GetNodeList()
946 nodelist.remove(self.dst_node.name)
947
948 # on one-node clusters nodelist will be empty after the removal
949 # if we proceed the backup would be removed because OpBackupQuery
950 # substitutes an empty list with the full cluster node list.
951 iname = self.instance.name
952 if nodelist:
953 feedback_fn("Removing old exports for instance %s" % iname)
954 exportlist = self.rpc.call_export_list(nodelist)
955 for node in exportlist:
956 if exportlist[node].fail_msg:
957 continue
958 if iname in exportlist[node].payload:
959 msg = self.rpc.call_export_remove(node, iname).fail_msg
960 if msg:
961 self.LogWarning("Could not remove older export for instance %s"
962 " on node %s: %s", iname, node, msg)
963
964 def Exec(self, feedback_fn):
965 """Export an instance to an image in the cluster.
966
967 """
968 assert self.op.mode in constants.EXPORT_MODES
969
970 instance = self.instance
971 src_node = instance.primary_node
972
973 if self.op.shutdown:
974 # shutdown the instance, but not the disks
975 feedback_fn("Shutting down instance %s" % instance.name)
976 result = self.rpc.call_instance_shutdown(src_node, instance,
977 self.op.shutdown_timeout,
978 self.op.reason)
979 # TODO: Maybe ignore failures if ignore_remove_failures is set
980 result.Raise("Could not shutdown instance %s on"
981 " node %s" % (instance.name, src_node))
982
983 # set the disks ID correctly since call_instance_start needs the
984 # correct drbd minor to create the symlinks
985 for disk in instance.disks:
986 self.cfg.SetDiskID(disk, src_node)
987
988 activate_disks = (instance.admin_state != constants.ADMINST_UP)
989
990 if activate_disks:
991 # Activate the instance disks if we'exporting a stopped instance
992 feedback_fn("Activating disks for %s" % instance.name)
993 _StartInstanceDisks(self, instance, None)
994
995 try:
996 helper = masterd.instance.ExportInstanceHelper(self, feedback_fn,
997 instance)
998
999 helper.CreateSnapshots()
1000 try:
1001 if (self.op.shutdown and
1002 instance.admin_state == constants.ADMINST_UP and
1003 not self.op.remove_instance):
1004 assert not activate_disks
1005 feedback_fn("Starting instance %s" % instance.name)
1006 result = self.rpc.call_instance_start(src_node,
1007 (instance, None, None), False,
1008 self.op.reason)
1009 msg = result.fail_msg
1010 if msg:
1011 feedback_fn("Failed to start instance: %s" % msg)
1012 _ShutdownInstanceDisks(self, instance)
1013 raise errors.OpExecError("Could not start instance: %s" % msg)
1014
1015 if self.op.mode == constants.EXPORT_MODE_LOCAL:
1016 (fin_resu, dresults) = helper.LocalExport(self.dst_node)
1017 elif self.op.mode == constants.EXPORT_MODE_REMOTE:
1018 connect_timeout = constants.RIE_CONNECT_TIMEOUT
1019 timeouts = masterd.instance.ImportExportTimeouts(connect_timeout)
1020
1021 (key_name, _, _) = self.x509_key_name
1022
1023 dest_ca_pem = \
1024 OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
1025 self.dest_x509_ca)
1026
1027 (fin_resu, dresults) = helper.RemoteExport(self.dest_disk_info,
1028 key_name, dest_ca_pem,
1029 timeouts)
1030 finally:
1031 helper.Cleanup()
1032
1033 # Check for backwards compatibility
1034 assert len(dresults) == len(instance.disks)
1035 assert compat.all(isinstance(i, bool) for i in dresults), \
1036 "Not all results are boolean: %r" % dresults
1037
1038 finally:
1039 if activate_disks:
1040 feedback_fn("Deactivating disks for %s" % instance.name)
1041 _ShutdownInstanceDisks(self, instance)
1042
1043 if not (compat.all(dresults) and fin_resu):
1044 failures = []
1045 if not fin_resu:
1046 failures.append("export finalization")
1047 if not compat.all(dresults):
1048 fdsk = utils.CommaJoin(idx for (idx, dsk) in enumerate(dresults)
1049 if not dsk)
1050 failures.append("disk export: disk(s) %s" % fdsk)
1051
1052 raise errors.OpExecError("Export failed, errors in %s" %
1053 utils.CommaJoin(failures))
1054
1055 # At this point, the export was successful, we can cleanup/finish
1056
1057 # Remove instance if requested
1058 if self.op.remove_instance:
1059 feedback_fn("Removing instance %s" % instance.name)
1060 _RemoveInstance(self, feedback_fn, instance,
1061 self.op.ignore_remove_failures)
1062
1063 if self.op.mode == constants.EXPORT_MODE_LOCAL:
1064 self._CleanupExports(feedback_fn)
1065
1066 return fin_resu, dresults
1067
1068
1069 class LUBackupRemove(NoHooksLU):
1070 """Remove exports related to the named instance.
1071
1072 """
1073 REQ_BGL = False
1074
1075 def ExpandNames(self):
1076 self.needed_locks = {
1077 # We need all nodes to be locked in order for RemoveExport to work, but
1078 # we don't need to lock the instance itself, as nothing will happen to it
1079 # (and we can remove exports also for a removed instance)
1080 locking.LEVEL_NODE: locking.ALL_SET,
1081
1082 # Removing backups is quick, so blocking allocations is justified
1083 locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1084 }
1085
1086 # Allocations should be stopped while this LU runs with node locks, but it
1087 # doesn't have to be exclusive
1088 self.share_locks[locking.LEVEL_NODE_ALLOC] = 1
1089
1090 def Exec(self, feedback_fn):
1091 """Remove any export.
1092
1093 """
1094 instance_name = self.cfg.ExpandInstanceName(self.op.instance_name)
1095 # If the instance was not found we'll try with the name that was passed in.
1096 # This will only work if it was an FQDN, though.
1097 fqdn_warn = False
1098 if not instance_name:
1099 fqdn_warn = True
1100 instance_name = self.op.instance_name
1101
1102 locked_nodes = self.owned_locks(locking.LEVEL_NODE)
1103 exportlist = self.rpc.call_export_list(locked_nodes)
1104 found = False
1105 for node in exportlist:
1106 msg = exportlist[node].fail_msg
1107 if msg:
1108 self.LogWarning("Failed to query node %s (continuing): %s", node, msg)
1109 continue
1110 if instance_name in exportlist[node].payload:
1111 found = True
1112 result = self.rpc.call_export_remove(node, instance_name)
1113 msg = result.fail_msg
1114 if msg:
1115 logging.error("Could not remove export for instance %s"
1116 " on node %s: %s", instance_name, node, msg)
1117
1118 if fqdn_warn and not found:
1119 feedback_fn("Export not found. If trying to remove an export belonging"
1120 " to a deleted instance please use its Fully Qualified"
1121 " Domain Name.")
1122
1123
1124 class LURestrictedCommand(NoHooksLU):
1125 """Logical unit for executing restricted commands.
1126
1127 """
1128 REQ_BGL = False
1129
1130 def ExpandNames(self):
1131 if self.op.nodes:
1132 self.op.nodes = _GetWantedNodes(self, self.op.nodes)
1133
1134 self.needed_locks = {
1135 locking.LEVEL_NODE: self.op.nodes,
1136 }
1137 self.share_locks = {
1138 locking.LEVEL_NODE: not self.op.use_locking,
1139 }
1140
1141 def CheckPrereq(self):
1142 """Check prerequisites.
1143
1144 """
1145
1146 def Exec(self, feedback_fn):
1147 """Execute restricted command and return output.
1148
1149 """
1150 owned_nodes = frozenset(self.owned_locks(locking.LEVEL_NODE))
1151
1152 # Check if correct locks are held
1153 assert set(self.op.nodes).issubset(owned_nodes)
1154
1155 rpcres = self.rpc.call_restricted_command(self.op.nodes, self.op.command)
1156
1157 result = []
1158
1159 for node_name in self.op.nodes:
1160 nres = rpcres[node_name]
1161 if nres.fail_msg:
1162 msg = ("Command '%s' on node '%s' failed: %s" %
1163 (self.op.command, node_name, nres.fail_msg))
1164 result.append((False, msg))
1165 else:
1166 result.append((True, nres.payload))
1167
1168 return result
1169
1170
1171 #: Query type implementations
1172 _QUERY_IMPL = {
1173 constants.QR_CLUSTER: _ClusterQuery,
1174 constants.QR_INSTANCE: _InstanceQuery,
1175 constants.QR_NODE: _NodeQuery,
1176 constants.QR_GROUP: _GroupQuery,
1177 constants.QR_NETWORK: _NetworkQuery,
1178 constants.QR_OS: _OsQuery,
1179 constants.QR_EXTSTORAGE: _ExtStorageQuery,
1180 constants.QR_EXPORT: _ExportQuery,
1181 }
1182
1183 assert set(_QUERY_IMPL.keys()) == constants.QR_VIA_OP
1184
1185
1186 def _GetQueryImplementation(name):
1187 """Returns the implemtnation for a query type.
1188
1189 @param name: Query type, must be one of L{constants.QR_VIA_OP}
1190
1191 """
1192 try:
1193 return _QUERY_IMPL[name]
1194 except KeyError:
1195 raise errors.OpPrereqError("Unknown query resource '%s'" % name,
1196 errors.ECODE_INVAL)