Handle SSH key distribution on auto promotion
[ganeti-github.git] / lib / cmdlib / common.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Google Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions are
9 # met:
10 #
11 # 1. Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
13 #
14 # 2. Redistributions in binary form must reproduce the above copyright
15 # notice, this list of conditions and the following disclaimer in the
16 # documentation and/or other materials provided with the distribution.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
19 # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
20 # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30
31 """Common functions used by multiple logical units."""
32
33 import copy
34 import math
35 import os
36 import urllib2
37
38 from ganeti import compat
39 from ganeti import constants
40 from ganeti import errors
41 from ganeti import hypervisor
42 from ganeti import locking
43 from ganeti import objects
44 from ganeti import opcodes
45 from ganeti import pathutils
46 import ganeti.rpc.node as rpc
47 from ganeti.serializer import Private
48 from ganeti import ssconf
49 from ganeti import utils
50
51
52 # States of instance
53 INSTANCE_DOWN = [constants.ADMINST_DOWN]
54 INSTANCE_ONLINE = [constants.ADMINST_DOWN, constants.ADMINST_UP]
55 INSTANCE_NOT_RUNNING = [constants.ADMINST_DOWN, constants.ADMINST_OFFLINE]
56
57
58 def _ExpandItemName(expand_fn, name, kind):
59 """Expand an item name.
60
61 @param expand_fn: the function to use for expansion
62 @param name: requested item name
63 @param kind: text description ('Node' or 'Instance')
64 @return: the result of the expand_fn, if successful
65 @raise errors.OpPrereqError: if the item is not found
66
67 """
68 (uuid, full_name) = expand_fn(name)
69 if uuid is None or full_name is None:
70 raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
71 errors.ECODE_NOENT)
72 return (uuid, full_name)
73
74
75 def ExpandInstanceUuidAndName(cfg, expected_uuid, name):
76 """Wrapper over L{_ExpandItemName} for instance."""
77 (uuid, full_name) = _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
78 if expected_uuid is not None and uuid != expected_uuid:
79 raise errors.OpPrereqError(
80 "The instances UUID '%s' does not match the expected UUID '%s' for"
81 " instance '%s'. Maybe the instance changed since you submitted this"
82 " job." % (uuid, expected_uuid, full_name), errors.ECODE_NOTUNIQUE)
83 return (uuid, full_name)
84
85
86 def ExpandNodeUuidAndName(cfg, expected_uuid, name):
87 """Expand a short node name into the node UUID and full name.
88
89 @type cfg: L{config.ConfigWriter}
90 @param cfg: The cluster configuration
91 @type expected_uuid: string
92 @param expected_uuid: expected UUID for the node (or None if there is no
93 expectation). If it does not match, a L{errors.OpPrereqError} is
94 raised.
95 @type name: string
96 @param name: the short node name
97
98 """
99 (uuid, full_name) = _ExpandItemName(cfg.ExpandNodeName, name, "Node")
100 if expected_uuid is not None and uuid != expected_uuid:
101 raise errors.OpPrereqError(
102 "The nodes UUID '%s' does not match the expected UUID '%s' for node"
103 " '%s'. Maybe the node changed since you submitted this job." %
104 (uuid, expected_uuid, full_name), errors.ECODE_NOTUNIQUE)
105 return (uuid, full_name)
106
107
108 def ShareAll():
109 """Returns a dict declaring all lock levels shared.
110
111 """
112 return dict.fromkeys(locking.LEVELS, 1)
113
114
115 def CheckNodeGroupInstances(cfg, group_uuid, owned_instance_names):
116 """Checks if the instances in a node group are still correct.
117
118 @type cfg: L{config.ConfigWriter}
119 @param cfg: The cluster configuration
120 @type group_uuid: string
121 @param group_uuid: Node group UUID
122 @type owned_instance_names: set or frozenset
123 @param owned_instance_names: List of currently owned instances
124
125 """
126 wanted_instances = frozenset(cfg.GetInstanceNames(
127 cfg.GetNodeGroupInstances(group_uuid)))
128 if owned_instance_names != wanted_instances:
129 raise errors.OpPrereqError("Instances in node group '%s' changed since"
130 " locks were acquired, wanted '%s', have '%s';"
131 " retry the operation" %
132 (group_uuid,
133 utils.CommaJoin(wanted_instances),
134 utils.CommaJoin(owned_instance_names)),
135 errors.ECODE_STATE)
136
137 return wanted_instances
138
139
140 def GetWantedNodes(lu, short_node_names):
141 """Returns list of checked and expanded node names.
142
143 @type lu: L{LogicalUnit}
144 @param lu: the logical unit on whose behalf we execute
145 @type short_node_names: list
146 @param short_node_names: list of node names or None for all nodes
147 @rtype: tuple of lists
148 @return: tupe with (list of node UUIDs, list of node names)
149 @raise errors.ProgrammerError: if the nodes parameter is wrong type
150
151 """
152 if short_node_names:
153 node_uuids = [ExpandNodeUuidAndName(lu.cfg, None, name)[0]
154 for name in short_node_names]
155 else:
156 node_uuids = lu.cfg.GetNodeList()
157
158 return (node_uuids, [lu.cfg.GetNodeName(uuid) for uuid in node_uuids])
159
160
161 def GetWantedInstances(lu, short_inst_names):
162 """Returns list of checked and expanded instance names.
163
164 @type lu: L{LogicalUnit}
165 @param lu: the logical unit on whose behalf we execute
166 @type short_inst_names: list
167 @param short_inst_names: list of instance names or None for all instances
168 @rtype: tuple of lists
169 @return: tuple of (instance UUIDs, instance names)
170 @raise errors.OpPrereqError: if the instances parameter is wrong type
171 @raise errors.OpPrereqError: if any of the passed instances is not found
172
173 """
174 if short_inst_names:
175 inst_uuids = [ExpandInstanceUuidAndName(lu.cfg, None, name)[0]
176 for name in short_inst_names]
177 else:
178 inst_uuids = lu.cfg.GetInstanceList()
179 return (inst_uuids, [lu.cfg.GetInstanceName(uuid) for uuid in inst_uuids])
180
181
182 def RunPostHook(lu, node_name):
183 """Runs the post-hook for an opcode on a single node.
184
185 """
186 hm = lu.proc.BuildHooksManager(lu)
187 try:
188 hm.RunPhase(constants.HOOKS_PHASE_POST, node_names=[node_name])
189 except Exception, err: # pylint: disable=W0703
190 lu.LogWarning("Errors occurred running hooks on %s: %s",
191 node_name, err)
192
193
194 def RedistributeAncillaryFiles(lu):
195 """Distribute additional files which are part of the cluster configuration.
196
197 ConfigWriter takes care of distributing the config and ssconf files, but
198 there are more files which should be distributed to all nodes. This function
199 makes sure those are copied.
200
201 """
202 # Gather target nodes
203 cluster = lu.cfg.GetClusterInfo()
204 master_info = lu.cfg.GetMasterNodeInfo()
205
206 online_node_uuids = lu.cfg.GetOnlineNodeList()
207 online_node_uuid_set = frozenset(online_node_uuids)
208 vm_node_uuids = list(online_node_uuid_set.intersection(
209 lu.cfg.GetVmCapableNodeList()))
210
211 # Never distribute to master node
212 for node_uuids in [online_node_uuids, vm_node_uuids]:
213 if master_info.uuid in node_uuids:
214 node_uuids.remove(master_info.uuid)
215
216 # Gather file lists
217 (files_all, _, files_mc, files_vm) = \
218 ComputeAncillaryFiles(cluster, True)
219
220 # Never re-distribute configuration file from here
221 assert not (pathutils.CLUSTER_CONF_FILE in files_all or
222 pathutils.CLUSTER_CONF_FILE in files_vm)
223 assert not files_mc, "Master candidates not handled in this function"
224
225 filemap = [
226 (online_node_uuids, files_all),
227 (vm_node_uuids, files_vm),
228 ]
229
230 # Upload the files
231 for (node_uuids, files) in filemap:
232 for fname in files:
233 UploadHelper(lu, node_uuids, fname)
234
235
236 def ComputeAncillaryFiles(cluster, redist):
237 """Compute files external to Ganeti which need to be consistent.
238
239 @type redist: boolean
240 @param redist: Whether to include files which need to be redistributed
241
242 """
243 # Compute files for all nodes
244 files_all = set([
245 pathutils.SSH_KNOWN_HOSTS_FILE,
246 pathutils.CONFD_HMAC_KEY,
247 pathutils.CLUSTER_DOMAIN_SECRET_FILE,
248 pathutils.SPICE_CERT_FILE,
249 pathutils.SPICE_CACERT_FILE,
250 pathutils.RAPI_USERS_FILE,
251 ])
252
253 if redist:
254 # we need to ship at least the RAPI certificate
255 files_all.add(pathutils.RAPI_CERT_FILE)
256 else:
257 files_all.update(pathutils.ALL_CERT_FILES)
258 files_all.update(ssconf.SimpleStore().GetFileList())
259
260 if cluster.modify_etc_hosts:
261 files_all.add(pathutils.ETC_HOSTS)
262
263 if cluster.use_external_mip_script:
264 files_all.add(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
265
266 # Files which are optional, these must:
267 # - be present in one other category as well
268 # - either exist or not exist on all nodes of that category (mc, vm all)
269 files_opt = set([
270 pathutils.RAPI_USERS_FILE,
271 ])
272
273 # Files which should only be on master candidates
274 files_mc = set()
275
276 if not redist:
277 files_mc.add(pathutils.CLUSTER_CONF_FILE)
278
279 # File storage
280 if (not redist and (cluster.IsFileStorageEnabled() or
281 cluster.IsSharedFileStorageEnabled())):
282 files_all.add(pathutils.FILE_STORAGE_PATHS_FILE)
283 files_opt.add(pathutils.FILE_STORAGE_PATHS_FILE)
284
285 # Files which should only be on VM-capable nodes
286 files_vm = set(
287 filename
288 for hv_name in cluster.enabled_hypervisors
289 for filename in
290 hypervisor.GetHypervisorClass(hv_name).GetAncillaryFiles()[0])
291
292 files_opt |= set(
293 filename
294 for hv_name in cluster.enabled_hypervisors
295 for filename in
296 hypervisor.GetHypervisorClass(hv_name).GetAncillaryFiles()[1])
297
298 # Filenames in each category must be unique
299 all_files_set = files_all | files_mc | files_vm
300 assert (len(all_files_set) ==
301 sum(map(len, [files_all, files_mc, files_vm]))), \
302 "Found file listed in more than one file list"
303
304 # Optional files must be present in one other category
305 assert all_files_set.issuperset(files_opt), \
306 "Optional file not in a different required list"
307
308 # This one file should never ever be re-distributed via RPC
309 assert not (redist and
310 pathutils.FILE_STORAGE_PATHS_FILE in all_files_set)
311
312 return (files_all, files_opt, files_mc, files_vm)
313
314
315 def UploadHelper(lu, node_uuids, fname):
316 """Helper for uploading a file and showing warnings.
317
318 """
319 if os.path.exists(fname):
320 result = lu.rpc.call_upload_file(node_uuids, fname)
321 for to_node_uuids, to_result in result.items():
322 msg = to_result.fail_msg
323 if msg:
324 msg = ("Copy of file %s to node %s failed: %s" %
325 (fname, lu.cfg.GetNodeName(to_node_uuids), msg))
326 lu.LogWarning(msg)
327
328
329 def MergeAndVerifyHvState(op_input, obj_input):
330 """Combines the hv state from an opcode with the one of the object
331
332 @param op_input: The input dict from the opcode
333 @param obj_input: The input dict from the objects
334 @return: The verified and updated dict
335
336 """
337 if op_input:
338 invalid_hvs = set(op_input) - constants.HYPER_TYPES
339 if invalid_hvs:
340 raise errors.OpPrereqError("Invalid hypervisor(s) in hypervisor state:"
341 " %s" % utils.CommaJoin(invalid_hvs),
342 errors.ECODE_INVAL)
343 if obj_input is None:
344 obj_input = {}
345 type_check = constants.HVSTS_PARAMETER_TYPES
346 return _UpdateAndVerifySubDict(obj_input, op_input, type_check)
347
348 return None
349
350
351 def MergeAndVerifyDiskState(op_input, obj_input):
352 """Combines the disk state from an opcode with the one of the object
353
354 @param op_input: The input dict from the opcode
355 @param obj_input: The input dict from the objects
356 @return: The verified and updated dict
357 """
358 if op_input:
359 invalid_dst = set(op_input) - constants.DS_VALID_TYPES
360 if invalid_dst:
361 raise errors.OpPrereqError("Invalid storage type(s) in disk state: %s" %
362 utils.CommaJoin(invalid_dst),
363 errors.ECODE_INVAL)
364 type_check = constants.DSS_PARAMETER_TYPES
365 if obj_input is None:
366 obj_input = {}
367 return dict((key, _UpdateAndVerifySubDict(obj_input.get(key, {}), value,
368 type_check))
369 for key, value in op_input.items())
370
371 return None
372
373
374 def CheckOSParams(lu, required, node_uuids, osname, osparams, force_variant):
375 """OS parameters validation.
376
377 @type lu: L{LogicalUnit}
378 @param lu: the logical unit for which we check
379 @type required: boolean
380 @param required: whether the validation should fail if the OS is not
381 found
382 @type node_uuids: list
383 @param node_uuids: the list of nodes on which we should check
384 @type osname: string
385 @param osname: the name of the OS we should use
386 @type osparams: dict
387 @param osparams: the parameters which we need to check
388 @raise errors.OpPrereqError: if the parameters are not valid
389
390 """
391 node_uuids = _FilterVmNodes(lu, node_uuids)
392
393 # Last chance to unwrap private elements.
394 for key in osparams:
395 if isinstance(osparams[key], Private):
396 osparams[key] = osparams[key].Get()
397
398 if osname:
399 result = lu.rpc.call_os_validate(node_uuids, required, osname,
400 [constants.OS_VALIDATE_PARAMETERS],
401 osparams, force_variant)
402 for node_uuid, nres in result.items():
403 # we don't check for offline cases since this should be run only
404 # against the master node and/or an instance's nodes
405 nres.Raise("OS Parameters validation failed on node %s" %
406 lu.cfg.GetNodeName(node_uuid))
407 if not nres.payload:
408 lu.LogInfo("OS %s not found on node %s, validation skipped",
409 osname, lu.cfg.GetNodeName(node_uuid))
410
411
412 def CheckImageValidity(image, error_message):
413 """Checks if a given image description is either a valid file path or a URL.
414
415 @type image: string
416 @param image: An absolute path or URL, the assumed location of a disk image.
417 @type error_message: string
418 @param error_message: The error message to show if the image is not valid.
419
420 @raise errors.OpPrereqError: If the validation fails.
421
422 """
423 if image is not None and not (utils.IsUrl(image) or os.path.isabs(image)):
424 raise errors.OpPrereqError(error_message)
425
426
427 def CheckOSImage(op):
428 """Checks if the OS image in the OS parameters of an opcode is
429 valid.
430
431 This function can also be used in LUs as they carry an opcode.
432
433 @type op: L{opcodes.OpCode}
434 @param op: opcode containing the OS params
435
436 @rtype: string or NoneType
437 @return:
438 None if the OS parameters in the opcode do not contain the OS
439 image, otherwise the OS image value contained in the OS parameters
440 @raise errors.OpPrereqError: if OS image is not a URL or an absolute path
441
442 """
443 os_image = objects.GetOSImage(op.osparams)
444 CheckImageValidity(os_image, "OS image must be an absolute path or a URL")
445 return os_image
446
447
448 def CheckHVParams(lu, node_uuids, hvname, hvparams):
449 """Hypervisor parameter validation.
450
451 This function abstracts the hypervisor parameter validation to be
452 used in both instance create and instance modify.
453
454 @type lu: L{LogicalUnit}
455 @param lu: the logical unit for which we check
456 @type node_uuids: list
457 @param node_uuids: the list of nodes on which we should check
458 @type hvname: string
459 @param hvname: the name of the hypervisor we should use
460 @type hvparams: dict
461 @param hvparams: the parameters which we need to check
462 @raise errors.OpPrereqError: if the parameters are not valid
463
464 """
465 node_uuids = _FilterVmNodes(lu, node_uuids)
466
467 cluster = lu.cfg.GetClusterInfo()
468 hvfull = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
469
470 hvinfo = lu.rpc.call_hypervisor_validate_params(node_uuids, hvname, hvfull)
471 for node_uuid in node_uuids:
472 info = hvinfo[node_uuid]
473 if info.offline:
474 continue
475 info.Raise("Hypervisor parameter validation failed on node %s" %
476 lu.cfg.GetNodeName(node_uuid))
477
478
479 def AddMasterCandidateSshKey(
480 lu, master_node, node, potential_master_candidates, feedback_fn):
481 ssh_result = lu.rpc.call_node_ssh_key_add(
482 [master_node], node.uuid, node.name,
483 potential_master_candidates,
484 True, # add node's key to all node's 'authorized_keys'
485 True, # all nodes are potential master candidates
486 False) # do not update the node's public keys
487 ssh_result[master_node].Raise(
488 "Could not update the SSH setup of node '%s' after promotion"
489 " (UUID: %s)." % (node.name, node.uuid))
490 WarnAboutFailedSshUpdates(ssh_result, master_node, feedback_fn)
491
492
493 def AdjustCandidatePool(
494 lu, exceptions, master_node, potential_master_candidates, feedback_fn,
495 modify_ssh_setup):
496 """Adjust the candidate pool after node operations.
497
498 @type master_node: string
499 @param master_node: name of the master node
500 @type potential_master_candidates: list of string
501 @param potential_master_candidates: list of node names of potential master
502 candidates
503 @type feedback_fn: function
504 @param feedback_fn: function emitting user-visible output
505 @type modify_ssh_setup: boolean
506 @param modify_ssh_setup: whether or not the ssh setup can be modified.
507
508 """
509 mod_list = lu.cfg.MaintainCandidatePool(exceptions)
510 if mod_list:
511 lu.LogInfo("Promoted nodes to master candidate role: %s",
512 utils.CommaJoin(node.name for node in mod_list))
513 for node in mod_list:
514 lu.context.ReaddNode(node)
515 AddNodeCertToCandidateCerts(lu, lu.cfg, node.uuid)
516 if modify_ssh_setup:
517 AddMasterCandidateSshKey(
518 lu, master_node, node, potential_master_candidates, feedback_fn)
519
520 mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
521 if mc_now > mc_max:
522 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
523 (mc_now, mc_max))
524
525
526 def CheckNodePVs(nresult, exclusive_storage):
527 """Check node PVs.
528
529 """
530 pvlist_dict = nresult.get(constants.NV_PVLIST, None)
531 if pvlist_dict is None:
532 return (["Can't get PV list from node"], None)
533 pvlist = map(objects.LvmPvInfo.FromDict, pvlist_dict)
534 errlist = []
535 # check that ':' is not present in PV names, since it's a
536 # special character for lvcreate (denotes the range of PEs to
537 # use on the PV)
538 for pv in pvlist:
539 if ":" in pv.name:
540 errlist.append("Invalid character ':' in PV '%s' of VG '%s'" %
541 (pv.name, pv.vg_name))
542 es_pvinfo = None
543 if exclusive_storage:
544 (errmsgs, es_pvinfo) = utils.LvmExclusiveCheckNodePvs(pvlist)
545 errlist.extend(errmsgs)
546 shared_pvs = nresult.get(constants.NV_EXCLUSIVEPVS, None)
547 if shared_pvs:
548 for (pvname, lvlist) in shared_pvs:
549 # TODO: Check that LVs are really unrelated (snapshots, DRBD meta...)
550 errlist.append("PV %s is shared among unrelated LVs (%s)" %
551 (pvname, utils.CommaJoin(lvlist)))
552 return (errlist, es_pvinfo)
553
554
555 def _ComputeMinMaxSpec(name, qualifier, ispecs, value):
556 """Computes if value is in the desired range.
557
558 @param name: name of the parameter for which we perform the check
559 @param qualifier: a qualifier used in the error message (e.g. 'disk/1',
560 not just 'disk')
561 @param ispecs: dictionary containing min and max values
562 @param value: actual value that we want to use
563 @return: None or an error string
564
565 """
566 if value in [None, constants.VALUE_AUTO]:
567 return None
568 max_v = ispecs[constants.ISPECS_MAX].get(name, value)
569 min_v = ispecs[constants.ISPECS_MIN].get(name, value)
570 if value > max_v or min_v > value:
571 if qualifier:
572 fqn = "%s/%s" % (name, qualifier)
573 else:
574 fqn = name
575 return ("%s value %s is not in range [%s, %s]" %
576 (fqn, value, min_v, max_v))
577 return None
578
579
580 def ComputeIPolicySpecViolation(ipolicy, mem_size, cpu_count, disk_count,
581 nic_count, disk_sizes, spindle_use,
582 disk_types, _compute_fn=_ComputeMinMaxSpec):
583 """Verifies ipolicy against provided specs.
584
585 @type ipolicy: dict
586 @param ipolicy: The ipolicy
587 @type mem_size: int
588 @param mem_size: The memory size
589 @type cpu_count: int
590 @param cpu_count: Used cpu cores
591 @type disk_count: int
592 @param disk_count: Number of disks used
593 @type nic_count: int
594 @param nic_count: Number of nics used
595 @type disk_sizes: list of ints
596 @param disk_sizes: Disk sizes of used disk (len must match C{disk_count})
597 @type spindle_use: int
598 @param spindle_use: The number of spindles this instance uses
599 @type disk_types: list of strings
600 @param disk_types: The disk template of the instance
601 @param _compute_fn: The compute function (unittest only)
602 @return: A list of violations, or an empty list of no violations are found
603
604 """
605 assert disk_count == len(disk_sizes)
606 assert isinstance(disk_types, list)
607 assert disk_count == len(disk_types)
608
609 test_settings = [
610 (constants.ISPEC_MEM_SIZE, "", mem_size),
611 (constants.ISPEC_CPU_COUNT, "", cpu_count),
612 (constants.ISPEC_NIC_COUNT, "", nic_count),
613 (constants.ISPEC_SPINDLE_USE, "", spindle_use),
614 ] + [(constants.ISPEC_DISK_SIZE, str(idx), d)
615 for idx, d in enumerate(disk_sizes)]
616
617 allowed_dts = set(ipolicy[constants.IPOLICY_DTS])
618 ret = []
619 if disk_count != 0:
620 # This check doesn't make sense for diskless instances
621 test_settings.append((constants.ISPEC_DISK_COUNT, "", disk_count))
622 elif constants.DT_DISKLESS not in allowed_dts:
623 ret.append("Disk template %s is not allowed (allowed templates %s)" %
624 (constants.DT_DISKLESS, utils.CommaJoin(allowed_dts)))
625
626 forbidden_dts = set(disk_types) - allowed_dts
627 if forbidden_dts:
628 ret.append("Disk template %s is not allowed (allowed templates: %s)" %
629 (utils.CommaJoin(forbidden_dts), utils.CommaJoin(allowed_dts)))
630
631 min_errs = None
632 for minmax in ipolicy[constants.ISPECS_MINMAX]:
633 errs = filter(None,
634 (_compute_fn(name, qualifier, minmax, value)
635 for (name, qualifier, value) in test_settings))
636 if min_errs is None or len(errs) < len(min_errs):
637 min_errs = errs
638 assert min_errs is not None
639 return ret + min_errs
640
641
642 def ComputeIPolicyDiskSizesViolation(ipolicy, disk_sizes, disks,
643 _compute_fn=_ComputeMinMaxSpec):
644 """Verifies ipolicy against provided disk sizes.
645
646 No other specs except the disk sizes, the number of disks and the disk
647 template are checked.
648
649 @type ipolicy: dict
650 @param ipolicy: The ipolicy
651 @type disk_sizes: list of ints
652 @param disk_sizes: Disk sizes of used disk (len must match C{disk_count})
653 @type disks: list of L{Disk}
654 @param disks: The Disk objects of the instance
655 @param _compute_fn: The compute function (unittest only)
656 @return: A list of violations, or an empty list of no violations are found
657
658 """
659 if len(disk_sizes) != len(disks):
660 return [constants.ISPEC_DISK_COUNT]
661 dev_types = [d.dev_type for d in disks]
662 return ComputeIPolicySpecViolation(ipolicy,
663 # mem_size, cpu_count, disk_count
664 None, None, len(disk_sizes),
665 None, disk_sizes, # nic_count, disk_sizes
666 None, # spindle_use
667 dev_types,
668 _compute_fn=_compute_fn)
669
670
671 def ComputeIPolicyInstanceViolation(ipolicy, instance, cfg,
672 _compute_fn=ComputeIPolicySpecViolation):
673 """Compute if instance meets the specs of ipolicy.
674
675 @type ipolicy: dict
676 @param ipolicy: The ipolicy to verify against
677 @type instance: L{objects.Instance}
678 @param instance: The instance to verify
679 @type cfg: L{config.ConfigWriter}
680 @param cfg: Cluster configuration
681 @param _compute_fn: The function to verify ipolicy (unittest only)
682 @see: L{ComputeIPolicySpecViolation}
683
684 """
685 ret = []
686 be_full = cfg.GetClusterInfo().FillBE(instance)
687 mem_size = be_full[constants.BE_MAXMEM]
688 cpu_count = be_full[constants.BE_VCPUS]
689 inst_nodes = cfg.GetInstanceNodes(instance.uuid)
690 es_flags = rpc.GetExclusiveStorageForNodes(cfg, inst_nodes)
691 disks = cfg.GetInstanceDisks(instance.uuid)
692 if any(es_flags.values()):
693 # With exclusive storage use the actual spindles
694 try:
695 spindle_use = sum([disk.spindles for disk in disks])
696 except TypeError:
697 ret.append("Number of spindles not configured for disks of instance %s"
698 " while exclusive storage is enabled, try running gnt-cluster"
699 " repair-disk-sizes" % instance.name)
700 # _ComputeMinMaxSpec ignores 'None's
701 spindle_use = None
702 else:
703 spindle_use = be_full[constants.BE_SPINDLE_USE]
704 disk_count = len(disks)
705 disk_sizes = [disk.size for disk in disks]
706 nic_count = len(instance.nics)
707 disk_types = [d.dev_type for d in disks]
708
709 return ret + _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count,
710 disk_sizes, spindle_use, disk_types)
711
712
713 def _ComputeViolatingInstances(ipolicy, instances, cfg):
714 """Computes a set of instances who violates given ipolicy.
715
716 @param ipolicy: The ipolicy to verify
717 @type instances: L{objects.Instance}
718 @param instances: List of instances to verify
719 @type cfg: L{config.ConfigWriter}
720 @param cfg: Cluster configuration
721 @return: A frozenset of instance names violating the ipolicy
722
723 """
724 return frozenset([inst.name for inst in instances
725 if ComputeIPolicyInstanceViolation(ipolicy, inst, cfg)])
726
727
728 def ComputeNewInstanceViolations(old_ipolicy, new_ipolicy, instances, cfg):
729 """Computes a set of any instances that would violate the new ipolicy.
730
731 @param old_ipolicy: The current (still in-place) ipolicy
732 @param new_ipolicy: The new (to become) ipolicy
733 @param instances: List of instances to verify
734 @type cfg: L{config.ConfigWriter}
735 @param cfg: Cluster configuration
736 @return: A list of instances which violates the new ipolicy but
737 did not before
738
739 """
740 return (_ComputeViolatingInstances(new_ipolicy, instances, cfg) -
741 _ComputeViolatingInstances(old_ipolicy, instances, cfg))
742
743
744 def GetUpdatedParams(old_params, update_dict,
745 use_default=True, use_none=False):
746 """Return the new version of a parameter dictionary.
747
748 @type old_params: dict
749 @param old_params: old parameters
750 @type update_dict: dict
751 @param update_dict: dict containing new parameter values, or
752 constants.VALUE_DEFAULT to reset the parameter to its default
753 value
754 @param use_default: boolean
755 @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
756 values as 'to be deleted' values
757 @param use_none: boolean
758 @type use_none: whether to recognise C{None} values as 'to be
759 deleted' values
760 @rtype: dict
761 @return: the new parameter dictionary
762
763 """
764 params_copy = copy.deepcopy(old_params)
765 for key, val in update_dict.iteritems():
766 if ((use_default and val == constants.VALUE_DEFAULT) or
767 (use_none and val is None)):
768 try:
769 del params_copy[key]
770 except KeyError:
771 pass
772 else:
773 params_copy[key] = val
774 return params_copy
775
776
777 def GetUpdatedIPolicy(old_ipolicy, new_ipolicy, group_policy=False):
778 """Return the new version of an instance policy.
779
780 @param group_policy: whether this policy applies to a group and thus
781 we should support removal of policy entries
782
783 """
784 ipolicy = copy.deepcopy(old_ipolicy)
785 for key, value in new_ipolicy.items():
786 if key not in constants.IPOLICY_ALL_KEYS:
787 raise errors.OpPrereqError("Invalid key in new ipolicy: %s" % key,
788 errors.ECODE_INVAL)
789 if (not value or value == [constants.VALUE_DEFAULT] or
790 value == constants.VALUE_DEFAULT):
791 if group_policy:
792 if key in ipolicy:
793 del ipolicy[key]
794 else:
795 raise errors.OpPrereqError("Can't unset ipolicy attribute '%s'"
796 " on the cluster'" % key,
797 errors.ECODE_INVAL)
798 else:
799 if key in constants.IPOLICY_PARAMETERS:
800 # FIXME: we assume all such values are float
801 try:
802 ipolicy[key] = float(value)
803 except (TypeError, ValueError), err:
804 raise errors.OpPrereqError("Invalid value for attribute"
805 " '%s': '%s', error: %s" %
806 (key, value, err), errors.ECODE_INVAL)
807 elif key == constants.ISPECS_MINMAX:
808 for minmax in value:
809 for k in minmax.keys():
810 utils.ForceDictType(minmax[k], constants.ISPECS_PARAMETER_TYPES)
811 ipolicy[key] = value
812 elif key == constants.ISPECS_STD:
813 if group_policy:
814 msg = "%s cannot appear in group instance specs" % key
815 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
816 ipolicy[key] = GetUpdatedParams(old_ipolicy.get(key, {}), value,
817 use_none=False, use_default=False)
818 utils.ForceDictType(ipolicy[key], constants.ISPECS_PARAMETER_TYPES)
819 else:
820 # FIXME: we assume all others are lists; this should be redone
821 # in a nicer way
822 ipolicy[key] = list(value)
823 try:
824 objects.InstancePolicy.CheckParameterSyntax(ipolicy, not group_policy)
825 except errors.ConfigurationError, err:
826 raise errors.OpPrereqError("Invalid instance policy: %s" % err,
827 errors.ECODE_INVAL)
828 return ipolicy
829
830
831 def AnnotateDiskParams(instance, devs, cfg):
832 """Little helper wrapper to the rpc annotation method.
833
834 @param instance: The instance object
835 @type devs: List of L{objects.Disk}
836 @param devs: The root devices (not any of its children!)
837 @param cfg: The config object
838 @returns The annotated disk copies
839 @see L{ganeti.rpc.node.AnnotateDiskParams}
840
841 """
842 return rpc.AnnotateDiskParams(devs, cfg.GetInstanceDiskParams(instance))
843
844
845 def SupportsOob(cfg, node):
846 """Tells if node supports OOB.
847
848 @type cfg: L{config.ConfigWriter}
849 @param cfg: The cluster configuration
850 @type node: L{objects.Node}
851 @param node: The node
852 @return: The OOB script if supported or an empty string otherwise
853
854 """
855 return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
856
857
858 def _UpdateAndVerifySubDict(base, updates, type_check):
859 """Updates and verifies a dict with sub dicts of the same type.
860
861 @param base: The dict with the old data
862 @param updates: The dict with the new data
863 @param type_check: Dict suitable to ForceDictType to verify correct types
864 @returns: A new dict with updated and verified values
865
866 """
867 def fn(old, value):
868 new = GetUpdatedParams(old, value)
869 utils.ForceDictType(new, type_check)
870 return new
871
872 ret = copy.deepcopy(base)
873 ret.update(dict((key, fn(base.get(key, {}), value))
874 for key, value in updates.items()))
875 return ret
876
877
878 def _FilterVmNodes(lu, node_uuids):
879 """Filters out non-vm_capable nodes from a list.
880
881 @type lu: L{LogicalUnit}
882 @param lu: the logical unit for which we check
883 @type node_uuids: list
884 @param node_uuids: the list of nodes on which we should check
885 @rtype: list
886 @return: the list of vm-capable nodes
887
888 """
889 vm_nodes = frozenset(lu.cfg.GetNonVmCapableNodeList())
890 return [uuid for uuid in node_uuids if uuid not in vm_nodes]
891
892
893 def GetDefaultIAllocator(cfg, ialloc):
894 """Decides on which iallocator to use.
895
896 @type cfg: L{config.ConfigWriter}
897 @param cfg: Cluster configuration object
898 @type ialloc: string or None
899 @param ialloc: Iallocator specified in opcode
900 @rtype: string
901 @return: Iallocator name
902
903 """
904 if not ialloc:
905 # Use default iallocator
906 ialloc = cfg.GetDefaultIAllocator()
907
908 if not ialloc:
909 raise errors.OpPrereqError("No iallocator was specified, neither in the"
910 " opcode nor as a cluster-wide default",
911 errors.ECODE_INVAL)
912
913 return ialloc
914
915
916 def CheckInstancesNodeGroups(cfg, instances, owned_groups, owned_node_uuids,
917 cur_group_uuid):
918 """Checks if node groups for locked instances are still correct.
919
920 @type cfg: L{config.ConfigWriter}
921 @param cfg: Cluster configuration
922 @type instances: dict; string as key, L{objects.Instance} as value
923 @param instances: Dictionary, instance UUID as key, instance object as value
924 @type owned_groups: iterable of string
925 @param owned_groups: List of owned groups
926 @type owned_node_uuids: iterable of string
927 @param owned_node_uuids: List of owned nodes
928 @type cur_group_uuid: string or None
929 @param cur_group_uuid: Optional group UUID to check against instance's groups
930
931 """
932 for (uuid, inst) in instances.items():
933 inst_nodes = cfg.GetInstanceNodes(inst.uuid)
934 assert owned_node_uuids.issuperset(inst_nodes), \
935 "Instance %s's nodes changed while we kept the lock" % inst.name
936
937 inst_groups = CheckInstanceNodeGroups(cfg, uuid, owned_groups)
938
939 assert cur_group_uuid is None or cur_group_uuid in inst_groups, \
940 "Instance %s has no node in group %s" % (inst.name, cur_group_uuid)
941
942
943 def CheckInstanceNodeGroups(cfg, inst_uuid, owned_groups, primary_only=False):
944 """Checks if the owned node groups are still correct for an instance.
945
946 @type cfg: L{config.ConfigWriter}
947 @param cfg: The cluster configuration
948 @type inst_uuid: string
949 @param inst_uuid: Instance UUID
950 @type owned_groups: set or frozenset
951 @param owned_groups: List of currently owned node groups
952 @type primary_only: boolean
953 @param primary_only: Whether to check node groups for only the primary node
954
955 """
956 inst_groups = cfg.GetInstanceNodeGroups(inst_uuid, primary_only)
957
958 if not owned_groups.issuperset(inst_groups):
959 raise errors.OpPrereqError("Instance %s's node groups changed since"
960 " locks were acquired, current groups are"
961 " are '%s', owning groups '%s'; retry the"
962 " operation" %
963 (cfg.GetInstanceName(inst_uuid),
964 utils.CommaJoin(inst_groups),
965 utils.CommaJoin(owned_groups)),
966 errors.ECODE_STATE)
967
968 return inst_groups
969
970
971 def LoadNodeEvacResult(lu, alloc_result, early_release, use_nodes):
972 """Unpacks the result of change-group and node-evacuate iallocator requests.
973
974 Iallocator modes L{constants.IALLOCATOR_MODE_NODE_EVAC} and
975 L{constants.IALLOCATOR_MODE_CHG_GROUP}.
976
977 @type lu: L{LogicalUnit}
978 @param lu: Logical unit instance
979 @type alloc_result: tuple/list
980 @param alloc_result: Result from iallocator
981 @type early_release: bool
982 @param early_release: Whether to release locks early if possible
983 @type use_nodes: bool
984 @param use_nodes: Whether to display node names instead of groups
985
986 """
987 (moved, failed, jobs) = alloc_result
988
989 if failed:
990 failreason = utils.CommaJoin("%s (%s)" % (name, reason)
991 for (name, reason) in failed)
992 lu.LogWarning("Unable to evacuate instances %s", failreason)
993 raise errors.OpExecError("Unable to evacuate instances %s" % failreason)
994
995 if moved:
996 lu.LogInfo("Instances to be moved: %s",
997 utils.CommaJoin(
998 "%s (to %s)" %
999 (name, _NodeEvacDest(use_nodes, group, node_names))
1000 for (name, group, node_names) in moved))
1001
1002 return [map(compat.partial(_SetOpEarlyRelease, early_release),
1003 map(opcodes.OpCode.LoadOpCode, ops))
1004 for ops in jobs]
1005
1006
1007 def _NodeEvacDest(use_nodes, group, node_names):
1008 """Returns group or nodes depending on caller's choice.
1009
1010 """
1011 if use_nodes:
1012 return utils.CommaJoin(node_names)
1013 else:
1014 return group
1015
1016
1017 def _SetOpEarlyRelease(early_release, op):
1018 """Sets C{early_release} flag on opcodes if available.
1019
1020 """
1021 try:
1022 op.early_release = early_release
1023 except AttributeError:
1024 assert not isinstance(op, opcodes.OpInstanceReplaceDisks)
1025
1026 return op
1027
1028
1029 def MapInstanceLvsToNodes(cfg, instances):
1030 """Creates a map from (node, volume) to instance name.
1031
1032 @type cfg: L{config.ConfigWriter}
1033 @param cfg: The cluster configuration
1034 @type instances: list of L{objects.Instance}
1035 @rtype: dict; tuple of (node uuid, volume name) as key, L{objects.Instance}
1036 object as value
1037
1038 """
1039 return dict(
1040 ((node_uuid, vol), inst)
1041 for inst in instances
1042 for (node_uuid, vols) in cfg.GetInstanceLVsByNode(inst.uuid).items()
1043 for vol in vols)
1044
1045
1046 def CheckParamsNotGlobal(params, glob_pars, kind, bad_levels, good_levels):
1047 """Make sure that none of the given paramters is global.
1048
1049 If a global parameter is found, an L{errors.OpPrereqError} exception is
1050 raised. This is used to avoid setting global parameters for individual nodes.
1051
1052 @type params: dictionary
1053 @param params: Parameters to check
1054 @type glob_pars: dictionary
1055 @param glob_pars: Forbidden parameters
1056 @type kind: string
1057 @param kind: Kind of parameters (e.g. "node")
1058 @type bad_levels: string
1059 @param bad_levels: Level(s) at which the parameters are forbidden (e.g.
1060 "instance")
1061 @type good_levels: strings
1062 @param good_levels: Level(s) at which the parameters are allowed (e.g.
1063 "cluster or group")
1064
1065 """
1066 used_globals = glob_pars.intersection(params)
1067 if used_globals:
1068 msg = ("The following %s parameters are global and cannot"
1069 " be customized at %s level, please modify them at"
1070 " %s level: %s" %
1071 (kind, bad_levels, good_levels, utils.CommaJoin(used_globals)))
1072 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
1073
1074
1075 def IsExclusiveStorageEnabledNode(cfg, node):
1076 """Whether exclusive_storage is in effect for the given node.
1077
1078 @type cfg: L{config.ConfigWriter}
1079 @param cfg: The cluster configuration
1080 @type node: L{objects.Node}
1081 @param node: The node
1082 @rtype: bool
1083 @return: The effective value of exclusive_storage
1084
1085 """
1086 return cfg.GetNdParams(node)[constants.ND_EXCLUSIVE_STORAGE]
1087
1088
1089 def IsInstanceRunning(lu, instance, prereq=True):
1090 """Given an instance object, checks if the instance is running.
1091
1092 This function asks the backend whether the instance is running and
1093 user shutdown instances are considered not to be running.
1094
1095 @type lu: L{LogicalUnit}
1096 @param lu: LU on behalf of which we make the check
1097
1098 @type instance: L{objects.Instance}
1099 @param instance: instance to check whether it is running
1100
1101 @rtype: bool
1102 @return: 'True' if the instance is running, 'False' otherwise
1103
1104 """
1105 hvparams = lu.cfg.GetClusterInfo().FillHV(instance)
1106 result = lu.rpc.call_instance_info(instance.primary_node, instance.name,
1107 instance.hypervisor, hvparams)
1108 # TODO: This 'prepreq=True' is a problem if this function is called
1109 # within the 'Exec' method of a LU.
1110 result.Raise("Can't retrieve instance information for instance '%s'" %
1111 instance.name, prereq=prereq, ecode=errors.ECODE_ENVIRON)
1112
1113 return result.payload and \
1114 "state" in result.payload and \
1115 (result.payload["state"] != hypervisor.hv_base.HvInstanceState.SHUTDOWN)
1116
1117
1118 def CheckInstanceState(lu, instance, req_states, msg=None):
1119 """Ensure that an instance is in one of the required states.
1120
1121 @param lu: the LU on behalf of which we make the check
1122 @param instance: the instance to check
1123 @param msg: if passed, should be a message to replace the default one
1124 @raise errors.OpPrereqError: if the instance is not in the required state
1125
1126 """
1127 if msg is None:
1128 msg = ("can't use instance from outside %s states" %
1129 utils.CommaJoin(req_states))
1130 if instance.admin_state not in req_states:
1131 raise errors.OpPrereqError("Instance '%s' is marked to be %s, %s" %
1132 (instance.name, instance.admin_state, msg),
1133 errors.ECODE_STATE)
1134
1135 if constants.ADMINST_UP not in req_states:
1136 pnode_uuid = instance.primary_node
1137 # Replicating the offline check
1138 if not lu.cfg.GetNodeInfo(pnode_uuid).offline:
1139 if IsInstanceRunning(lu, instance):
1140 raise errors.OpPrereqError("Instance %s is running, %s" %
1141 (instance.name, msg), errors.ECODE_STATE)
1142 else:
1143 lu.LogWarning("Primary node offline, ignoring check that instance"
1144 " is down")
1145
1146
1147 def CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
1148 """Check the sanity of iallocator and node arguments and use the
1149 cluster-wide iallocator if appropriate.
1150
1151 Check that at most one of (iallocator, node) is specified. If none is
1152 specified, or the iallocator is L{constants.DEFAULT_IALLOCATOR_SHORTCUT},
1153 then the LU's opcode's iallocator slot is filled with the cluster-wide
1154 default iallocator.
1155
1156 @type iallocator_slot: string
1157 @param iallocator_slot: the name of the opcode iallocator slot
1158 @type node_slot: string
1159 @param node_slot: the name of the opcode target node slot
1160
1161 """
1162 node = getattr(lu.op, node_slot, None)
1163 ialloc = getattr(lu.op, iallocator_slot, None)
1164 if node == []:
1165 node = None
1166
1167 if node is not None and ialloc is not None:
1168 raise errors.OpPrereqError("Do not specify both, iallocator and node",
1169 errors.ECODE_INVAL)
1170 elif ((node is None and ialloc is None) or
1171 ialloc == constants.DEFAULT_IALLOCATOR_SHORTCUT):
1172 default_iallocator = lu.cfg.GetDefaultIAllocator()
1173 if default_iallocator:
1174 setattr(lu.op, iallocator_slot, default_iallocator)
1175 else:
1176 raise errors.OpPrereqError("No iallocator or node given and no"
1177 " cluster-wide default iallocator found;"
1178 " please specify either an iallocator or a"
1179 " node, or set a cluster-wide default"
1180 " iallocator", errors.ECODE_INVAL)
1181
1182
1183 def FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_uuid, prereq):
1184 faulty = []
1185
1186 disks = cfg.GetInstanceDisks(instance.uuid)
1187 result = rpc_runner.call_blockdev_getmirrorstatus(
1188 node_uuid, (disks, instance))
1189 result.Raise("Failed to get disk status from node %s" %
1190 cfg.GetNodeName(node_uuid),
1191 prereq=prereq, ecode=errors.ECODE_ENVIRON)
1192
1193 for idx, bdev_status in enumerate(result.payload):
1194 if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
1195 faulty.append(idx)
1196
1197 return faulty
1198
1199
1200 def CheckNodeOnline(lu, node_uuid, msg=None):
1201 """Ensure that a given node is online.
1202
1203 @param lu: the LU on behalf of which we make the check
1204 @param node_uuid: the node to check
1205 @param msg: if passed, should be a message to replace the default one
1206 @raise errors.OpPrereqError: if the node is offline
1207
1208 """
1209 if msg is None:
1210 msg = "Can't use offline node"
1211 if lu.cfg.GetNodeInfo(node_uuid).offline:
1212 raise errors.OpPrereqError("%s: %s" % (msg, lu.cfg.GetNodeName(node_uuid)),
1213 errors.ECODE_STATE)
1214
1215
1216 def CheckDiskTemplateEnabled(cluster, disk_template):
1217 """Helper function to check if a disk template is enabled.
1218
1219 @type cluster: C{objects.Cluster}
1220 @param cluster: the cluster's configuration
1221 @type disk_template: str
1222 @param disk_template: the disk template to be checked
1223
1224 """
1225 assert disk_template is not None
1226 if disk_template not in constants.DISK_TEMPLATES:
1227 raise errors.OpPrereqError("'%s' is not a valid disk template."
1228 " Valid disk templates are: %s" %
1229 (disk_template,
1230 ",".join(constants.DISK_TEMPLATES)))
1231 if not disk_template in cluster.enabled_disk_templates:
1232 raise errors.OpPrereqError("Disk template '%s' is not enabled in cluster."
1233 " Enabled disk templates are: %s" %
1234 (disk_template,
1235 ",".join(cluster.enabled_disk_templates)))
1236
1237
1238 def CheckStorageTypeEnabled(cluster, storage_type):
1239 """Helper function to check if a storage type is enabled.
1240
1241 @type cluster: C{objects.Cluster}
1242 @param cluster: the cluster's configuration
1243 @type storage_type: str
1244 @param storage_type: the storage type to be checked
1245
1246 """
1247 assert storage_type is not None
1248 assert storage_type in constants.STORAGE_TYPES
1249 # special case for lvm-pv, because it cannot be enabled
1250 # via disk templates
1251 if storage_type == constants.ST_LVM_PV:
1252 CheckStorageTypeEnabled(cluster, constants.ST_LVM_VG)
1253 else:
1254 possible_disk_templates = \
1255 utils.storage.GetDiskTemplatesOfStorageTypes(storage_type)
1256 for disk_template in possible_disk_templates:
1257 if disk_template in cluster.enabled_disk_templates:
1258 return
1259 raise errors.OpPrereqError("No disk template of storage type '%s' is"
1260 " enabled in this cluster. Enabled disk"
1261 " templates are: %s" % (storage_type,
1262 ",".join(cluster.enabled_disk_templates)))
1263
1264
1265 def CheckIpolicyVsDiskTemplates(ipolicy, enabled_disk_templates):
1266 """Checks ipolicy disk templates against enabled disk tempaltes.
1267
1268 @type ipolicy: dict
1269 @param ipolicy: the new ipolicy
1270 @type enabled_disk_templates: list of string
1271 @param enabled_disk_templates: list of enabled disk templates on the
1272 cluster
1273 @raises errors.OpPrereqError: if there is at least one allowed disk
1274 template that is not also enabled.
1275
1276 """
1277 assert constants.IPOLICY_DTS in ipolicy
1278 allowed_disk_templates = ipolicy[constants.IPOLICY_DTS]
1279 not_enabled = set(allowed_disk_templates) - set(enabled_disk_templates)
1280 if not_enabled:
1281 raise errors.OpPrereqError("The following disk templates are allowed"
1282 " by the ipolicy, but not enabled on the"
1283 " cluster: %s" % utils.CommaJoin(not_enabled),
1284 errors.ECODE_INVAL)
1285
1286
1287 def CheckDiskAccessModeValidity(parameters):
1288 """Checks if the access parameter is legal.
1289
1290 @see: L{CheckDiskAccessModeConsistency} for cluster consistency checks.
1291 @raise errors.OpPrereqError: if the check fails.
1292
1293 """
1294 for disk_template in parameters:
1295 access = parameters[disk_template].get(constants.LDP_ACCESS,
1296 constants.DISK_KERNELSPACE)
1297 if access not in constants.DISK_VALID_ACCESS_MODES:
1298 valid_vals_str = utils.CommaJoin(constants.DISK_VALID_ACCESS_MODES)
1299 raise errors.OpPrereqError("Invalid value of '{d}:{a}': '{v}' (expected"
1300 " one of {o})".format(d=disk_template,
1301 a=constants.LDP_ACCESS,
1302 v=access,
1303 o=valid_vals_str))
1304
1305
1306 def CheckDiskAccessModeConsistency(parameters, cfg, group=None):
1307 """Checks if the access param is consistent with the cluster configuration.
1308
1309 @note: requires a configuration lock to run.
1310 @param parameters: the parameters to validate
1311 @param cfg: the cfg object of the cluster
1312 @param group: if set, only check for consistency within this group.
1313 @raise errors.OpPrereqError: if the LU attempts to change the access parameter
1314 to an invalid value, such as "pink bunny".
1315 @raise errors.OpPrereqError: if the LU attempts to change the access parameter
1316 to an inconsistent value, such as asking for RBD
1317 userspace access to the chroot hypervisor.
1318
1319 """
1320 CheckDiskAccessModeValidity(parameters)
1321
1322 for disk_template in parameters:
1323 access = parameters[disk_template].get(constants.LDP_ACCESS,
1324 constants.DISK_KERNELSPACE)
1325
1326 if disk_template not in constants.DTS_HAVE_ACCESS:
1327 continue
1328
1329 #Check the combination of instance hypervisor, disk template and access
1330 #protocol is sane.
1331 inst_uuids = cfg.GetNodeGroupInstances(group) if group else \
1332 cfg.GetInstanceList()
1333
1334 for entry in inst_uuids:
1335 inst = cfg.GetInstanceInfo(entry)
1336 disks = cfg.GetInstanceDisks(entry)
1337 for disk in disks:
1338
1339 if disk.dev_type != disk_template:
1340 continue
1341
1342 hv = inst.hypervisor
1343
1344 if not IsValidDiskAccessModeCombination(hv, disk.dev_type, access):
1345 raise errors.OpPrereqError("Instance {i}: cannot use '{a}' access"
1346 " setting with {h} hypervisor and {d} disk"
1347 " type.".format(i=inst.name,
1348 a=access,
1349 h=hv,
1350 d=disk.dev_type))
1351
1352
1353 def IsValidDiskAccessModeCombination(hv, disk_template, mode):
1354 """Checks if an hypervisor can read a disk template with given mode.
1355
1356 @param hv: the hypervisor that will access the data
1357 @param disk_template: the disk template the data is stored as
1358 @param mode: how the hypervisor should access the data
1359 @return: True if the hypervisor can read a given read disk_template
1360 in the specified mode.
1361
1362 """
1363 if mode == constants.DISK_KERNELSPACE:
1364 return True
1365
1366 if (hv == constants.HT_KVM and
1367 disk_template in constants.DTS_HAVE_ACCESS and
1368 mode == constants.DISK_USERSPACE):
1369 return True
1370
1371 # Everything else:
1372 return False
1373
1374
1375 def AddNodeCertToCandidateCerts(lu, cfg, node_uuid):
1376 """Add the node's client SSL certificate digest to the candidate certs.
1377
1378 @type lu: L{LogicalUnit}
1379 @param lu: the logical unit
1380 @type cfg: L{ConfigWriter}
1381 @param cfg: the configuration client to use
1382 @type node_uuid: string
1383 @param node_uuid: the node's UUID
1384
1385 """
1386 result = lu.rpc.call_node_crypto_tokens(
1387 node_uuid,
1388 [(constants.CRYPTO_TYPE_SSL_DIGEST, constants.CRYPTO_ACTION_GET,
1389 None)])
1390 result.Raise("Could not retrieve the node's (uuid %s) SSL digest."
1391 % node_uuid)
1392 ((crypto_type, digest), ) = result.payload
1393 assert crypto_type == constants.CRYPTO_TYPE_SSL_DIGEST
1394
1395 cfg.AddNodeToCandidateCerts(node_uuid, digest)
1396
1397
1398 def RemoveNodeCertFromCandidateCerts(cfg, node_uuid):
1399 """Removes the node's certificate from the candidate certificates list.
1400
1401 @type cfg: C{config.ConfigWriter}
1402 @param cfg: the cluster's configuration
1403 @type node_uuid: string
1404 @param node_uuid: the node's UUID
1405
1406 """
1407 cfg.RemoveNodeFromCandidateCerts(node_uuid)
1408
1409
1410 def GetClientCertDigest(lu, node_uuid, filename=None):
1411 """Get the client SSL certificate digest for the node.
1412
1413 @type node_uuid: string
1414 @param node_uuid: the node's UUID
1415 @type filename: string
1416 @param filename: the certificate's filename
1417 @rtype: string
1418 @return: the digest of the newly created certificate
1419
1420 """
1421 options = {}
1422 if filename:
1423 options[constants.CRYPTO_OPTION_CERT_FILE] = filename
1424 result = lu.rpc.call_node_crypto_tokens(
1425 node_uuid,
1426 [(constants.CRYPTO_TYPE_SSL_DIGEST,
1427 constants.CRYPTO_ACTION_GET,
1428 options)])
1429 result.Raise("Could not fetch the node's (uuid %s) SSL client"
1430 " certificate." % node_uuid)
1431 ((crypto_type, new_digest), ) = result.payload
1432 assert crypto_type == constants.CRYPTO_TYPE_SSL_DIGEST
1433 return new_digest
1434
1435
1436 def AddInstanceCommunicationNetworkOp(network):
1437 """Create an OpCode that adds the instance communication network.
1438
1439 This OpCode contains the configuration necessary for the instance
1440 communication network.
1441
1442 @type network: string
1443 @param network: name or UUID of the instance communication network
1444
1445 @rtype: L{ganeti.opcodes.OpCode}
1446 @return: OpCode that creates the instance communication network
1447
1448 """
1449 return opcodes.OpNetworkAdd(
1450 network_name=network,
1451 gateway=None,
1452 network=constants.INSTANCE_COMMUNICATION_NETWORK4,
1453 gateway6=None,
1454 network6=constants.INSTANCE_COMMUNICATION_NETWORK6,
1455 mac_prefix=constants.INSTANCE_COMMUNICATION_MAC_PREFIX,
1456 add_reserved_ips=None,
1457 conflicts_check=True,
1458 tags=[])
1459
1460
1461 def ConnectInstanceCommunicationNetworkOp(group_uuid, network):
1462 """Create an OpCode that connects a group to the instance
1463 communication network.
1464
1465 This OpCode contains the configuration necessary for the instance
1466 communication network.
1467
1468 @type group_uuid: string
1469 @param group_uuid: UUID of the group to connect
1470
1471 @type network: string
1472 @param network: name or UUID of the network to connect to, i.e., the
1473 instance communication network
1474
1475 @rtype: L{ganeti.opcodes.OpCode}
1476 @return: OpCode that connects the group to the instance
1477 communication network
1478
1479 """
1480 return opcodes.OpNetworkConnect(
1481 group_name=group_uuid,
1482 network_name=network,
1483 network_mode=constants.INSTANCE_COMMUNICATION_NETWORK_MODE,
1484 network_link=constants.INSTANCE_COMMUNICATION_NETWORK_LINK,
1485 conflicts_check=True)
1486
1487
1488 def DetermineImageSize(lu, image, node_uuid):
1489 """Determines the size of the specified image.
1490
1491 @type image: string
1492 @param image: absolute filepath or URL of the image
1493
1494 @type node_uuid: string
1495 @param node_uuid: if L{image} is a filepath, this is the UUID of the
1496 node where the image is located
1497
1498 @rtype: int
1499 @return: size of the image in MB, rounded up
1500 @raise OpExecError: if the image does not exist
1501
1502 """
1503 # Check if we are dealing with a URL first
1504 class _HeadRequest(urllib2.Request):
1505 def get_method(self):
1506 return "HEAD"
1507
1508 if utils.IsUrl(image):
1509 try:
1510 response = urllib2.urlopen(_HeadRequest(image))
1511 except urllib2.URLError:
1512 raise errors.OpExecError("Could not retrieve image from given url '%s'" %
1513 image)
1514
1515 content_length_str = response.info().getheader('content-length')
1516
1517 if not content_length_str:
1518 raise errors.OpExecError("Could not determine image size from given url"
1519 " '%s'" % image)
1520
1521 byte_size = int(content_length_str)
1522 else:
1523 # We end up here if a file path is used
1524 result = lu.rpc.call_get_file_info(node_uuid, image)
1525 result.Raise("Could not determine size of file '%s'" % image)
1526
1527 success, attributes = result.payload
1528 if not success:
1529 raise errors.OpExecError("Could not open file '%s'" % image)
1530 byte_size = attributes[constants.STAT_SIZE]
1531
1532 # Finally, the conversion
1533 return math.ceil(byte_size / 1024. / 1024.)
1534
1535
1536 def EnsureKvmdOnNodes(lu, feedback_fn, nodes=None):
1537 """Ensure KVM daemon is running on nodes with KVM instances.
1538
1539 If user shutdown is enabled in the cluster:
1540 - The KVM daemon will be started on VM capable nodes containing
1541 KVM instances.
1542 - The KVM daemon will be stopped on non VM capable nodes.
1543
1544 If user shutdown is disabled in the cluster:
1545 - The KVM daemon will be stopped on all nodes
1546
1547 Issues a warning for each failed RPC call.
1548
1549 @type lu: L{LogicalUnit}
1550 @param lu: logical unit on whose behalf we execute
1551
1552 @type feedback_fn: callable
1553 @param feedback_fn: feedback function
1554
1555 @type nodes: list of string
1556 @param nodes: if supplied, it overrides the node uuids to start/stop;
1557 this is used mainly for optimization
1558
1559 """
1560 cluster = lu.cfg.GetClusterInfo()
1561
1562 # Either use the passed nodes or consider all cluster nodes
1563 if nodes is not None:
1564 node_uuids = set(nodes)
1565 else:
1566 node_uuids = lu.cfg.GetNodeList()
1567
1568 # Determine in which nodes should the KVM daemon be started/stopped
1569 if constants.HT_KVM in cluster.enabled_hypervisors and \
1570 cluster.enabled_user_shutdown:
1571 start_nodes = []
1572 stop_nodes = []
1573
1574 for node_uuid in node_uuids:
1575 if lu.cfg.GetNodeInfo(node_uuid).vm_capable:
1576 start_nodes.append(node_uuid)
1577 else:
1578 stop_nodes.append(node_uuid)
1579 else:
1580 start_nodes = []
1581 stop_nodes = node_uuids
1582
1583 # Start KVM where necessary
1584 if start_nodes:
1585 results = lu.rpc.call_node_ensure_daemon(start_nodes, constants.KVMD, True)
1586 for node_uuid in start_nodes:
1587 results[node_uuid].Warn("Failed to start KVM daemon in node '%s'" %
1588 node_uuid, feedback_fn)
1589
1590 # Stop KVM where necessary
1591 if stop_nodes:
1592 results = lu.rpc.call_node_ensure_daemon(stop_nodes, constants.KVMD, False)
1593 for node_uuid in stop_nodes:
1594 results[node_uuid].Warn("Failed to stop KVM daemon in node '%s'" %
1595 node_uuid, feedback_fn)
1596
1597
1598 def WarnAboutFailedSshUpdates(result, master_uuid, feedback_fn):
1599 node_errors = result[master_uuid].payload
1600 if node_errors:
1601 feedback_fn("Some nodes' SSH key files could not be updated:")
1602 for node_name, error_msg in node_errors:
1603 feedback_fn("%s: %s" % (node_name, error_msg))