c847ca898dbeafeb5a5b8aa68458d8209452d495
[ganeti-github.git] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Google Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions are
9 # met:
10 #
11 # 1. Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
13 #
14 # 2. Redistributions in binary form must reproduce the above copyright
15 # notice, this list of conditions and the following disclaimer in the
16 # documentation and/or other materials provided with the distribution.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
19 # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
20 # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30
31 """Functions used by the node daemon
32
33 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
34 the L{UploadFile} function
35 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
36 in the L{_CleanDirectory} function
37
38 """
39
40 # pylint: disable=E1103,C0302
41
42 # E1103: %s %r has no %r member (but some types could not be
43 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
44 # or (False, "string") which confuses pylint
45
46 # C0302: This module has become too big and should be split up
47
48
49 import base64
50 import errno
51 import logging
52 import os
53 import os.path
54 import pycurl
55 import random
56 import re
57 import shutil
58 import signal
59 import stat
60 import tempfile
61 import time
62 import zlib
63 import copy
64 import contextlib
65 import collections
66
67 from ganeti import errors
68 from ganeti import http
69 from ganeti import utils
70 from ganeti import ssh
71 from ganeti import hypervisor
72 from ganeti.hypervisor import hv_base
73 from ganeti import constants
74 from ganeti.storage import bdev
75 from ganeti.storage import drbd
76 from ganeti.storage import extstorage
77 from ganeti.storage import filestorage
78 from ganeti import objects
79 from ganeti import ssconf
80 from ganeti import serializer
81 from ganeti import netutils
82 from ganeti import runtime
83 from ganeti import compat
84 from ganeti import pathutils
85 from ganeti import vcluster
86 from ganeti import ht
87 from ganeti.storage.base import BlockDev
88 from ganeti.storage.drbd import DRBD8
89 from ganeti import hooksmaster
90 import ganeti.metad as metad
91
92
93 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
94 _ALLOWED_CLEAN_DIRS = compat.UniqueFrozenset([
95 pathutils.DATA_DIR,
96 pathutils.JOB_QUEUE_ARCHIVE_DIR,
97 pathutils.QUEUE_DIR,
98 pathutils.CRYPTO_KEYS_DIR,
99 ])
100 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
101 _X509_KEY_FILE = "key"
102 _X509_CERT_FILE = "cert"
103 _IES_STATUS_FILE = "status"
104 _IES_PID_FILE = "pid"
105 _IES_CA_FILE = "ca"
106
107 #: Valid LVS output line regex
108 _LVSLINE_REGEX = re.compile(r"^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6,})\|?$")
109
110 # Actions for the master setup script
111 _MASTER_START = "start"
112 _MASTER_STOP = "stop"
113
114 #: Maximum file permissions for restricted command directory and executables
115 _RCMD_MAX_MODE = (stat.S_IRWXU |
116 stat.S_IRGRP | stat.S_IXGRP |
117 stat.S_IROTH | stat.S_IXOTH)
118
119 #: Delay before returning an error for restricted commands
120 _RCMD_INVALID_DELAY = 10
121
122 #: How long to wait to acquire lock for restricted commands (shorter than
123 #: L{_RCMD_INVALID_DELAY}) to reduce blockage of noded forks when many
124 #: command requests arrive
125 _RCMD_LOCK_TIMEOUT = _RCMD_INVALID_DELAY * 0.8
126
127
128 class RPCFail(Exception):
129 """Class denoting RPC failure.
130
131 Its argument is the error message.
132
133 """
134
135
136 def _GetInstReasonFilename(instance_name):
137 """Path of the file containing the reason of the instance status change.
138
139 @type instance_name: string
140 @param instance_name: The name of the instance
141 @rtype: string
142 @return: The path of the file
143
144 """
145 return utils.PathJoin(pathutils.INSTANCE_REASON_DIR, instance_name)
146
147
148 def _StoreInstReasonTrail(instance_name, trail):
149 """Serialize a reason trail related to an instance change of state to file.
150
151 The exact location of the file depends on the name of the instance and on
152 the configuration of the Ganeti cluster defined at deploy time.
153
154 @type instance_name: string
155 @param instance_name: The name of the instance
156
157 @type trail: list of reasons
158 @param trail: reason trail
159
160 @rtype: None
161
162 """
163 json = serializer.DumpJson(trail)
164 filename = _GetInstReasonFilename(instance_name)
165 utils.WriteFile(filename, data=json)
166
167
168 def _Fail(msg, *args, **kwargs):
169 """Log an error and the raise an RPCFail exception.
170
171 This exception is then handled specially in the ganeti daemon and
172 turned into a 'failed' return type. As such, this function is a
173 useful shortcut for logging the error and returning it to the master
174 daemon.
175
176 @type msg: string
177 @param msg: the text of the exception
178 @raise RPCFail
179
180 """
181 if args:
182 msg = msg % args
183 if "log" not in kwargs or kwargs["log"]: # if we should log this error
184 if "exc" in kwargs and kwargs["exc"]:
185 logging.exception(msg)
186 else:
187 logging.error(msg)
188 raise RPCFail(msg)
189
190
191 def _GetConfig():
192 """Simple wrapper to return a SimpleStore.
193
194 @rtype: L{ssconf.SimpleStore}
195 @return: a SimpleStore instance
196
197 """
198 return ssconf.SimpleStore()
199
200
201 def _GetSshRunner(cluster_name):
202 """Simple wrapper to return an SshRunner.
203
204 @type cluster_name: str
205 @param cluster_name: the cluster name, which is needed
206 by the SshRunner constructor
207 @rtype: L{ssh.SshRunner}
208 @return: an SshRunner instance
209
210 """
211 return ssh.SshRunner(cluster_name)
212
213
214 def _Decompress(data):
215 """Unpacks data compressed by the RPC client.
216
217 @type data: list or tuple
218 @param data: Data sent by RPC client
219 @rtype: str
220 @return: Decompressed data
221
222 """
223 assert isinstance(data, (list, tuple))
224 assert len(data) == 2
225 (encoding, content) = data
226 if encoding == constants.RPC_ENCODING_NONE:
227 return content
228 elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
229 return zlib.decompress(base64.b64decode(content))
230 else:
231 raise AssertionError("Unknown data encoding")
232
233
234 def _CleanDirectory(path, exclude=None):
235 """Removes all regular files in a directory.
236
237 @type path: str
238 @param path: the directory to clean
239 @type exclude: list
240 @param exclude: list of files to be excluded, defaults
241 to the empty list
242
243 """
244 if path not in _ALLOWED_CLEAN_DIRS:
245 _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
246 path)
247
248 if not os.path.isdir(path):
249 return
250 if exclude is None:
251 exclude = []
252 else:
253 # Normalize excluded paths
254 exclude = [os.path.normpath(i) for i in exclude]
255
256 for rel_name in utils.ListVisibleFiles(path):
257 full_name = utils.PathJoin(path, rel_name)
258 if full_name in exclude:
259 continue
260 if os.path.isfile(full_name) and not os.path.islink(full_name):
261 utils.RemoveFile(full_name)
262
263
264 def _BuildUploadFileList():
265 """Build the list of allowed upload files.
266
267 This is abstracted so that it's built only once at module import time.
268
269 """
270 allowed_files = set([
271 pathutils.CLUSTER_CONF_FILE,
272 pathutils.ETC_HOSTS,
273 pathutils.SSH_KNOWN_HOSTS_FILE,
274 pathutils.VNC_PASSWORD_FILE,
275 pathutils.RAPI_CERT_FILE,
276 pathutils.SPICE_CERT_FILE,
277 pathutils.SPICE_CACERT_FILE,
278 pathutils.RAPI_USERS_FILE,
279 pathutils.CONFD_HMAC_KEY,
280 pathutils.CLUSTER_DOMAIN_SECRET_FILE,
281 ])
282
283 for hv_name in constants.HYPER_TYPES:
284 hv_class = hypervisor.GetHypervisorClass(hv_name)
285 allowed_files.update(hv_class.GetAncillaryFiles()[0])
286
287 assert pathutils.FILE_STORAGE_PATHS_FILE not in allowed_files, \
288 "Allowed file storage paths should never be uploaded via RPC"
289
290 return frozenset(allowed_files)
291
292
293 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
294
295
296 def JobQueuePurge():
297 """Removes job queue files and archived jobs.
298
299 @rtype: tuple
300 @return: True, None
301
302 """
303 _CleanDirectory(pathutils.QUEUE_DIR, exclude=[pathutils.JOB_QUEUE_LOCK_FILE])
304 _CleanDirectory(pathutils.JOB_QUEUE_ARCHIVE_DIR)
305
306
307 def GetMasterNodeName():
308 """Returns the master node name.
309
310 @rtype: string
311 @return: name of the master node
312 @raise RPCFail: in case of errors
313
314 """
315 try:
316 return _GetConfig().GetMasterNode()
317 except errors.ConfigurationError, err:
318 _Fail("Cluster configuration incomplete: %s", err, exc=True)
319
320
321 def RunLocalHooks(hook_opcode, hooks_path, env_builder_fn):
322 """Decorator that runs hooks before and after the decorated function.
323
324 @type hook_opcode: string
325 @param hook_opcode: opcode of the hook
326 @type hooks_path: string
327 @param hooks_path: path of the hooks
328 @type env_builder_fn: function
329 @param env_builder_fn: function that returns a dictionary containing the
330 environment variables for the hooks. Will get all the parameters of the
331 decorated function.
332 @raise RPCFail: in case of pre-hook failure
333
334 """
335 def decorator(fn):
336 def wrapper(*args, **kwargs):
337 _, myself = ssconf.GetMasterAndMyself()
338 nodes = ([myself], [myself]) # these hooks run locally
339
340 env_fn = compat.partial(env_builder_fn, *args, **kwargs)
341
342 cfg = _GetConfig()
343 hr = HooksRunner()
344 hm = hooksmaster.HooksMaster(hook_opcode, hooks_path, nodes,
345 hr.RunLocalHooks, None, env_fn, None,
346 logging.warning, cfg.GetClusterName(),
347 cfg.GetMasterNode())
348 hm.RunPhase(constants.HOOKS_PHASE_PRE)
349 result = fn(*args, **kwargs)
350 hm.RunPhase(constants.HOOKS_PHASE_POST)
351
352 return result
353 return wrapper
354 return decorator
355
356
357 def _BuildMasterIpEnv(master_params, use_external_mip_script=None):
358 """Builds environment variables for master IP hooks.
359
360 @type master_params: L{objects.MasterNetworkParameters}
361 @param master_params: network parameters of the master
362 @type use_external_mip_script: boolean
363 @param use_external_mip_script: whether to use an external master IP
364 address setup script (unused, but necessary per the implementation of the
365 _RunLocalHooks decorator)
366
367 """
368 # pylint: disable=W0613
369 ver = netutils.IPAddress.GetVersionFromAddressFamily(master_params.ip_family)
370 env = {
371 "MASTER_NETDEV": master_params.netdev,
372 "MASTER_IP": master_params.ip,
373 "MASTER_NETMASK": str(master_params.netmask),
374 "CLUSTER_IP_VERSION": str(ver),
375 }
376
377 return env
378
379
380 def _RunMasterSetupScript(master_params, action, use_external_mip_script):
381 """Execute the master IP address setup script.
382
383 @type master_params: L{objects.MasterNetworkParameters}
384 @param master_params: network parameters of the master
385 @type action: string
386 @param action: action to pass to the script. Must be one of
387 L{backend._MASTER_START} or L{backend._MASTER_STOP}
388 @type use_external_mip_script: boolean
389 @param use_external_mip_script: whether to use an external master IP
390 address setup script
391 @raise backend.RPCFail: if there are errors during the execution of the
392 script
393
394 """
395 env = _BuildMasterIpEnv(master_params)
396
397 if use_external_mip_script:
398 setup_script = pathutils.EXTERNAL_MASTER_SETUP_SCRIPT
399 else:
400 setup_script = pathutils.DEFAULT_MASTER_SETUP_SCRIPT
401
402 result = utils.RunCmd([setup_script, action], env=env, reset_env=True)
403
404 if result.failed:
405 _Fail("Failed to %s the master IP. Script return value: %s, output: '%s'" %
406 (action, result.exit_code, result.output), log=True)
407
408
409 @RunLocalHooks(constants.FAKE_OP_MASTER_TURNUP, "master-ip-turnup",
410 _BuildMasterIpEnv)
411 def ActivateMasterIp(master_params, use_external_mip_script):
412 """Activate the IP address of the master daemon.
413
414 @type master_params: L{objects.MasterNetworkParameters}
415 @param master_params: network parameters of the master
416 @type use_external_mip_script: boolean
417 @param use_external_mip_script: whether to use an external master IP
418 address setup script
419 @raise RPCFail: in case of errors during the IP startup
420
421 """
422 _RunMasterSetupScript(master_params, _MASTER_START,
423 use_external_mip_script)
424
425
426 def StartMasterDaemons(no_voting):
427 """Activate local node as master node.
428
429 The function will start the master daemons (ganeti-masterd and ganeti-rapi).
430
431 @type no_voting: boolean
432 @param no_voting: whether to start ganeti-masterd without a node vote
433 but still non-interactively
434 @rtype: None
435
436 """
437
438 if no_voting:
439 masterd_args = "--no-voting --yes-do-it"
440 else:
441 masterd_args = ""
442
443 env = {
444 "EXTRA_MASTERD_ARGS": masterd_args,
445 }
446
447 result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env)
448 if result.failed:
449 msg = "Can't start Ganeti master: %s" % result.output
450 logging.error(msg)
451 _Fail(msg)
452
453
454 @RunLocalHooks(constants.FAKE_OP_MASTER_TURNDOWN, "master-ip-turndown",
455 _BuildMasterIpEnv)
456 def DeactivateMasterIp(master_params, use_external_mip_script):
457 """Deactivate the master IP on this node.
458
459 @type master_params: L{objects.MasterNetworkParameters}
460 @param master_params: network parameters of the master
461 @type use_external_mip_script: boolean
462 @param use_external_mip_script: whether to use an external master IP
463 address setup script
464 @raise RPCFail: in case of errors during the IP turndown
465
466 """
467 _RunMasterSetupScript(master_params, _MASTER_STOP,
468 use_external_mip_script)
469
470
471 def StopMasterDaemons():
472 """Stop the master daemons on this node.
473
474 Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
475
476 @rtype: None
477
478 """
479 # TODO: log and report back to the caller the error failures; we
480 # need to decide in which case we fail the RPC for this
481
482 result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"])
483 if result.failed:
484 logging.error("Could not stop Ganeti master, command %s had exitcode %s"
485 " and error %s",
486 result.cmd, result.exit_code, result.output)
487
488
489 def ChangeMasterNetmask(old_netmask, netmask, master_ip, master_netdev):
490 """Change the netmask of the master IP.
491
492 @param old_netmask: the old value of the netmask
493 @param netmask: the new value of the netmask
494 @param master_ip: the master IP
495 @param master_netdev: the master network device
496
497 """
498 if old_netmask == netmask:
499 return
500
501 if not netutils.IPAddress.Own(master_ip):
502 _Fail("The master IP address is not up, not attempting to change its"
503 " netmask")
504
505 result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
506 "%s/%s" % (master_ip, netmask),
507 "dev", master_netdev, "label",
508 "%s:0" % master_netdev])
509 if result.failed:
510 _Fail("Could not set the new netmask on the master IP address")
511
512 result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
513 "%s/%s" % (master_ip, old_netmask),
514 "dev", master_netdev, "label",
515 "%s:0" % master_netdev])
516 if result.failed:
517 _Fail("Could not bring down the master IP address with the old netmask")
518
519
520 def EtcHostsModify(mode, host, ip):
521 """Modify a host entry in /etc/hosts.
522
523 @param mode: The mode to operate. Either add or remove entry
524 @param host: The host to operate on
525 @param ip: The ip associated with the entry
526
527 """
528 if mode == constants.ETC_HOSTS_ADD:
529 if not ip:
530 RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
531 " present")
532 utils.AddHostToEtcHosts(host, ip)
533 elif mode == constants.ETC_HOSTS_REMOVE:
534 if ip:
535 RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
536 " parameter is present")
537 utils.RemoveHostFromEtcHosts(host)
538 else:
539 RPCFail("Mode not supported")
540
541
542 def LeaveCluster(modify_ssh_setup):
543 """Cleans up and remove the current node.
544
545 This function cleans up and prepares the current node to be removed
546 from the cluster.
547
548 If processing is successful, then it raises an
549 L{errors.QuitGanetiException} which is used as a special case to
550 shutdown the node daemon.
551
552 @param modify_ssh_setup: boolean
553
554 """
555 _CleanDirectory(pathutils.DATA_DIR)
556 _CleanDirectory(pathutils.CRYPTO_KEYS_DIR)
557 JobQueuePurge()
558
559 if modify_ssh_setup:
560 try:
561 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.SSH_LOGIN_USER)
562
563 ssh.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
564
565 utils.RemoveFile(priv_key)
566 utils.RemoveFile(pub_key)
567 except errors.OpExecError:
568 logging.exception("Error while processing ssh files")
569
570 try:
571 utils.RemoveFile(pathutils.CONFD_HMAC_KEY)
572 utils.RemoveFile(pathutils.RAPI_CERT_FILE)
573 utils.RemoveFile(pathutils.SPICE_CERT_FILE)
574 utils.RemoveFile(pathutils.SPICE_CACERT_FILE)
575 utils.RemoveFile(pathutils.NODED_CERT_FILE)
576 except: # pylint: disable=W0702
577 logging.exception("Error while removing cluster secrets")
578
579 utils.StopDaemon(constants.CONFD)
580 utils.StopDaemon(constants.MOND)
581 utils.StopDaemon(constants.KVMD)
582
583 # Raise a custom exception (handled in ganeti-noded)
584 raise errors.QuitGanetiException(True, "Shutdown scheduled")
585
586
587 def _CheckStorageParams(params, num_params):
588 """Performs sanity checks for storage parameters.
589
590 @type params: list
591 @param params: list of storage parameters
592 @type num_params: int
593 @param num_params: expected number of parameters
594
595 """
596 if params is None:
597 raise errors.ProgrammerError("No storage parameters for storage"
598 " reporting is provided.")
599 if not isinstance(params, list):
600 raise errors.ProgrammerError("The storage parameters are not of type"
601 " list: '%s'" % params)
602 if not len(params) == num_params:
603 raise errors.ProgrammerError("Did not receive the expected number of"
604 "storage parameters: expected %s,"
605 " received '%s'" % (num_params, len(params)))
606
607
608 def _CheckLvmStorageParams(params):
609 """Performs sanity check for the 'exclusive storage' flag.
610
611 @see: C{_CheckStorageParams}
612
613 """
614 _CheckStorageParams(params, 1)
615 excl_stor = params[0]
616 if not isinstance(params[0], bool):
617 raise errors.ProgrammerError("Exclusive storage parameter is not"
618 " boolean: '%s'." % excl_stor)
619 return excl_stor
620
621
622 def _GetLvmVgSpaceInfo(name, params):
623 """Wrapper around C{_GetVgInfo} which checks the storage parameters.
624
625 @type name: string
626 @param name: name of the volume group
627 @type params: list
628 @param params: list of storage parameters, which in this case should be
629 containing only one for exclusive storage
630
631 """
632 excl_stor = _CheckLvmStorageParams(params)
633 return _GetVgInfo(name, excl_stor)
634
635
636 def _GetVgInfo(
637 name, excl_stor, info_fn=bdev.LogicalVolume.GetVGInfo):
638 """Retrieves information about a LVM volume group.
639
640 """
641 # TODO: GetVGInfo supports returning information for multiple VGs at once
642 vginfo = info_fn([name], excl_stor)
643 if vginfo:
644 vg_free = int(round(vginfo[0][0], 0))
645 vg_size = int(round(vginfo[0][1], 0))
646 else:
647 vg_free = None
648 vg_size = None
649
650 return {
651 "type": constants.ST_LVM_VG,
652 "name": name,
653 "storage_free": vg_free,
654 "storage_size": vg_size,
655 }
656
657
658 def _GetLvmPvSpaceInfo(name, params):
659 """Wrapper around C{_GetVgSpindlesInfo} with sanity checks.
660
661 @see: C{_GetLvmVgSpaceInfo}
662
663 """
664 excl_stor = _CheckLvmStorageParams(params)
665 return _GetVgSpindlesInfo(name, excl_stor)
666
667
668 def _GetVgSpindlesInfo(
669 name, excl_stor, info_fn=bdev.LogicalVolume.GetVgSpindlesInfo):
670 """Retrieves information about spindles in an LVM volume group.
671
672 @type name: string
673 @param name: VG name
674 @type excl_stor: bool
675 @param excl_stor: exclusive storage
676 @rtype: dict
677 @return: dictionary whose keys are "name", "vg_free", "vg_size" for VG name,
678 free spindles, total spindles respectively
679
680 """
681 if excl_stor:
682 (vg_free, vg_size) = info_fn(name)
683 else:
684 vg_free = 0
685 vg_size = 0
686 return {
687 "type": constants.ST_LVM_PV,
688 "name": name,
689 "storage_free": vg_free,
690 "storage_size": vg_size,
691 }
692
693
694 def _GetHvInfo(name, hvparams, get_hv_fn=hypervisor.GetHypervisor):
695 """Retrieves node information from a hypervisor.
696
697 The information returned depends on the hypervisor. Common items:
698
699 - vg_size is the size of the configured volume group in MiB
700 - vg_free is the free size of the volume group in MiB
701 - memory_dom0 is the memory allocated for domain0 in MiB
702 - memory_free is the currently available (free) ram in MiB
703 - memory_total is the total number of ram in MiB
704 - hv_version: the hypervisor version, if available
705
706 @type hvparams: dict of string
707 @param hvparams: the hypervisor's hvparams
708
709 """
710 return get_hv_fn(name).GetNodeInfo(hvparams=hvparams)
711
712
713 def _GetHvInfoAll(hv_specs, get_hv_fn=hypervisor.GetHypervisor):
714 """Retrieves node information for all hypervisors.
715
716 See C{_GetHvInfo} for information on the output.
717
718 @type hv_specs: list of pairs (string, dict of strings)
719 @param hv_specs: list of pairs of a hypervisor's name and its hvparams
720
721 """
722 if hv_specs is None:
723 return None
724
725 result = []
726 for hvname, hvparams in hv_specs:
727 result.append(_GetHvInfo(hvname, hvparams, get_hv_fn))
728 return result
729
730
731 def _GetNamedNodeInfo(names, fn):
732 """Calls C{fn} for all names in C{names} and returns a dictionary.
733
734 @rtype: None or dict
735
736 """
737 if names is None:
738 return None
739 else:
740 return map(fn, names)
741
742
743 def GetNodeInfo(storage_units, hv_specs):
744 """Gives back a hash with different information about the node.
745
746 @type storage_units: list of tuples (string, string, list)
747 @param storage_units: List of tuples (storage unit, identifier, parameters) to
748 ask for disk space information. In case of lvm-vg, the identifier is
749 the VG name. The parameters can contain additional, storage-type-specific
750 parameters, for example exclusive storage for lvm storage.
751 @type hv_specs: list of pairs (string, dict of strings)
752 @param hv_specs: list of pairs of a hypervisor's name and its hvparams
753 @rtype: tuple; (string, None/dict, None/dict)
754 @return: Tuple containing boot ID, volume group information and hypervisor
755 information
756
757 """
758 bootid = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
759 storage_info = _GetNamedNodeInfo(
760 storage_units,
761 (lambda (storage_type, storage_key, storage_params):
762 _ApplyStorageInfoFunction(storage_type, storage_key, storage_params)))
763 hv_info = _GetHvInfoAll(hv_specs)
764 return (bootid, storage_info, hv_info)
765
766
767 def _GetFileStorageSpaceInfo(path, params):
768 """Wrapper around filestorage.GetSpaceInfo.
769
770 The purpose of this wrapper is to call filestorage.GetFileStorageSpaceInfo
771 and ignore the *args parameter to not leak it into the filestorage
772 module's code.
773
774 @see: C{filestorage.GetFileStorageSpaceInfo} for description of the
775 parameters.
776
777 """
778 _CheckStorageParams(params, 0)
779 return filestorage.GetFileStorageSpaceInfo(path)
780
781
782 # FIXME: implement storage reporting for all missing storage types.
783 _STORAGE_TYPE_INFO_FN = {
784 constants.ST_BLOCK: None,
785 constants.ST_DISKLESS: None,
786 constants.ST_EXT: None,
787 constants.ST_FILE: _GetFileStorageSpaceInfo,
788 constants.ST_LVM_PV: _GetLvmPvSpaceInfo,
789 constants.ST_LVM_VG: _GetLvmVgSpaceInfo,
790 constants.ST_SHARED_FILE: None,
791 constants.ST_GLUSTER: None,
792 constants.ST_RADOS: None,
793 }
794
795
796 def _ApplyStorageInfoFunction(storage_type, storage_key, *args):
797 """Looks up and applies the correct function to calculate free and total
798 storage for the given storage type.
799
800 @type storage_type: string
801 @param storage_type: the storage type for which the storage shall be reported.
802 @type storage_key: string
803 @param storage_key: identifier of a storage unit, e.g. the volume group name
804 of an LVM storage unit
805 @type args: any
806 @param args: various parameters that can be used for storage reporting. These
807 parameters and their semantics vary from storage type to storage type and
808 are just propagated in this function.
809 @return: the results of the application of the storage space function (see
810 _STORAGE_TYPE_INFO_FN) if storage space reporting is implemented for that
811 storage type
812 @raises NotImplementedError: for storage types who don't support space
813 reporting yet
814 """
815 fn = _STORAGE_TYPE_INFO_FN[storage_type]
816 if fn is not None:
817 return fn(storage_key, *args)
818 else:
819 raise NotImplementedError
820
821
822 def _CheckExclusivePvs(pvi_list):
823 """Check that PVs are not shared among LVs
824
825 @type pvi_list: list of L{objects.LvmPvInfo} objects
826 @param pvi_list: information about the PVs
827
828 @rtype: list of tuples (string, list of strings)
829 @return: offending volumes, as tuples: (pv_name, [lv1_name, lv2_name...])
830
831 """
832 res = []
833 for pvi in pvi_list:
834 if len(pvi.lv_list) > 1:
835 res.append((pvi.name, pvi.lv_list))
836 return res
837
838
839 def _VerifyHypervisors(what, vm_capable, result, all_hvparams,
840 get_hv_fn=hypervisor.GetHypervisor):
841 """Verifies the hypervisor. Appends the results to the 'results' list.
842
843 @type what: C{dict}
844 @param what: a dictionary of things to check
845 @type vm_capable: boolean
846 @param vm_capable: whether or not this node is vm capable
847 @type result: dict
848 @param result: dictionary of verification results; results of the
849 verifications in this function will be added here
850 @type all_hvparams: dict of dict of string
851 @param all_hvparams: dictionary mapping hypervisor names to hvparams
852 @type get_hv_fn: function
853 @param get_hv_fn: function to retrieve the hypervisor, to improve testability
854
855 """
856 if not vm_capable:
857 return
858
859 if constants.NV_HYPERVISOR in what:
860 result[constants.NV_HYPERVISOR] = {}
861 for hv_name in what[constants.NV_HYPERVISOR]:
862 hvparams = all_hvparams[hv_name]
863 try:
864 val = get_hv_fn(hv_name).Verify(hvparams=hvparams)
865 except errors.HypervisorError, err:
866 val = "Error while checking hypervisor: %s" % str(err)
867 result[constants.NV_HYPERVISOR][hv_name] = val
868
869
870 def _VerifyHvparams(what, vm_capable, result,
871 get_hv_fn=hypervisor.GetHypervisor):
872 """Verifies the hvparams. Appends the results to the 'results' list.
873
874 @type what: C{dict}
875 @param what: a dictionary of things to check
876 @type vm_capable: boolean
877 @param vm_capable: whether or not this node is vm capable
878 @type result: dict
879 @param result: dictionary of verification results; results of the
880 verifications in this function will be added here
881 @type get_hv_fn: function
882 @param get_hv_fn: function to retrieve the hypervisor, to improve testability
883
884 """
885 if not vm_capable:
886 return
887
888 if constants.NV_HVPARAMS in what:
889 result[constants.NV_HVPARAMS] = []
890 for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
891 try:
892 logging.info("Validating hv %s, %s", hv_name, hvparms)
893 get_hv_fn(hv_name).ValidateParameters(hvparms)
894 except errors.HypervisorError, err:
895 result[constants.NV_HVPARAMS].append((source, hv_name, str(err)))
896
897
898 def _VerifyInstanceList(what, vm_capable, result, all_hvparams):
899 """Verifies the instance list.
900
901 @type what: C{dict}
902 @param what: a dictionary of things to check
903 @type vm_capable: boolean
904 @param vm_capable: whether or not this node is vm capable
905 @type result: dict
906 @param result: dictionary of verification results; results of the
907 verifications in this function will be added here
908 @type all_hvparams: dict of dict of string
909 @param all_hvparams: dictionary mapping hypervisor names to hvparams
910
911 """
912 if constants.NV_INSTANCELIST in what and vm_capable:
913 # GetInstanceList can fail
914 try:
915 val = GetInstanceList(what[constants.NV_INSTANCELIST],
916 all_hvparams=all_hvparams)
917 except RPCFail, err:
918 val = str(err)
919 result[constants.NV_INSTANCELIST] = val
920
921
922 def _VerifyNodeInfo(what, vm_capable, result, all_hvparams):
923 """Verifies the node info.
924
925 @type what: C{dict}
926 @param what: a dictionary of things to check
927 @type vm_capable: boolean
928 @param vm_capable: whether or not this node is vm capable
929 @type result: dict
930 @param result: dictionary of verification results; results of the
931 verifications in this function will be added here
932 @type all_hvparams: dict of dict of string
933 @param all_hvparams: dictionary mapping hypervisor names to hvparams
934
935 """
936 if constants.NV_HVINFO in what and vm_capable:
937 hvname = what[constants.NV_HVINFO]
938 hyper = hypervisor.GetHypervisor(hvname)
939 hvparams = all_hvparams[hvname]
940 result[constants.NV_HVINFO] = hyper.GetNodeInfo(hvparams=hvparams)
941
942
943 def _VerifyClientCertificate(cert_file=pathutils.NODED_CLIENT_CERT_FILE):
944 """Verify the existance and validity of the client SSL certificate.
945
946 Also, verify that the client certificate is not self-signed. Self-
947 signed client certificates stem from Ganeti versions 2.12.0 - 2.12.4
948 and should be replaced by client certificates signed by the server
949 certificate. Hence we output a warning when we encounter a self-signed
950 one.
951
952 """
953 create_cert_cmd = "gnt-cluster renew-crypto --new-node-certificates"
954 if not os.path.exists(cert_file):
955 return (constants.CV_ERROR,
956 "The client certificate does not exist. Run '%s' to create"
957 " client certificates for all nodes." % create_cert_cmd)
958
959 (errcode, msg) = utils.VerifyCertificate(cert_file)
960 if errcode is not None:
961 return (errcode, msg)
962
963 (errcode, msg) = utils.IsCertificateSelfSigned(cert_file)
964 if errcode is not None:
965 return (errcode, msg)
966
967 # if everything is fine, we return the digest to be compared to the config
968 return (None, utils.GetCertificateDigest(cert_filename=cert_file))
969
970
971 def _VerifySshSetup(node_status_list, my_name,
972 pub_key_file=pathutils.SSH_PUB_KEYS):
973 """Verifies the state of the SSH key files.
974
975 @type node_status_list: list of tuples
976 @param node_status_list: list of nodes of the cluster associated with a
977 couple of flags: (uuid, name, is_master_candidate,
978 is_potential_master_candidate, online)
979 @type my_name: str
980 @param my_name: name of this node
981 @type pub_key_file: str
982 @param pub_key_file: filename of the public key file
983
984 """
985 if node_status_list is None:
986 return ["No node list to check against the pub_key_file received."]
987
988 my_status_list = [(my_uuid, name, mc, pot_mc, online) for
989 (my_uuid, name, mc, pot_mc, online)
990 in node_status_list if name == my_name]
991 if len(my_status_list) == 0:
992 return ["Cannot find node information for node '%s'." % my_name]
993 (my_uuid, _, _, potential_master_candidate, online) = \
994 my_status_list[0]
995
996 result = []
997
998 if not os.path.exists(pub_key_file):
999 result.append("The public key file '%s' does not exist. Consider running"
1000 " 'gnt-cluster renew-crypto --new-ssh-keys"
1001 " [--no-ssh-key-check]' to fix this." % pub_key_file)
1002 return result
1003
1004 pot_mc_uuids = [uuid for (uuid, _, _, _, _) in node_status_list]
1005 offline_nodes = [uuid for (uuid, _, _, _, online) in node_status_list
1006 if not online]
1007 pub_keys = ssh.QueryPubKeyFile(None)
1008
1009 if potential_master_candidate:
1010 # Check that the set of potential master candidates matches the
1011 # public key file
1012 pub_uuids_set = set(pub_keys.keys()) - set(offline_nodes)
1013 pot_mc_uuids_set = set(pot_mc_uuids) - set(offline_nodes)
1014 missing_uuids = set([])
1015 if pub_uuids_set != pot_mc_uuids_set:
1016 unknown_uuids = pub_uuids_set - pot_mc_uuids_set
1017 if unknown_uuids:
1018 result.append("The following node UUIDs are listed in the public key"
1019 " file on node '%s', but are not potential master"
1020 " candidates: %s."
1021 % (my_name, ", ".join(list(unknown_uuids))))
1022 missing_uuids = pot_mc_uuids_set - pub_uuids_set
1023 if missing_uuids:
1024 result.append("The following node UUIDs of potential master candidates"
1025 " are missing in the public key file on node %s: %s."
1026 % (my_name, ", ".join(list(missing_uuids))))
1027
1028 (_, key_files) = \
1029 ssh.GetAllUserFiles(constants.SSH_LOGIN_USER, mkdir=False, dircheck=False)
1030 (_, dsa_pub_key_filename) = key_files[constants.SSHK_DSA]
1031
1032 my_keys = pub_keys[my_uuid]
1033
1034 dsa_pub_key = utils.ReadFile(dsa_pub_key_filename)
1035 if dsa_pub_key.strip() not in my_keys:
1036 result.append("The dsa key of node %s does not match this node's key"
1037 " in the pub key file." % (my_name))
1038 if len(my_keys) != 1:
1039 result.append("There is more than one key for node %s in the public key"
1040 " file." % my_name)
1041 else:
1042 if len(pub_keys.keys()) > 0:
1043 result.append("The public key file of node '%s' is not empty, although"
1044 " the node is not a potential master candidate."
1045 % my_name)
1046
1047 # Check that all master candidate keys are in the authorized_keys file
1048 (auth_key_file, _) = \
1049 ssh.GetAllUserFiles(constants.SSH_LOGIN_USER, mkdir=False, dircheck=False)
1050 for (uuid, name, mc, _, online) in node_status_list:
1051 if not online:
1052 continue
1053 if uuid in missing_uuids:
1054 continue
1055 if mc:
1056 for key in pub_keys[uuid]:
1057 if not ssh.HasAuthorizedKey(auth_key_file, key):
1058 result.append("A SSH key of master candidate '%s' (UUID: '%s') is"
1059 " not in the 'authorized_keys' file of node '%s'."
1060 % (name, uuid, my_name))
1061 else:
1062 for key in pub_keys[uuid]:
1063 if name != my_name and ssh.HasAuthorizedKey(auth_key_file, key):
1064 result.append("A SSH key of normal node '%s' (UUID: '%s') is in the"
1065 " 'authorized_keys' file of node '%s'."
1066 % (name, uuid, my_name))
1067 if name == my_name and not ssh.HasAuthorizedKey(auth_key_file, key):
1068 result.append("A SSH key of normal node '%s' (UUID: '%s') is not"
1069 " in the 'authorized_keys' file of itself."
1070 % (my_name, uuid))
1071
1072 return result
1073
1074
1075 def _VerifySshClutter(node_status_list, my_name):
1076 """Verifies that the 'authorized_keys' files are not cluttered up.
1077
1078 @type node_status_list: list of tuples
1079 @param node_status_list: list of nodes of the cluster associated with a
1080 couple of flags: (uuid, name, is_master_candidate,
1081 is_potential_master_candidate, online)
1082 @type my_name: str
1083 @param my_name: name of this node
1084
1085 """
1086 result = []
1087 (auth_key_file, _) = \
1088 ssh.GetAllUserFiles(constants.SSH_LOGIN_USER, mkdir=False, dircheck=False)
1089 node_names = [name for (_, name, _, _) in node_status_list]
1090 multiple_occurrences = ssh.CheckForMultipleKeys(auth_key_file, node_names)
1091 if multiple_occurrences:
1092 msg = "There are hosts which have more than one SSH key stored for the" \
1093 " same user in the 'authorized_keys' file of node %s. This can be" \
1094 " due to an unsuccessful operation which cluttered up the" \
1095 " 'authorized_keys' file. We recommend to clean this up manually. " \
1096 % my_name
1097 for host, occ in multiple_occurrences.items():
1098 msg += "Entry for '%s' in lines %s. " % (host, utils.CommaJoin(occ))
1099 result.append(msg)
1100
1101 return result
1102
1103
1104 def VerifyNode(what, cluster_name, all_hvparams, node_groups, groups_cfg):
1105 """Verify the status of the local node.
1106
1107 Based on the input L{what} parameter, various checks are done on the
1108 local node.
1109
1110 If the I{filelist} key is present, this list of
1111 files is checksummed and the file/checksum pairs are returned.
1112
1113 If the I{nodelist} key is present, we check that we have
1114 connectivity via ssh with the target nodes (and check the hostname
1115 report).
1116
1117 If the I{node-net-test} key is present, we check that we have
1118 connectivity to the given nodes via both primary IP and, if
1119 applicable, secondary IPs.
1120
1121 @type what: C{dict}
1122 @param what: a dictionary of things to check:
1123 - filelist: list of files for which to compute checksums
1124 - nodelist: list of nodes we should check ssh communication with
1125 - node-net-test: list of nodes we should check node daemon port
1126 connectivity with
1127 - hypervisor: list with hypervisors to run the verify for
1128 @type cluster_name: string
1129 @param cluster_name: the cluster's name
1130 @type all_hvparams: dict of dict of strings
1131 @param all_hvparams: a dictionary mapping hypervisor names to hvparams
1132 @type node_groups: a dict of strings
1133 @param node_groups: node _names_ mapped to their group uuids (it's enough to
1134 have only those nodes that are in `what["nodelist"]`)
1135 @type groups_cfg: a dict of dict of strings
1136 @param groups_cfg: a dictionary mapping group uuids to their configuration
1137 @rtype: dict
1138 @return: a dictionary with the same keys as the input dict, and
1139 values representing the result of the checks
1140
1141 """
1142 result = {}
1143 my_name = netutils.Hostname.GetSysName()
1144 port = netutils.GetDaemonPort(constants.NODED)
1145 vm_capable = my_name not in what.get(constants.NV_NONVMNODES, [])
1146
1147 _VerifyHypervisors(what, vm_capable, result, all_hvparams)
1148 _VerifyHvparams(what, vm_capable, result)
1149
1150 if constants.NV_FILELIST in what:
1151 fingerprints = utils.FingerprintFiles(map(vcluster.LocalizeVirtualPath,
1152 what[constants.NV_FILELIST]))
1153 result[constants.NV_FILELIST] = \
1154 dict((vcluster.MakeVirtualPath(key), value)
1155 for (key, value) in fingerprints.items())
1156
1157 if constants.NV_CLIENT_CERT in what:
1158 result[constants.NV_CLIENT_CERT] = _VerifyClientCertificate()
1159
1160 if constants.NV_SSH_SETUP in what:
1161 result[constants.NV_SSH_SETUP] = \
1162 _VerifySshSetup(what[constants.NV_SSH_SETUP], my_name)
1163 if constants.NV_SSH_CLUTTER in what:
1164 result[constants.NV_SSH_CLUTTER] = \
1165 _VerifySshClutter(what[constants.NV_SSH_SETUP], my_name)
1166
1167 if constants.NV_NODELIST in what:
1168 (nodes, bynode, mcs) = what[constants.NV_NODELIST]
1169
1170 # Add nodes from other groups (different for each node)
1171 try:
1172 nodes.extend(bynode[my_name])
1173 except KeyError:
1174 pass
1175
1176 # Use a random order
1177 random.shuffle(nodes)
1178
1179 # Try to contact all nodes
1180 val = {}
1181 for node in nodes:
1182 params = groups_cfg.get(node_groups.get(node))
1183 ssh_port = params["ndparams"].get(constants.ND_SSH_PORT)
1184 logging.debug("Ssh port %s (None = default) for node %s",
1185 str(ssh_port), node)
1186
1187 # We only test if master candidates can communicate to other nodes.
1188 # We cannot test if normal nodes cannot communicate with other nodes,
1189 # because the administrator might have installed additional SSH keys,
1190 # over which Ganeti has no power.
1191 if my_name in mcs:
1192 success, message = _GetSshRunner(cluster_name). \
1193 VerifyNodeHostname(node, ssh_port)
1194 if not success:
1195 val[node] = message
1196
1197 result[constants.NV_NODELIST] = val
1198
1199 if constants.NV_NODENETTEST in what:
1200 result[constants.NV_NODENETTEST] = tmp = {}
1201 my_pip = my_sip = None
1202 for name, pip, sip in what[constants.NV_NODENETTEST]:
1203 if name == my_name:
1204 my_pip = pip
1205 my_sip = sip
1206 break
1207 if not my_pip:
1208 tmp[my_name] = ("Can't find my own primary/secondary IP"
1209 " in the node list")
1210 else:
1211 for name, pip, sip in what[constants.NV_NODENETTEST]:
1212 fail = []
1213 if not netutils.TcpPing(pip, port, source=my_pip):
1214 fail.append("primary")
1215 if sip != pip:
1216 if not netutils.TcpPing(sip, port, source=my_sip):
1217 fail.append("secondary")
1218 if fail:
1219 tmp[name] = ("failure using the %s interface(s)" %
1220 " and ".join(fail))
1221
1222 if constants.NV_MASTERIP in what:
1223 # FIXME: add checks on incoming data structures (here and in the
1224 # rest of the function)
1225 master_name, master_ip = what[constants.NV_MASTERIP]
1226 if master_name == my_name:
1227 source = constants.IP4_ADDRESS_LOCALHOST
1228 else:
1229 source = None
1230 result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
1231 source=source)
1232
1233 if constants.NV_USERSCRIPTS in what:
1234 result[constants.NV_USERSCRIPTS] = \
1235 [script for script in what[constants.NV_USERSCRIPTS]
1236 if not utils.IsExecutable(script)]
1237
1238 if constants.NV_OOB_PATHS in what:
1239 result[constants.NV_OOB_PATHS] = tmp = []
1240 for path in what[constants.NV_OOB_PATHS]:
1241 try:
1242 st = os.stat(path)
1243 except OSError, err:
1244 tmp.append("error stating out of band helper: %s" % err)
1245 else:
1246 if stat.S_ISREG(st.st_mode):
1247 if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
1248 tmp.append(None)
1249 else:
1250 tmp.append("out of band helper %s is not executable" % path)
1251 else:
1252 tmp.append("out of band helper %s is not a file" % path)
1253
1254 if constants.NV_LVLIST in what and vm_capable:
1255 try:
1256 val = GetVolumeList(utils.ListVolumeGroups().keys())
1257 except RPCFail, err:
1258 val = str(err)
1259 result[constants.NV_LVLIST] = val
1260
1261 _VerifyInstanceList(what, vm_capable, result, all_hvparams)
1262
1263 if constants.NV_VGLIST in what and vm_capable:
1264 result[constants.NV_VGLIST] = utils.ListVolumeGroups()
1265
1266 if constants.NV_PVLIST in what and vm_capable:
1267 check_exclusive_pvs = constants.NV_EXCLUSIVEPVS in what
1268 val = bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
1269 filter_allocatable=False,
1270 include_lvs=check_exclusive_pvs)
1271 if check_exclusive_pvs:
1272 result[constants.NV_EXCLUSIVEPVS] = _CheckExclusivePvs(val)
1273 for pvi in val:
1274 # Avoid sending useless data on the wire
1275 pvi.lv_list = []
1276 result[constants.NV_PVLIST] = map(objects.LvmPvInfo.ToDict, val)
1277
1278 if constants.NV_VERSION in what:
1279 result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
1280 constants.RELEASE_VERSION)
1281
1282 _VerifyNodeInfo(what, vm_capable, result, all_hvparams)
1283
1284 if constants.NV_DRBDVERSION in what and vm_capable:
1285 try:
1286 drbd_version = DRBD8.GetProcInfo().GetVersionString()
1287 except errors.BlockDeviceError, err:
1288 logging.warning("Can't get DRBD version", exc_info=True)
1289 drbd_version = str(err)
1290 result[constants.NV_DRBDVERSION] = drbd_version
1291
1292 if constants.NV_DRBDLIST in what and vm_capable:
1293 try:
1294 used_minors = drbd.DRBD8.GetUsedDevs()
1295 except errors.BlockDeviceError, err:
1296 logging.warning("Can't get used minors list", exc_info=True)
1297 used_minors = str(err)
1298 result[constants.NV_DRBDLIST] = used_minors
1299
1300 if constants.NV_DRBDHELPER in what and vm_capable:
1301 status = True
1302 try:
1303 payload = drbd.DRBD8.GetUsermodeHelper()
1304 except errors.BlockDeviceError, err:
1305 logging.error("Can't get DRBD usermode helper: %s", str(err))
1306 status = False
1307 payload = str(err)
1308 result[constants.NV_DRBDHELPER] = (status, payload)
1309
1310 if constants.NV_NODESETUP in what:
1311 result[constants.NV_NODESETUP] = tmpr = []
1312 if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
1313 tmpr.append("The sysfs filesytem doesn't seem to be mounted"
1314 " under /sys, missing required directories /sys/block"
1315 " and /sys/class/net")
1316 if (not os.path.isdir("/proc/sys") or
1317 not os.path.isfile("/proc/sysrq-trigger")):
1318 tmpr.append("The procfs filesystem doesn't seem to be mounted"
1319 " under /proc, missing required directory /proc/sys and"
1320 " the file /proc/sysrq-trigger")
1321
1322 if constants.NV_TIME in what:
1323 result[constants.NV_TIME] = utils.SplitTime(time.time())
1324
1325 if constants.NV_OSLIST in what and vm_capable:
1326 result[constants.NV_OSLIST] = DiagnoseOS()
1327
1328 if constants.NV_BRIDGES in what and vm_capable:
1329 result[constants.NV_BRIDGES] = [bridge
1330 for bridge in what[constants.NV_BRIDGES]
1331 if not utils.BridgeExists(bridge)]
1332
1333 if what.get(constants.NV_ACCEPTED_STORAGE_PATHS) == my_name:
1334 result[constants.NV_ACCEPTED_STORAGE_PATHS] = \
1335 filestorage.ComputeWrongFileStoragePaths()
1336
1337 if what.get(constants.NV_FILE_STORAGE_PATH):
1338 pathresult = filestorage.CheckFileStoragePath(
1339 what[constants.NV_FILE_STORAGE_PATH])
1340 if pathresult:
1341 result[constants.NV_FILE_STORAGE_PATH] = pathresult
1342
1343 if what.get(constants.NV_SHARED_FILE_STORAGE_PATH):
1344 pathresult = filestorage.CheckFileStoragePath(
1345 what[constants.NV_SHARED_FILE_STORAGE_PATH])
1346 if pathresult:
1347 result[constants.NV_SHARED_FILE_STORAGE_PATH] = pathresult
1348
1349 return result
1350
1351
1352 def GetCryptoTokens(token_requests):
1353 """Perform actions on the node's cryptographic tokens.
1354
1355 Token types can be 'ssl' or 'ssh'. So far only some actions are implemented
1356 for 'ssl'. Action 'get' returns the digest of the public client ssl
1357 certificate. Action 'create' creates a new client certificate and private key
1358 and also returns the digest of the certificate. The third parameter of a
1359 token request are optional parameters for the actions, so far only the
1360 filename is supported.
1361
1362 @type token_requests: list of tuples of (string, string, dict), where the
1363 first string is in constants.CRYPTO_TYPES, the second in
1364 constants.CRYPTO_ACTIONS. The third parameter is a dictionary of string
1365 to string.
1366 @param token_requests: list of requests of cryptographic tokens and actions
1367 to perform on them. The actions come with a dictionary of options.
1368 @rtype: list of tuples (string, string)
1369 @return: list of tuples of the token type and the public crypto token
1370
1371 """
1372 tokens = []
1373 for (token_type, action, _) in token_requests:
1374 if token_type not in constants.CRYPTO_TYPES:
1375 raise errors.ProgrammerError("Token type '%s' not supported." %
1376 token_type)
1377 if action not in constants.CRYPTO_ACTIONS:
1378 raise errors.ProgrammerError("Action '%s' is not supported." %
1379 action)
1380 if token_type == constants.CRYPTO_TYPE_SSL_DIGEST:
1381 tokens.append((token_type,
1382 utils.GetCertificateDigest()))
1383 return tokens
1384
1385
1386 def EnsureDaemon(daemon_name, run):
1387 """Ensures the given daemon is running or stopped.
1388
1389 @type daemon_name: string
1390 @param daemon_name: name of the daemon (e.g., constants.KVMD)
1391
1392 @type run: bool
1393 @param run: whether to start or stop the daemon
1394
1395 @rtype: bool
1396 @return: 'True' if daemon successfully started/stopped,
1397 'False' otherwise
1398
1399 """
1400 allowed_daemons = [constants.KVMD]
1401
1402 if daemon_name not in allowed_daemons:
1403 fn = lambda _: False
1404 elif run:
1405 fn = utils.EnsureDaemon
1406 else:
1407 fn = utils.StopDaemon
1408
1409 return fn(daemon_name)
1410
1411
1412 def _InitSshUpdateData(data, noded_cert_file, ssconf_store):
1413 (_, noded_cert) = \
1414 utils.ExtractX509Certificate(utils.ReadFile(noded_cert_file))
1415 data[constants.SSHS_NODE_DAEMON_CERTIFICATE] = noded_cert
1416
1417 cluster_name = ssconf_store.GetClusterName()
1418 data[constants.SSHS_CLUSTER_NAME] = cluster_name
1419
1420
1421 def AddNodeSshKey(node_uuid, node_name,
1422 potential_master_candidates,
1423 to_authorized_keys=False,
1424 to_public_keys=False,
1425 get_public_keys=False,
1426 pub_key_file=pathutils.SSH_PUB_KEYS,
1427 ssconf_store=None,
1428 noded_cert_file=pathutils.NODED_CERT_FILE,
1429 run_cmd_fn=ssh.RunSshCmdWithStdin):
1430 """Distributes a node's public SSH key across the cluster.
1431
1432 Note that this function should only be executed on the master node, which
1433 then will copy the new node's key to all nodes in the cluster via SSH.
1434
1435 Also note: at least one of the flags C{to_authorized_keys},
1436 C{to_public_keys}, and C{get_public_keys} has to be set to C{True} for
1437 the function to actually perform any actions.
1438
1439 @type node_uuid: str
1440 @param node_uuid: the UUID of the node whose key is added
1441 @type node_name: str
1442 @param node_name: the name of the node whose key is added
1443 @type potential_master_candidates: list of str
1444 @param potential_master_candidates: list of node names of potential master
1445 candidates; this should match the list of uuids in the public key file
1446 @type to_authorized_keys: boolean
1447 @param to_authorized_keys: whether the key should be added to the
1448 C{authorized_keys} file of all nodes
1449 @type to_public_keys: boolean
1450 @param to_public_keys: whether the keys should be added to the public key file
1451 @type get_public_keys: boolean
1452 @param get_public_keys: whether the node should add the clusters' public keys
1453 to its {ganeti_pub_keys} file
1454
1455 """
1456 node_list = [SshAddNodeInfo(name=node_name, uuid=node_uuid,
1457 to_authorized_keys=to_authorized_keys,
1458 to_public_keys=to_public_keys,
1459 get_public_keys=get_public_keys)]
1460 return AddNodeSshKeyBulk(node_list,
1461 potential_master_candidates,
1462 pub_key_file=pub_key_file,
1463 ssconf_store=ssconf_store,
1464 noded_cert_file=noded_cert_file,
1465 run_cmd_fn=run_cmd_fn)
1466
1467
1468 # Node info named tuple specifically for the use with AddNodeSshKeyBulk
1469 SshAddNodeInfo = collections.namedtuple(
1470 "SshAddNodeInfo",
1471 ["uuid",
1472 "name",
1473 "to_authorized_keys",
1474 "to_public_keys",
1475 "get_public_keys"])
1476
1477
1478 def AddNodeSshKeyBulk(node_list,
1479 potential_master_candidates,
1480 pub_key_file=pathutils.SSH_PUB_KEYS,
1481 ssconf_store=None,
1482 noded_cert_file=pathutils.NODED_CERT_FILE,
1483 run_cmd_fn=ssh.RunSshCmdWithStdin):
1484 """Distributes a node's public SSH key across the cluster.
1485
1486 Note that this function should only be executed on the master node, which
1487 then will copy the new node's key to all nodes in the cluster via SSH.
1488
1489 Also note: at least one of the flags C{to_authorized_keys},
1490 C{to_public_keys}, and C{get_public_keys} has to be set to C{True} for
1491 the function to actually perform any actions.
1492
1493 @type node_list: list of SshAddNodeInfo tuples
1494 @param node_list: list of tuples containing the necessary node information for
1495 adding their keys
1496 @type potential_master_candidates: list of str
1497 @param potential_master_candidates: list of node names of potential master
1498 candidates; this should match the list of uuids in the public key file
1499
1500 """
1501 # whether there are any keys to be added or retrieved at all
1502 to_authorized_keys = any([node_info.to_authorized_keys for node_info in
1503 node_list])
1504 to_public_keys = any([node_info.to_public_keys for node_info in
1505 node_list])
1506 get_public_keys = any([node_info.get_public_keys for node_info in
1507 node_list])
1508
1509 # assure that at least one of those flags is true, as the function would
1510 # not do anything otherwise
1511 assert (to_authorized_keys or to_public_keys or get_public_keys)
1512
1513 if not ssconf_store:
1514 ssconf_store = ssconf.SimpleStore()
1515
1516 for node_info in node_list:
1517 # Check and fix sanity of key file
1518 keys_by_name = ssh.QueryPubKeyFile([node_info.name], key_file=pub_key_file)
1519 keys_by_uuid = ssh.QueryPubKeyFile([node_info.uuid], key_file=pub_key_file)
1520
1521 if (not keys_by_name or node_info.name not in keys_by_name) \
1522 and (not keys_by_uuid or node_info.uuid not in keys_by_uuid):
1523 raise errors.SshUpdateError(
1524 "No keys found for the new node '%s' (UUID %s) in the list of public"
1525 " SSH keys, neither for the name or the UUID" %
1526 (node_info.name, node_info.uuid))
1527 else:
1528 if node_info.name in keys_by_name:
1529 # Replace the name by UUID in the file as the name should only be used
1530 # temporarily
1531 ssh.ReplaceNameByUuid(node_info.uuid, node_info.name,
1532 error_fn=errors.SshUpdateError,
1533 key_file=pub_key_file)
1534
1535 # Retrieve updated map of UUIDs to keys
1536 keys_by_uuid = ssh.QueryPubKeyFile(
1537 [node_info.uuid for node_info in node_list], key_file=pub_key_file)
1538
1539 # Update the master node's key files
1540 (auth_key_file, _) = \
1541 ssh.GetAllUserFiles(constants.SSH_LOGIN_USER, mkdir=False, dircheck=False)
1542 for node_info in node_list:
1543 if node_info.to_authorized_keys:
1544 ssh.AddAuthorizedKeys(auth_key_file, keys_by_uuid[node_info.uuid])
1545
1546 base_data = {}
1547 _InitSshUpdateData(base_data, noded_cert_file, ssconf_store)
1548 cluster_name = base_data[constants.SSHS_CLUSTER_NAME]
1549
1550 ssh_port_map = ssconf_store.GetSshPortMap()
1551
1552 # Update the target nodes themselves
1553 for node_info in node_list:
1554 logging.debug("Updating SSH key files of target node '%s'.", node_info.name)
1555 if node_info.get_public_keys:
1556 node_data = {}
1557 _InitSshUpdateData(node_data, noded_cert_file, ssconf_store)
1558 all_keys = ssh.QueryPubKeyFile(None, key_file=pub_key_file)
1559 node_data[constants.SSHS_SSH_PUBLIC_KEYS] = \
1560 (constants.SSHS_OVERRIDE, all_keys)
1561
1562 try:
1563 utils.RetryByNumberOfTimes(
1564 constants.SSHS_MAX_RETRIES,
1565 errors.SshUpdateError,
1566 run_cmd_fn, cluster_name, node_info.name, pathutils.SSH_UPDATE,
1567 ssh_port_map.get(node_info.name), node_data,
1568 debug=False, verbose=False, use_cluster_key=False,
1569 ask_key=False, strict_host_check=False)
1570 except errors.SshUpdateError as e:
1571 # Clean up the master's public key file if adding key fails
1572 if node_info.to_public_keys:
1573 ssh.RemovePublicKey(node_info.uuid)
1574 raise e
1575
1576 # Update all nodes except master and the target nodes
1577 keys_by_uuid_auth = ssh.QueryPubKeyFile(
1578 [node_info.uuid for node_info in node_list
1579 if node_info.to_authorized_keys],
1580 key_file=pub_key_file)
1581 if to_authorized_keys:
1582 base_data[constants.SSHS_SSH_AUTHORIZED_KEYS] = \
1583 (constants.SSHS_ADD, keys_by_uuid_auth)
1584
1585 pot_mc_data = copy.deepcopy(base_data)
1586 keys_by_uuid_pub = ssh.QueryPubKeyFile(
1587 [node_info.uuid for node_info in node_list
1588 if node_info.to_public_keys],
1589 key_file=pub_key_file)
1590 if to_public_keys:
1591 pot_mc_data[constants.SSHS_SSH_PUBLIC_KEYS] = \
1592 (constants.SSHS_REPLACE_OR_ADD, keys_by_uuid_pub)
1593
1594 all_nodes = ssconf_store.GetNodeList()
1595 master_node = ssconf_store.GetMasterNode()
1596 online_nodes = ssconf_store.GetOnlineNodeList()
1597
1598 node_errors = []
1599 for node in all_nodes:
1600 if node == master_node:
1601 logging.debug("Skipping master node '%s'.", master_node)
1602 continue
1603 if node not in online_nodes:
1604 logging.debug("Skipping offline node '%s'.", node)
1605 continue
1606 if node in potential_master_candidates:
1607 logging.debug("Updating SSH key files of node '%s'.", node)
1608 try:
1609 utils.RetryByNumberOfTimes(
1610 constants.SSHS_MAX_RETRIES,
1611 errors.SshUpdateError,
1612 run_cmd_fn, cluster_name, node, pathutils.SSH_UPDATE,
1613 ssh_port_map.get(node), pot_mc_data,
1614 debug=False, verbose=False, use_cluster_key=False,
1615 ask_key=False, strict_host_check=False)
1616 except errors.SshUpdateError as last_exception:
1617 error_msg = ("When adding the key of node '%s', updating SSH key"
1618 " files of node '%s' failed after %s retries."
1619 " Not trying again. Last error was: %s." %
1620 (node, node_info.name, constants.SSHS_MAX_RETRIES,
1621 last_exception))
1622 node_errors.append((node, error_msg))
1623 # We only log the error and don't throw an exception, because
1624 # one unreachable node shall not abort the entire procedure.
1625 logging.error(error_msg)
1626
1627 else:
1628 if to_authorized_keys:
1629 run_cmd_fn(cluster_name, node, pathutils.SSH_UPDATE,
1630 ssh_port_map.get(node), base_data,
1631 debug=False, verbose=False, use_cluster_key=False,
1632 ask_key=False, strict_host_check=False)
1633
1634 return node_errors
1635
1636
1637 def RemoveNodeSshKey(node_uuid, node_name,
1638 master_candidate_uuids,
1639 potential_master_candidates,
1640 master_uuid=None,
1641 keys_to_remove=None,
1642 from_authorized_keys=False,
1643 from_public_keys=False,
1644 clear_authorized_keys=False,
1645 clear_public_keys=False,
1646 pub_key_file=pathutils.SSH_PUB_KEYS,
1647 ssconf_store=None,
1648 noded_cert_file=pathutils.NODED_CERT_FILE,
1649 readd=False,
1650 run_cmd_fn=ssh.RunSshCmdWithStdin):
1651 """Removes the node's SSH keys from the key files and distributes those.
1652
1653 Note that at least one of the flags C{from_authorized_keys},
1654 C{from_public_keys}, C{clear_authorized_keys}, and C{clear_public_keys}
1655 has to be set to C{True} for the function to perform any action at all.
1656 Not doing so will trigger an assertion in the function.
1657
1658 @type node_uuid: str
1659 @param node_uuid: UUID of the node whose key is removed
1660 @type node_name: str
1661 @param node_name: name of the node whose key is remove
1662 @type master_candidate_uuids: list of str
1663 @param master_candidate_uuids: list of UUIDs of the current master candidates
1664 @type potential_master_candidates: list of str
1665 @param potential_master_candidates: list of names of potential master
1666 candidates
1667 @type keys_to_remove: dict of str to list of str
1668 @param keys_to_remove: a dictionary mapping node UUIDS to lists of SSH keys
1669 to be removed. This list is supposed to be used only if the keys are not
1670 in the public keys file. This is for example the case when removing a
1671 master node's key.
1672 @type from_authorized_keys: boolean
1673 @param from_authorized_keys: whether or not the key should be removed
1674 from the C{authorized_keys} file
1675 @type from_public_keys: boolean
1676 @param from_public_keys: whether or not the key should be remove from
1677 the C{ganeti_pub_keys} file
1678 @type clear_authorized_keys: boolean
1679 @param clear_authorized_keys: whether or not the C{authorized_keys} file
1680 should be cleared on the node whose keys are removed
1681 @type clear_public_keys: boolean
1682 @param clear_public_keys: whether to clear the node's C{ganeti_pub_key} file
1683 @type readd: boolean
1684 @param readd: whether this is called during a readd operation.
1685 @rtype: list of string
1686 @returns: list of feedback messages
1687
1688 """
1689 # Non-disruptive error messages, list of (node, msg) pairs
1690 result_msgs = []
1691
1692 # Make sure at least one of these flags is true.
1693 if not (from_authorized_keys or from_public_keys or clear_authorized_keys
1694 or clear_public_keys):
1695 raise errors.SshUpdateError("No removal from any key file was requested.")
1696
1697 if not ssconf_store:
1698 ssconf_store = ssconf.SimpleStore()
1699
1700 master_node = ssconf_store.GetMasterNode()
1701 ssh_port_map = ssconf_store.GetSshPortMap()
1702
1703 if from_authorized_keys or from_public_keys:
1704 if keys_to_remove:
1705 keys = keys_to_remove
1706 else:
1707 keys = ssh.QueryPubKeyFile([node_uuid], key_file=pub_key_file)
1708 if (not keys or node_uuid not in keys) and not readd:
1709 raise errors.SshUpdateError("Node '%s' not found in the list of public"
1710 " SSH keys. It seems someone tries to"
1711 " remove a key from outside the cluster!"
1712 % node_uuid)
1713 # During an upgrade all nodes have the master key. In this case we
1714 # should not remove it to avoid accidentally shutting down cluster
1715 # SSH communication
1716 master_keys = None
1717 if master_uuid:
1718 master_keys = ssh.QueryPubKeyFile([master_uuid], key_file=pub_key_file)
1719 for master_key in master_keys:
1720 if master_key in keys[node_uuid]:
1721 keys[node_uuid].remove(master_key)
1722
1723 if node_name == master_node and not keys_to_remove:
1724 raise errors.SshUpdateError("Cannot remove the master node's keys.")
1725
1726 if node_uuid in keys:
1727 base_data = {}
1728 _InitSshUpdateData(base_data, noded_cert_file, ssconf_store)
1729 cluster_name = base_data[constants.SSHS_CLUSTER_NAME]
1730
1731 if from_authorized_keys:
1732 base_data[constants.SSHS_SSH_AUTHORIZED_KEYS] = \
1733 (constants.SSHS_REMOVE, keys)
1734 (auth_key_file, _) = \
1735 ssh.GetAllUserFiles(constants.SSH_LOGIN_USER, mkdir=False,
1736 dircheck=False)
1737 ssh.RemoveAuthorizedKeys(auth_key_file, keys[node_uuid])
1738
1739 pot_mc_data = copy.deepcopy(base_data)
1740
1741 if from_public_keys:
1742 pot_mc_data[constants.SSHS_SSH_PUBLIC_KEYS] = \
1743 (constants.SSHS_REMOVE, keys)
1744 ssh.RemovePublicKey(node_uuid, key_file=pub_key_file)
1745
1746 all_nodes = ssconf_store.GetNodeList()
1747 online_nodes = ssconf_store.GetOnlineNodeList()
1748 logging.debug("Removing key of node '%s' from all nodes but itself and"
1749 " master.", node_name)
1750 for node in all_nodes:
1751 if node == master_node:
1752 logging.debug("Skipping master node '%s'.", master_node)
1753 continue
1754 if node not in online_nodes:
1755 logging.debug("Skipping offline node '%s'.", node)
1756 continue
1757 if node == node_name:
1758 logging.debug("Skipping node itself '%s'.", node_name)
1759 continue
1760 ssh_port = ssh_port_map.get(node)
1761 if not ssh_port:
1762 raise errors.OpExecError("No SSH port information available for"
1763 " node '%s', map: %s." %
1764 (node, ssh_port_map))
1765 error_msg_final = ("When removing the key of node '%s', updating the"
1766 " SSH key files of node '%s' failed. Last error"
1767 " was: %s.")
1768 if node in potential_master_candidates:
1769 logging.debug("Updating key setup of potential master candidate node"
1770 " %s.", node)
1771 try:
1772 utils.RetryByNumberOfTimes(
1773 constants.SSHS_MAX_RETRIES,
1774 errors.SshUpdateError,
1775 run_cmd_fn, cluster_name, node, pathutils.SSH_UPDATE,
1776 ssh_port, pot_mc_data,
1777 debug=False, verbose=False, use_cluster_key=False,
1778 ask_key=False, strict_host_check=False)
1779 except errors.SshUpdateError as last_exception:
1780 error_msg = error_msg_final % (
1781 node_name, node, last_exception)
1782 result_msgs.append((node, error_msg))
1783 logging.error(error_msg)
1784
1785 else:
1786 if from_authorized_keys:
1787 logging.debug("Updating key setup of normal node %s.", node)
1788 try:
1789 utils.RetryByNumberOfTimes(
1790 constants.SSHS_MAX_RETRIES,
1791 errors.SshUpdateError,
1792 run_cmd_fn, cluster_name, node, pathutils.SSH_UPDATE,
1793 ssh_port, base_data,
1794 debug=False, verbose=False, use_cluster_key=False,
1795 ask_key=False, strict_host_check=False)
1796 except errors.SshUpdateError as last_exception:
1797 error_msg = error_msg_final % (
1798 node_name, node, last_exception)
1799 result_msgs.append((node, error_msg))
1800 logging.error(error_msg)
1801
1802 if clear_authorized_keys or from_public_keys or clear_public_keys:
1803 data = {}
1804 _InitSshUpdateData(data, noded_cert_file, ssconf_store)
1805 cluster_name = data[constants.SSHS_CLUSTER_NAME]
1806 ssh_port = ssh_port_map.get(node_name)
1807 if not ssh_port:
1808 raise errors.OpExecError("No SSH port information available for"
1809 " node '%s', which is leaving the cluster.")
1810
1811 if clear_authorized_keys:
1812 # The 'authorized_keys' file is not solely managed by Ganeti. Therefore,
1813 # we have to specify exactly which keys to clear to leave keys untouched
1814 # that were not added by Ganeti.
1815 other_master_candidate_uuids = [uuid for uuid in master_candidate_uuids
1816 if uuid != node_uuid]
1817 candidate_keys = ssh.QueryPubKeyFile(other_master_candidate_uuids,
1818 key_file=pub_key_file)
1819 data[constants.SSHS_SSH_AUTHORIZED_KEYS] = \
1820 (constants.SSHS_REMOVE, candidate_keys)
1821
1822 if clear_public_keys:
1823 data[constants.SSHS_SSH_PUBLIC_KEYS] = \
1824 (constants.SSHS_CLEAR, {})
1825 elif from_public_keys:
1826 # Since clearing the public keys subsumes removing just a single key,
1827 # we only do it of clear_public_keys is 'False'.
1828
1829 if keys[node_uuid]:
1830 data[constants.SSHS_SSH_PUBLIC_KEYS] = \
1831 (constants.SSHS_REMOVE, keys)
1832
1833 # If we have no changes to any keyfile, just return
1834 if not (constants.SSHS_SSH_PUBLIC_KEYS in data or
1835 constants.SSHS_SSH_AUTHORIZED_KEYS in data):
1836 return
1837
1838 logging.debug("Updating SSH key setup of target node '%s'.", node_name)
1839 try:
1840 utils.RetryByNumberOfTimes(
1841 constants.SSHS_MAX_RETRIES,
1842 errors.SshUpdateError,
1843 run_cmd_fn, cluster_name, node_name, pathutils.SSH_UPDATE,
1844 ssh_port, data,
1845 debug=False, verbose=False, use_cluster_key=False,
1846 ask_key=False, strict_host_check=False)
1847 except errors.SshUpdateError as last_exception:
1848 result_msgs.append(
1849 (node_name,
1850 ("Removing SSH keys from node '%s' failed."
1851 " This can happen when the node is already unreachable."
1852 " Error: %s" % (node_name, last_exception))))
1853
1854 return result_msgs
1855
1856
1857 def _GenerateNodeSshKey(node_uuid, node_name, ssh_port_map,
1858 pub_key_file=pathutils.SSH_PUB_KEYS,
1859 ssconf_store=None,
1860 noded_cert_file=pathutils.NODED_CERT_FILE,
1861 run_cmd_fn=ssh.RunSshCmdWithStdin,
1862 suffix=""):
1863 """Generates the root SSH key pair on the node.
1864
1865 @type node_uuid: str
1866 @param node_uuid: UUID of the node whose key is removed
1867 @type node_name: str
1868 @param node_name: name of the node whose key is remove
1869 @type ssh_port_map: dict of str to int
1870 @param ssh_port_map: mapping of node names to their SSH port
1871
1872 """
1873 if not ssconf_store:
1874 ssconf_store = ssconf.SimpleStore()
1875
1876 keys_by_uuid = ssh.QueryPubKeyFile([node_uuid], key_file=pub_key_file)
1877 if not keys_by_uuid or node_uuid not in keys_by_uuid:
1878 raise errors.SshUpdateError("Node %s (UUID: %s) whose key is requested to"
1879 " be regenerated is not registered in the"
1880 " public keys file." % (node_name, node_uuid))
1881
1882 data = {}
1883 _InitSshUpdateData(data, noded_cert_file, ssconf_store)
1884 cluster_name = data[constants.SSHS_CLUSTER_NAME]
1885 data[constants.SSHS_GENERATE] = {constants.SSHS_SUFFIX: suffix}
1886
1887 run_cmd_fn(cluster_name, node_name, pathutils.SSH_UPDATE,
1888 ssh_port_map.get(node_name), data,
1889 debug=False, verbose=False, use_cluster_key=False,
1890 ask_key=False, strict_host_check=False)
1891
1892
1893 def _GetMasterNodeUUID(node_uuid_name_map, master_node_name):
1894 master_node_uuids = [node_uuid for (node_uuid, node_name)
1895 in node_uuid_name_map
1896 if node_name == master_node_name]
1897 if len(master_node_uuids) != 1:
1898 raise errors.SshUpdateError("No (unique) master UUID found. Master node"
1899 " name: '%s', Master UUID: '%s'" %
1900 (master_node_name, master_node_uuids))
1901 return master_node_uuids[0]
1902
1903
1904 def _GetOldMasterKeys(master_node_uuid, pub_key_file):
1905 old_master_keys_by_uuid = ssh.QueryPubKeyFile([master_node_uuid],
1906 key_file=pub_key_file)
1907 if not old_master_keys_by_uuid:
1908 raise errors.SshUpdateError("No public key of the master node (UUID '%s')"
1909 " found, not generating a new key."
1910 % master_node_uuid)
1911 return old_master_keys_by_uuid
1912
1913
1914 def _GetNewMasterKey(root_keyfiles, master_node_uuid):
1915 new_master_keys = []
1916 for (_, (_, public_key_file)) in root_keyfiles.items():
1917 public_key_dir = os.path.dirname(public_key_file)
1918 public_key_file_tmp_filename = \
1919 os.path.splitext(os.path.basename(public_key_file))[0] \
1920 + constants.SSHS_MASTER_SUFFIX + ".pub"
1921 public_key_path_tmp = os.path.join(public_key_dir,
1922 public_key_file_tmp_filename)
1923 if os.path.exists(public_key_path_tmp):
1924 # for some key types, there might not be any keys
1925 key = utils.ReadFile(public_key_path_tmp)
1926 new_master_keys.append(key)
1927 if not new_master_keys:
1928 raise errors.SshUpdateError("Cannot find any type of temporary SSH key.")
1929 return {master_node_uuid: new_master_keys}
1930
1931
1932 def _ReplaceMasterKeyOnMaster(root_keyfiles):
1933 number_of_moves = 0
1934 for (_, (private_key_file, public_key_file)) in root_keyfiles.items():
1935 key_dir = os.path.dirname(public_key_file)
1936 private_key_file_tmp = \
1937 os.path.basename(private_key_file) + constants.SSHS_MASTER_SUFFIX
1938 public_key_file_tmp = private_key_file_tmp + ".pub"
1939 private_key_path_tmp = os.path.join(key_dir,
1940 private_key_file_tmp)
1941 public_key_path_tmp = os.path.join(key_dir,
1942 public_key_file_tmp)
1943 if os.path.exists(public_key_file):
1944 utils.CreateBackup(public_key_file)
1945 utils.RemoveFile(public_key_file)
1946 if os.path.exists(private_key_file):
1947 utils.CreateBackup(private_key_file)
1948 utils.RemoveFile(private_key_file)
1949 if os.path.exists(public_key_path_tmp) and \
1950 os.path.exists(private_key_path_tmp):
1951 # for some key types, there might not be any keys
1952 shutil.move(public_key_path_tmp, public_key_file)
1953 shutil.move(private_key_path_tmp, private_key_file)
1954 number_of_moves += 1
1955 if not number_of_moves:
1956 raise errors.SshUpdateError("Could not move at least one master SSH key.")
1957
1958
1959 def RenewSshKeys(node_uuids, node_names, master_candidate_uuids,
1960 potential_master_candidates,
1961 pub_key_file=pathutils.SSH_PUB_KEYS,
1962 ssconf_store=None,
1963 noded_cert_file=pathutils.NODED_CERT_FILE,
1964 run_cmd_fn=ssh.RunSshCmdWithStdin):
1965 """Renews all SSH keys and updates authorized_keys and ganeti_pub_keys.
1966
1967 @type node_uuids: list of str
1968 @param node_uuids: list of node UUIDs whose keys should be renewed
1969 @type node_names: list of str
1970 @param node_names: list of node names whose keys should be removed. This list
1971 should match the C{node_uuids} parameter
1972 @type master_candidate_uuids: list of str
1973 @param master_candidate_uuids: list of UUIDs of master candidates or
1974 master node
1975 @type pub_key_file: str
1976 @param pub_key_file: file path of the the public key file
1977 @type noded_cert_file: str
1978 @param noded_cert_file: path of the noded SSL certificate file
1979 @type run_cmd_fn: function
1980 @param run_cmd_fn: function to run commands on remote nodes via SSH
1981 @raises ProgrammerError: if node_uuids and node_names don't match;
1982 SshUpdateError if a node's key is missing from the public key file,
1983 if a node's new SSH key could not be fetched from it, if there is
1984 none or more than one entry in the public key list for the master
1985 node.
1986
1987 """
1988 if not ssconf_store:
1989 ssconf_store = ssconf.SimpleStore()
1990 cluster_name = ssconf_store.GetClusterName()
1991
1992 if not len(node_uuids) == len(node_names):
1993 raise errors.ProgrammerError("List of nodes UUIDs and node names"
1994 " does not match in length.")
1995
1996 (_, root_keyfiles) = \
1997 ssh.GetAllUserFiles(constants.SSH_LOGIN_USER, mkdir=False, dircheck=False)
1998 (_, dsa_pub_keyfile) = root_keyfiles[constants.SSHK_DSA]
1999 old_master_key = utils.ReadFile(dsa_pub_keyfile)
2000
2001 node_uuid_name_map = zip(node_uuids, node_names)
2002
2003 master_node_name = ssconf_store.GetMasterNode()
2004 master_node_uuid = _GetMasterNodeUUID(node_uuid_name_map, master_node_name)
2005 ssh_port_map = ssconf_store.GetSshPortMap()
2006 # List of all node errors that happened, but which did not abort the
2007 # procedure as a whole. It is important that this is a list to have a
2008 # somewhat chronological history of events.
2009 all_node_errors = []
2010
2011 # process non-master nodes
2012 for node_uuid, node_name in node_uuid_name_map:
2013 if node_name == master_node_name:
2014 continue
2015 master_candidate = node_uuid in master_candidate_uuids
2016 potential_master_candidate = node_name in potential_master_candidates
2017
2018 keys_by_uuid = ssh.QueryPubKeyFile([node_uuid], key_file=pub_key_file)
2019 if not keys_by_uuid:
2020 raise errors.SshUpdateError("No public key of node %s (UUID %s) found,"
2021 " not generating a new key."
2022 % (node_name, node_uuid))
2023
2024 if master_candidate:
2025 logging.debug("Fetching old SSH key from node '%s'.", node_name)
2026 old_pub_key = ssh.ReadRemoteSshPubKeys(dsa_pub_keyfile,
2027 node_name, cluster_name,
2028 ssh_port_map[node_name],
2029 False, # ask_key
2030 False) # key_check
2031 if old_pub_key != old_master_key:
2032 # If we are already in a multi-key setup (that is past Ganeti 2.12),
2033 # we can safely remove the old key of the node. Otherwise, we cannot
2034 # remove that node's key, because it is also the master node's key
2035 # and that would terminate all communication from the master to the
2036 # node.
2037 logging.debug("Removing SSH key of node '%s'.", node_name)
2038 node_errors = RemoveNodeSshKey(
2039 node_uuid, node_name, master_candidate_uuids,
2040 potential_master_candidates,
2041 master_uuid=master_node_uuid, from_authorized_keys=master_candidate,
2042 from_public_keys=False, clear_authorized_keys=False,
2043 clear_public_keys=False)
2044 if node_errors:
2045 all_node_errors = all_node_errors + node_errors
2046 else:
2047 logging.debug("Old key of node '%s' is the same as the current master"
2048 " key. Not deleting that key on the node.", node_name)
2049
2050 logging.debug("Generating new SSH key for node '%s'.", node_name)
2051 _GenerateNodeSshKey(node_uuid, node_name, ssh_port_map,
2052 pub_key_file=pub_key_file,
2053 ssconf_store=ssconf_store,
2054 noded_cert_file=noded_cert_file,
2055 run_cmd_fn=run_cmd_fn)
2056
2057 try:
2058 logging.debug("Fetching newly created SSH key from node '%s'.", node_name)
2059 pub_key = ssh.ReadRemoteSshPubKeys(dsa_pub_keyfile,
2060 node_name, cluster_name,
2061 ssh_port_map[node_name],
2062 False, # ask_key
2063 False) # key_check
2064 except:
2065 raise errors.SshUpdateError("Could not fetch key of node %s"
2066 " (UUID %s)" % (node_name, node_uuid))
2067
2068 if potential_master_candidate:
2069 ssh.RemovePublicKey(node_uuid, key_file=pub_key_file)
2070 ssh.AddPublicKey(node_uuid, pub_key, key_file=pub_key_file)
2071
2072 logging.debug("Add ssh key of node '%s'.", node_name)
2073 node_errors = AddNodeSshKey(
2074 node_uuid, node_name, potential_master_candidates,
2075 to_authorized_keys=master_candidate,
2076 to_public_keys=potential_master_candidate,
2077 get_public_keys=True,
2078 pub_key_file=pub_key_file, ssconf_store=ssconf_store,
2079 noded_cert_file=noded_cert_file,
2080 run_cmd_fn=run_cmd_fn)
2081 if node_errors:
2082 all_node_errors = all_node_errors + node_errors
2083
2084 # Renewing the master node's key
2085
2086 # Preserve the old keys for now
2087 old_master_keys_by_uuid = _GetOldMasterKeys(master_node_uuid, pub_key_file)
2088
2089 # Generate a new master key with a suffix, don't touch the old one for now
2090 logging.debug("Generate new ssh key of master.")
2091 _GenerateNodeSshKey(master_node_uuid, master_node_name, ssh_port_map,
2092 pub_key_file=pub_key_file,
2093 ssconf_store=ssconf_store,
2094 noded_cert_file=noded_cert_file,
2095 run_cmd_fn=run_cmd_fn,
2096 suffix=constants.SSHS_MASTER_SUFFIX)
2097 # Read newly created master key
2098 new_master_key_dict = _GetNewMasterKey(root_keyfiles, master_node_uuid)
2099
2100 # Replace master key in the master nodes' public key file
2101 ssh.RemovePublicKey(master_node_uuid, key_file=pub_key_file)
2102 for pub_key in new_master_key_dict[master_node_uuid]:
2103 ssh.AddPublicKey(master_node_uuid, pub_key, key_file=pub_key_file)
2104
2105 # Add new master key to all node's public and authorized keys
2106 logging.debug("Add new master key to all nodes.")
2107 node_errors = AddNodeSshKey(
2108 master_node_uuid, master_node_name, potential_master_candidates,
2109 to_authorized_keys=True, to_public_keys=True,
2110 get_public_keys=False, pub_key_file=pub_key_file,
2111 ssconf_store=ssconf_store, noded_cert_file=noded_cert_file,
2112 run_cmd_fn=run_cmd_fn)
2113 if node_errors:
2114 all_node_errors = all_node_errors + node_errors
2115
2116 # Remove the old key file and rename the new key to the non-temporary filename
2117 _ReplaceMasterKeyOnMaster(root_keyfiles)
2118
2119 # Remove old key from authorized keys
2120 (auth_key_file, _) = \
2121 ssh.GetAllUserFiles(constants.SSH_LOGIN_USER, mkdir=False, dircheck=False)
2122 ssh.RemoveAuthorizedKeys(auth_key_file,
2123 old_master_keys_by_uuid[master_node_uuid])
2124
2125 # Remove the old key from all node's authorized keys file
2126 logging.debug("Remove the old master key from all nodes.")
2127 node_errors = RemoveNodeSshKey(
2128 master_node_uuid, master_node_name, master_candidate_uuids,
2129 potential_master_candidates,
2130 keys_to_remove=old_master_keys_by_uuid, from_authorized_keys=True,
2131 from_public_keys=False, clear_authorized_keys=False,
2132 clear_public_keys=False)
2133 if node_errors:
2134 all_node_errors = all_node_errors + node_errors
2135
2136 return all_node_errors
2137
2138
2139 def GetBlockDevSizes(devices):
2140 """Return the size of the given block devices
2141
2142 @type devices: list
2143 @param devices: list of block device nodes to query
2144 @rtype: dict
2145 @return:
2146 dictionary of all block devices under /dev (key). The value is their
2147 size in MiB.
2148
2149 {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
2150
2151 """
2152 DEV_PREFIX = "/dev/"
2153 blockdevs = {}
2154
2155 for devpath in devices:
2156 if not utils.IsBelowDir(DEV_PREFIX, devpath):
2157 continue
2158
2159 try:
2160 st = os.stat(devpath)
2161 except EnvironmentError, err:
2162 logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
2163 continue
2164
2165 if stat.S_ISBLK(st.st_mode):
2166 result = utils.RunCmd(["blockdev", "--getsize64", devpath])
2167 if result.failed:
2168 # We don't want to fail, just do not list this device as available
2169 logging.warning("Cannot get size for block device %s", devpath)
2170 continue
2171
2172 size = int(result.stdout) / (1024 * 1024)
2173 blockdevs[devpath] = size
2174 return blockdevs
2175
2176
2177 def GetVolumeList(vg_names):
2178 """Compute list of logical volumes and their size.
2179
2180 @type vg_names: list
2181 @param vg_names: the volume groups whose LVs we should list, or
2182 empty for all volume groups
2183 @rtype: dict
2184 @return:
2185 dictionary of all partions (key) with value being a tuple of
2186 their size (in MiB), inactive and online status::
2187
2188 {'xenvg/test1': ('20.06', True, True)}
2189
2190 in case of errors, a string is returned with the error
2191 details.
2192
2193 """
2194 lvs = {}
2195 sep = "|"
2196 if not vg_names:
2197 vg_names = []
2198 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
2199 "--separator=%s" % sep,
2200 "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
2201 if result.failed:
2202 _Fail("Failed to list logical volumes, lvs output: %s", result.output)
2203
2204 for line in result.stdout.splitlines():
2205 line = line.strip()
2206 match = _LVSLINE_REGEX.match(line)
2207 if not match:
2208 logging.error("Invalid line returned from lvs output: '%s'", line)
2209 continue
2210 vg_name, name, size, attr = match.groups()
2211 inactive = attr[4] == "-"
2212 online = attr[5] == "o"
2213 virtual = attr[0] == "v"
2214 if virtual:
2215 # we don't want to report such volumes as existing, since they
2216 # don't really hold data
2217 continue
2218 lvs[vg_name + "/" + name] = (size, inactive, online)
2219
2220 return lvs
2221
2222
2223 def ListVolumeGroups():
2224 """List the volume groups and their size.
2225
2226 @rtype: dict
2227 @return: dictionary with keys volume name and values the
2228 size of the volume
2229
2230 """
2231 return utils.ListVolumeGroups()
2232
2233
2234 def NodeVolumes():
2235 """List all volumes on this node.
2236
2237 @rtype: list
2238 @return:
2239 A list of dictionaries, each having four keys:
2240 - name: the logical volume name,
2241 - size: the size of the logical volume
2242 - dev: the physical device on which the LV lives
2243 - vg: the volume group to which it belongs
2244
2245 In case of errors, we return an empty list and log the
2246 error.
2247
2248 Note that since a logical volume can live on multiple physical
2249 volumes, the resulting list might include a logical volume
2250 multiple times.
2251
2252 """
2253 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
2254 "--separator=|",
2255 "--options=lv_name,lv_size,devices,vg_name"])
2256 if result.failed:
2257 _Fail("Failed to list logical volumes, lvs output: %s",
2258 result.output)
2259
2260 def parse_dev(dev):
2261 return dev.split("(")[0]
2262
2263 def handle_dev(dev):
2264 return [parse_dev(x) for x in dev.split(",")]
2265
2266 def map_line(line):
2267 line = [v.strip() for v in line]
2268 return [{"name": line[0], "size": line[1],
2269 "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
2270
2271 all_devs = []
2272 for line in result.stdout.splitlines():
2273 if line.count("|") >= 3:
2274 all_devs.extend(map_line(line.split("|")))
2275 else:
2276 logging.warning("Strange line in the output from lvs: '%s'", line)
2277 return all_devs
2278
2279
2280 def BridgesExist(bridges_list):
2281 """Check if a list of bridges exist on the current node.
2282
2283 @rtype: boolean
2284 @return: C{True} if all of them exist, C{False} otherwise
2285
2286 """
2287 missing = []
2288 for bridge in bridges_list:
2289 if not utils.BridgeExists(bridge):
2290 missing.append(bridge)
2291
2292 if missing:
2293 _Fail("Missing bridges %s", utils.CommaJoin(missing))
2294
2295
2296 def GetInstanceListForHypervisor(hname, hvparams=None,
2297 get_hv_fn=hypervisor.GetHypervisor):
2298 """Provides a list of instances of the given hypervisor.
2299
2300 @type hname: string
2301 @param hname: name of the hypervisor
2302 @type hvparams: dict of strings
2303 @param hvparams: hypervisor parameters for the given hypervisor
2304 @type get_hv_fn: function
2305 @param get_hv_fn: function that returns a hypervisor for the given hypervisor
2306 name; optional parameter to increase testability
2307
2308 @rtype: list
2309 @return: a list of all running instances on the current node
2310 - instance1.example.com
2311 - instance2.example.com
2312
2313 """
2314 try:
2315 return get_hv_fn(hname).ListInstances(hvparams=hvparams)
2316 except errors.HypervisorError, err:
2317 _Fail("Error enumerating instances (hypervisor %s): %s",
2318 hname, err, exc=True)
2319
2320
2321 def GetInstanceList(hypervisor_list, all_hvparams=None,
2322 get_hv_fn=hypervisor.GetHypervisor):
2323 """Provides a list of instances.
2324
2325 @type hypervisor_list: list
2326 @param hypervisor_list: the list of hypervisors to query information
2327 @type all_hvparams: dict of dict of strings
2328 @param all_hvparams: a dictionary mapping hypervisor types to respective
2329 cluster-wide hypervisor parameters
2330 @type get_hv_fn: function
2331 @param get_hv_fn: function that returns a hypervisor for the given hypervisor
2332 name; optional parameter to increase testability
2333
2334 @rtype: list
2335 @return: a list of all running instances on the current node
2336 - instance1.example.com
2337 - instance2.example.com
2338
2339 """
2340 results = []
2341 for hname in hypervisor_list:
2342 hvparams = all_hvparams[hname]
2343 results.extend(GetInstanceListForHypervisor(hname, hvparams=hvparams,
2344 get_hv_fn=get_hv_fn))
2345 return results
2346
2347
2348 def GetInstanceInfo(instance, hname, hvparams=None):
2349 """Gives back the information about an instance as a dictionary.
2350
2351 @type instance: string
2352 @param instance: the instance name
2353 @type hname: string
2354 @param hname: the hypervisor type of the instance
2355 @type hvparams: dict of strings
2356 @param hvparams: the instance's hvparams
2357
2358 @rtype: dict
2359 @return: dictionary with the following keys:
2360 - memory: memory size of instance (int)
2361 - state: state of instance (HvInstanceState)
2362 - time: cpu time of instance (float)
2363 - vcpus: the number of vcpus (int)
2364
2365 """
2366 output = {}
2367
2368 iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance,
2369 hvparams=hvparams)
2370 if iinfo is not None:
2371 output["memory"] = iinfo[2]
2372 output["vcpus"] = iinfo[3]
2373 output["state"] = iinfo[4]
2374 output["time"] = iinfo[5]
2375
2376 return output
2377
2378
2379 def GetInstanceMigratable(instance):
2380 """Computes whether an instance can be migrated.
2381
2382 @type instance: L{objects.Instance}
2383 @param instance: object representing the instance to be checked.
2384
2385 @rtype: tuple
2386 @return: tuple of (result, description) where:
2387 - result: whether the instance can be migrated or not
2388 - description: a description of the issue, if relevant
2389
2390 """
2391 hyper = hypervisor.GetHypervisor(instance.hypervisor)
2392 iname = instance.name
2393 if iname not in hyper.ListInstances(hvparams=instance.hvparams):
2394 _Fail("Instance %s is not running", iname)
2395
2396 for idx in range(len(instance.disks_info)):
2397 link_name = _GetBlockDevSymlinkPath(iname, idx)
2398 if not os.path.islink(link_name):
2399 logging.warning("Instance %s is missing symlink %s for disk %d",
2400 iname, link_name, idx)
2401
2402
2403 def GetAllInstancesInfo(hypervisor_list, all_hvparams):
2404 """Gather data about all instances.
2405
2406 This is the equivalent of L{GetInstanceInfo}, except that it
2407 computes data for all instances at once, thus being faster if one
2408 needs data about more than one instance.
2409
2410 @type hypervisor_list: list
2411 @param hypervisor_list: list of hypervisors to query for instance data
2412 @type all_hvparams: dict of dict of strings
2413 @param all_hvparams: mapping of hypervisor names to hvparams
2414
2415 @rtype: dict
2416 @return: dictionary of instance: data, with data having the following keys:
2417 - memory: memory size of instance (int)
2418 - state: xen state of instance (string)
2419 - time: cpu time of instance (float)
2420 - vcpus: the number of vcpus
2421
2422 """
2423 output = {}
2424 for hname in hypervisor_list:
2425 hvparams = all_hvparams[hname]
2426 iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo(hvparams)
2427 if iinfo:
2428 for name, _, memory, vcpus, state, times in iinfo:
2429 value = {
2430 "memory": memory,
2431 "vcpus": vcpus,
2432 "state": state,
2433 "time": times,
2434 }
2435 if name in output:
2436 # we only check static parameters, like memory and vcpus,
2437 # and not state and time which can change between the
2438 # invocations of the different hypervisors
2439 for key in "memory", "vcpus":
2440 if value[key] != output[name][key]:
2441 _Fail("Instance %s is running twice"
2442 " with different parameters", name)
2443 output[name] = value
2444
2445 return output
2446
2447
2448 def GetInstanceConsoleInfo(instance_param_dict,
2449 get_hv_fn=hypervisor.GetHypervisor):
2450 """Gather data about the console access of a set of instances of this node.
2451
2452 This function assumes that the caller already knows which instances are on
2453 this node, by calling a function such as L{GetAllInstancesInfo} or
2454 L{GetInstanceList}.
2455
2456 For every instance, a large amount of configuration data needs to be
2457 provided to the hypervisor interface in order to receive the console
2458 information. Whether this could or should be cut down can be discussed.
2459 The information is provided in a dictionary indexed by instance name,
2460 allowing any number of instance queries to be done.
2461
2462 @type instance_param_dict: dict of string to tuple of dictionaries, where the
2463 dictionaries represent: L{objects.Instance}, L{objects.Node},
2464 L{objects.NodeGroup}, HvParams, BeParams
2465 @param instance_param_dict: mapping of instance name to parameters necessary
2466 for console information retrieval
2467
2468 @rtype: dict
2469 @return: dictionary of instance: data, with data having the following keys:
2470 - instance: instance name
2471 - kind: console kind
2472 - message: used with kind == CONS_MESSAGE, indicates console to be
2473 unavailable, supplies error message
2474 - host: host to connect to
2475 - port: port to use
2476 - user: user for login
2477 - command: the command, broken into parts as an array
2478 - display: unknown, potentially unused?
2479
2480 """
2481
2482 output = {}
2483 for inst_name in instance_param_dict:
2484 instance = instance_param_dict[inst_name]["instance"]
2485 pnode = instance_param_dict[inst_name]["node"]
2486 group = instance_param_dict[inst_name]["group"]
2487 hvparams = instance_param_dict[inst_name]["hvParams"]
2488 beparams = instance_param_dict[inst_name]["beParams"]
2489
2490 instance = objects.Instance.FromDict(instance)
2491 pnode = objects.Node.FromDict(pnode)
2492 group = objects.NodeGroup.FromDict(group)
2493
2494 h = get_hv_fn(instance.hypervisor)
2495 output[inst_name] = h.GetInstanceConsole(instance, pnode, group,
2496 hvparams, beparams).ToDict()
2497
2498 return output
2499
2500
2501 def _InstanceLogName(kind, os_name, instance, component):
2502 """Compute the OS log filename for a given instance and operation.
2503
2504 The instance name and os name are passed in as strings since not all
2505 operations have these as part of an instance object.
2506
2507 @type kind: string
2508 @param kind: the operation type (e.g. add, import, etc.)
2509 @type os_name: string
2510 @param os_name: the os name
2511 @type instance: string
2512 @param instance: the name of the instance being imported/added/etc.
2513 @type component: string or None
2514 @param component: the name of the component of the instance being
2515 transferred
2516
2517 """
2518 # TODO: Use tempfile.mkstemp to create unique filename
2519 if component:
2520 assert "/" not in component
2521 c_msg = "-%s" % component
2522 else:
2523 c_msg = ""
2524 base = ("%s-%s-%s%s-%s.log" %
2525 (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
2526 return utils.PathJoin(pathutils.LOG_OS_DIR, base)
2527
2528
2529 def InstanceOsAdd(instance, reinstall, debug):
2530 """Add an OS to an instance.
2531
2532 @type instance: L{objects.Instance}
2533 @param instance: Instance whose OS is to be installed
2534 @type reinstall: boolean
2535 @param reinstall: whether this is an instance reinstall
2536 @type debug: integer
2537 @param debug: debug level, passed to the OS scripts
2538 @rtype: None
2539
2540 """
2541 inst_os = OSFromDisk(instance.os)
2542
2543 create_env = OSEnvironment(instance, inst_os, debug)
2544 if reinstall:
2545 create_env["INSTANCE_REINSTALL"] = "1"
2546
2547 logfile = _InstanceLogName("add", instance.os, instance.name, None)
2548
2549 result = utils.RunCmd([inst_os.create_script], env=create_env,
2550 cwd=inst_os.path, output=logfile, reset_env=True)
2551 if result.failed:
2552 logging.error("os create command '%s' returned error: %s, logfile: %s,"
2553 " output: %s", result.cmd, result.fail_reason, logfile,
2554 result.output)
2555 lines = [utils.SafeEncode(val)
2556 for val in utils.TailFile(logfile, lines=20)]
2557 _Fail("OS create script failed (%s), last lines in the"
2558 " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
2559
2560
2561 def RunRenameInstance(instance, old_name, debug):
2562 """Run the OS rename script for an instance.
2563
2564 @type instance: L{objects.Instance}
2565 @param instance: Instance whose OS is to be installed
2566 @type old_name: string
2567 @param old_name: previous instance name
2568 @type debug: integer
2569 @param debug: debug level, passed to the OS scripts
2570 @rtype: boolean
2571 @return: the success of the operation
2572
2573 """
2574 inst_os = OSFromDisk(instance.os)
2575
2576 rename_env = OSEnvironment(instance, inst_os, debug)
2577 rename_env["OLD_INSTANCE_NAME"] = old_name
2578
2579 logfile = _InstanceLogName("rename", instance.os,
2580 "%s-%s" % (old_name, instance.name), None)
2581
2582 result = utils.RunCmd([inst_os.rename_script], env=rename_env,
2583 cwd=inst_os.path, output=logfile, reset_env=True)
2584
2585 if result.failed:
2586 logging.error("os create command '%s' returned error: %s output: %s",
2587 result.cmd, result.fail_reason, result.output)
2588 lines = [utils.SafeEncode(val)
2589 for val in utils.TailFile(logfile, lines=20)]
2590 _Fail("OS rename script failed (%s), last lines in the"
2591 " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
2592
2593
2594 def _GetBlockDevSymlinkPath(instance_name, idx, _dir=None):
2595 """Returns symlink path for block device.
2596
2597 """
2598 if _dir is None:
2599 _dir = pathutils.DISK_LINKS_DIR
2600
2601 return utils.PathJoin(_dir,
2602 ("%s%s%s" %
2603 (instance_name, constants.DISK_SEPARATOR, idx)))
2604
2605
2606 def _SymlinkBlockDev(instance_name, device_path, idx):
2607 """Set up symlinks to a instance's block device.
2608
2609 This is an auxiliary function run when an instance is start (on the primary
2610 node) or when an instance is migrated (on the target node).
2611
2612
2613 @param instance_name: the name of the target instance
2614 @param device_path: path of the physical block device, on the node
2615 @param idx: the disk index
2616 @return: absolute path to the disk's symlink
2617
2618 """
2619 # In case we have only a userspace access URI, device_path is None
2620 if not device_path:
2621 return None
2622
2623 link_name = _GetBlockDevSymlinkPath(instance_name, idx)
2624 try:
2625 os.symlink(device_path, link_name)
2626 except OSError, err:
2627 if err.errno == errno.EEXIST:
2628 if (not os.path.islink(link_name) or
2629 os.readlink(link_name) != device_path):
2630 os.remove(link_name)
2631 os.symlink(device_path, link_name)
2632 else:
2633 raise
2634
2635 return link_name
2636
2637
2638 def _RemoveBlockDevLinks(instance_name, disks):
2639 """Remove the block device symlinks belonging to the given instance.
2640
2641 """
2642 for idx, _ in enumerate(disks):
2643 link_name = _GetBlockDevSymlinkPath(instance_name, idx)
2644 if os.path.islink(link_name):
2645 try:
2646 os.remove(link_name)
2647 except OSError:
2648 logging.exception("Can't remove symlink '%s'", link_name)
2649
2650
2651 def _CalculateDeviceURI(instance, disk, device):
2652 """Get the URI for the device.
2653
2654 @type instance: L{objects.Instance}
2655 @param instance: the instance which disk belongs to
2656 @type disk: L{objects.Disk}
2657 @param disk: the target disk object
2658 @type device: L{bdev.BlockDev}
2659 @param device: the corresponding BlockDevice
2660 @rtype: string
2661 @return: the device uri if any else None
2662
2663 """
2664 access_mode = disk.params.get(constants.LDP_ACCESS,
2665 constants.DISK_KERNELSPACE)
2666 if access_mode == constants.DISK_USERSPACE:
2667 # This can raise errors.BlockDeviceError
2668 return device.GetUserspaceAccessUri(instance.hypervisor)
2669 else:
2670 return None
2671
2672
2673 def _GatherAndLinkBlockDevs(instance):
2674 """Set up an instance's block device(s).
2675
2676 This is run on the primary node at instance startup. The block
2677 devices must be already assembled.
2678
2679 @type instance: L{objects.Instance}
2680 @param instance: the instance whose disks we should assemble
2681 @rtype: list
2682 @return: list of (disk_object, link_name, drive_uri)
2683
2684 """
2685 block_devices = []
2686 for idx, disk in enumerate(instance.disks_info):
2687 device = _RecursiveFindBD(disk)
2688 if device is None:
2689 raise errors.BlockDeviceError("Block device '%s' is not set up." %
2690 str(disk))
2691 device.Open()
2692 try:
2693 link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
2694 except OSError, e:
2695 raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
2696 e.strerror)
2697 uri = _CalculateDeviceURI(instance, disk, device)
2698
2699 block_devices.append((disk, link_name, uri))
2700
2701 return block_devices
2702
2703
2704 def _IsInstanceUserDown(instance_info):
2705 return instance_info and \
2706 "state" in instance_info and \
2707 hv_base.HvInstanceState.IsShutdown(instance_info["state"])
2708
2709
2710 def _GetInstanceInfo(instance):
2711 """Helper function L{GetInstanceInfo}"""
2712 return GetInstanceInfo(instance.name, instance.hypervisor,
2713 hvparams=instance.hvparams)
2714
2715
2716 def StartInstance(instance, startup_paused, reason, store_reason=True):
2717 """Start an instance.
2718
2719 @type instance: L{objects.Instance}
2720 @param instance: the instance object
2721 @type startup_paused: bool
2722 @param instance: pause instance at startup?
2723 @type reason: list of reasons
2724 @param reason: the reason trail for this startup
2725 @type store_reason: boolean
2726 @param store_reason: whether to store the shutdown reason trail on file
2727 @rtype: None
2728
2729 """
2730 instance_info = _GetInstanceInfo(instance)
2731
2732 if instance_info and not _IsInstanceUserDown(instance_info):
2733 logging.info("Instance '%s' already running, not starting", instance.name)
2734 return
2735
2736 try:
2737 block_devices = _GatherAndLinkBlockDevs(instance)
2738 hyper = hypervisor.GetHypervisor(instance.hypervisor)
2739 hyper.StartInstance(instance, block_devices, startup_paused)
2740 if store_reason:
2741 _StoreInstReasonTrail(instance.name, reason)
2742 except errors.BlockDeviceError, err:
2743 _Fail("Block device error: %s", err, exc=True)
2744 except errors.HypervisorError, err:
2745 _RemoveBlockDevLinks(instance.name, instance.disks_info)
2746 _Fail("Hypervisor error: %s", err, exc=True)
2747
2748
2749 def InstanceShutdown(instance, timeout, reason, store_reason=True):
2750 """Shut an instance down.
2751
2752 @note: this functions uses polling with a hardcoded timeout.
2753
2754 @type instance: L{objects.Instance}
2755 @param instance: the instance object
2756 @type timeout: integer
2757 @param timeout: maximum timeout for soft shutdown
2758 @type reason: list of reasons
2759 @param reason: the reason trail for this shutdown
2760 @type store_reason: boolean
2761 @param store_reason: whether to store the shutdown reason trail on file
2762 @rtype: None
2763
2764 """
2765 hyper = hypervisor.GetHypervisor(instance.hypervisor)
2766
2767 if not _GetInstanceInfo(instance):
2768 logging.info("Instance '%s' not running, doing nothing", instance.name)
2769 return
2770
2771 class _TryShutdown(object):
2772 def __init__(self):
2773 self.tried_once = False
2774
2775 def __call__(self):
2776 if not _GetInstanceInfo(instance):
2777 return
2778
2779 try:
2780 hyper.StopInstance(instance, retry=self.tried_once, timeout=timeout)
2781 if store_reason:
2782 _StoreInstReasonTrail(instance.name, reason)
2783 except errors.HypervisorError, err:
2784 # if the instance is no longer existing, consider this a
2785 # success and go to cleanup
2786 if not _GetInstanceInfo(instance):
2787 return
2788
2789 _Fail("Failed to stop instance '%s': %s", instance.name, err)
2790
2791 self.tried_once = True
2792
2793 raise utils.RetryAgain()
2794
2795 try:
2796 utils.Retry(_TryShutdown(), 5, timeout)
2797 except utils.RetryTimeout:
2798 # the shutdown did not succeed
2799 logging.error("Shutdown of '%s' unsuccessful, forcing", instance.name)
2800
2801 try:
2802 hyper.StopInstance(instance, force=True)
2803 except errors.HypervisorError, err:
2804 # only raise an error if the instance still exists, otherwise
2805 # the error could simply be "instance ... unknown"!
2806 if _GetInstanceInfo(instance):
2807 _Fail("Failed to force stop instance '%s': %s", instance.name, err)
2808
2809 time.sleep(1)
2810
2811 if _GetInstanceInfo(instance):
2812 _Fail("Could not shutdown instance '%s' even by destroy", instance.name)
2813
2814 try:
2815 hyper.CleanupInstance(instance.name)
2816 except errors.HypervisorError, err:
2817 logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
2818
2819 _RemoveBlockDevLinks(instance.name, instance.disks_info)
2820
2821
2822 def InstanceReboot(instance, reboot_type, shutdown_timeout, reason):
2823 """Reboot an instance.
2824
2825 @type instance: L{objects.Instance}
2826 @param instance: the instance object to reboot
2827 @type reboot_type: str
2828 @param reboot_type: the type of reboot, one the following
2829 constants:
2830 - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
2831 instance OS, do not recreate the VM
2832 - L{constants.INSTANCE_REBOOT_HARD}: tear down and
2833 restart the VM (at the hypervisor level)
2834 - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
2835 not accepted here, since that mode is handled differently, in
2836 cmdlib, and translates into full stop and start of the
2837 instance (instead of a call_instance_reboot RPC)
2838 @type shutdown_timeout: integer
2839 @param shutdown_timeout: maximum timeout for soft shutdown
2840 @type reason: list of reasons
2841 @param reason: the reason trail for this reboot
2842 @rtype: None
2843
2844 """
2845 # TODO: this is inconsistent with 'StartInstance' and 'InstanceShutdown'
2846 # because those functions simply 'return' on error whereas this one
2847 # raises an exception with '_Fail'
2848 if not _GetInstanceInfo(instance):
2849 _Fail("Cannot reboot instance '%s' that is not running", instance.name)
2850
2851 hyper = hypervisor.GetHypervisor(instance.hypervisor)
2852 if reboot_type == constants.INSTANCE_REBOOT_SOFT:
2853 try:
2854 hyper.RebootInstance(instance)
2855 except errors.HypervisorError, err:
2856 _Fail("Failed to soft reboot instance '%s': %s", instance.name, err)
2857 elif reboot_type == constants.INSTANCE_REBOOT_HARD:
2858 try:
2859 InstanceShutdown(instance, shutdown_timeout, reason, store_reason=False)
2860 result = StartInstance(instance, False, reason, store_reason=False)
2861 _StoreInstReasonTrail(instance.name, reason)
2862 return result
2863 except errors.HypervisorError, err:
2864 _Fail("Failed to hard reboot instance '%s': %s", instance.name, err)
2865 else:
2866 _Fail("Invalid reboot_type received: '%s'", reboot_type)
2867
2868
2869 def InstanceBalloonMemory(instance, memory):
2870 """Resize an instance's memory.
2871
2872 @type instance: L{objects.Instance}
2873 @param instance: the instance object
2874 @type memory: int
2875 @param memory: new memory amount in MB
2876 @rtype: None
2877
2878 """
2879 hyper = hypervisor.GetHypervisor(instance.hypervisor)
2880 running = hyper.ListInstances(hvparams=instance.hvparams)
2881 if instance.name not in running:
2882 logging.info("Instance %s is not running, cannot balloon", instance.name)
2883 return
2884 try:
2885 hyper.BalloonInstanceMemory(instance, memory)
2886 except errors.HypervisorError, err:
2887 _Fail("Failed to balloon instance memory: %s", err, exc=True)
2888
2889
2890 def MigrationInfo(instance):
2891 """Gather information about an instance to be migrated.
2892
2893 @type instance: L{objects.Instance}
2894 @param instance: the instance definition
2895
2896 """
2897 hyper = hypervisor.GetHypervisor(instance.hypervisor)
2898 try:
2899 info = hyper.MigrationInfo(instance)
2900 except errors.HypervisorError, err:
2901 _Fail("Failed to fetch migration information: %s", err, exc=True)
2902 return info
2903
2904
2905 def AcceptInstance(instance, info, target):
2906 """Prepare the node to accept an instance.
2907
2908 @type instance: L{objects.Instance}
2909 @param instance: the instance definition
2910 @type info: string/data (opaque)
2911 @param info: migration information, from the source node
2912 @type target: string
2913 @param target: target host (usually ip), on this node
2914
2915 """
2916 hyper = hypervisor.GetHypervisor(instance.hypervisor)
2917 try:
2918 hyper.AcceptInstance(instance, info, target)
2919 except errors.HypervisorError, err:
2920 _Fail("Failed to accept instance: %s", err, exc=True)
2921
2922
2923 def FinalizeMigrationDst(instance, info, success):
2924 """Finalize any preparation to accept an instance.
2925
2926 @type instance: L{objects.Instance}
2927 @param instance: the instance definition
2928 @type info: string/data (opaque)
2929 @param info: migration information, from the source node
2930 @type success: boolean
2931 @param success: whether the migration was a success or a failure
2932
2933 """
2934 hyper = hypervisor.GetHypervisor(instance.hypervisor)
2935 try:
2936 hyper.FinalizeMigrationDst(instance, info, success)
2937 except errors.HypervisorError, err:
2938 _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
2939
2940
2941 def MigrateInstance(cluster_name, instance, target, live):
2942 """Migrates an instance to another node.
2943
2944 @type cluster_name: string
2945 @param cluster_name: name of the cluster
2946 @type instance: L{objects.Instance}
2947 @param instance: the instance definition
2948 @type target: string
2949 @param target: the target node name
2950 @type live: boolean
2951 @param live: whether the migration should be done live or not (the
2952 interpretation of this parameter is left to the hypervisor)
2953 @raise RPCFail: if migration fails for some reason
2954
2955 """
2956 hyper = hypervisor.GetHypervisor(instance.hypervisor)
2957
2958 try:
2959 hyper.MigrateInstance(cluster_name, instance, target, live)
2960 except errors.HypervisorError, err:
2961 _Fail("Failed to migrate instance: %s", err, exc=True)
2962
2963
2964 def FinalizeMigrationSource(instance, success, live):
2965 """Finalize the instance migration on the source node.
2966
2967 @type instance: L{objects.Instance}
2968 @param instance: the instance definition of the migrated instance
2969 @type success: bool
2970 @param success: whether the migration succeeded or not
2971 @type live: bool
2972 @param live: whether the user requested a live migration or not
2973 @raise RPCFail: If the execution fails for some reason
2974
2975 """
2976 hyper = hypervisor.GetHypervisor(instance.hypervisor)
2977
2978 try:
2979 hyper.FinalizeMigrationSource(instance, success, live)
2980 except Exception, err: # pylint: disable=W0703
2981 _Fail("Failed to finalize the migration on the source node: %s", err,
2982 exc=True)
2983
2984
2985 def GetMigrationStatus(instance):
2986 """Get the migration status
2987
2988 @type instance: L{objects.Instance}
2989 @param instance: the instance that is being migrated
2990 @rtype: L{objects.MigrationStatus}
2991 @return: the status of the current migration (one of
2992 L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
2993 progress info that can be retrieved from the hypervisor
2994 @raise RPCFail: If the migration status cannot be retrieved
2995
2996 """
2997 hyper = hypervisor.GetHypervisor(instance.hypervisor)
2998 try:
2999 return hyper.GetMigrationStatus(instance)
3000 except Exception, err: # pylint: disable=W0703
3001 _Fail("Failed to get migration status: %s", err, exc=True)
3002
3003
3004 def HotplugDevice(instance, action, dev_type, device, extra, seq):
3005 """Hotplug a device
3006
3007 Hotplug is currently supported only for KVM Hypervisor.
3008 @type instance: L{objects.Instance}
3009 @param instance: the instance to which we hotplug a device
3010 @type action: string
3011 @param action: the hotplug action to perform
3012 @type dev_type: string
3013 @param dev_type: the device type to hotplug
3014 @type device: either L{objects.NIC} or L{objects.Disk}
3015 @param device: the device object to hotplug
3016 @type extra: tuple
3017 @param extra: extra info used for disk hotplug (disk link, drive uri)
3018 @type seq: int
3019 @param seq: the index of the device from master perspective
3020 @raise RPCFail: in case instance does not have KVM hypervisor
3021
3022 """
3023 hyper = hypervisor.GetHypervisor(instance.hypervisor)
3024 try:
3025 hyper.VerifyHotplugSupport(instance, action, dev_type)
3026 except errors.HotplugError, err:
3027 _Fail("Hotplug is not supported: %s", err)
3028
3029 if action == constants.HOTPLUG_ACTION_ADD:
3030 fn = hyper.HotAddDevice
3031 elif action == constants.HOTPLUG_ACTION_REMOVE:
3032 fn = hyper.HotDelDevice
3033 elif action == constants.HOTPLUG_ACTION_MODIFY:
3034 fn = hyper.HotModDevice
3035 else:
3036 assert action in constants.HOTPLUG_ALL_ACTIONS
3037
3038 return fn(instance, dev_type, device, extra, seq)
3039
3040
3041 def HotplugSupported(instance):
3042 """Checks if hotplug is generally supported.
3043
3044 """
3045 hyper = hypervisor.GetHypervisor(instance.hypervisor)
3046 try:
3047 hyper.HotplugSupported(instance)
3048 except errors.HotplugError, err:
3049 _Fail("Hotplug is not supported: %s", err)
3050
3051
3052 def ModifyInstanceMetadata(metadata):
3053 """Sends instance data to the metadata daemon.
3054
3055 Uses the Luxi transport layer to communicate with the metadata
3056 daemon configuration server. It starts the metadata daemon if it is
3057 not running.
3058 The daemon must be enabled during at configuration time.
3059
3060 @type metadata: dict
3061 @param metadata: instance metadata obtained by calling
3062 L{objects.Instance.ToDict} on an instance object
3063
3064 """
3065 if not constants.ENABLE_METAD:
3066 raise errors.ProgrammerError("The metadata deamon is disabled, yet"
3067 " ModifyInstanceMetadata has been called")
3068
3069 if not utils.IsDaemonAlive(constants.METAD):
3070 result = utils.RunCmd([pathutils.DAEMON_UTIL, "start", constants.METAD])
3071 if result.failed:
3072 raise errors.HypervisorError("Failed to start metadata daemon")
3073
3074 with contextlib.closing(metad.Client()) as client:
3075 client.UpdateConfig(metadata)
3076
3077
3078 def BlockdevCreate(disk, size, owner, on_primary, info, excl_stor):
3079 """Creates a block device for an instance.
3080
3081 @type disk: L{objects.Disk}
3082 @param disk: the object describing the disk we should create
3083 @type size: int
3084 @param size: the size of the physical underlying device, in MiB
3085 @type owner: str
3086 @param owner: the name of the instance for which disk is created,
3087 used for device cache data
3088 @type on_primary: boolean
3089 @param on_primary: indicates if it is the primary node or not
3090 @type info: string
3091 @param info: string that will be sent to the physical device
3092 creation, used for example to set (LVM) tags on LVs
3093 @type excl_stor: boolean
3094 @param excl_stor: Whether exclusive_storage is active
3095
3096 @return: the new unique_id of the device (this can sometime be
3097 computed only after creation), or None. On secondary nodes,
3098 it's not required to return anything.
3099
3100 """
3101 # TODO: remove the obsolete "size" argument
3102 # pylint: disable=W0613
3103 clist = []
3104 if disk.children:
3105 for child in disk.children:
3106 try:
3107 crdev = _RecursiveAssembleBD(child, owner, on_primary)
3108 except errors.BlockDeviceError, err:
3109 _Fail("Can't assemble device %s: %s", child, err)
3110 if on_primary or disk.AssembleOnSecondary():
3111 # we need the children open in case the device itself has to
3112 # be assembled
3113 try:
3114 # pylint: disable=E1103
3115 crdev.Open()
3116 except errors.BlockDeviceError, err:
3117 _Fail("Can't make child '%s' read-write: %s", child, err)
3118 clist.append(crdev)
3119
3120 try:
3121 device = bdev.Create(disk, clist, excl_stor)
3122 except errors.BlockDeviceError, err:
3123 _Fail("Can't create block device: %s", err)
3124
3125 if on_primary or disk.AssembleOnSecondary():
3126 try:
3127 device.Assemble()
3128 except errors.BlockDeviceError, err:
3129 _Fail("Can't assemble device after creation, unusual event: %s", err)
3130 if on_primary or disk.OpenOnSecondary():
3131 try:
3132 device.Open(force=True)
3133 except errors.BlockDeviceError, err:
3134 _Fail("Can't make device r/w after creation, unusual event: %s", err)
3135 DevCacheManager.UpdateCache(device.dev_path, owner,
3136 on_primary, disk.iv_name)
3137
3138 device.SetInfo(info)
3139
3140 return device.unique_id
3141
3142
3143 def _DumpDevice(source_path, target_path, offset, size, truncate):
3144 """This function images/wipes the device using a local file.
3145
3146 @type source_path: string
3147 @param source_path: path of the image or data source (e.g., "/dev/zero")
3148
3149 @type target_path: string
3150 @param target_path: path of the device to image/wipe
3151
3152 @type offset: int
3153 @param offset: offset in MiB in the output file
3154
3155 @type size: int
3156 @param size: maximum size in MiB to write (data source might be smaller)
3157
3158 @type truncate: bool
3159 @param truncate: whether the file should be truncated
3160
3161 @return: None
3162 @raise RPCFail: in case of failure
3163
3164 """
3165 # Internal sizes are always in Mebibytes; if the following "dd" command
3166 # should use a different block size the offset and size given to this
3167 # function must be adjusted accordingly before being passed to "dd".
3168 block_size = constants.DD_BLOCK_SIZE
3169
3170 cmd = [constants.DD_CMD, "if=%s" % source_path, "seek=%d" % offset,
3171 "bs=%s" % block_size, "oflag=direct", "of=%s" % target_path,
3172 "count=%d" % size]
3173
3174 if not truncate:
3175 cmd.append("conv=notrunc")
3176
3177 result = utils.RunCmd(cmd)
3178
3179 if result.failed:
3180 _Fail("Dump command '%s' exited with error: %s; output: %s", result.cmd,
3181 result.fail_reason, result.output)
3182
3183
3184 def _DownloadAndDumpDevice(source_url, target_path, size):
3185 """This function images a device using a downloaded image file.
3186
3187 @type source_url: string
3188 @param source_url: URL of image to dump to disk
3189
3190 @type target_path: string
3191 @param target_path: path of the device to image
3192
3193 @type size: int
3194 @param size: maximum size in MiB to write (data source might be smaller)
3195
3196 @rtype: NoneType
3197 @return: None
3198 @raise RPCFail: in case of download or write failures
3199
3200 """
3201 class DDParams(object):
3202 def __init__(self, current_size, total_size):
3203 self.current_size = current_size
3204 self.total_size = total_size
3205 self.image_size_error = False
3206
3207 def dd_write(ddparams, out):
3208 if ddparams.current_size < ddparams.total_size:
3209 ddparams.current_size += len(out)
3210 target_file.write(out)
3211 else:
3212 ddparams.image_size_error = True
3213 return -1
3214
3215 target_file = open(target_path, "r+")
3216 ddparams = DDParams(0, 1024 * 1024 * size)
3217
3218 curl = pycurl.Curl()
3219 curl.setopt(pycurl.VERBOSE, True)
3220 curl.setopt(pycurl.NOSIGNAL, True)
3221 curl.setopt(pycurl.USERAGENT, http.HTTP_GANETI_VERSION)
3222 curl.setopt(pycurl.URL, source_url)
3223 curl.setopt(pycurl.WRITEFUNCTION, lambda out: dd_write(ddparams, out))
3224
3225 try:
3226 curl.perform()
3227 except pycurl.error:
3228 if ddparams.image_size_error:
3229 _Fail("Disk image larger than the disk")
3230 else:
3231 raise
3232
3233 target_file.close()
3234
3235
3236 def BlockdevConvert(src_disk, target_disk):
3237 """Copies data from source block device to target.
3238
3239 This function gets the export and import commands from the source and
3240 target devices respectively, and then concatenates them to a single
3241 command using a pipe ("|"). Finally, executes the unified command that
3242 will transfer the data between the devices during the disk template
3243 conversion operation.
3244
3245 @type src_disk: L{objects.Disk}
3246 @param src_disk: the disk object we want to copy from
3247 @type target_disk: L{objects.Disk}
3248 @param target_disk: the disk object we want to copy to
3249
3250 @rtype: NoneType
3251 @return: None
3252 @raise RPCFail: in case of failure
3253
3254 """
3255 src_dev = _RecursiveFindBD(src_disk)
3256 if src_dev is None:
3257 _Fail("Cannot copy from device '%s': device not found", src_disk.uuid)
3258
3259 dest_dev = _RecursiveFindBD(target_disk)
3260 if dest_dev is None:
3261 _Fail("Cannot copy to device '%s': device not found", target_disk.uuid)
3262
3263 src_cmd = src_dev.Export()
3264 dest_cmd = dest_dev.Import()
3265 command = "%s | %s" % (utils.ShellQuoteArgs(src_cmd),
3266 utils.ShellQuoteArgs(dest_cmd))
3267
3268 result = utils.RunCmd(command)
3269 if result.failed:
3270 _Fail("Disk conversion command '%s' exited with error: %s; output: %s",
3271 result.cmd, result.fail_reason, result.output)
3272
3273
3274 def BlockdevWipe(disk, offset, size):
3275 """Wipes a block device.
3276
3277 @type disk: L{objects.Disk}
3278 @param disk: the disk object we want to wipe
3279 @type offset: int
3280 @param offset: The offset in MiB in the file
3281 @type size: int
3282 @param size: The size in MiB to write
3283
3284 """
3285 try:
3286 rdev = _RecursiveFindBD(disk)
3287 except errors.BlockDeviceError:
3288 rdev = None
3289
3290 if not rdev:
3291 _Fail("Cannot wipe device %s: device not found", disk.iv_name)
3292 if offset < 0:
3293 _Fail("Negative offset")
3294 if size < 0:
3295 _Fail("Negative size")
3296 if offset > rdev.size:
3297 _Fail("Wipe offset is bigger than device size")
3298 if (offset + size) > rdev.size:
3299 _Fail("Wipe offset and size are bigger than device size")
3300
3301 _DumpDevice("/dev/zero", rdev.dev_path, offset, size, True)
3302
3303
3304 def BlockdevImage(disk, image, size):
3305 """Images a block device either by dumping a local file or
3306 downloading a URL.
3307
3308 @type disk: L{objects.Disk}
3309 @param disk: the disk object we want to image
3310
3311 @type image: string
3312 @param image: file path to the disk image be dumped
3313
3314 @type size: int
3315 @param size: The size in MiB to write
3316
3317 @rtype: NoneType
3318 @return: None
3319 @raise RPCFail: in case of failure
3320
3321 """
3322 if not (utils.IsUrl(image) or os.path.exists(image)):
3323 _Fail("Image '%s' not found", image)
3324
3325 try:
3326 rdev = _RecursiveFindBD(disk)
3327 except errors.BlockDeviceError:
3328 rdev = None
3329
3330 if not rdev:
3331 _Fail("Cannot image device %s: device not found", disk.iv_name)
3332 if size < 0:
3333 _Fail("Negative size")
3334 if size > rdev.size:
3335 _Fail("Image size is bigger than device size")
3336
3337 if utils.IsUrl(image):
3338 _DownloadAndDumpDevice(image, rdev.dev_path, size)
3339 else:
3340 _DumpDevice(image, rdev.dev_path, 0, size, False)
3341
3342
3343 def BlockdevPauseResumeSync(disks, pause):
3344 """Pause or resume the sync of the block device.
3345
3346 @type disks: list of L{objects.Disk}
3347 @param disks: the disks object we want to pause/resume
3348 @type pause: bool
3349 @param pause: Wheater to pause or resume
3350
3351 """
3352 success = []
3353 for disk in disks:
3354 try:
3355 rdev = _RecursiveFindBD(disk)
3356 except errors.BlockDeviceError:
3357 rdev = None
3358
3359 if not rdev:
3360 success.append((False, ("Cannot change sync for device %s:"
3361 " device not found" % disk.iv_name)))
3362 continue
3363
3364 result = rdev.PauseResumeSync(pause)
3365
3366 if result:
3367 success.append((result, None))
3368 else:
3369 if pause:
3370 msg = "Pause"
3371 else:
3372 msg = "Resume"
3373 success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
3374
3375 return success
3376
3377
3378 def BlockdevRemove(disk):
3379 """Remove a block device.
3380
3381 @note: This is intended to be called recursively.
3382
3383 @type disk: L{objects.Disk}
3384 @param disk: the disk object we should remove
3385 @rtype: boolean
3386 @return: the success of the operation
3387
3388 """
3389 msgs = []
3390 try:
3391 rdev = _RecursiveFindBD(disk)
3392 except errors.BlockDeviceError, err:
3393 # probably can't attach
3394 logging.info("Can't attach to device %s in remove", disk)
3395 rdev = None
3396 if rdev is not None:
3397 r_path = rdev.dev_path
3398
3399 def _TryRemove():
3400 try:
3401 rdev.Remove()
3402 return []
3403 except errors.BlockDeviceError, err:
3404 return [str(err)]
3405
3406 msgs.extend(utils.SimpleRetry([], _TryRemove,
3407 constants.DISK_REMOVE_RETRY_INTERVAL,
3408 constants.DISK_REMOVE_RETRY_TIMEOUT))
3409
3410 if not msgs:
3411 DevCacheManager.RemoveCache(r_path)
3412
3413 if disk.children:
3414 for child in disk.children:
3415 try:
3416 BlockdevRemove(child)
3417 except RPCFail, err:
3418 msgs.append(str(err))
3419
3420 if msgs:
3421 _Fail("; ".join(msgs))
3422
3423
3424 def _RecursiveAssembleBD(disk, owner, as_primary):
3425 """Activate a block device for an instance.
3426
3427 This is run on the primary and secondary nodes for an instance.
3428
3429 @note: this function is called recursively.
3430
3431 @type disk: L{objects.Disk}
3432 @param disk: the disk we try to assemble
3433 @type owner: str
3434 @param owner: the name of the instance which owns the disk
3435 @type as_primary: boolean
3436 @param as_primary: if we should make the block device
3437 read/write
3438
3439 @return: the assembled device or None (in case no device
3440 was assembled)
3441 @raise errors.BlockDeviceError: in case there is an error
3442 during the activation of the children or the device
3443 itself
3444
3445 """
3446 children = []
3447 if disk.children:
3448 mcn = disk.ChildrenNeeded()
3449 if mcn == -1:
3450 mcn = 0 # max number of Nones allowed
3451 else:
3452 mcn = len(disk.children) - mcn # max number of Nones
3453 for chld_disk in disk.children:
3454 try:
3455 cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
3456 except errors.BlockDeviceError, err:
3457 if children.count(None) >= mcn:
3458 raise
3459 cdev = None
3460 logging.error("Error in child activation (but continuing): %s",
3461 str(err))
3462 children.append(cdev)
3463
3464 if as_primary or disk.AssembleOnSecondary():
3465 r_dev = bdev.Assemble(disk, children)
3466 result = r_dev
3467 if as_primary or disk.OpenOnSecondary():
3468 r_dev.Open()
3469 DevCacheManager.UpdateCache(r_dev.dev_path, owner,
3470 as_primary, disk.iv_name)
3471
3472 else:
3473 result = True
3474 return result
3475
3476
3477 def BlockdevAssemble(disk, instance, as_primary, idx):
3478 """Activate a block device for an instance.
3479
3480 This is a wrapper over _RecursiveAssembleBD.
3481
3482 @rtype: str or boolean
3483 @return: a tuple with the C{/dev/...} path and the created symlink
3484 for primary nodes, and (C{True}, C{True}) for secondary nodes
3485
3486 """
3487 try:
3488 result = _RecursiveAssembleBD(disk, instance.name, as_primary)
3489 if isinstance(result, BlockDev):
3490 # pylint: disable=E1103
3491 dev_path = result.dev_path
3492 link_name = None
3493 uri = None
3494 if as_primary:
3495 link_name = _SymlinkBlockDev(instance.name, dev_path, idx)
3496 uri = _CalculateDeviceURI(instance, disk, result)
3497 elif result:
3498 return result, result
3499 else:
3500 _Fail("Unexpected result from _RecursiveAssembleBD")
3501 except errors.BlockDeviceError, err:
3502 _Fail("Error while assembling disk: %s", err, exc=True)
3503 except OSError, err:
3504 _Fail("Error while symlinking disk: %s", err, exc=True)
3505
3506 return dev_path, link_name, uri
3507
3508
3509 def BlockdevShutdown(disk):
3510 """Shut down a block device.
3511
3512 First, if the device is assembled (Attach() is successful), then
3513 the device is shutdown. Then the children of the device are
3514 shutdown.
3515
3516 This function is called recursively. Note that we don't cache the
3517 children or such, as oppossed to assemble, shutdown of different
3518 devices doesn't require that the upper device was active.
3519
3520 @type disk: L{objects.Disk}
3521 @param disk: the description of the disk we should
3522 shutdown
3523 @rtype: None
3524
3525 """
3526 msgs = []
3527 r_dev = _RecursiveFindBD(disk)
3528 if r_dev is not None:
3529 r_path = r_dev.dev_path
3530 try:
3531 r_dev.Shutdown()
3532 DevCacheManager.RemoveCache(r_path)
3533 except errors.BlockDeviceError, err:
3534 msgs.append(str(err))
3535
3536 if disk.children:
3537 for child in disk.children:
3538 try:
3539 BlockdevShutdown(child)
3540 except RPCFail, err:
3541 msgs.append(str(err))
3542
3543 if msgs:
3544 _Fail("; ".join(msgs))
3545
3546
3547 def BlockdevAddchildren(parent_cdev, new_cdevs):
3548 """Extend a mirrored block device.
3549
3550 @type parent_cdev: L{objects.Disk}
3551 @param parent_cdev: the disk to which we should add children
3552 @type new_cdevs: list of L{objects.Disk}
3553 @param new_cdevs: the list of children which we should add
3554 @rtype: None
3555
3556 """
3557 parent_bdev = _RecursiveFindBD(parent_cdev)
3558 if parent_bdev is None:
3559 _Fail("Can't find parent device '%s' in add children", parent_cdev)
3560 new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
3561 if new_bdevs.count(None) > 0:
3562 _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
3563 parent_bdev.AddChildren(new_bdevs)
3564
3565
3566 def BlockdevRemovechildren(parent_cdev, new_cdevs):
3567 """Shrink a mirrored block device.
3568
3569 @type parent_cdev: L{objects.Disk}
3570 @param parent_cdev: the disk from which we should remove children
3571 @type new_cdevs: list of L{objects.Disk}
3572 @param new_cdevs: the list of children which we should remove
3573 @rtype: None
3574
3575 """
3576 parent_bdev = _RecursiveFindBD(parent_cdev)
3577 if parent_bdev is None:
3578 _Fail("Can't find parent device '%s' in remove children", parent_cdev)
3579 devs = []
3580 for disk in new_cdevs:
3581 rpath = disk.StaticDevPath()
3582 if rpath is None:
3583 bd = _RecursiveFindBD(disk)
3584 if bd is None:
3585 _Fail("Can't find device %s while removing children", disk)
3586 else:
3587 devs.append(bd.dev_path)
3588 else:
3589 if not utils.IsNormAbsPath(rpath):
3590 _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
3591 devs.append(rpath)
3592 parent_bdev.RemoveChildren(devs)
3593
3594
3595 def BlockdevGetmirrorstatus(disks):
3596 """Get the mirroring status of a list of devices.
3597
3598 @type disks: list of L{objects.Disk}
3599 @param disks: the list of disks which we should query
3600 @rtype: disk
3601 @return: List of L{objects.BlockDevStatus}, one for each disk
3602 @raise errors.BlockDeviceError: if any of the disks cannot be
3603 found
3604
3605 """
3606 stats = []
3607 for dsk in disks:
3608 rbd = _RecursiveFindBD(dsk)
3609 if rbd is None:
3610 _Fail("Can't find device %s", dsk)
3611
3612 stats.append(rbd.CombinedSyncStatus())
3613
3614 return stats
3615
3616
3617 def BlockdevGetmirrorstatusMulti(disks):
3618 """Get the mirroring status of a list of devices.
3619
3620 @type disks: list of L{objects.Disk}
3621 @param disks: the list of disks which we should query
3622 @rtype: disk
3623 @return: List of tuples, (bool, status), one for each disk; bool denotes
3624 success/failure, status is L{objects.BlockDevStatus} on success, string
3625 otherwise
3626
3627 """
3628 result = []
3629 for disk in disks:
3630 try:
3631 rbd = _RecursiveFindBD(disk)
3632 if rbd is None:
3633 result.append((False, "Can't find device %s" % disk))
3634 continue
3635
3636 status = rbd.CombinedSyncStatus()
3637 except errors.BlockDeviceError, err:
3638 logging.exception("Error while getting disk status")
3639 result.append((False, str(err)))
3640 else:
3641 result.append((True, status))
3642
3643 assert len(disks) == len(result)
3644
3645 return result
3646
3647
3648 def _RecursiveFindBD(disk):
3649 """Check if a device is activated.
3650
3651 If so, return information about the real device.
3652
3653 @type disk: L{objects.Disk}
3654 @param disk: the disk object we need to find
3655
3656 @return: None if the device can't be found,
3657 otherwise the device instance
3658
3659 """
3660 children = []
3661 if disk.children:
3662 for chdisk in disk.children:
3663 children.append(_RecursiveFindBD(chdisk))
3664
3665 return bdev.FindDevice(disk, children)
3666
3667
3668 def _OpenRealBD(disk):
3669 """Opens the underlying block device of a disk.
3670
3671 @type disk: L{objects.Disk}
3672 @param disk: the disk object we want to open
3673
3674 """
3675 real_disk = _RecursiveFindBD(disk)
3676 if real_disk is None:
3677 _Fail("Block device '%s' is not set up", disk)
3678
3679 real_disk.Open()
3680
3681 return real_disk
3682
3683
3684 def BlockdevFind(disk):
3685 """Check if a device is activated.
3686
3687 If it is, return information about the real device.
3688
3689 @type disk: L{objects.Disk}
3690 @param disk: the disk to find
3691 @rtype: None or objects.BlockDevStatus
3692 @return: None if the disk cannot be found, otherwise a the current
3693 information
3694
3695 """
3696 try:
3697 rbd = _RecursiveFindBD(disk)
3698 except errors.BlockDeviceError, err:
3699 _Fail("Failed to find device: %s", err, exc=True)
3700
3701 if rbd is None:
3702 return None
3703
3704 return rbd.GetSyncStatus()
3705
3706
3707 def BlockdevGetdimensions(disks):
3708 """Computes the size of the given disks.
3709
3710 If a disk is not found, returns None instead.
3711
3712 @type disks: list of L{objects.Disk}
3713 @param disks: the list of disk to compute the size for
3714 @rtype: list
3715 @return: list with elements None if the disk cannot be found,
3716 otherwise the pair (size, spindles), where spindles is None if the
3717 device doesn't support that
3718
3719 """
3720 result = []
3721 for cf in disks:
3722 try:
3723 rbd = _RecursiveFindBD(cf)
3724 except errors.BlockDeviceError:
3725 result.append(None)
3726 continue
3727 if rbd is None:
3728 result.append(None)
3729 else:
3730 result.append(rbd.GetActualDimensions())
3731 return result
3732
3733
3734 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
3735 """Write a file to the filesystem.
3736
3737 This allows the master to overwrite(!) a file. It will only perform
3738 the operation if the file belongs to a list of configuration files.
3739
3740 @type file_name: str
3741 @param file_name: the target file name
3742 @type data: str
3743 @param data: the new contents of the file
3744 @type mode: int
3745 @param mode: the mode to give the file (can be None)
3746 @type uid: string
3747 @param uid: the owner of the file
3748 @type gid: string
3749 @param gid: the group of the file
3750 @type atime: float
3751 @param atime: the atime to set on the file (can be None)
3752 @type mtime: float
3753 @param mtime: the mtime to set on the file (can be None)
3754 @rtype: None
3755
3756 """
3757 file_name = vcluster.LocalizeVirtualPath(file_name)
3758
3759 if not os.path.isabs(file_name):
3760 _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
3761
3762 if file_name not in _ALLOWED_UPLOAD_FILES:
3763 _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
3764 file_name)
3765
3766 raw_data = _Decompress(data)
3767
3768 if not (isinstance(uid, basestring) and isinstance(gid, basestring)):
3769 _Fail("Invalid username/groupname type")
3770
3771 getents = runtime.GetEnts()
3772 uid = getents.LookupUser(uid)
3773 gid = getents.LookupGroup(gid)
3774
3775 utils.SafeWriteFile(file_name, None,
3776 data=raw_data, mode=mode, uid=uid, gid=gid,
3777 atime=atime, mtime=mtime)
3778
3779
3780 def RunOob(oob_program, command, node, timeout):
3781 """Executes oob_program with given command on given node.
3782
3783 @param oob_program: The path to the executable oob_program
3784 @param command: The command to invoke on oob_program
3785 @param node: The node given as an argument to the program
3786 @param timeout: Timeout after which we kill the oob program
3787
3788 @return: stdout
3789 @raise RPCFail: If execution fails for some reason
3790
3791 """
3792 result = utils.RunCmd([oob_program, command, node], timeout=timeout)
3793
3794 if result.failed:
3795 _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
3796 result.fail_reason, result.output)
3797
3798 return result.stdout
3799
3800
3801 def _OSOndiskAPIVersion(os_dir):
3802 """Compute and return the API version of a given OS.
3803
3804 This function will try to read the API version of the OS residing in
3805 the 'os_dir' directory.
3806
3807 @type os_dir: str
3808 @param os_dir: the directory in which we should look for the OS
3809 @rtype: tuple
3810 @return: tuple (status, data) with status denoting the validity and
3811 data holding either the valid versions or an error message
3812
3813 """
3814 api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
3815
3816 try:
3817 st = os.stat(api_file)
3818 except EnvironmentError, err:
3819 return False, ("Required file '%s' not found under path %s: %s" %
3820 (constants.OS_API_FILE, os_dir, utils.ErrnoOrStr(err)))
3821
3822 if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
3823 return False, ("File '%s' in %s is not a regular file" %
3824 (constants.OS_API_FILE, os_dir))
3825
3826 try:
3827 api_versions = utils.ReadFile(api_file).splitlines()
3828 except EnvironmentError, err:
3829 return False, ("Error while reading the API version file at %s: %s" %
3830 (api_file, utils.ErrnoOrStr(err)))
3831
3832 try:
3833 api_versions = [int(version.strip()) for version in api_versions]
3834 except (TypeError, ValueError), err:
3835 return False, ("API version(s) can't be converted to integer: %s" %
3836 str(err))
3837
3838 return True, api_versions
3839
3840
3841 def DiagnoseOS(top_dirs=None):
3842 """Compute the validity for all OSes.
3843
3844 @type top_dirs: list
3845 @param top_dirs: the list of directories in which to
3846 search (if not given defaults to
3847 L{pathutils.OS_SEARCH_PATH})
3848 @rtype: list of L{objects.OS}
3849 @return: a list of tuples (name, path, status, diagnose, variants,
3850 parameters, api_version) for all (potential) OSes under all
3851 search paths, where:
3852 - name is the (potential) OS name
3853 - path is the full path to the OS
3854 - status True/False is the validity of the OS
3855 - diagnose is the error message for an invalid OS, otherwise empty
3856 - variants is a list of supported OS variants, if any
3857 - parameters is a list of (name, help) parameters, if any
3858 - api_version is a list of support OS API versions
3859
3860 """
3861 if top_dirs is None:
3862 top_dirs = pathutils.OS_SEARCH_PATH
3863
3864 result = []
3865 for dir_name in top_dirs:
3866 if os.path.isdir(dir_name):
3867 try:
3868 f_names = utils.ListVisibleFiles(dir_name)
3869 except EnvironmentError, err:
3870 logging.exception("Can't list the OS directory %s: %s", dir_name, err)
3871 break
3872 for name in f_names:
3873 os_path = utils.PathJoin(dir_name, name)
3874 status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
3875 if status:
3876 diagnose = ""
3877 variants = os_inst.supported_variants
3878 parameters = os_inst.supported_parameters
3879 api_versions = os_inst.api_versions
3880 trusted = False if os_inst.create_script_untrusted else True
3881 else:
3882 diagnose = os_inst
3883 variants = parameters = api_versions = []
3884 trusted = True
3885 result.append((name, os_path, status, diagnose, variants,
3886 parameters, api_versions, trusted))
3887
3888 return result
3889
3890
3891 def _TryOSFromDisk(name, base_dir=None):
3892 """Create an OS instance from disk.
3893
3894 This function will return an OS instance if the given name is a
3895 valid OS name.
3896
3897 @type base_dir: string
3898 @keyword base_dir: Base directory containing OS installations.
3899 Defaults to a search in all the OS_SEARCH_PATH dirs.
3900 @rtype: tuple
3901 @return: success and either the OS instance if we find a valid one,
3902 or error message
3903
3904 """
3905 if base_dir is None:
3906 os_dir = utils.FindFile(name, pathutils.OS_SEARCH_PATH, os.path.isdir)
3907 else:
3908 os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
3909
3910 if os_dir is None:
3911 return False, "Directory for OS %s not found in search path" % name
3912
3913 status, api_versions = _OSOndiskAPIVersion(os_dir)
3914 if not status:
3915 # push the error up
3916 return status, api_versions
3917
3918 if not constants.OS_API_VERSIONS.intersection(api_versions):
3919 return False, ("API version mismatch for path '%s': found %s, want %s." %
3920 (os_dir, api_versions, constants.OS_API_VERSIONS))
3921
3922 # OS Files dictionary, we will populate it with the absolute path
3923 # names; if the value is True, then it is a required file, otherwise
3924 # an optional one
3925 os_files = dict.fromkeys(constants.OS_SCRIPTS, True)
3926