Merge branch 'stable-2.16' into stable-2.17
[ganeti-github.git] / lib / cmdlib / common.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Google Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions are
9 # met:
10 #
11 # 1. Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
13 #
14 # 2. Redistributions in binary form must reproduce the above copyright
15 # notice, this list of conditions and the following disclaimer in the
16 # documentation and/or other materials provided with the distribution.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
19 # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
20 # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30
31 """Common functions used by multiple logical units."""
32
33 import copy
34 import math
35 import os
36 import urllib2
37
38 from ganeti import constants
39 from ganeti import errors
40 from ganeti import hypervisor
41 from ganeti import locking
42 from ganeti import objects
43 from ganeti import opcodes
44 from ganeti import pathutils
45 import ganeti.rpc.node as rpc
46 from ganeti.serializer import Private
47 from ganeti import ssconf
48 from ganeti import utils
49
50
51 # States of instance
52 INSTANCE_DOWN = [constants.ADMINST_DOWN]
53 INSTANCE_ONLINE = [constants.ADMINST_DOWN, constants.ADMINST_UP]
54 INSTANCE_NOT_RUNNING = [constants.ADMINST_DOWN, constants.ADMINST_OFFLINE]
55
56
57 def _ExpandItemName(expand_fn, name, kind):
58 """Expand an item name.
59
60 @param expand_fn: the function to use for expansion
61 @param name: requested item name
62 @param kind: text description ('Node' or 'Instance')
63 @return: the result of the expand_fn, if successful
64 @raise errors.OpPrereqError: if the item is not found
65
66 """
67 (uuid, full_name) = expand_fn(name)
68 if uuid is None or full_name is None:
69 raise errors.OpPrereqError("%s '%s' not known" % (kind, name),
70 errors.ECODE_NOENT)
71 return (uuid, full_name)
72
73
74 def ExpandInstanceUuidAndName(cfg, expected_uuid, name):
75 """Wrapper over L{_ExpandItemName} for instance."""
76 (uuid, full_name) = _ExpandItemName(cfg.ExpandInstanceName, name, "Instance")
77 if expected_uuid is not None and uuid != expected_uuid:
78 raise errors.OpPrereqError(
79 "The instances UUID '%s' does not match the expected UUID '%s' for"
80 " instance '%s'. Maybe the instance changed since you submitted this"
81 " job." % (uuid, expected_uuid, full_name), errors.ECODE_NOTUNIQUE)
82 return (uuid, full_name)
83
84
85 def ExpandNodeUuidAndName(cfg, expected_uuid, name):
86 """Expand a short node name into the node UUID and full name.
87
88 @type cfg: L{config.ConfigWriter}
89 @param cfg: The cluster configuration
90 @type expected_uuid: string
91 @param expected_uuid: expected UUID for the node (or None if there is no
92 expectation). If it does not match, a L{errors.OpPrereqError} is
93 raised.
94 @type name: string
95 @param name: the short node name
96
97 """
98 (uuid, full_name) = _ExpandItemName(cfg.ExpandNodeName, name, "Node")
99 if expected_uuid is not None and uuid != expected_uuid:
100 raise errors.OpPrereqError(
101 "The nodes UUID '%s' does not match the expected UUID '%s' for node"
102 " '%s'. Maybe the node changed since you submitted this job." %
103 (uuid, expected_uuid, full_name), errors.ECODE_NOTUNIQUE)
104 return (uuid, full_name)
105
106
107 def ShareAll():
108 """Returns a dict declaring all lock levels shared.
109
110 """
111 return dict.fromkeys(locking.LEVELS, 1)
112
113
114 def CheckNodeGroupInstances(cfg, group_uuid, owned_instance_names):
115 """Checks if the instances in a node group are still correct.
116
117 @type cfg: L{config.ConfigWriter}
118 @param cfg: The cluster configuration
119 @type group_uuid: string
120 @param group_uuid: Node group UUID
121 @type owned_instance_names: set or frozenset
122 @param owned_instance_names: List of currently owned instances
123
124 """
125 wanted_instances = frozenset(cfg.GetInstanceNames(
126 cfg.GetNodeGroupInstances(group_uuid)))
127 if owned_instance_names != wanted_instances:
128 group_name = cfg.GetNodeGroup(group_uuid).name
129 raise errors.OpPrereqError("Instances in node group '%s' changed since"
130 " locks were acquired, wanted '%s', have '%s';"
131 " retry the operation" %
132 (group_name,
133 utils.CommaJoin(wanted_instances),
134 utils.CommaJoin(owned_instance_names)),
135 errors.ECODE_STATE)
136
137 return wanted_instances
138
139
140 def GetWantedNodes(lu, short_node_names):
141 """Returns list of checked and expanded node names.
142
143 @type lu: L{LogicalUnit}
144 @param lu: the logical unit on whose behalf we execute
145 @type short_node_names: list
146 @param short_node_names: list of node names or None for all nodes
147 @rtype: tuple of lists
148 @return: tupe with (list of node UUIDs, list of node names)
149 @raise errors.ProgrammerError: if the nodes parameter is wrong type
150
151 """
152 if short_node_names:
153 node_uuids = [ExpandNodeUuidAndName(lu.cfg, None, name)[0]
154 for name in short_node_names]
155 else:
156 node_uuids = lu.cfg.GetNodeList()
157
158 return (node_uuids, [lu.cfg.GetNodeName(uuid) for uuid in node_uuids])
159
160
161 def GetWantedInstances(lu, short_inst_names):
162 """Returns list of checked and expanded instance names.
163
164 @type lu: L{LogicalUnit}
165 @param lu: the logical unit on whose behalf we execute
166 @type short_inst_names: list
167 @param short_inst_names: list of instance names or None for all instances
168 @rtype: tuple of lists
169 @return: tuple of (instance UUIDs, instance names)
170 @raise errors.OpPrereqError: if the instances parameter is wrong type
171 @raise errors.OpPrereqError: if any of the passed instances is not found
172
173 """
174 if short_inst_names:
175 inst_uuids = [ExpandInstanceUuidAndName(lu.cfg, None, name)[0]
176 for name in short_inst_names]
177 else:
178 inst_uuids = lu.cfg.GetInstanceList()
179 return (inst_uuids, [lu.cfg.GetInstanceName(uuid) for uuid in inst_uuids])
180
181
182 def RunPostHook(lu, node_name):
183 """Runs the post-hook for an opcode on a single node.
184
185 """
186 hm = lu.proc.BuildHooksManager(lu)
187 try:
188 hm.RunPhase(constants.HOOKS_PHASE_POST, node_names=[node_name])
189 except Exception, err: # pylint: disable=W0703
190 lu.LogWarning("Errors occurred running hooks on %s: %s",
191 node_name, err)
192
193
194 def RedistributeAncillaryFiles(lu):
195 """Distribute additional files which are part of the cluster configuration.
196
197 ConfigWriter takes care of distributing the config and ssconf files, but
198 there are more files which should be distributed to all nodes. This function
199 makes sure those are copied.
200
201 """
202 # Gather target nodes
203 cluster = lu.cfg.GetClusterInfo()
204 master_info = lu.cfg.GetMasterNodeInfo()
205
206 online_node_uuids = lu.cfg.GetOnlineNodeList()
207 online_node_uuid_set = frozenset(online_node_uuids)
208 vm_node_uuids = list(online_node_uuid_set.intersection(
209 lu.cfg.GetVmCapableNodeList()))
210
211 # Never distribute to master node
212 for node_uuids in [online_node_uuids, vm_node_uuids]:
213 if master_info.uuid in node_uuids:
214 node_uuids.remove(master_info.uuid)
215
216 # Gather file lists
217 (files_all, _, files_mc, files_vm) = \
218 ComputeAncillaryFiles(cluster, True)
219
220 # Never re-distribute configuration file from here
221 assert not (pathutils.CLUSTER_CONF_FILE in files_all or
222 pathutils.CLUSTER_CONF_FILE in files_vm)
223 assert not files_mc, "Master candidates not handled in this function"
224
225 filemap = [
226 (online_node_uuids, files_all),
227 (vm_node_uuids, files_vm),
228 ]
229
230 # Upload the files
231 for (node_uuids, files) in filemap:
232 for fname in files:
233 UploadHelper(lu, node_uuids, fname)
234
235
236 def ComputeAncillaryFiles(cluster, redist):
237 """Compute files external to Ganeti which need to be consistent.
238
239 @type redist: boolean
240 @param redist: Whether to include files which need to be redistributed
241
242 """
243 # Compute files for all nodes
244 files_all = set([
245 pathutils.SSH_KNOWN_HOSTS_FILE,
246 pathutils.CONFD_HMAC_KEY,
247 pathutils.CLUSTER_DOMAIN_SECRET_FILE,
248 pathutils.SPICE_CERT_FILE,
249 pathutils.SPICE_CACERT_FILE,
250 pathutils.RAPI_USERS_FILE,
251 ])
252
253 if redist:
254 # we need to ship at least the RAPI certificate
255 files_all.add(pathutils.RAPI_CERT_FILE)
256 else:
257 files_all.update(pathutils.ALL_CERT_FILES)
258 files_all.update(ssconf.SimpleStore().GetFileList())
259
260 if cluster.modify_etc_hosts:
261 files_all.add(pathutils.ETC_HOSTS)
262
263 if cluster.use_external_mip_script:
264 files_all.add(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
265
266 # Files which are optional, these must:
267 # - be present in one other category as well
268 # - either exist or not exist on all nodes of that category (mc, vm all)
269 files_opt = set([
270 pathutils.RAPI_USERS_FILE,
271 ])
272
273 # Files which should only be on master candidates
274 files_mc = set()
275
276 if not redist:
277 files_mc.add(pathutils.CLUSTER_CONF_FILE)
278
279 # File storage
280 if (not redist and (cluster.IsFileStorageEnabled() or
281 cluster.IsSharedFileStorageEnabled())):
282 files_all.add(pathutils.FILE_STORAGE_PATHS_FILE)
283 files_opt.add(pathutils.FILE_STORAGE_PATHS_FILE)
284
285 # Files which should only be on VM-capable nodes
286 files_vm = set(
287 filename
288 for hv_name in cluster.enabled_hypervisors
289 for filename in
290 hypervisor.GetHypervisorClass(hv_name).GetAncillaryFiles()[0])
291
292 files_opt |= set(
293 filename
294 for hv_name in cluster.enabled_hypervisors
295 for filename in
296 hypervisor.GetHypervisorClass(hv_name).GetAncillaryFiles()[1])
297
298 # Filenames in each category must be unique
299 all_files_set = files_all | files_mc | files_vm
300 assert (len(all_files_set) ==
301 sum(map(len, [files_all, files_mc, files_vm]))), \
302 "Found file listed in more than one file list"
303
304 # Optional files must be present in one other category
305 assert all_files_set.issuperset(files_opt), \
306 "Optional file not in a different required list"
307
308 # This one file should never ever be re-distributed via RPC
309 assert not (redist and
310 pathutils.FILE_STORAGE_PATHS_FILE in all_files_set)
311
312 return (files_all, files_opt, files_mc, files_vm)
313
314
315 def UploadHelper(lu, node_uuids, fname):
316 """Helper for uploading a file and showing warnings.
317
318 """
319 if os.path.exists(fname):
320 result = lu.rpc.call_upload_file(node_uuids, fname)
321 for to_node_uuids, to_result in result.items():
322 msg = to_result.fail_msg
323 if msg:
324 msg = ("Copy of file %s to node %s failed: %s" %
325 (fname, lu.cfg.GetNodeName(to_node_uuids), msg))
326 lu.LogWarning(msg)
327
328
329 def MergeAndVerifyHvState(op_input, obj_input):
330 """Combines the hv state from an opcode with the one of the object
331
332 @param op_input: The input dict from the opcode
333 @param obj_input: The input dict from the objects
334 @return: The verified and updated dict
335
336 """
337 if op_input:
338 invalid_hvs = set(op_input) - constants.HYPER_TYPES
339 if invalid_hvs:
340 raise errors.OpPrereqError("Invalid hypervisor(s) in hypervisor state:"
341 " %s" % utils.CommaJoin(invalid_hvs),
342 errors.ECODE_INVAL)
343 if obj_input is None:
344 obj_input = {}
345 type_check = constants.HVSTS_PARAMETER_TYPES
346 return _UpdateAndVerifySubDict(obj_input, op_input, type_check)
347
348 return None
349
350
351 def MergeAndVerifyDiskState(op_input, obj_input):
352 """Combines the disk state from an opcode with the one of the object
353
354 @param op_input: The input dict from the opcode
355 @param obj_input: The input dict from the objects
356 @return: The verified and updated dict
357 """
358 if op_input:
359 invalid_dst = set(op_input) - constants.DS_VALID_TYPES
360 if invalid_dst:
361 raise errors.OpPrereqError("Invalid storage type(s) in disk state: %s" %
362 utils.CommaJoin(invalid_dst),
363 errors.ECODE_INVAL)
364 type_check = constants.DSS_PARAMETER_TYPES
365 if obj_input is None:
366 obj_input = {}
367 return dict((key, _UpdateAndVerifySubDict(obj_input.get(key, {}), value,
368 type_check))
369 for key, value in op_input.items())
370
371 return None
372
373
374 def CheckOSParams(lu, required, node_uuids, osname, osparams, force_variant):
375 """OS parameters validation.
376
377 @type lu: L{LogicalUnit}
378 @param lu: the logical unit for which we check
379 @type required: boolean
380 @param required: whether the validation should fail if the OS is not
381 found
382 @type node_uuids: list
383 @param node_uuids: the list of nodes on which we should check
384 @type osname: string
385 @param osname: the name of the OS we should use
386 @type osparams: dict
387 @param osparams: the parameters which we need to check
388 @raise errors.OpPrereqError: if the parameters are not valid
389
390 """
391 node_uuids = _FilterVmNodes(lu, node_uuids)
392
393 # Last chance to unwrap private elements.
394 for key in osparams:
395 if isinstance(osparams[key], Private):
396 osparams[key] = osparams[key].Get()
397
398 if osname:
399 result = lu.rpc.call_os_validate(node_uuids, required, osname,
400 [constants.OS_VALIDATE_PARAMETERS],
401 osparams, force_variant)
402 for node_uuid, nres in result.items():
403 # we don't check for offline cases since this should be run only
404 # against the master node and/or an instance's nodes
405 nres.Raise("OS Parameters validation failed on node %s" %
406 lu.cfg.GetNodeName(node_uuid))
407 if not nres.payload:
408 lu.LogInfo("OS %s not found on node %s, validation skipped",
409 osname, lu.cfg.GetNodeName(node_uuid))
410
411
412 def CheckImageValidity(image, error_message):
413 """Checks if a given image description is either a valid file path or a URL.
414
415 @type image: string
416 @param image: An absolute path or URL, the assumed location of a disk image.
417 @type error_message: string
418 @param error_message: The error message to show if the image is not valid.
419
420 @raise errors.OpPrereqError: If the validation fails.
421
422 """
423 if image is not None and not (utils.IsUrl(image) or os.path.isabs(image)):
424 raise errors.OpPrereqError(error_message)
425
426
427 def CheckOSImage(op):
428 """Checks if the OS image in the OS parameters of an opcode is
429 valid.
430
431 This function can also be used in LUs as they carry an opcode.
432
433 @type op: L{opcodes.OpCode}
434 @param op: opcode containing the OS params
435
436 @rtype: string or NoneType
437 @return:
438 None if the OS parameters in the opcode do not contain the OS
439 image, otherwise the OS image value contained in the OS parameters
440 @raise errors.OpPrereqError: if OS image is not a URL or an absolute path
441
442 """
443 os_image = objects.GetOSImage(op.osparams)
444 CheckImageValidity(os_image, "OS image must be an absolute path or a URL")
445 return os_image
446
447
448 def CheckHVParams(lu, node_uuids, hvname, hvparams):
449 """Hypervisor parameter validation.
450
451 This function abstracts the hypervisor parameter validation to be
452 used in both instance create and instance modify.
453
454 @type lu: L{LogicalUnit}
455 @param lu: the logical unit for which we check
456 @type node_uuids: list
457 @param node_uuids: the list of nodes on which we should check
458 @type hvname: string
459 @param hvname: the name of the hypervisor we should use
460 @type hvparams: dict
461 @param hvparams: the parameters which we need to check
462 @raise errors.OpPrereqError: if the parameters are not valid
463
464 """
465 node_uuids = _FilterVmNodes(lu, node_uuids)
466
467 cluster = lu.cfg.GetClusterInfo()
468 hvfull = objects.FillDict(cluster.hvparams.get(hvname, {}), hvparams)
469
470 hvinfo = lu.rpc.call_hypervisor_validate_params(node_uuids, hvname, hvfull)
471 for node_uuid in node_uuids:
472 info = hvinfo[node_uuid]
473 if info.offline:
474 continue
475 info.Raise("Hypervisor parameter validation failed on node %s" %
476 lu.cfg.GetNodeName(node_uuid))
477
478
479 def AddMasterCandidateSshKey(
480 lu, master_node, node, potential_master_candidates, feedback_fn):
481 ssh_result = lu.rpc.call_node_ssh_key_add(
482 [master_node], node.uuid, node.name,
483 potential_master_candidates,
484 True, # add node's key to all node's 'authorized_keys'
485 True, # all nodes are potential master candidates
486 False, # do not update the node's public keys
487 lu.op.debug,
488 lu.op.verbose)
489 ssh_result[master_node].Raise(
490 "Could not update the SSH setup of node '%s' after promotion"
491 " (UUID: %s)." % (node.name, node.uuid))
492 WarnAboutFailedSshUpdates(ssh_result, master_node, feedback_fn)
493
494
495 def AdjustCandidatePool(
496 lu, exceptions, master_node, potential_master_candidates, feedback_fn,
497 modify_ssh_setup):
498 """Adjust the candidate pool after node operations.
499
500 @type master_node: string
501 @param master_node: name of the master node
502 @type potential_master_candidates: list of string
503 @param potential_master_candidates: list of node names of potential master
504 candidates
505 @type feedback_fn: function
506 @param feedback_fn: function emitting user-visible output
507 @type modify_ssh_setup: boolean
508 @param modify_ssh_setup: whether or not the ssh setup can be modified.
509
510 """
511 mod_list = lu.cfg.MaintainCandidatePool(exceptions)
512 if mod_list:
513 lu.LogInfo("Promoted nodes to master candidate role: %s",
514 utils.CommaJoin(node.name for node in mod_list))
515 for node in mod_list:
516 AddNodeCertToCandidateCerts(lu, lu.cfg, node.uuid)
517 if modify_ssh_setup:
518 AddMasterCandidateSshKey(
519 lu, master_node, node, potential_master_candidates, feedback_fn)
520
521 mc_now, mc_max, _ = lu.cfg.GetMasterCandidateStats(exceptions)
522 if mc_now > mc_max:
523 lu.LogInfo("Note: more nodes are candidates (%d) than desired (%d)" %
524 (mc_now, mc_max))
525
526
527 def CheckNodePVs(nresult, exclusive_storage):
528 """Check node PVs.
529
530 """
531 pvlist_dict = nresult.get(constants.NV_PVLIST, None)
532 if pvlist_dict is None:
533 return (["Can't get PV list from node"], None)
534 pvlist = map(objects.LvmPvInfo.FromDict, pvlist_dict)
535 errlist = []
536 # check that ':' is not present in PV names, since it's a
537 # special character for lvcreate (denotes the range of PEs to
538 # use on the PV)
539 for pv in pvlist:
540 if ":" in pv.name:
541 errlist.append("Invalid character ':' in PV '%s' of VG '%s'" %
542 (pv.name, pv.vg_name))
543 es_pvinfo = None
544 if exclusive_storage:
545 (errmsgs, es_pvinfo) = utils.LvmExclusiveCheckNodePvs(pvlist)
546 errlist.extend(errmsgs)
547 shared_pvs = nresult.get(constants.NV_EXCLUSIVEPVS, None)
548 if shared_pvs:
549 for (pvname, lvlist) in shared_pvs:
550 # TODO: Check that LVs are really unrelated (snapshots, DRBD meta...)
551 errlist.append("PV %s is shared among unrelated LVs (%s)" %
552 (pvname, utils.CommaJoin(lvlist)))
553 return (errlist, es_pvinfo)
554
555
556 def _ComputeMinMaxSpec(name, qualifier, ispecs, value):
557 """Computes if value is in the desired range.
558
559 @param name: name of the parameter for which we perform the check
560 @param qualifier: a qualifier used in the error message (e.g. 'disk/1',
561 not just 'disk')
562 @param ispecs: dictionary containing min and max values
563 @param value: actual value that we want to use
564 @return: None or an error string
565
566 """
567 if value in [None, constants.VALUE_AUTO]:
568 return None
569 max_v = ispecs[constants.ISPECS_MAX].get(name, value)
570 min_v = ispecs[constants.ISPECS_MIN].get(name, value)
571 if value > max_v or min_v > value:
572 if qualifier:
573 fqn = "%s/%s" % (name, qualifier)
574 else:
575 fqn = name
576 return ("%s value %s is not in range [%s, %s]" %
577 (fqn, value, min_v, max_v))
578 return None
579
580
581 def ComputeIPolicySpecViolation(ipolicy, mem_size, cpu_count, disk_count,
582 nic_count, disk_sizes, spindle_use,
583 disk_types, _compute_fn=_ComputeMinMaxSpec):
584 """Verifies ipolicy against provided specs.
585
586 @type ipolicy: dict
587 @param ipolicy: The ipolicy
588 @type mem_size: int
589 @param mem_size: The memory size
590 @type cpu_count: int
591 @param cpu_count: Used cpu cores
592 @type disk_count: int
593 @param disk_count: Number of disks used
594 @type nic_count: int
595 @param nic_count: Number of nics used
596 @type disk_sizes: list of ints
597 @param disk_sizes: Disk sizes of used disk (len must match C{disk_count})
598 @type spindle_use: int
599 @param spindle_use: The number of spindles this instance uses
600 @type disk_types: list of strings
601 @param disk_types: The disk template of the instance
602 @param _compute_fn: The compute function (unittest only)
603 @return: A list of violations, or an empty list of no violations are found
604
605 """
606 assert disk_count == len(disk_sizes)
607 assert isinstance(disk_types, list)
608 assert disk_count == len(disk_types)
609
610 test_settings = [
611 (constants.ISPEC_MEM_SIZE, "", mem_size),
612 (constants.ISPEC_CPU_COUNT, "", cpu_count),
613 (constants.ISPEC_NIC_COUNT, "", nic_count),
614 (constants.ISPEC_SPINDLE_USE, "", spindle_use),
615 ] + [(constants.ISPEC_DISK_SIZE, str(idx), d)
616 for idx, d in enumerate(disk_sizes)]
617
618 allowed_dts = set(ipolicy[constants.IPOLICY_DTS])
619 ret = []
620 if disk_count != 0:
621 # This check doesn't make sense for diskless instances
622 test_settings.append((constants.ISPEC_DISK_COUNT, "", disk_count))
623 elif constants.DT_DISKLESS not in allowed_dts:
624 ret.append("Disk template %s is not allowed (allowed templates %s)" %
625 (constants.DT_DISKLESS, utils.CommaJoin(allowed_dts)))
626
627 forbidden_dts = set(disk_types) - allowed_dts
628 if forbidden_dts:
629 ret.append("Disk template %s is not allowed (allowed templates: %s)" %
630 (utils.CommaJoin(forbidden_dts), utils.CommaJoin(allowed_dts)))
631
632 min_errs = None
633 for minmax in ipolicy[constants.ISPECS_MINMAX]:
634 errs = filter(None,
635 (_compute_fn(name, qualifier, minmax, value)
636 for (name, qualifier, value) in test_settings))
637 if min_errs is None or len(errs) < len(min_errs):
638 min_errs = errs
639 assert min_errs is not None
640 return ret + min_errs
641
642
643 def ComputeIPolicyDiskSizesViolation(ipolicy, disk_sizes, disks,
644 _compute_fn=_ComputeMinMaxSpec):
645 """Verifies ipolicy against provided disk sizes.
646
647 No other specs except the disk sizes, the number of disks and the disk
648 template are checked.
649
650 @type ipolicy: dict
651 @param ipolicy: The ipolicy
652 @type disk_sizes: list of ints
653 @param disk_sizes: Disk sizes of used disk (len must match C{disk_count})
654 @type disks: list of L{Disk}
655 @param disks: The Disk objects of the instance
656 @param _compute_fn: The compute function (unittest only)
657 @return: A list of violations, or an empty list of no violations are found
658
659 """
660 if len(disk_sizes) != len(disks):
661 return [constants.ISPEC_DISK_COUNT]
662 dev_types = [d.dev_type for d in disks]
663 return ComputeIPolicySpecViolation(ipolicy,
664 # mem_size, cpu_count, disk_count
665 None, None, len(disk_sizes),
666 None, disk_sizes, # nic_count, disk_sizes
667 None, # spindle_use
668 dev_types,
669 _compute_fn=_compute_fn)
670
671
672 def ComputeIPolicyInstanceViolation(ipolicy, instance, cfg,
673 _compute_fn=ComputeIPolicySpecViolation):
674 """Compute if instance meets the specs of ipolicy.
675
676 @type ipolicy: dict
677 @param ipolicy: The ipolicy to verify against
678 @type instance: L{objects.Instance}
679 @param instance: The instance to verify
680 @type cfg: L{config.ConfigWriter}
681 @param cfg: Cluster configuration
682 @param _compute_fn: The function to verify ipolicy (unittest only)
683 @see: L{ComputeIPolicySpecViolation}
684
685 """
686 ret = []
687 be_full = cfg.GetClusterInfo().FillBE(instance)
688 mem_size = be_full[constants.BE_MAXMEM]
689 cpu_count = be_full[constants.BE_VCPUS]
690 inst_nodes = cfg.GetInstanceNodes(instance.uuid)
691 es_flags = rpc.GetExclusiveStorageForNodes(cfg, inst_nodes)
692 disks = cfg.GetInstanceDisks(instance.uuid)
693 if any(es_flags.values()):
694 # With exclusive storage use the actual spindles
695 try:
696 spindle_use = sum([disk.spindles for disk in disks])
697 except TypeError:
698 ret.append("Number of spindles not configured for disks of instance %s"
699 " while exclusive storage is enabled, try running gnt-cluster"
700 " repair-disk-sizes" % instance.name)
701 # _ComputeMinMaxSpec ignores 'None's
702 spindle_use = None
703 else:
704 spindle_use = be_full[constants.BE_SPINDLE_USE]
705 disk_count = len(disks)
706 disk_sizes = [disk.size for disk in disks]
707 nic_count = len(instance.nics)
708 disk_types = [d.dev_type for d in disks]
709
710 return ret + _compute_fn(ipolicy, mem_size, cpu_count, disk_count, nic_count,
711 disk_sizes, spindle_use, disk_types)
712
713
714 def _ComputeViolatingInstances(ipolicy, instances, cfg):
715 """Computes a set of instances who violates given ipolicy.
716
717 @param ipolicy: The ipolicy to verify
718 @type instances: L{objects.Instance}
719 @param instances: List of instances to verify
720 @type cfg: L{config.ConfigWriter}
721 @param cfg: Cluster configuration
722 @return: A frozenset of instance names violating the ipolicy
723
724 """
725 return frozenset([inst.name for inst in instances
726 if ComputeIPolicyInstanceViolation(ipolicy, inst, cfg)])
727
728
729 def ComputeNewInstanceViolations(old_ipolicy, new_ipolicy, instances, cfg):
730 """Computes a set of any instances that would violate the new ipolicy.
731
732 @param old_ipolicy: The current (still in-place) ipolicy
733 @param new_ipolicy: The new (to become) ipolicy
734 @param instances: List of instances to verify
735 @type cfg: L{config.ConfigWriter}
736 @param cfg: Cluster configuration
737 @return: A list of instances which violates the new ipolicy but
738 did not before
739
740 """
741 return (_ComputeViolatingInstances(new_ipolicy, instances, cfg) -
742 _ComputeViolatingInstances(old_ipolicy, instances, cfg))
743
744
745 def GetUpdatedParams(old_params, update_dict,
746 use_default=True, use_none=False):
747 """Return the new version of a parameter dictionary.
748
749 @type old_params: dict
750 @param old_params: old parameters
751 @type update_dict: dict
752 @param update_dict: dict containing new parameter values, or
753 constants.VALUE_DEFAULT to reset the parameter to its default
754 value
755 @param use_default: boolean
756 @type use_default: whether to recognise L{constants.VALUE_DEFAULT}
757 values as 'to be deleted' values
758 @param use_none: boolean
759 @type use_none: whether to recognise C{None} values as 'to be
760 deleted' values
761 @rtype: dict
762 @return: the new parameter dictionary
763
764 """
765 params_copy = copy.deepcopy(old_params)
766 for key, val in update_dict.iteritems():
767 if ((use_default and val == constants.VALUE_DEFAULT) or
768 (use_none and val is None)):
769 try:
770 del params_copy[key]
771 except KeyError:
772 pass
773 else:
774 params_copy[key] = val
775 return params_copy
776
777
778 def GetUpdatedIPolicy(old_ipolicy, new_ipolicy, group_policy=False):
779 """Return the new version of an instance policy.
780
781 @param group_policy: whether this policy applies to a group and thus
782 we should support removal of policy entries
783
784 """
785 ipolicy = copy.deepcopy(old_ipolicy)
786 for key, value in new_ipolicy.items():
787 if key not in constants.IPOLICY_ALL_KEYS:
788 raise errors.OpPrereqError("Invalid key in new ipolicy: %s" % key,
789 errors.ECODE_INVAL)
790 if (not value or value == [constants.VALUE_DEFAULT] or
791 value == constants.VALUE_DEFAULT):
792 if group_policy:
793 if key in ipolicy:
794 del ipolicy[key]
795 else:
796 raise errors.OpPrereqError("Can't unset ipolicy attribute '%s'"
797 " on the cluster'" % key,
798 errors.ECODE_INVAL)
799 else:
800 if key in constants.IPOLICY_PARAMETERS:
801 # FIXME: we assume all such values are float
802 try:
803 ipolicy[key] = float(value)
804 except (TypeError, ValueError), err:
805 raise errors.OpPrereqError("Invalid value for attribute"
806 " '%s': '%s', error: %s" %
807 (key, value, err), errors.ECODE_INVAL)
808 elif key == constants.ISPECS_MINMAX:
809 for minmax in value:
810 for k in minmax.keys():
811 utils.ForceDictType(minmax[k], constants.ISPECS_PARAMETER_TYPES)
812 ipolicy[key] = value
813 elif key == constants.ISPECS_STD:
814 if group_policy:
815 msg = "%s cannot appear in group instance specs" % key
816 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
817 ipolicy[key] = GetUpdatedParams(old_ipolicy.get(key, {}), value,
818 use_none=False, use_default=False)
819 utils.ForceDictType(ipolicy[key], constants.ISPECS_PARAMETER_TYPES)
820 else:
821 # FIXME: we assume all others are lists; this should be redone
822 # in a nicer way
823 ipolicy[key] = list(value)
824 try:
825 objects.InstancePolicy.CheckParameterSyntax(ipolicy, not group_policy)
826 except errors.ConfigurationError, err:
827 raise errors.OpPrereqError("Invalid instance policy: %s" % err,
828 errors.ECODE_INVAL)
829 return ipolicy
830
831
832 def AnnotateDiskParams(instance, devs, cfg):
833 """Little helper wrapper to the rpc annotation method.
834
835 @param instance: The instance object
836 @type devs: List of L{objects.Disk}
837 @param devs: The root devices (not any of its children!)
838 @param cfg: The config object
839 @returns The annotated disk copies
840 @see L{ganeti.rpc.node.AnnotateDiskParams}
841
842 """
843 return rpc.AnnotateDiskParams(devs, cfg.GetInstanceDiskParams(instance))
844
845
846 def SupportsOob(cfg, node):
847 """Tells if node supports OOB.
848
849 @type cfg: L{config.ConfigWriter}
850 @param cfg: The cluster configuration
851 @type node: L{objects.Node}
852 @param node: The node
853 @return: The OOB script if supported or an empty string otherwise
854
855 """
856 return cfg.GetNdParams(node)[constants.ND_OOB_PROGRAM]
857
858
859 def _UpdateAndVerifySubDict(base, updates, type_check):
860 """Updates and verifies a dict with sub dicts of the same type.
861
862 @param base: The dict with the old data
863 @param updates: The dict with the new data
864 @param type_check: Dict suitable to ForceDictType to verify correct types
865 @returns: A new dict with updated and verified values
866
867 """
868 def fn(old, value):
869 new = GetUpdatedParams(old, value)
870 utils.ForceDictType(new, type_check)
871 return new
872
873 ret = copy.deepcopy(base)
874 ret.update(dict((key, fn(base.get(key, {}), value))
875 for key, value in updates.items()))
876 return ret
877
878
879 def _FilterVmNodes(lu, node_uuids):
880 """Filters out non-vm_capable nodes from a list.
881
882 @type lu: L{LogicalUnit}
883 @param lu: the logical unit for which we check
884 @type node_uuids: list
885 @param node_uuids: the list of nodes on which we should check
886 @rtype: list
887 @return: the list of vm-capable nodes
888
889 """
890 vm_nodes = frozenset(lu.cfg.GetNonVmCapableNodeList())
891 return [uuid for uuid in node_uuids if uuid not in vm_nodes]
892
893
894 def GetDefaultIAllocator(cfg, ialloc):
895 """Decides on which iallocator to use.
896
897 @type cfg: L{config.ConfigWriter}
898 @param cfg: Cluster configuration object
899 @type ialloc: string or None
900 @param ialloc: Iallocator specified in opcode
901 @rtype: string
902 @return: Iallocator name
903
904 """
905 if not ialloc:
906 # Use default iallocator
907 ialloc = cfg.GetDefaultIAllocator()
908
909 if not ialloc:
910 raise errors.OpPrereqError("No iallocator was specified, neither in the"
911 " opcode nor as a cluster-wide default",
912 errors.ECODE_INVAL)
913
914 return ialloc
915
916
917 def CheckInstancesNodeGroups(cfg, instances, owned_groups, owned_node_uuids,
918 cur_group_uuid):
919 """Checks if node groups for locked instances are still correct.
920
921 @type cfg: L{config.ConfigWriter}
922 @param cfg: Cluster configuration
923 @type instances: dict; string as key, L{objects.Instance} as value
924 @param instances: Dictionary, instance UUID as key, instance object as value
925 @type owned_groups: iterable of string
926 @param owned_groups: List of owned groups
927 @type owned_node_uuids: iterable of string
928 @param owned_node_uuids: List of owned nodes
929 @type cur_group_uuid: string or None
930 @param cur_group_uuid: Optional group UUID to check against instance's groups
931
932 """
933 for (uuid, inst) in instances.items():
934 inst_nodes = cfg.GetInstanceNodes(inst.uuid)
935 assert owned_node_uuids.issuperset(inst_nodes), \
936 "Instance %s's nodes changed while we kept the lock" % inst.name
937
938 inst_groups = CheckInstanceNodeGroups(cfg, uuid, owned_groups)
939
940 assert cur_group_uuid is None or cur_group_uuid in inst_groups, \
941 "Instance %s has no node in group %s" % (inst.name, cur_group_uuid)
942
943
944 def CheckInstanceNodeGroups(cfg, inst_uuid, owned_groups, primary_only=False):
945 """Checks if the owned node groups are still correct for an instance.
946
947 @type cfg: L{config.ConfigWriter}
948 @param cfg: The cluster configuration
949 @type inst_uuid: string
950 @param inst_uuid: Instance UUID
951 @type owned_groups: set or frozenset
952 @param owned_groups: List of currently owned node groups
953 @type primary_only: boolean
954 @param primary_only: Whether to check node groups for only the primary node
955
956 """
957 inst_groups = cfg.GetInstanceNodeGroups(inst_uuid, primary_only)
958
959 if not owned_groups.issuperset(inst_groups):
960 raise errors.OpPrereqError("Instance %s's node groups changed since"
961 " locks were acquired, current groups are"
962 " are '%s', owning groups '%s'; retry the"
963 " operation" %
964 (cfg.GetInstanceName(inst_uuid),
965 utils.CommaJoin(inst_groups),
966 utils.CommaJoin(owned_groups)),
967 errors.ECODE_STATE)
968
969 return inst_groups
970
971
972 def LoadNodeEvacResult(lu, alloc_result, early_release, use_nodes):
973 """Unpacks the result of change-group and node-evacuate iallocator requests.
974
975 Iallocator modes L{constants.IALLOCATOR_MODE_NODE_EVAC} and
976 L{constants.IALLOCATOR_MODE_CHG_GROUP}.
977
978 @type lu: L{LogicalUnit}
979 @param lu: Logical unit instance
980 @type alloc_result: tuple/list
981 @param alloc_result: Result from iallocator
982 @type early_release: bool
983 @param early_release: Whether to release locks early if possible
984 @type use_nodes: bool
985 @param use_nodes: Whether to display node names instead of groups
986
987 """
988 (moved, failed, jobs) = alloc_result
989
990 if failed:
991 failreason = utils.CommaJoin("%s (%s)" % (name, reason)
992 for (name, reason) in failed)
993 lu.LogWarning("Unable to evacuate instances %s", failreason)
994 raise errors.OpExecError("Unable to evacuate instances %s" % failreason)
995
996 if moved:
997 lu.LogInfo("Instances to be moved: %s",
998 utils.CommaJoin(
999 "%s (to %s)" %
1000 (name, _NodeEvacDest(use_nodes, group, node_names))
1001 for (name, group, node_names) in moved))
1002
1003 return [
1004 [
1005 _SetOpEarlyRelease(early_release, opcodes.OpCode.LoadOpCode(o))
1006 for o in ops
1007 ]
1008 for ops in jobs
1009 ]
1010
1011
1012 def _NodeEvacDest(use_nodes, group, node_names):
1013 """Returns group or nodes depending on caller's choice.
1014
1015 """
1016 if use_nodes:
1017 return utils.CommaJoin(node_names)
1018 else:
1019 return group
1020
1021
1022 def _SetOpEarlyRelease(early_release, op):
1023 """Sets C{early_release} flag on opcodes if available.
1024
1025 """
1026 try:
1027 op.early_release = early_release
1028 except AttributeError:
1029 assert not isinstance(op, opcodes.OpInstanceReplaceDisks)
1030
1031 return op
1032
1033
1034 def MapInstanceLvsToNodes(cfg, instances):
1035 """Creates a map from (node, volume) to instance name.
1036
1037 @type cfg: L{config.ConfigWriter}
1038 @param cfg: The cluster configuration
1039 @type instances: list of L{objects.Instance}
1040 @rtype: dict; tuple of (node uuid, volume name) as key, L{objects.Instance}
1041 object as value
1042
1043 """
1044 return dict(
1045 ((node_uuid, vol), inst)
1046 for inst in instances
1047 for (node_uuid, vols) in cfg.GetInstanceLVsByNode(inst.uuid).items()
1048 for vol in vols)
1049
1050
1051 def CheckParamsNotGlobal(params, glob_pars, kind, bad_levels, good_levels):
1052 """Make sure that none of the given paramters is global.
1053
1054 If a global parameter is found, an L{errors.OpPrereqError} exception is
1055 raised. This is used to avoid setting global parameters for individual nodes.
1056
1057 @type params: dictionary
1058 @param params: Parameters to check
1059 @type glob_pars: dictionary
1060 @param glob_pars: Forbidden parameters
1061 @type kind: string
1062 @param kind: Kind of parameters (e.g. "node")
1063 @type bad_levels: string
1064 @param bad_levels: Level(s) at which the parameters are forbidden (e.g.
1065 "instance")
1066 @type good_levels: strings
1067 @param good_levels: Level(s) at which the parameters are allowed (e.g.
1068 "cluster or group")
1069
1070 """
1071 used_globals = glob_pars.intersection(params)
1072 if used_globals:
1073 msg = ("The following %s parameters are global and cannot"
1074 " be customized at %s level, please modify them at"
1075 " %s level: %s" %
1076 (kind, bad_levels, good_levels, utils.CommaJoin(used_globals)))
1077 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
1078
1079
1080 def IsExclusiveStorageEnabledNode(cfg, node):
1081 """Whether exclusive_storage is in effect for the given node.
1082
1083 @type cfg: L{config.ConfigWriter}
1084 @param cfg: The cluster configuration
1085 @type node: L{objects.Node}
1086 @param node: The node
1087 @rtype: bool
1088 @return: The effective value of exclusive_storage
1089
1090 """
1091 return cfg.GetNdParams(node)[constants.ND_EXCLUSIVE_STORAGE]
1092
1093
1094 def IsInstanceRunning(lu, instance, prereq=True):
1095 """Given an instance object, checks if the instance is running.
1096
1097 This function asks the backend whether the instance is running and
1098 user shutdown instances are considered not to be running.
1099
1100 @type lu: L{LogicalUnit}
1101 @param lu: LU on behalf of which we make the check
1102
1103 @type instance: L{objects.Instance}
1104 @param instance: instance to check whether it is running
1105
1106 @rtype: bool
1107 @return: 'True' if the instance is running, 'False' otherwise
1108
1109 """
1110 hvparams = lu.cfg.GetClusterInfo().FillHV(instance)
1111 result = lu.rpc.call_instance_info(instance.primary_node, instance.name,
1112 instance.hypervisor, hvparams)
1113 # TODO: This 'prepreq=True' is a problem if this function is called
1114 # within the 'Exec' method of a LU.
1115 result.Raise("Can't retrieve instance information for instance '%s'" %
1116 instance.name, prereq=prereq, ecode=errors.ECODE_ENVIRON)
1117
1118 return result.payload and \
1119 "state" in result.payload and \
1120 (result.payload["state"] != hypervisor.hv_base.HvInstanceState.SHUTDOWN)
1121
1122
1123 def CheckInstanceState(lu, instance, req_states, msg=None):
1124 """Ensure that an instance is in one of the required states.
1125
1126 @param lu: the LU on behalf of which we make the check
1127 @param instance: the instance to check
1128 @param msg: if passed, should be a message to replace the default one
1129 @raise errors.OpPrereqError: if the instance is not in the required state
1130
1131 """
1132 if msg is None:
1133 msg = ("can't use instance from outside %s states" %
1134 utils.CommaJoin(req_states))
1135 if instance.admin_state not in req_states:
1136 raise errors.OpPrereqError("Instance '%s' is marked to be %s, %s" %
1137 (instance.name, instance.admin_state, msg),
1138 errors.ECODE_STATE)
1139
1140 if constants.ADMINST_UP not in req_states:
1141 pnode_uuid = instance.primary_node
1142 # Replicating the offline check
1143 if not lu.cfg.GetNodeInfo(pnode_uuid).offline:
1144 if IsInstanceRunning(lu, instance):
1145 raise errors.OpPrereqError("Instance %s is running, %s" %
1146 (instance.name, msg), errors.ECODE_STATE)
1147 else:
1148 lu.LogWarning("Primary node offline, ignoring check that instance"
1149 " is down")
1150
1151
1152 def CheckIAllocatorOrNode(lu, iallocator_slot, node_slot):
1153 """Check the sanity of iallocator and node arguments and use the
1154 cluster-wide iallocator if appropriate.
1155
1156 Check that at most one of (iallocator, node) is specified. If none is
1157 specified, or the iallocator is L{constants.DEFAULT_IALLOCATOR_SHORTCUT},
1158 then the LU's opcode's iallocator slot is filled with the cluster-wide
1159 default iallocator.
1160
1161 @type iallocator_slot: string
1162 @param iallocator_slot: the name of the opcode iallocator slot
1163 @type node_slot: string
1164 @param node_slot: the name of the opcode target node slot
1165
1166 """
1167 node = getattr(lu.op, node_slot, None)
1168 ialloc = getattr(lu.op, iallocator_slot, None)
1169 if node == []:
1170 node = None
1171
1172 if node is not None and ialloc is not None:
1173 raise errors.OpPrereqError("Do not specify both, iallocator and node",
1174 errors.ECODE_INVAL)
1175 elif ((node is None and ialloc is None) or
1176 ialloc == constants.DEFAULT_IALLOCATOR_SHORTCUT):
1177 default_iallocator = lu.cfg.GetDefaultIAllocator()
1178 if default_iallocator:
1179 setattr(lu.op, iallocator_slot, default_iallocator)
1180 else:
1181 raise errors.OpPrereqError("No iallocator or node given and no"
1182 " cluster-wide default iallocator found;"
1183 " please specify either an iallocator or a"
1184 " node, or set a cluster-wide default"
1185 " iallocator", errors.ECODE_INVAL)
1186
1187
1188 def FindFaultyInstanceDisks(cfg, rpc_runner, instance, node_uuid, prereq):
1189 faulty = []
1190
1191 disks = cfg.GetInstanceDisks(instance.uuid)
1192 result = rpc_runner.call_blockdev_getmirrorstatus(
1193 node_uuid, (disks, instance))
1194 result.Raise("Failed to get disk status from node %s" %
1195 cfg.GetNodeName(node_uuid),
1196 prereq=prereq, ecode=errors.ECODE_ENVIRON)
1197
1198 for idx, bdev_status in enumerate(result.payload):
1199 if bdev_status and bdev_status.ldisk_status == constants.LDS_FAULTY:
1200 faulty.append(idx)
1201
1202 return faulty
1203
1204
1205 def CheckNodeOnline(lu, node_uuid, msg=None):
1206 """Ensure that a given node is online.
1207
1208 @param lu: the LU on behalf of which we make the check
1209 @param node_uuid: the node to check
1210 @param msg: if passed, should be a message to replace the default one
1211 @raise errors.OpPrereqError: if the node is offline
1212
1213 """
1214 if msg is None:
1215 msg = "Can't use offline node"
1216 if lu.cfg.GetNodeInfo(node_uuid).offline:
1217 raise errors.OpPrereqError("%s: %s" % (msg, lu.cfg.GetNodeName(node_uuid)),
1218 errors.ECODE_STATE)
1219
1220
1221 def CheckDiskTemplateEnabled(cluster, disk_template):
1222 """Helper function to check if a disk template is enabled.
1223
1224 @type cluster: C{objects.Cluster}
1225 @param cluster: the cluster's configuration
1226 @type disk_template: str
1227 @param disk_template: the disk template to be checked
1228
1229 """
1230 assert disk_template is not None
1231 if disk_template not in constants.DISK_TEMPLATES:
1232 raise errors.OpPrereqError("'%s' is not a valid disk template."
1233 " Valid disk templates are: %s" %
1234 (disk_template,
1235 ",".join(constants.DISK_TEMPLATES)))
1236 if not disk_template in cluster.enabled_disk_templates:
1237 raise errors.OpPrereqError("Disk template '%s' is not enabled in cluster."
1238 " Enabled disk templates are: %s" %
1239 (disk_template,
1240 ",".join(cluster.enabled_disk_templates)))
1241
1242
1243 def CheckStorageTypeEnabled(cluster, storage_type):
1244 """Helper function to check if a storage type is enabled.
1245
1246 @type cluster: C{objects.Cluster}
1247 @param cluster: the cluster's configuration
1248 @type storage_type: str
1249 @param storage_type: the storage type to be checked
1250
1251 """
1252 assert storage_type is not None
1253 assert storage_type in constants.STORAGE_TYPES
1254 # special case for lvm-pv, because it cannot be enabled
1255 # via disk templates
1256 if storage_type == constants.ST_LVM_PV:
1257 CheckStorageTypeEnabled(cluster, constants.ST_LVM_VG)
1258 else:
1259 possible_disk_templates = \
1260 utils.storage.GetDiskTemplatesOfStorageTypes(storage_type)
1261 for disk_template in possible_disk_templates:
1262 if disk_template in cluster.enabled_disk_templates:
1263 return
1264 raise errors.OpPrereqError("No disk template of storage type '%s' is"
1265 " enabled in this cluster. Enabled disk"
1266 " templates are: %s" % (storage_type,
1267 ",".join(cluster.enabled_disk_templates)))
1268
1269
1270 def CheckIpolicyVsDiskTemplates(ipolicy, enabled_disk_templates):
1271 """Checks ipolicy disk templates against enabled disk tempaltes.
1272
1273 @type ipolicy: dict
1274 @param ipolicy: the new ipolicy
1275 @type enabled_disk_templates: list of string
1276 @param enabled_disk_templates: list of enabled disk templates on the
1277 cluster
1278 @raises errors.OpPrereqError: if there is at least one allowed disk
1279 template that is not also enabled.
1280
1281 """
1282 assert constants.IPOLICY_DTS in ipolicy
1283 allowed_disk_templates = ipolicy[constants.IPOLICY_DTS]
1284 not_enabled = set(allowed_disk_templates) - set(enabled_disk_templates)
1285 if not_enabled:
1286 raise errors.OpPrereqError("The following disk templates are allowed"
1287 " by the ipolicy, but not enabled on the"
1288 " cluster: %s" % utils.CommaJoin(not_enabled),
1289 errors.ECODE_INVAL)
1290
1291
1292 def CheckDiskAccessModeValidity(parameters):
1293 """Checks if the access parameter is legal.
1294
1295 @see: L{CheckDiskAccessModeConsistency} for cluster consistency checks.
1296 @raise errors.OpPrereqError: if the check fails.
1297
1298 """
1299 for disk_template in parameters:
1300 access = parameters[disk_template].get(constants.LDP_ACCESS,
1301 constants.DISK_KERNELSPACE)
1302 if access not in constants.DISK_VALID_ACCESS_MODES:
1303 valid_vals_str = utils.CommaJoin(constants.DISK_VALID_ACCESS_MODES)
1304 raise errors.OpPrereqError("Invalid value of '{d}:{a}': '{v}' (expected"
1305 " one of {o})".format(d=disk_template,
1306 a=constants.LDP_ACCESS,
1307 v=access,
1308 o=valid_vals_str))
1309
1310
1311 def CheckDiskAccessModeConsistency(parameters, cfg, group=None):
1312 """Checks if the access param is consistent with the cluster configuration.
1313
1314 @note: requires a configuration lock to run.
1315 @param parameters: the parameters to validate
1316 @param cfg: the cfg object of the cluster
1317 @param group: if set, only check for consistency within this group.
1318 @raise errors.OpPrereqError: if the LU attempts to change the access parameter
1319 to an invalid value, such as "pink bunny".
1320 @raise errors.OpPrereqError: if the LU attempts to change the access parameter
1321 to an inconsistent value, such as asking for RBD
1322 userspace access to the chroot hypervisor.
1323
1324 """
1325 CheckDiskAccessModeValidity(parameters)
1326
1327 for disk_template in parameters:
1328 access = parameters[disk_template].get(constants.LDP_ACCESS,
1329 constants.DISK_KERNELSPACE)
1330
1331 if disk_template not in constants.DTS_HAVE_ACCESS:
1332 continue
1333
1334 #Check the combination of instance hypervisor, disk template and access
1335 #protocol is sane.
1336 inst_uuids = cfg.GetNodeGroupInstances(group) if group else \
1337 cfg.GetInstanceList()
1338
1339 for entry in inst_uuids:
1340 inst = cfg.GetInstanceInfo(entry)
1341 disks = cfg.GetInstanceDisks(entry)
1342 for disk in disks:
1343
1344 if disk.dev_type != disk_template:
1345 continue
1346
1347 hv = inst.hypervisor
1348
1349 if not IsValidDiskAccessModeCombination(hv, disk.dev_type, access):
1350 raise errors.OpPrereqError("Instance {i}: cannot use '{a}' access"
1351 " setting with {h} hypervisor and {d} disk"
1352 " type.".format(i=inst.name,
1353 a=access,
1354 h=hv,
1355 d=disk.dev_type))
1356
1357
1358 def IsValidDiskAccessModeCombination(hv, disk_template, mode):
1359 """Checks if an hypervisor can read a disk template with given mode.
1360
1361 @param hv: the hypervisor that will access the data
1362 @param disk_template: the disk template the data is stored as
1363 @param mode: how the hypervisor should access the data
1364 @return: True if the hypervisor can read a given read disk_template
1365 in the specified mode.
1366
1367 """
1368 if mode == constants.DISK_KERNELSPACE:
1369 return True
1370
1371 if (hv == constants.HT_KVM and
1372 disk_template in constants.DTS_HAVE_ACCESS and
1373 mode == constants.DISK_USERSPACE):
1374 return True
1375
1376 # Everything else:
1377 return False
1378
1379
1380 def AddNodeCertToCandidateCerts(lu, cfg, node_uuid):
1381 """Add the node's client SSL certificate digest to the candidate certs.
1382
1383 @type lu: L{LogicalUnit}
1384 @param lu: the logical unit
1385 @type cfg: L{ConfigWriter}
1386 @param cfg: the configuration client to use
1387 @type node_uuid: string
1388 @param node_uuid: the node's UUID
1389
1390 """
1391 result = lu.rpc.call_node_crypto_tokens(
1392 node_uuid,
1393 [(constants.CRYPTO_TYPE_SSL_DIGEST, constants.CRYPTO_ACTION_GET,
1394 None)])
1395 result.Raise("Could not retrieve the node's (uuid %s) SSL digest."
1396 % node_uuid)
1397 ((crypto_type, digest), ) = result.payload
1398 assert crypto_type == constants.CRYPTO_TYPE_SSL_DIGEST
1399
1400 cfg.AddNodeToCandidateCerts(node_uuid, digest)
1401
1402
1403 def RemoveNodeCertFromCandidateCerts(cfg, node_uuid):
1404 """Removes the node's certificate from the candidate certificates list.
1405
1406 @type cfg: C{config.ConfigWriter}
1407 @param cfg: the cluster's configuration
1408 @type node_uuid: string
1409 @param node_uuid: the node's UUID
1410
1411 """
1412 cfg.RemoveNodeFromCandidateCerts(node_uuid)
1413
1414
1415 def GetClientCertDigest(lu, node_uuid, filename=None):
1416 """Get the client SSL certificate digest for the node.
1417
1418 @type node_uuid: string
1419 @param node_uuid: the node's UUID
1420 @type filename: string
1421 @param filename: the certificate's filename
1422 @rtype: string
1423 @return: the digest of the newly created certificate
1424
1425 """
1426 options = {}
1427 if filename:
1428 options[constants.CRYPTO_OPTION_CERT_FILE] = filename
1429 result = lu.rpc.call_node_crypto_tokens(
1430 node_uuid,
1431 [(constants.CRYPTO_TYPE_SSL_DIGEST,
1432 constants.CRYPTO_ACTION_GET,
1433 options)])
1434 result.Raise("Could not fetch the node's (uuid %s) SSL client"
1435 " certificate." % node_uuid)
1436 ((crypto_type, new_digest), ) = result.payload
1437 assert crypto_type == constants.CRYPTO_TYPE_SSL_DIGEST
1438 return new_digest
1439
1440
1441 def AddInstanceCommunicationNetworkOp(network):
1442 """Create an OpCode that adds the instance communication network.
1443
1444 This OpCode contains the configuration necessary for the instance
1445 communication network.
1446
1447 @type network: string
1448 @param network: name or UUID of the instance communication network
1449
1450 @rtype: L{ganeti.opcodes.OpCode}
1451 @return: OpCode that creates the instance communication network
1452
1453 """
1454 return opcodes.OpNetworkAdd(
1455 network_name=network,
1456 gateway=None,
1457 network=constants.INSTANCE_COMMUNICATION_NETWORK4,
1458 gateway6=None,
1459 network6=constants.INSTANCE_COMMUNICATION_NETWORK6,
1460 mac_prefix=constants.INSTANCE_COMMUNICATION_MAC_PREFIX,
1461 add_reserved_ips=None,
1462 conflicts_check=True,
1463 tags=[])
1464
1465
1466 def ConnectInstanceCommunicationNetworkOp(group_uuid, network):
1467 """Create an OpCode that connects a group to the instance
1468 communication network.
1469
1470 This OpCode contains the configuration necessary for the instance
1471 communication network.
1472
1473 @type group_uuid: string
1474 @param group_uuid: UUID of the group to connect
1475
1476 @type network: string
1477 @param network: name or UUID of the network to connect to, i.e., the
1478 instance communication network
1479
1480 @rtype: L{ganeti.opcodes.OpCode}
1481 @return: OpCode that connects the group to the instance
1482 communication network
1483
1484 """
1485 return opcodes.OpNetworkConnect(
1486 group_name=group_uuid,
1487 network_name=network,
1488 network_mode=constants.INSTANCE_COMMUNICATION_NETWORK_MODE,
1489 network_link=constants.INSTANCE_COMMUNICATION_NETWORK_LINK,
1490 conflicts_check=True)
1491
1492
1493 def DetermineImageSize(lu, image, node_uuid):
1494 """Determines the size of the specified image.
1495
1496 @type image: string
1497 @param image: absolute filepath or URL of the image
1498
1499 @type node_uuid: string
1500 @param node_uuid: if L{image} is a filepath, this is the UUID of the
1501 node where the image is located
1502
1503 @rtype: int
1504 @return: size of the image in MB, rounded up
1505 @raise OpExecError: if the image does not exist
1506
1507 """
1508 # Check if we are dealing with a URL first
1509 class _HeadRequest(urllib2.Request):
1510 def get_method(self):
1511 return "HEAD"
1512
1513 if utils.IsUrl(image):
1514 try:
1515 response = urllib2.urlopen(_HeadRequest(image))
1516 except urllib2.URLError:
1517 raise errors.OpExecError("Could not retrieve image from given url '%s'" %
1518 image)
1519
1520 content_length_str = response.info().getheader('content-length')
1521
1522 if not content_length_str:
1523 raise errors.OpExecError("Could not determine image size from given url"
1524 " '%s'" % image)
1525
1526 byte_size = int(content_length_str)
1527 else:
1528 # We end up here if a file path is used
1529 result = lu.rpc.call_get_file_info(node_uuid, image)
1530 result.Raise("Could not determine size of file '%s'" % image)
1531
1532 success, attributes = result.payload
1533 if not success:
1534 raise errors.OpExecError("Could not open file '%s'" % image)
1535 byte_size = attributes[constants.STAT_SIZE]
1536
1537 # Finally, the conversion
1538 return math.ceil(byte_size / 1024. / 1024.)
1539
1540
1541 def EnsureKvmdOnNodes(lu, feedback_fn, nodes=None):
1542 """Ensure KVM daemon is running on nodes with KVM instances.
1543
1544 If user shutdown is enabled in the cluster:
1545 - The KVM daemon will be started on VM capable nodes containing
1546 KVM instances.
1547 - The KVM daemon will be stopped on non VM capable nodes.
1548
1549 If user shutdown is disabled in the cluster:
1550 - The KVM daemon will be stopped on all nodes
1551
1552 Issues a warning for each failed RPC call.
1553
1554 @type lu: L{LogicalUnit}
1555 @param lu: logical unit on whose behalf we execute
1556
1557 @type feedback_fn: callable
1558 @param feedback_fn: feedback function
1559
1560 @type nodes: list of string
1561 @param nodes: if supplied, it overrides the node uuids to start/stop;
1562 this is used mainly for optimization
1563
1564 """
1565 cluster = lu.cfg.GetClusterInfo()
1566
1567 # Either use the passed nodes or consider all cluster nodes
1568 if nodes is not None:
1569 node_uuids = set(nodes)
1570 else:
1571 node_uuids = lu.cfg.GetNodeList()
1572
1573 # Determine in which nodes should the KVM daemon be started/stopped
1574 if constants.HT_KVM in cluster.enabled_hypervisors and \
1575 cluster.enabled_user_shutdown:
1576 start_nodes = []
1577 stop_nodes = []
1578
1579 for node_uuid in node_uuids:
1580 if lu.cfg.GetNodeInfo(node_uuid).vm_capable:
1581 start_nodes.append(node_uuid)
1582 else:
1583 stop_nodes.append(node_uuid)
1584 else:
1585 start_nodes = []
1586 stop_nodes = node_uuids
1587
1588 # Start KVM where necessary
1589 if start_nodes:
1590 results = lu.rpc.call_node_ensure_daemon(start_nodes, constants.KVMD, True)
1591 for node_uuid in start_nodes:
1592 results[node_uuid].Warn("Failed to start KVM daemon on node '%s'" %
1593 lu.cfg.GetNodeName(node_uuid), feedback_fn)
1594
1595 # Stop KVM where necessary
1596 if stop_nodes:
1597 results = lu.rpc.call_node_ensure_daemon(stop_nodes, constants.KVMD, False)
1598 for node_uuid in stop_nodes:
1599 results[node_uuid].Warn("Failed to stop KVM daemon on node '%s'" %
1600 lu.cfg.GetNodeName(node_uuid), feedback_fn)
1601
1602
1603 def WarnAboutFailedSshUpdates(result, master_uuid, feedback_fn):
1604 node_errors = result[master_uuid].payload
1605 if node_errors:
1606 feedback_fn("Some nodes' SSH key files could not be updated:")
1607 for node_name, error_msg in node_errors:
1608 feedback_fn("%s: %s" % (node_name, error_msg))