Merge branch 'stable-2.12' into stable-2.13
[ganeti-github.git] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Google Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions are
9 # met:
10 #
11 # 1. Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
13 #
14 # 2. Redistributions in binary form must reproduce the above copyright
15 # notice, this list of conditions and the following disclaimer in the
16 # documentation and/or other materials provided with the distribution.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
19 # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
20 # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30
31 """Functions used by the node daemon
32
33 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
34 the L{UploadFile} function
35 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
36 in the L{_CleanDirectory} function
37
38 """
39
40 # pylint: disable=E1103,C0302
41
42 # E1103: %s %r has no %r member (but some types could not be
43 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
44 # or (False, "string") which confuses pylint
45
46 # C0302: This module has become too big and should be split up
47
48
49 import base64
50 import errno
51 import logging
52 import os
53 import os.path
54 import pycurl
55 import random
56 import re
57 import shutil
58 import signal
59 import socket
60 import stat
61 import tempfile
62 import time
63 import zlib
64 import copy
65
66 from ganeti import errors
67 from ganeti import http
68 from ganeti import utils
69 from ganeti import ssh
70 from ganeti import hypervisor
71 from ganeti.hypervisor import hv_base
72 from ganeti import constants
73 from ganeti.storage import bdev
74 from ganeti.storage import drbd
75 from ganeti.storage import extstorage
76 from ganeti.storage import filestorage
77 from ganeti import objects
78 from ganeti import ssconf
79 from ganeti import serializer
80 from ganeti import netutils
81 from ganeti import runtime
82 from ganeti import compat
83 from ganeti import pathutils
84 from ganeti import vcluster
85 from ganeti import ht
86 from ganeti.storage.base import BlockDev
87 from ganeti.storage.drbd import DRBD8
88 from ganeti import hooksmaster
89 from ganeti.rpc import transport
90 from ganeti.rpc.errors import NoMasterError, TimeoutError
91
92
93 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
94 _ALLOWED_CLEAN_DIRS = compat.UniqueFrozenset([
95 pathutils.DATA_DIR,
96 pathutils.JOB_QUEUE_ARCHIVE_DIR,
97 pathutils.QUEUE_DIR,
98 pathutils.CRYPTO_KEYS_DIR,
99 ])
100 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
101 _X509_KEY_FILE = "key"
102 _X509_CERT_FILE = "cert"
103 _IES_STATUS_FILE = "status"
104 _IES_PID_FILE = "pid"
105 _IES_CA_FILE = "ca"
106
107 #: Valid LVS output line regex
108 _LVSLINE_REGEX = re.compile(r"^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6,})\|?$")
109
110 # Actions for the master setup script
111 _MASTER_START = "start"
112 _MASTER_STOP = "stop"
113
114 #: Maximum file permissions for restricted command directory and executables
115 _RCMD_MAX_MODE = (stat.S_IRWXU |
116 stat.S_IRGRP | stat.S_IXGRP |
117 stat.S_IROTH | stat.S_IXOTH)
118
119 #: Delay before returning an error for restricted commands
120 _RCMD_INVALID_DELAY = 10
121
122 #: How long to wait to acquire lock for restricted commands (shorter than
123 #: L{_RCMD_INVALID_DELAY}) to reduce blockage of noded forks when many
124 #: command requests arrive
125 _RCMD_LOCK_TIMEOUT = _RCMD_INVALID_DELAY * 0.8
126
127
128 class RPCFail(Exception):
129 """Class denoting RPC failure.
130
131 Its argument is the error message.
132
133 """
134
135
136 def _GetInstReasonFilename(instance_name):
137 """Path of the file containing the reason of the instance status change.
138
139 @type instance_name: string
140 @param instance_name: The name of the instance
141 @rtype: string
142 @return: The path of the file
143
144 """
145 return utils.PathJoin(pathutils.INSTANCE_REASON_DIR, instance_name)
146
147
148 def _StoreInstReasonTrail(instance_name, trail):
149 """Serialize a reason trail related to an instance change of state to file.
150
151 The exact location of the file depends on the name of the instance and on
152 the configuration of the Ganeti cluster defined at deploy time.
153
154 @type instance_name: string
155 @param instance_name: The name of the instance
156
157 @type trail: list of reasons
158 @param trail: reason trail
159
160 @rtype: None
161
162 """
163 json = serializer.DumpJson(trail)
164 filename = _GetInstReasonFilename(instance_name)
165 utils.WriteFile(filename, data=json)
166
167
168 def _Fail(msg, *args, **kwargs):
169 """Log an error and the raise an RPCFail exception.
170
171 This exception is then handled specially in the ganeti daemon and
172 turned into a 'failed' return type. As such, this function is a
173 useful shortcut for logging the error and returning it to the master
174 daemon.
175
176 @type msg: string
177 @param msg: the text of the exception
178 @raise RPCFail
179
180 """
181 if args:
182 msg = msg % args
183 if "log" not in kwargs or kwargs["log"]: # if we should log this error
184 if "exc" in kwargs and kwargs["exc"]:
185 logging.exception(msg)
186 else:
187 logging.error(msg)
188 raise RPCFail(msg)
189
190
191 def _GetConfig():
192 """Simple wrapper to return a SimpleStore.
193
194 @rtype: L{ssconf.SimpleStore}
195 @return: a SimpleStore instance
196
197 """
198 return ssconf.SimpleStore()
199
200
201 def _GetSshRunner(cluster_name):
202 """Simple wrapper to return an SshRunner.
203
204 @type cluster_name: str
205 @param cluster_name: the cluster name, which is needed
206 by the SshRunner constructor
207 @rtype: L{ssh.SshRunner}
208 @return: an SshRunner instance
209
210 """
211 return ssh.SshRunner(cluster_name)
212
213
214 def _Decompress(data):
215 """Unpacks data compressed by the RPC client.
216
217 @type data: list or tuple
218 @param data: Data sent by RPC client
219 @rtype: str
220 @return: Decompressed data
221
222 """
223 assert isinstance(data, (list, tuple))
224 assert len(data) == 2
225 (encoding, content) = data
226 if encoding == constants.RPC_ENCODING_NONE:
227 return content
228 elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
229 return zlib.decompress(base64.b64decode(content))
230 else:
231 raise AssertionError("Unknown data encoding")
232
233
234 def _CleanDirectory(path, exclude=None):
235 """Removes all regular files in a directory.
236
237 @type path: str
238 @param path: the directory to clean
239 @type exclude: list
240 @param exclude: list of files to be excluded, defaults
241 to the empty list
242
243 """
244 if path not in _ALLOWED_CLEAN_DIRS:
245 _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
246 path)
247
248 if not os.path.isdir(path):
249 return
250 if exclude is None:
251 exclude = []
252 else:
253 # Normalize excluded paths
254 exclude = [os.path.normpath(i) for i in exclude]
255
256 for rel_name in utils.ListVisibleFiles(path):
257 full_name = utils.PathJoin(path, rel_name)
258 if full_name in exclude:
259 continue
260 if os.path.isfile(full_name) and not os.path.islink(full_name):
261 utils.RemoveFile(full_name)
262
263
264 def _BuildUploadFileList():
265 """Build the list of allowed upload files.
266
267 This is abstracted so that it's built only once at module import time.
268
269 """
270 allowed_files = set([
271 pathutils.CLUSTER_CONF_FILE,
272 pathutils.ETC_HOSTS,
273 pathutils.SSH_KNOWN_HOSTS_FILE,
274 pathutils.VNC_PASSWORD_FILE,
275 pathutils.RAPI_CERT_FILE,
276 pathutils.SPICE_CERT_FILE,
277 pathutils.SPICE_CACERT_FILE,
278 pathutils.RAPI_USERS_FILE,
279 pathutils.CONFD_HMAC_KEY,
280 pathutils.CLUSTER_DOMAIN_SECRET_FILE,
281 ])
282
283 for hv_name in constants.HYPER_TYPES:
284 hv_class = hypervisor.GetHypervisorClass(hv_name)
285 allowed_files.update(hv_class.GetAncillaryFiles()[0])
286
287 assert pathutils.FILE_STORAGE_PATHS_FILE not in allowed_files, \
288 "Allowed file storage paths should never be uploaded via RPC"
289
290 return frozenset(allowed_files)
291
292
293 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
294
295
296 def JobQueuePurge():
297 """Removes job queue files and archived jobs.
298
299 @rtype: tuple
300 @return: True, None
301
302 """
303 _CleanDirectory(pathutils.QUEUE_DIR, exclude=[pathutils.JOB_QUEUE_LOCK_FILE])
304 _CleanDirectory(pathutils.JOB_QUEUE_ARCHIVE_DIR)
305
306
307 def GetMasterNodeName():
308 """Returns the master node name.
309
310 @rtype: string
311 @return: name of the master node
312 @raise RPCFail: in case of errors
313
314 """
315 try:
316 return _GetConfig().GetMasterNode()
317 except errors.ConfigurationError, err:
318 _Fail("Cluster configuration incomplete: %s", err, exc=True)
319
320
321 def RunLocalHooks(hook_opcode, hooks_path, env_builder_fn):
322 """Decorator that runs hooks before and after the decorated function.
323
324 @type hook_opcode: string
325 @param hook_opcode: opcode of the hook
326 @type hooks_path: string
327 @param hooks_path: path of the hooks
328 @type env_builder_fn: function
329 @param env_builder_fn: function that returns a dictionary containing the
330 environment variables for the hooks. Will get all the parameters of the
331 decorated function.
332 @raise RPCFail: in case of pre-hook failure
333
334 """
335 def decorator(fn):
336 def wrapper(*args, **kwargs):
337 _, myself = ssconf.GetMasterAndMyself()
338 nodes = ([myself], [myself]) # these hooks run locally
339
340 env_fn = compat.partial(env_builder_fn, *args, **kwargs)
341
342 cfg = _GetConfig()
343 hr = HooksRunner()
344 hm = hooksmaster.HooksMaster(hook_opcode, hooks_path, nodes,
345 hr.RunLocalHooks, None, env_fn, None,
346 logging.warning, cfg.GetClusterName(),
347 cfg.GetMasterNode())
348 hm.RunPhase(constants.HOOKS_PHASE_PRE)
349 result = fn(*args, **kwargs)
350 hm.RunPhase(constants.HOOKS_PHASE_POST)
351
352 return result
353 return wrapper
354 return decorator
355
356
357 def _BuildMasterIpEnv(master_params, use_external_mip_script=None):
358 """Builds environment variables for master IP hooks.
359
360 @type master_params: L{objects.MasterNetworkParameters}
361 @param master_params: network parameters of the master
362 @type use_external_mip_script: boolean
363 @param use_external_mip_script: whether to use an external master IP
364 address setup script (unused, but necessary per the implementation of the
365 _RunLocalHooks decorator)
366
367 """
368 # pylint: disable=W0613
369 ver = netutils.IPAddress.GetVersionFromAddressFamily(master_params.ip_family)
370 env = {
371 "MASTER_NETDEV": master_params.netdev,
372 "MASTER_IP": master_params.ip,
373 "MASTER_NETMASK": str(master_params.netmask),
374 "CLUSTER_IP_VERSION": str(ver),
375 }
376
377 return env
378
379
380 def _RunMasterSetupScript(master_params, action, use_external_mip_script):
381 """Execute the master IP address setup script.
382
383 @type master_params: L{objects.MasterNetworkParameters}
384 @param master_params: network parameters of the master
385 @type action: string
386 @param action: action to pass to the script. Must be one of
387 L{backend._MASTER_START} or L{backend._MASTER_STOP}
388 @type use_external_mip_script: boolean
389 @param use_external_mip_script: whether to use an external master IP
390 address setup script
391 @raise backend.RPCFail: if there are errors during the execution of the
392 script
393
394 """
395 env = _BuildMasterIpEnv(master_params)
396
397 if use_external_mip_script:
398 setup_script = pathutils.EXTERNAL_MASTER_SETUP_SCRIPT
399 else:
400 setup_script = pathutils.DEFAULT_MASTER_SETUP_SCRIPT
401
402 result = utils.RunCmd([setup_script, action], env=env, reset_env=True)
403
404 if result.failed:
405 _Fail("Failed to %s the master IP. Script return value: %s, output: '%s'" %
406 (action, result.exit_code, result.output), log=True)
407
408
409 @RunLocalHooks(constants.FAKE_OP_MASTER_TURNUP, "master-ip-turnup",
410 _BuildMasterIpEnv)
411 def ActivateMasterIp(master_params, use_external_mip_script):
412 """Activate the IP address of the master daemon.
413
414 @type master_params: L{objects.MasterNetworkParameters}
415 @param master_params: network parameters of the master
416 @type use_external_mip_script: boolean
417 @param use_external_mip_script: whether to use an external master IP
418 address setup script
419 @raise RPCFail: in case of errors during the IP startup
420
421 """
422 _RunMasterSetupScript(master_params, _MASTER_START,
423 use_external_mip_script)
424
425
426 def StartMasterDaemons(no_voting):
427 """Activate local node as master node.
428
429 The function will start the master daemons (ganeti-masterd and ganeti-rapi).
430
431 @type no_voting: boolean
432 @param no_voting: whether to start ganeti-masterd without a node vote
433 but still non-interactively
434 @rtype: None
435
436 """
437
438 if no_voting:
439 daemon_args = "--no-voting --yes-do-it"
440 else:
441 daemon_args = ""
442
443 env = {
444 "EXTRA_LUXID_ARGS": daemon_args,
445 "EXTRA_WCONFD_ARGS": daemon_args,
446 }
447
448 result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-master"], env=env)
449 if result.failed:
450 msg = "Can't start Ganeti master: %s" % result.output
451 logging.error(msg)
452 _Fail(msg)
453
454
455 @RunLocalHooks(constants.FAKE_OP_MASTER_TURNDOWN, "master-ip-turndown",
456 _BuildMasterIpEnv)
457 def DeactivateMasterIp(master_params, use_external_mip_script):
458 """Deactivate the master IP on this node.
459
460 @type master_params: L{objects.MasterNetworkParameters}
461 @param master_params: network parameters of the master
462 @type use_external_mip_script: boolean
463 @param use_external_mip_script: whether to use an external master IP
464 address setup script
465 @raise RPCFail: in case of errors during the IP turndown
466
467 """
468 _RunMasterSetupScript(master_params, _MASTER_STOP,
469 use_external_mip_script)
470
471
472 def StopMasterDaemons():
473 """Stop the master daemons on this node.
474
475 Stop the master daemons (ganeti-masterd and ganeti-rapi) on this node.
476
477 @rtype: None
478
479 """
480 # TODO: log and report back to the caller the error failures; we
481 # need to decide in which case we fail the RPC for this
482
483 result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-master"])
484 if result.failed:
485 logging.error("Could not stop Ganeti master, command %s had exitcode %s"
486 " and error %s",
487 result.cmd, result.exit_code, result.output)
488
489
490 def ChangeMasterNetmask(old_netmask, netmask, master_ip, master_netdev):
491 """Change the netmask of the master IP.
492
493 @param old_netmask: the old value of the netmask
494 @param netmask: the new value of the netmask
495 @param master_ip: the master IP
496 @param master_netdev: the master network device
497
498 """
499 if old_netmask == netmask:
500 return
501
502 if not netutils.IPAddress.Own(master_ip):
503 _Fail("The master IP address is not up, not attempting to change its"
504 " netmask")
505
506 result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "add",
507 "%s/%s" % (master_ip, netmask),
508 "dev", master_netdev, "label",
509 "%s:0" % master_netdev])
510 if result.failed:
511 _Fail("Could not set the new netmask on the master IP address")
512
513 result = utils.RunCmd([constants.IP_COMMAND_PATH, "address", "del",
514 "%s/%s" % (master_ip, old_netmask),
515 "dev", master_netdev, "label",
516 "%s:0" % master_netdev])
517 if result.failed:
518 _Fail("Could not bring down the master IP address with the old netmask")
519
520
521 def EtcHostsModify(mode, host, ip):
522 """Modify a host entry in /etc/hosts.
523
524 @param mode: The mode to operate. Either add or remove entry
525 @param host: The host to operate on
526 @param ip: The ip associated with the entry
527
528 """
529 if mode == constants.ETC_HOSTS_ADD:
530 if not ip:
531 RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
532 " present")
533 utils.AddHostToEtcHosts(host, ip)
534 elif mode == constants.ETC_HOSTS_REMOVE:
535 if ip:
536 RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
537 " parameter is present")
538 utils.RemoveHostFromEtcHosts(host)
539 else:
540 RPCFail("Mode not supported")
541
542
543 def LeaveCluster(modify_ssh_setup):
544 """Cleans up and remove the current node.
545
546 This function cleans up and prepares the current node to be removed
547 from the cluster.
548
549 If processing is successful, then it raises an
550 L{errors.QuitGanetiException} which is used as a special case to
551 shutdown the node daemon.
552
553 @param modify_ssh_setup: boolean
554
555 """
556 _CleanDirectory(pathutils.DATA_DIR)
557 _CleanDirectory(pathutils.CRYPTO_KEYS_DIR)
558 JobQueuePurge()
559
560 if modify_ssh_setup:
561 try:
562 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.SSH_LOGIN_USER)
563
564 ssh.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
565
566 utils.RemoveFile(priv_key)
567 utils.RemoveFile(pub_key)
568 except errors.OpExecError:
569 logging.exception("Error while processing ssh files")
570
571 try:
572 utils.RemoveFile(pathutils.CONFD_HMAC_KEY)
573 utils.RemoveFile(pathutils.RAPI_CERT_FILE)
574 utils.RemoveFile(pathutils.SPICE_CERT_FILE)
575 utils.RemoveFile(pathutils.SPICE_CACERT_FILE)
576 utils.RemoveFile(pathutils.NODED_CERT_FILE)
577 except: # pylint: disable=W0702
578 logging.exception("Error while removing cluster secrets")
579
580 utils.StopDaemon(constants.CONFD)
581 utils.StopDaemon(constants.MOND)
582 utils.StopDaemon(constants.KVMD)
583
584 # Raise a custom exception (handled in ganeti-noded)
585 raise errors.QuitGanetiException(True, "Shutdown scheduled")
586
587
588 def _CheckStorageParams(params, num_params):
589 """Performs sanity checks for storage parameters.
590
591 @type params: list
592 @param params: list of storage parameters
593 @type num_params: int
594 @param num_params: expected number of parameters
595
596 """
597 if params is None:
598 raise errors.ProgrammerError("No storage parameters for storage"
599 " reporting is provided.")
600 if not isinstance(params, list):
601 raise errors.ProgrammerError("The storage parameters are not of type"
602 " list: '%s'" % params)
603 if not len(params) == num_params:
604 raise errors.ProgrammerError("Did not receive the expected number of"
605 "storage parameters: expected %s,"
606 " received '%s'" % (num_params, len(params)))
607
608
609 def _CheckLvmStorageParams(params):
610 """Performs sanity check for the 'exclusive storage' flag.
611
612 @see: C{_CheckStorageParams}
613
614 """
615 _CheckStorageParams(params, 1)
616 excl_stor = params[0]
617 if not isinstance(params[0], bool):
618 raise errors.ProgrammerError("Exclusive storage parameter is not"
619 " boolean: '%s'." % excl_stor)
620 return excl_stor
621
622
623 def _GetLvmVgSpaceInfo(name, params):
624 """Wrapper around C{_GetVgInfo} which checks the storage parameters.
625
626 @type name: string
627 @param name: name of the volume group
628 @type params: list
629 @param params: list of storage parameters, which in this case should be
630 containing only one for exclusive storage
631
632 """
633 excl_stor = _CheckLvmStorageParams(params)
634 return _GetVgInfo(name, excl_stor)
635
636
637 def _GetVgInfo(
638 name, excl_stor, info_fn=bdev.LogicalVolume.GetVGInfo):
639 """Retrieves information about a LVM volume group.
640
641 """
642 # TODO: GetVGInfo supports returning information for multiple VGs at once
643 vginfo = info_fn([name], excl_stor)
644 if vginfo:
645 vg_free = int(round(vginfo[0][0], 0))
646 vg_size = int(round(vginfo[0][1], 0))
647 else:
648 vg_free = None
649 vg_size = None
650
651 return {
652 "type": constants.ST_LVM_VG,
653 "name": name,
654 "storage_free": vg_free,
655 "storage_size": vg_size,
656 }
657
658
659 def _GetLvmPvSpaceInfo(name, params):
660 """Wrapper around C{_GetVgSpindlesInfo} with sanity checks.
661
662 @see: C{_GetLvmVgSpaceInfo}
663
664 """
665 excl_stor = _CheckLvmStorageParams(params)
666 return _GetVgSpindlesInfo(name, excl_stor)
667
668
669 def _GetVgSpindlesInfo(
670 name, excl_stor, info_fn=bdev.LogicalVolume.GetVgSpindlesInfo):
671 """Retrieves information about spindles in an LVM volume group.
672
673 @type name: string
674 @param name: VG name
675 @type excl_stor: bool
676 @param excl_stor: exclusive storage
677 @rtype: dict
678 @return: dictionary whose keys are "name", "vg_free", "vg_size" for VG name,
679 free spindles, total spindles respectively
680
681 """
682 if excl_stor:
683 (vg_free, vg_size) = info_fn(name)
684 else:
685 vg_free = 0
686 vg_size = 0
687 return {
688 "type": constants.ST_LVM_PV,
689 "name": name,
690 "storage_free": vg_free,
691 "storage_size": vg_size,
692 }
693
694
695 def _GetHvInfo(name, hvparams, get_hv_fn=hypervisor.GetHypervisor):
696 """Retrieves node information from a hypervisor.
697
698 The information returned depends on the hypervisor. Common items:
699
700 - vg_size is the size of the configured volume group in MiB
701 - vg_free is the free size of the volume group in MiB
702 - memory_dom0 is the memory allocated for domain0 in MiB
703 - memory_free is the currently available (free) ram in MiB
704 - memory_total is the total number of ram in MiB
705 - hv_version: the hypervisor version, if available
706
707 @type hvparams: dict of string
708 @param hvparams: the hypervisor's hvparams
709
710 """
711 return get_hv_fn(name).GetNodeInfo(hvparams=hvparams)
712
713
714 def _GetHvInfoAll(hv_specs, get_hv_fn=hypervisor.GetHypervisor):
715 """Retrieves node information for all hypervisors.
716
717 See C{_GetHvInfo} for information on the output.
718
719 @type hv_specs: list of pairs (string, dict of strings)
720 @param hv_specs: list of pairs of a hypervisor's name and its hvparams
721
722 """
723 if hv_specs is None:
724 return None
725
726 result = []
727 for hvname, hvparams in hv_specs:
728 result.append(_GetHvInfo(hvname, hvparams, get_hv_fn))
729 return result
730
731
732 def _GetNamedNodeInfo(names, fn):
733 """Calls C{fn} for all names in C{names} and returns a dictionary.
734
735 @rtype: None or dict
736
737 """
738 if names is None:
739 return None
740 else:
741 return map(fn, names)
742
743
744 def GetNodeInfo(storage_units, hv_specs):
745 """Gives back a hash with different information about the node.
746
747 @type storage_units: list of tuples (string, string, list)
748 @param storage_units: List of tuples (storage unit, identifier, parameters) to
749 ask for disk space information. In case of lvm-vg, the identifier is
750 the VG name. The parameters can contain additional, storage-type-specific
751 parameters, for example exclusive storage for lvm storage.
752 @type hv_specs: list of pairs (string, dict of strings)
753 @param hv_specs: list of pairs of a hypervisor's name and its hvparams
754 @rtype: tuple; (string, None/dict, None/dict)
755 @return: Tuple containing boot ID, volume group information and hypervisor
756 information
757
758 """
759 bootid = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
760 storage_info = _GetNamedNodeInfo(
761 storage_units,
762 (lambda (storage_type, storage_key, storage_params):
763 _ApplyStorageInfoFunction(storage_type, storage_key, storage_params)))
764 hv_info = _GetHvInfoAll(hv_specs)
765 return (bootid, storage_info, hv_info)
766
767
768 def _GetFileStorageSpaceInfo(path, params):
769 """Wrapper around filestorage.GetSpaceInfo.
770
771 The purpose of this wrapper is to call filestorage.GetFileStorageSpaceInfo
772 and ignore the *args parameter to not leak it into the filestorage
773 module's code.
774
775 @see: C{filestorage.GetFileStorageSpaceInfo} for description of the
776 parameters.
777
778 """
779 _CheckStorageParams(params, 0)
780 return filestorage.GetFileStorageSpaceInfo(path)
781
782
783 # FIXME: implement storage reporting for all missing storage types.
784 _STORAGE_TYPE_INFO_FN = {
785 constants.ST_BLOCK: None,
786 constants.ST_DISKLESS: None,
787 constants.ST_EXT: None,
788 constants.ST_FILE: _GetFileStorageSpaceInfo,
789 constants.ST_LVM_PV: _GetLvmPvSpaceInfo,
790 constants.ST_LVM_VG: _GetLvmVgSpaceInfo,
791 constants.ST_SHARED_FILE: None,
792 constants.ST_GLUSTER: None,
793 constants.ST_RADOS: None,
794 }
795
796
797 def _ApplyStorageInfoFunction(storage_type, storage_key, *args):
798 """Looks up and applies the correct function to calculate free and total
799 storage for the given storage type.
800
801 @type storage_type: string
802 @param storage_type: the storage type for which the storage shall be reported.
803 @type storage_key: string
804 @param storage_key: identifier of a storage unit, e.g. the volume group name
805 of an LVM storage unit
806 @type args: any
807 @param args: various parameters that can be used for storage reporting. These
808 parameters and their semantics vary from storage type to storage type and
809 are just propagated in this function.
810 @return: the results of the application of the storage space function (see
811 _STORAGE_TYPE_INFO_FN) if storage space reporting is implemented for that
812 storage type
813 @raises NotImplementedError: for storage types who don't support space
814 reporting yet
815 """
816 fn = _STORAGE_TYPE_INFO_FN[storage_type]
817 if fn is not None:
818 return fn(storage_key, *args)
819 else:
820 raise NotImplementedError
821
822
823 def _CheckExclusivePvs(pvi_list):
824 """Check that PVs are not shared among LVs
825
826 @type pvi_list: list of L{objects.LvmPvInfo} objects
827 @param pvi_list: information about the PVs
828
829 @rtype: list of tuples (string, list of strings)
830 @return: offending volumes, as tuples: (pv_name, [lv1_name, lv2_name...])
831
832 """
833 res = []
834 for pvi in pvi_list:
835 if len(pvi.lv_list) > 1:
836 res.append((pvi.name, pvi.lv_list))
837 return res
838
839
840 def _VerifyHypervisors(what, vm_capable, result, all_hvparams,
841 get_hv_fn=hypervisor.GetHypervisor):
842 """Verifies the hypervisor. Appends the results to the 'results' list.
843
844 @type what: C{dict}
845 @param what: a dictionary of things to check
846 @type vm_capable: boolean
847 @param vm_capable: whether or not this node is vm capable
848 @type result: dict
849 @param result: dictionary of verification results; results of the
850 verifications in this function will be added here
851 @type all_hvparams: dict of dict of string
852 @param all_hvparams: dictionary mapping hypervisor names to hvparams
853 @type get_hv_fn: function
854 @param get_hv_fn: function to retrieve the hypervisor, to improve testability
855
856 """
857 if not vm_capable:
858 return
859
860 if constants.NV_HYPERVISOR in what:
861 result[constants.NV_HYPERVISOR] = {}
862 for hv_name in what[constants.NV_HYPERVISOR]:
863 hvparams = all_hvparams[hv_name]
864 try:
865 val = get_hv_fn(hv_name).Verify(hvparams=hvparams)
866 except errors.HypervisorError, err:
867 val = "Error while checking hypervisor: %s" % str(err)
868 result[constants.NV_HYPERVISOR][hv_name] = val
869
870
871 def _VerifyHvparams(what, vm_capable, result,
872 get_hv_fn=hypervisor.GetHypervisor):
873 """Verifies the hvparams. Appends the results to the 'results' list.
874
875 @type what: C{dict}
876 @param what: a dictionary of things to check
877 @type vm_capable: boolean
878 @param vm_capable: whether or not this node is vm capable
879 @type result: dict
880 @param result: dictionary of verification results; results of the
881 verifications in this function will be added here
882 @type get_hv_fn: function
883 @param get_hv_fn: function to retrieve the hypervisor, to improve testability
884
885 """
886 if not vm_capable:
887 return
888
889 if constants.NV_HVPARAMS in what:
890 result[constants.NV_HVPARAMS] = []
891 for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
892 try:
893 logging.info("Validating hv %s, %s", hv_name, hvparms)
894 get_hv_fn(hv_name).ValidateParameters(hvparms)
895 except errors.HypervisorError, err:
896 result[constants.NV_HVPARAMS].append((source, hv_name, str(err)))
897
898
899 def _VerifyInstanceList(what, vm_capable, result, all_hvparams):
900 """Verifies the instance list.
901
902 @type what: C{dict}
903 @param what: a dictionary of things to check
904 @type vm_capable: boolean
905 @param vm_capable: whether or not this node is vm capable
906 @type result: dict
907 @param result: dictionary of verification results; results of the
908 verifications in this function will be added here
909 @type all_hvparams: dict of dict of string
910 @param all_hvparams: dictionary mapping hypervisor names to hvparams
911
912 """
913 if constants.NV_INSTANCELIST in what and vm_capable:
914 # GetInstanceList can fail
915 try:
916 val = GetInstanceList(what[constants.NV_INSTANCELIST],
917 all_hvparams=all_hvparams)
918 except RPCFail, err:
919 val = str(err)
920 result[constants.NV_INSTANCELIST] = val
921
922
923 def _VerifyNodeInfo(what, vm_capable, result, all_hvparams):
924 """Verifies the node info.
925
926 @type what: C{dict}
927 @param what: a dictionary of things to check
928 @type vm_capable: boolean
929 @param vm_capable: whether or not this node is vm capable
930 @type result: dict
931 @param result: dictionary of verification results; results of the
932 verifications in this function will be added here
933 @type all_hvparams: dict of dict of string
934 @param all_hvparams: dictionary mapping hypervisor names to hvparams
935
936 """
937 if constants.NV_HVINFO in what and vm_capable:
938 hvname = what[constants.NV_HVINFO]
939 hyper = hypervisor.GetHypervisor(hvname)
940 hvparams = all_hvparams[hvname]
941 result[constants.NV_HVINFO] = hyper.GetNodeInfo(hvparams=hvparams)
942
943
944 def _VerifyClientCertificate(cert_file=pathutils.NODED_CLIENT_CERT_FILE):
945 """Verify the existance and validity of the client SSL certificate.
946
947 Also, verify that the client certificate is not self-signed. Self-
948 signed client certificates stem from Ganeti versions 2.12.0 - 2.12.4
949 and should be replaced by client certificates signed by the server
950 certificate. Hence we output a warning when we encounter a self-signed
951 one.
952
953 """
954 create_cert_cmd = "gnt-cluster renew-crypto --new-node-certificates"
955 if not os.path.exists(cert_file):
956 return (constants.CV_ERROR,
957 "The client certificate does not exist. Run '%s' to create"
958 " client certificates for all nodes." % create_cert_cmd)
959
960 (errcode, msg) = utils.VerifyCertificate(cert_file)
961 if errcode is not None:
962 return (errcode, msg)
963
964 (errcode, msg) = utils.IsCertificateSelfSigned(cert_file)
965 if errcode is not None:
966 return (errcode, msg)
967
968 # if everything is fine, we return the digest to be compared to the config
969 return (None, utils.GetCertificateDigest(cert_filename=cert_file))
970
971
972 def _VerifySshSetup(node_status_list, my_name,
973 pub_key_file=pathutils.SSH_PUB_KEYS):
974 """Verifies the state of the SSH key files.
975
976 @type node_status_list: list of tuples
977 @param node_status_list: list of nodes of the cluster associated with a
978 couple of flags: (uuid, name, is_master_candidate,
979 is_potential_master_candidate, online)
980 @type my_name: str
981 @param my_name: name of this node
982 @type pub_key_file: str
983 @param pub_key_file: filename of the public key file
984
985 """
986 if node_status_list is None:
987 return ["No node list to check against the pub_key_file received."]
988
989 my_status_list = [(my_uuid, name, mc, pot_mc, online) for
990 (my_uuid, name, mc, pot_mc, online)
991 in node_status_list if name == my_name]
992 if len(my_status_list) == 0:
993 return ["Cannot find node information for node '%s'." % my_name]
994 (my_uuid, _, _, potential_master_candidate, online) = \
995 my_status_list[0]
996
997 result = []
998
999 if not os.path.exists(pub_key_file):
1000 result.append("The public key file '%s' does not exist. Consider running"
1001 " 'gnt-cluster renew-crypto --new-ssh-keys"
1002 " [--no-ssh-key-check]' to fix this." % pub_key_file)
1003 return result
1004
1005 pot_mc_uuids = [uuid for (uuid, _, _, _, _) in node_status_list]
1006 offline_nodes = [uuid for (uuid, _, _, _, online) in node_status_list
1007 if not online]
1008 pub_keys = ssh.QueryPubKeyFile(None)
1009
1010 if potential_master_candidate:
1011 # Check that the set of potential master candidates matches the
1012 # public key file
1013 pub_uuids_set = set(pub_keys.keys()) - set(offline_nodes)
1014 pot_mc_uuids_set = set(pot_mc_uuids) - set(offline_nodes)
1015 missing_uuids = set([])
1016 if pub_uuids_set != pot_mc_uuids_set:
1017 unknown_uuids = pub_uuids_set - pot_mc_uuids_set
1018 if unknown_uuids:
1019 result.append("The following node UUIDs are listed in the public key"
1020 " file on node '%s', but are not potential master"
1021 " candidates: %s."
1022 % (my_name, ", ".join(list(unknown_uuids))))
1023 missing_uuids = pot_mc_uuids_set - pub_uuids_set
1024 if missing_uuids:
1025 result.append("The following node UUIDs of potential master candidates"
1026 " are missing in the public key file on node %s: %s."
1027 % (my_name, ", ".join(list(missing_uuids))))
1028
1029 (_, key_files) = \
1030 ssh.GetAllUserFiles(constants.SSH_LOGIN_USER, mkdir=False, dircheck=False)
1031 (_, dsa_pub_key_filename) = key_files[constants.SSHK_DSA]
1032
1033 my_keys = pub_keys[my_uuid]
1034
1035 dsa_pub_key = utils.ReadFile(dsa_pub_key_filename)
1036 if dsa_pub_key.strip() not in my_keys:
1037 result.append("The dsa key of node %s does not match this node's key"
1038 " in the pub key file." % (my_name))
1039 if len(my_keys) != 1:
1040 result.append("There is more than one key for node %s in the public key"
1041 " file." % my_name)
1042 else:
1043 if len(pub_keys.keys()) > 0:
1044 result.append("The public key file of node '%s' is not empty, although"
1045 " the node is not a potential master candidate."
1046 % my_name)
1047
1048 # Check that all master candidate keys are in the authorized_keys file
1049 (auth_key_file, _) = \
1050 ssh.GetAllUserFiles(constants.SSH_LOGIN_USER, mkdir=False, dircheck=False)
1051 for (uuid, name, mc, _, online) in node_status_list:
1052 if not online:
1053 continue
1054 if uuid in missing_uuids:
1055 continue
1056 if mc:
1057 for key in pub_keys[uuid]:
1058 if not ssh.HasAuthorizedKey(auth_key_file, key):
1059 result.append("A SSH key of master candidate '%s' (UUID: '%s') is"
1060 " not in the 'authorized_keys' file of node '%s'."
1061 % (name, uuid, my_name))
1062 else:
1063 for key in pub_keys[uuid]:
1064 if name != my_name and ssh.HasAuthorizedKey(auth_key_file, key):
1065 result.append("A SSH key of normal node '%s' (UUID: '%s') is in the"
1066 " 'authorized_keys' file of node '%s'."
1067 % (name, uuid, my_name))
1068 if name == my_name and not ssh.HasAuthorizedKey(auth_key_file, key):
1069 result.append("A SSH key of normal node '%s' (UUID: '%s') is not"
1070 " in the 'authorized_keys' file of itself."
1071 % (my_name, uuid))
1072
1073 return result
1074
1075
1076 def _VerifySshClutter(node_status_list, my_name):
1077 """Verifies that the 'authorized_keys' files are not cluttered up.
1078
1079 @type node_status_list: list of tuples
1080 @param node_status_list: list of nodes of the cluster associated with a
1081 couple of flags: (uuid, name, is_master_candidate,
1082 is_potential_master_candidate, online)
1083 @type my_name: str
1084 @param my_name: name of this node
1085
1086 """
1087 result = []
1088 (auth_key_file, _) = \
1089 ssh.GetAllUserFiles(constants.SSH_LOGIN_USER, mkdir=False, dircheck=False)
1090 node_names = [name for (_, name, _, _) in node_status_list]
1091 multiple_occurrences = ssh.CheckForMultipleKeys(auth_key_file, node_names)
1092 if multiple_occurrences:
1093 msg = "There are hosts which have more than one SSH key stored for the" \
1094 " same user in the 'authorized_keys' file of node %s. This can be" \
1095 " due to an unsuccessful operation which cluttered up the" \
1096 " 'authorized_keys' file. We recommend to clean this up manually. " \
1097 % my_name
1098 for host, occ in multiple_occurrences.items():
1099 msg += "Entry for '%s' in lines %s. " % (host, utils.CommaJoin(occ))
1100 result.append(msg)
1101
1102 return result
1103
1104
1105 def VerifyNode(what, cluster_name, all_hvparams, node_groups, groups_cfg):
1106 """Verify the status of the local node.
1107
1108 Based on the input L{what} parameter, various checks are done on the
1109 local node.
1110
1111 If the I{filelist} key is present, this list of
1112 files is checksummed and the file/checksum pairs are returned.
1113
1114 If the I{nodelist} key is present, we check that we have
1115 connectivity via ssh with the target nodes (and check the hostname
1116 report).
1117
1118 If the I{node-net-test} key is present, we check that we have
1119 connectivity to the given nodes via both primary IP and, if
1120 applicable, secondary IPs.
1121
1122 @type what: C{dict}
1123 @param what: a dictionary of things to check:
1124 - filelist: list of files for which to compute checksums
1125 - nodelist: list of nodes we should check ssh communication with
1126 - node-net-test: list of nodes we should check node daemon port
1127 connectivity with
1128 - hypervisor: list with hypervisors to run the verify for
1129 @type cluster_name: string
1130 @param cluster_name: the cluster's name
1131 @type all_hvparams: dict of dict of strings
1132 @param all_hvparams: a dictionary mapping hypervisor names to hvparams
1133 @type node_groups: a dict of strings
1134 @param node_groups: node _names_ mapped to their group uuids (it's enough to
1135 have only those nodes that are in `what["nodelist"]`)
1136 @type groups_cfg: a dict of dict of strings
1137 @param groups_cfg: a dictionary mapping group uuids to their configuration
1138 @rtype: dict
1139 @return: a dictionary with the same keys as the input dict, and
1140 values representing the result of the checks
1141
1142 """
1143 result = {}
1144 my_name = netutils.Hostname.GetSysName()
1145 port = netutils.GetDaemonPort(constants.NODED)
1146 vm_capable = my_name not in what.get(constants.NV_NONVMNODES, [])
1147
1148 _VerifyHypervisors(what, vm_capable, result, all_hvparams)
1149 _VerifyHvparams(what, vm_capable, result)
1150
1151 if constants.NV_FILELIST in what:
1152 fingerprints = utils.FingerprintFiles(map(vcluster.LocalizeVirtualPath,
1153 what[constants.NV_FILELIST]))
1154 result[constants.NV_FILELIST] = \
1155 dict((vcluster.MakeVirtualPath(key), value)
1156 for (key, value) in fingerprints.items())
1157
1158 if constants.NV_CLIENT_CERT in what:
1159 result[constants.NV_CLIENT_CERT] = _VerifyClientCertificate()
1160
1161 if constants.NV_SSH_SETUP in what:
1162 result[constants.NV_SSH_SETUP] = \
1163 _VerifySshSetup(what[constants.NV_SSH_SETUP], my_name)
1164 if constants.NV_SSH_CLUTTER in what:
1165 result[constants.NV_SSH_CLUTTER] = \
1166 _VerifySshClutter(what[constants.NV_SSH_SETUP], my_name)
1167
1168 if constants.NV_NODELIST in what:
1169 (nodes, bynode, mcs) = what[constants.NV_NODELIST]
1170
1171 # Add nodes from other groups (different for each node)
1172 try:
1173 nodes.extend(bynode[my_name])
1174 except KeyError:
1175 pass
1176
1177 # Use a random order
1178 random.shuffle(nodes)
1179
1180 # Try to contact all nodes
1181 val = {}
1182 for node in nodes:
1183 params = groups_cfg.get(node_groups.get(node))
1184 ssh_port = params["ndparams"].get(constants.ND_SSH_PORT)
1185 logging.debug("Ssh port %s (None = default) for node %s",
1186 str(ssh_port), node)
1187
1188 # We only test if master candidates can communicate to other nodes.
1189 # We cannot test if normal nodes cannot communicate with other nodes,
1190 # because the administrator might have installed additional SSH keys,
1191 # over which Ganeti has no power.
1192 if my_name in mcs:
1193 success, message = _GetSshRunner(cluster_name). \
1194 VerifyNodeHostname(node, ssh_port)
1195 if not success:
1196 val[node] = message
1197
1198 result[constants.NV_NODELIST] = val
1199
1200 if constants.NV_NODENETTEST in what:
1201 result[constants.NV_NODENETTEST] = tmp = {}
1202 my_pip = my_sip = None
1203 for name, pip, sip in what[constants.NV_NODENETTEST]:
1204 if name == my_name:
1205 my_pip = pip
1206 my_sip = sip
1207 break
1208 if not my_pip:
1209 tmp[my_name] = ("Can't find my own primary/secondary IP"
1210 " in the node list")
1211 else:
1212 for name, pip, sip in what[constants.NV_NODENETTEST]:
1213 fail = []
1214 if not netutils.TcpPing(pip, port, source=my_pip):
1215 fail.append("primary")
1216 if sip != pip:
1217 if not netutils.TcpPing(sip, port, source=my_sip):
1218 fail.append("secondary")
1219 if fail:
1220 tmp[name] = ("failure using the %s interface(s)" %
1221 " and ".join(fail))
1222
1223 if constants.NV_MASTERIP in what:
1224 # FIXME: add checks on incoming data structures (here and in the
1225 # rest of the function)
1226 master_name, master_ip = what[constants.NV_MASTERIP]
1227 if master_name == my_name:
1228 source = constants.IP4_ADDRESS_LOCALHOST
1229 else:
1230 source = None
1231 result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
1232 source=source)
1233
1234 if constants.NV_USERSCRIPTS in what:
1235 result[constants.NV_USERSCRIPTS] = \
1236 [script for script in what[constants.NV_USERSCRIPTS]
1237 if not utils.IsExecutable(script)]
1238
1239 if constants.NV_OOB_PATHS in what:
1240 result[constants.NV_OOB_PATHS] = tmp = []
1241 for path in what[constants.NV_OOB_PATHS]:
1242 try:
1243 st = os.stat(path)
1244 except OSError, err:
1245 tmp.append("error stating out of band helper: %s" % err)
1246 else:
1247 if stat.S_ISREG(st.st_mode):
1248 if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
1249 tmp.append(None)
1250 else:
1251 tmp.append("out of band helper %s is not executable" % path)
1252 else:
1253 tmp.append("out of band helper %s is not a file" % path)
1254
1255 if constants.NV_LVLIST in what and vm_capable:
1256 try:
1257 val = GetVolumeList(utils.ListVolumeGroups().keys())
1258 except RPCFail, err:
1259 val = str(err)
1260 result[constants.NV_LVLIST] = val
1261
1262 _VerifyInstanceList(what, vm_capable, result, all_hvparams)
1263
1264 if constants.NV_VGLIST in what and vm_capable:
1265 result[constants.NV_VGLIST] = utils.ListVolumeGroups()
1266
1267 if constants.NV_PVLIST in what and vm_capable:
1268 check_exclusive_pvs = constants.NV_EXCLUSIVEPVS in what
1269 val = bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
1270 filter_allocatable=False,
1271 include_lvs=check_exclusive_pvs)
1272 if check_exclusive_pvs:
1273 result[constants.NV_EXCLUSIVEPVS] = _CheckExclusivePvs(val)
1274 for pvi in val:
1275 # Avoid sending useless data on the wire
1276 pvi.lv_list = []
1277 result[constants.NV_PVLIST] = map(objects.LvmPvInfo.ToDict, val)
1278
1279 if constants.NV_VERSION in what:
1280 result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
1281 constants.RELEASE_VERSION)
1282
1283 _VerifyNodeInfo(what, vm_capable, result, all_hvparams)
1284
1285 if constants.NV_DRBDVERSION in what and vm_capable:
1286 try:
1287 drbd_version = DRBD8.GetProcInfo().GetVersionString()
1288 except errors.BlockDeviceError, err:
1289 logging.warning("Can't get DRBD version", exc_info=True)
1290 drbd_version = str(err)
1291 result[constants.NV_DRBDVERSION] = drbd_version
1292
1293 if constants.NV_DRBDLIST in what and vm_capable:
1294 try:
1295 used_minors = drbd.DRBD8.GetUsedDevs()
1296 except errors.BlockDeviceError, err:
1297 logging.warning("Can't get used minors list", exc_info=True)
1298 used_minors = str(err)
1299 result[constants.NV_DRBDLIST] = used_minors
1300
1301 if constants.NV_DRBDHELPER in what and vm_capable:
1302 status = True
1303 try:
1304 payload = drbd.DRBD8.GetUsermodeHelper()
1305 except errors.BlockDeviceError, err:
1306 logging.error("Can't get DRBD usermode helper: %s", str(err))
1307 status = False
1308 payload = str(err)
1309 result[constants.NV_DRBDHELPER] = (status, payload)
1310
1311 if constants.NV_NODESETUP in what:
1312 result[constants.NV_NODESETUP] = tmpr = []
1313 if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
1314 tmpr.append("The sysfs filesytem doesn't seem to be mounted"
1315 " under /sys, missing required directories /sys/block"
1316 " and /sys/class/net")
1317 if (not os.path.isdir("/proc/sys") or
1318 not os.path.isfile("/proc/sysrq-trigger")):
1319 tmpr.append("The procfs filesystem doesn't seem to be mounted"
1320 " under /proc, missing required directory /proc/sys and"
1321 " the file /proc/sysrq-trigger")
1322
1323 if constants.NV_TIME in what:
1324 result[constants.NV_TIME] = utils.SplitTime(time.time())
1325
1326 if constants.NV_OSLIST in what and vm_capable:
1327 result[constants.NV_OSLIST] = DiagnoseOS()
1328
1329 if constants.NV_BRIDGES in what and vm_capable:
1330 result[constants.NV_BRIDGES] = [bridge
1331 for bridge in what[constants.NV_BRIDGES]
1332 if not utils.BridgeExists(bridge)]
1333
1334 if what.get(constants.NV_ACCEPTED_STORAGE_PATHS) == my_name:
1335 result[constants.NV_ACCEPTED_STORAGE_PATHS] = \
1336 filestorage.ComputeWrongFileStoragePaths()
1337
1338 if what.get(constants.NV_FILE_STORAGE_PATH):
1339 pathresult = filestorage.CheckFileStoragePath(
1340 what[constants.NV_FILE_STORAGE_PATH])
1341 if pathresult:
1342 result[constants.NV_FILE_STORAGE_PATH] = pathresult
1343
1344 if what.get(constants.NV_SHARED_FILE_STORAGE_PATH):
1345 pathresult = filestorage.CheckFileStoragePath(
1346 what[constants.NV_SHARED_FILE_STORAGE_PATH])
1347 if pathresult:
1348 result[constants.NV_SHARED_FILE_STORAGE_PATH] = pathresult
1349
1350 return result
1351
1352
1353 def GetCryptoTokens(token_requests):
1354 """Perform actions on the node's cryptographic tokens.
1355
1356 Token types can be 'ssl' or 'ssh'. So far only some actions are implemented
1357 for 'ssl'. Action 'get' returns the digest of the public client ssl
1358 certificate. Action 'create' creates a new client certificate and private key
1359 and also returns the digest of the certificate. The third parameter of a
1360 token request are optional parameters for the actions, so far only the
1361 filename is supported.
1362
1363 @type token_requests: list of tuples of (string, string, dict), where the
1364 first string is in constants.CRYPTO_TYPES, the second in
1365 constants.CRYPTO_ACTIONS. The third parameter is a dictionary of string
1366 to string.
1367 @param token_requests: list of requests of cryptographic tokens and actions
1368 to perform on them. The actions come with a dictionary of options.
1369 @rtype: list of tuples (string, string)
1370 @return: list of tuples of the token type and the public crypto token
1371
1372 """
1373 tokens = []
1374 for (token_type, action, _) in token_requests:
1375 if token_type not in constants.CRYPTO_TYPES:
1376 raise errors.ProgrammerError("Token type '%s' not supported." %
1377 token_type)
1378 if action not in constants.CRYPTO_ACTIONS:
1379 raise errors.ProgrammerError("Action '%s' is not supported." %
1380 action)
1381 if token_type == constants.CRYPTO_TYPE_SSL_DIGEST:
1382 tokens.append((token_type,
1383 utils.GetCertificateDigest()))
1384 return tokens
1385
1386
1387 def EnsureDaemon(daemon_name, run):
1388 """Ensures the given daemon is running or stopped.
1389
1390 @type daemon_name: string
1391 @param daemon_name: name of the daemon (e.g., constants.KVMD)
1392
1393 @type run: bool
1394 @param run: whether to start or stop the daemon
1395
1396 @rtype: bool
1397 @return: 'True' if daemon successfully started/stopped,
1398 'False' otherwise
1399
1400 """
1401 allowed_daemons = [constants.KVMD]
1402
1403 if daemon_name not in allowed_daemons:
1404 fn = lambda _: False
1405 elif run:
1406 fn = utils.EnsureDaemon
1407 else:
1408 fn = utils.StopDaemon
1409
1410 return fn(daemon_name)
1411
1412
1413 def _InitSshUpdateData(data, noded_cert_file, ssconf_store):
1414 (_, noded_cert) = \
1415 utils.ExtractX509Certificate(utils.ReadFile(noded_cert_file))
1416 data[constants.SSHS_NODE_DAEMON_CERTIFICATE] = noded_cert
1417
1418 cluster_name = ssconf_store.GetClusterName()
1419 data[constants.SSHS_CLUSTER_NAME] = cluster_name
1420
1421
1422 def AddNodeSshKey(node_uuid, node_name,
1423 potential_master_candidates,
1424 to_authorized_keys=False,
1425 to_public_keys=False,
1426 get_public_keys=False,
1427 pub_key_file=pathutils.SSH_PUB_KEYS,
1428 ssconf_store=None,
1429 noded_cert_file=pathutils.NODED_CERT_FILE,
1430 run_cmd_fn=ssh.RunSshCmdWithStdin):
1431 """Distributes a node's public SSH key across the cluster.
1432
1433 Note that this function should only be executed on the master node, which
1434 then will copy the new node's key to all nodes in the cluster via SSH.
1435
1436 Also note: at least one of the flags C{to_authorized_keys},
1437 C{to_public_keys}, and C{get_public_keys} has to be set to C{True} for
1438 the function to actually perform any actions.
1439
1440 @type node_uuid: str
1441 @param node_uuid: the UUID of the node whose key is added
1442 @type node_name: str
1443 @param node_name: the name of the node whose key is added
1444 @type potential_master_candidates: list of str
1445 @param potential_master_candidates: list of node names of potential master
1446 candidates; this should match the list of uuids in the public key file
1447 @type to_authorized_keys: boolean
1448 @param to_authorized_keys: whether the key should be added to the
1449 C{authorized_keys} file of all nodes
1450 @type to_public_keys: boolean
1451 @param to_public_keys: whether the keys should be added to the public key file
1452 @type get_public_keys: boolean
1453 @param get_public_keys: whether the node should add the clusters' public keys
1454 to its {ganeti_pub_keys} file
1455
1456 """
1457 # assure that at least one of those flags is true, as the function would
1458 # not do anything otherwise
1459 assert (to_authorized_keys or to_public_keys or get_public_keys)
1460
1461 if not ssconf_store:
1462 ssconf_store = ssconf.SimpleStore()
1463
1464 # Check and fix sanity of key file
1465 keys_by_name = ssh.QueryPubKeyFile([node_name], key_file=pub_key_file)
1466 keys_by_uuid = ssh.QueryPubKeyFile([node_uuid], key_file=pub_key_file)
1467
1468 if (not keys_by_name or node_name not in keys_by_name) \
1469 and (not keys_by_uuid or node_uuid not in keys_by_uuid):
1470 raise errors.SshUpdateError(
1471 "No keys found for the new node '%s' (UUID %s) in the list of public"
1472 " SSH keys, neither for the name or the UUID" % (node_name, node_uuid))
1473 else:
1474 if node_name in keys_by_name:
1475 keys_by_uuid = {}
1476 # Replace the name by UUID in the file as the name should only be used
1477 # temporarily
1478 ssh.ReplaceNameByUuid(node_uuid, node_name,
1479 error_fn=errors.SshUpdateError,
1480 key_file=pub_key_file)
1481 keys_by_uuid[node_uuid] = keys_by_name[node_name]
1482
1483 # Update the master node's key files
1484 if to_authorized_keys:
1485 (auth_key_file, _) = \
1486 ssh.GetAllUserFiles(constants.SSH_LOGIN_USER, mkdir=False, dircheck=False)
1487 ssh.AddAuthorizedKeys(auth_key_file, keys_by_uuid[node_uuid])
1488
1489 base_data = {}
1490 _InitSshUpdateData(base_data, noded_cert_file, ssconf_store)
1491 cluster_name = base_data[constants.SSHS_CLUSTER_NAME]
1492
1493 ssh_port_map = ssconf_store.GetSshPortMap()
1494
1495 # Update the target node itself
1496 logging.debug("Updating SSH key files of target node '%s'.", node_name)
1497 if get_public_keys:
1498 node_data = {}
1499 _InitSshUpdateData(node_data, noded_cert_file, ssconf_store)
1500 all_keys = ssh.QueryPubKeyFile(None, key_file=pub_key_file)
1501 node_data[constants.SSHS_SSH_PUBLIC_KEYS] = \
1502 (constants.SSHS_OVERRIDE, all_keys)
1503
1504 try:
1505 utils.RetryByNumberOfTimes(
1506 constants.SSHS_MAX_RETRIES,
1507 errors.SshUpdateError,
1508 run_cmd_fn, cluster_name, node_name, pathutils.SSH_UPDATE,
1509 ssh_port_map.get(node_name), node_data,
1510 debug=False, verbose=False, use_cluster_key=False,
1511 ask_key=False, strict_host_check=False)
1512 except errors.SshUpdateError as e:
1513 # Clean up the master's public key file if adding key fails
1514 if to_public_keys:
1515 ssh.RemovePublicKey(node_uuid)
1516 raise e
1517
1518 # Update all nodes except master and the target node
1519 if to_authorized_keys:
1520 base_data[constants.SSHS_SSH_AUTHORIZED_KEYS] = \
1521 (constants.SSHS_ADD, keys_by_uuid)
1522
1523 pot_mc_data = copy.deepcopy(base_data)
1524 if to_public_keys:
1525 pot_mc_data[constants.SSHS_SSH_PUBLIC_KEYS] = \
1526 (constants.SSHS_REPLACE_OR_ADD, keys_by_uuid)
1527
1528 all_nodes = ssconf_store.GetNodeList()
1529 master_node = ssconf_store.GetMasterNode()
1530 online_nodes = ssconf_store.GetOnlineNodeList()
1531
1532 node_errors = []
1533 for node in all_nodes:
1534 if node == master_node:
1535 logging.debug("Skipping master node '%s'.", master_node)
1536 continue
1537 if node not in online_nodes:
1538 logging.debug("Skipping offline node '%s'.", node)
1539 continue
1540 if node in potential_master_candidates:
1541 logging.debug("Updating SSH key files of node '%s'.", node)
1542 try:
1543 utils.RetryByNumberOfTimes(
1544 constants.SSHS_MAX_RETRIES,
1545 errors.SshUpdateError,
1546 run_cmd_fn, cluster_name, node, pathutils.SSH_UPDATE,
1547 ssh_port_map.get(node), pot_mc_data,
1548 debug=False, verbose=False, use_cluster_key=False,
1549 ask_key=False, strict_host_check=False)
1550 except errors.SshUpdateError as last_exception:
1551 error_msg = ("When adding the key of node '%s', updating SSH key"
1552 " files of node '%s' failed after %s retries."
1553 " Not trying again. Last error was: %s." %
1554 (node, node_name, constants.SSHS_MAX_RETRIES,
1555 last_exception))
1556 node_errors.append((node, error_msg))
1557 # We only log the error and don't throw an exception, because
1558 # one unreachable node shall not abort the entire procedure.
1559 logging.error(error_msg)
1560
1561 else:
1562 if to_authorized_keys:
1563 run_cmd_fn(cluster_name, node, pathutils.SSH_UPDATE,
1564 ssh_port_map.get(node), base_data,
1565 debug=False, verbose=False, use_cluster_key=False,
1566 ask_key=False, strict_host_check=False)
1567
1568 return node_errors
1569
1570
1571 def RemoveNodeSshKey(node_uuid, node_name,
1572 master_candidate_uuids,
1573 potential_master_candidates,
1574 master_uuid=None,
1575 keys_to_remove=None,
1576 from_authorized_keys=False,
1577 from_public_keys=False,
1578 clear_authorized_keys=False,
1579 clear_public_keys=False,
1580 pub_key_file=pathutils.SSH_PUB_KEYS,
1581 ssconf_store=None,
1582 noded_cert_file=pathutils.NODED_CERT_FILE,
1583 readd=False,
1584 run_cmd_fn=ssh.RunSshCmdWithStdin):
1585 """Removes the node's SSH keys from the key files and distributes those.
1586
1587 Note that at least one of the flags C{from_authorized_keys},
1588 C{from_public_keys}, C{clear_authorized_keys}, and C{clear_public_keys}
1589 has to be set to C{True} for the function to perform any action at all.
1590 Not doing so will trigger an assertion in the function.
1591
1592 @type node_uuid: str
1593 @param node_uuid: UUID of the node whose key is removed
1594 @type node_name: str
1595 @param node_name: name of the node whose key is remove
1596 @type master_candidate_uuids: list of str
1597 @param master_candidate_uuids: list of UUIDs of the current master candidates
1598 @type potential_master_candidates: list of str
1599 @param potential_master_candidates: list of names of potential master
1600 candidates
1601 @type keys_to_remove: dict of str to list of str
1602 @param keys_to_remove: a dictionary mapping node UUIDS to lists of SSH keys
1603 to be removed. This list is supposed to be used only if the keys are not
1604 in the public keys file. This is for example the case when removing a
1605 master node's key.
1606 @type from_authorized_keys: boolean
1607 @param from_authorized_keys: whether or not the key should be removed
1608 from the C{authorized_keys} file
1609 @type from_public_keys: boolean
1610 @param from_public_keys: whether or not the key should be remove from
1611 the C{ganeti_pub_keys} file
1612 @type clear_authorized_keys: boolean
1613 @param clear_authorized_keys: whether or not the C{authorized_keys} file
1614 should be cleared on the node whose keys are removed
1615 @type clear_public_keys: boolean
1616 @param clear_public_keys: whether to clear the node's C{ganeti_pub_key} file
1617 @type readd: boolean
1618 @param readd: whether this is called during a readd operation.
1619 @rtype: list of string
1620 @returns: list of feedback messages
1621
1622 """
1623 # Non-disruptive error messages, list of (node, msg) pairs
1624 result_msgs = []
1625
1626 # Make sure at least one of these flags is true.
1627 if not (from_authorized_keys or from_public_keys or clear_authorized_keys
1628 or clear_public_keys):
1629 raise errors.SshUpdateError("No removal from any key file was requested.")
1630
1631 if not ssconf_store:
1632 ssconf_store = ssconf.SimpleStore()
1633
1634 master_node = ssconf_store.GetMasterNode()
1635 ssh_port_map = ssconf_store.GetSshPortMap()
1636
1637 if from_authorized_keys or from_public_keys:
1638 if keys_to_remove:
1639 keys = keys_to_remove
1640 else:
1641 keys = ssh.QueryPubKeyFile([node_uuid], key_file=pub_key_file)
1642 if (not keys or node_uuid not in keys) and not readd:
1643 raise errors.SshUpdateError("Node '%s' not found in the list of public"
1644 " SSH keys. It seems someone tries to"
1645 " remove a key from outside the cluster!"
1646 % node_uuid)
1647 # During an upgrade all nodes have the master key. In this case we
1648 # should not remove it to avoid accidentally shutting down cluster
1649 # SSH communication
1650 master_keys = None
1651 if master_uuid:
1652 master_keys = ssh.QueryPubKeyFile([master_uuid], key_file=pub_key_file)
1653 for master_key in master_keys:
1654 if master_key in keys[node_uuid]:
1655 keys[node_uuid].remove(master_key)
1656
1657 if node_name == master_node and not keys_to_remove:
1658 raise errors.SshUpdateError("Cannot remove the master node's keys.")
1659
1660 if node_uuid in keys:
1661 base_data = {}
1662 _InitSshUpdateData(base_data, noded_cert_file, ssconf_store)
1663 cluster_name = base_data[constants.SSHS_CLUSTER_NAME]
1664
1665 if from_authorized_keys:
1666 base_data[constants.SSHS_SSH_AUTHORIZED_KEYS] = \
1667 (constants.SSHS_REMOVE, keys)
1668 (auth_key_file, _) = \
1669 ssh.GetAllUserFiles(constants.SSH_LOGIN_USER, mkdir=False,
1670 dircheck=False)
1671 ssh.RemoveAuthorizedKeys(auth_key_file, keys[node_uuid])
1672
1673 pot_mc_data = copy.deepcopy(base_data)
1674
1675 if from_public_keys:
1676 pot_mc_data[constants.SSHS_SSH_PUBLIC_KEYS] = \
1677 (constants.SSHS_REMOVE, keys)
1678 ssh.RemovePublicKey(node_uuid, key_file=pub_key_file)
1679
1680 all_nodes = ssconf_store.GetNodeList()
1681 online_nodes = ssconf_store.GetOnlineNodeList()
1682 logging.debug("Removing key of node '%s' from all nodes but itself and"
1683 " master.", node_name)
1684 for node in all_nodes:
1685 if node == master_node:
1686 logging.debug("Skipping master node '%s'.", master_node)
1687 continue
1688 if node not in online_nodes:
1689 logging.debug("Skipping offline node '%s'.", node)
1690 continue
1691 ssh_port = ssh_port_map.get(node)
1692 if not ssh_port:
1693 raise errors.OpExecError("No SSH port information available for"
1694 " node '%s', map: %s." %
1695 (node, ssh_port_map))
1696 error_msg_final = ("When removing the key of node '%s', updating the"
1697 " SSH key files of node '%s' failed. Last error"
1698 " was: %s.")
1699 if node in potential_master_candidates:
1700 logging.debug("Updating key setup of potential master candidate node"
1701 " %s.", node)
1702 try:
1703 utils.RetryByNumberOfTimes(
1704 constants.SSHS_MAX_RETRIES,
1705 errors.SshUpdateError,
1706 run_cmd_fn, cluster_name, node, pathutils.SSH_UPDATE,
1707 ssh_port, pot_mc_data,
1708 debug=False, verbose=False, use_cluster_key=False,
1709 ask_key=False, strict_host_check=False)
1710 except errors.SshUpdateError as last_exception:
1711 error_msg = error_msg_final % (
1712 node_name, node, last_exception)
1713 result_msgs.append((node, error_msg))
1714 logging.error(error_msg)
1715
1716 else:
1717 if from_authorized_keys:
1718 logging.debug("Updating key setup of normal node %s.", node)
1719 try:
1720 utils.RetryByNumberOfTimes(
1721 constants.SSHS_MAX_RETRIES,
1722 errors.SshUpdateError,
1723 run_cmd_fn, cluster_name, node, pathutils.SSH_UPDATE,
1724 ssh_port, base_data,
1725 debug=False, verbose=False, use_cluster_key=False,
1726 ask_key=False, strict_host_check=False)
1727 except errors.SshUpdateError as last_exception:
1728 error_msg = error_msg_final % (
1729 node_name, node, last_exception)
1730 result_msgs.append((node, error_msg))
1731 logging.error(error_msg)
1732
1733 if clear_authorized_keys or from_public_keys or clear_public_keys:
1734 data = {}
1735 _InitSshUpdateData(data, noded_cert_file, ssconf_store)
1736 cluster_name = data[constants.SSHS_CLUSTER_NAME]
1737 ssh_port = ssh_port_map.get(node_name)
1738 if not ssh_port:
1739 raise errors.OpExecError("No SSH port information available for"
1740 " node '%s', which is leaving the cluster.")
1741
1742 if clear_authorized_keys:
1743 # The 'authorized_keys' file is not solely managed by Ganeti. Therefore,
1744 # we have to specify exactly which keys to clear to leave keys untouched
1745 # that were not added by Ganeti.
1746 other_master_candidate_uuids = [uuid for uuid in master_candidate_uuids
1747 if uuid != node_uuid]
1748 candidate_keys = ssh.QueryPubKeyFile(other_master_candidate_uuids,
1749 key_file=pub_key_file)
1750 data[constants.SSHS_SSH_AUTHORIZED_KEYS] = \
1751 (constants.SSHS_REMOVE, candidate_keys)
1752
1753 if clear_public_keys:
1754 data[constants.SSHS_SSH_PUBLIC_KEYS] = \
1755 (constants.SSHS_CLEAR, {})
1756 elif from_public_keys:
1757 # Since clearing the public keys subsumes removing just a single key,
1758 # we only do it of clear_public_keys is 'False'.
1759
1760 if keys[node_uuid]:
1761 data[constants.SSHS_SSH_PUBLIC_KEYS] = \
1762 (constants.SSHS_REMOVE, keys)
1763
1764 # If we have no changes to any keyfile, just return
1765 if not (constants.SSHS_SSH_PUBLIC_KEYS in data or
1766 constants.SSHS_SSH_AUTHORIZED_KEYS in data):
1767 return
1768
1769 logging.debug("Updating SSH key setup of target node '%s'.", node_name)
1770 try:
1771 utils.RetryByNumberOfTimes(
1772 constants.SSHS_MAX_RETRIES,
1773 errors.SshUpdateError,
1774 run_cmd_fn, cluster_name, node_name, pathutils.SSH_UPDATE,
1775 ssh_port, data,
1776 debug=False, verbose=False, use_cluster_key=False,
1777 ask_key=False, strict_host_check=False)
1778 except errors.SshUpdateError as last_exception:
1779 result_msgs.append(
1780 (node_name,
1781 ("Removing SSH keys from node '%s' failed."
1782 " This can happen when the node is already unreachable."
1783 " Error: %s" % (node_name, last_exception))))
1784
1785 return result_msgs
1786
1787
1788 def _GenerateNodeSshKey(node_uuid, node_name, ssh_port_map,
1789 pub_key_file=pathutils.SSH_PUB_KEYS,
1790 ssconf_store=None,
1791 noded_cert_file=pathutils.NODED_CERT_FILE,
1792 run_cmd_fn=ssh.RunSshCmdWithStdin,
1793 suffix=""):
1794 """Generates the root SSH key pair on the node.
1795
1796 @type node_uuid: str
1797 @param node_uuid: UUID of the node whose key is removed
1798 @type node_name: str
1799 @param node_name: name of the node whose key is remove
1800 @type ssh_port_map: dict of str to int
1801 @param ssh_port_map: mapping of node names to their SSH port
1802
1803 """
1804 if not ssconf_store:
1805 ssconf_store = ssconf.SimpleStore()
1806
1807 keys_by_uuid = ssh.QueryPubKeyFile([node_uuid], key_file=pub_key_file)
1808 if not keys_by_uuid or node_uuid not in keys_by_uuid:
1809 raise errors.SshUpdateError("Node %s (UUID: %s) whose key is requested to"
1810 " be regenerated is not registered in the"
1811 " public keys file." % (node_name, node_uuid))
1812
1813 data = {}
1814 _InitSshUpdateData(data, noded_cert_file, ssconf_store)
1815 cluster_name = data[constants.SSHS_CLUSTER_NAME]
1816 data[constants.SSHS_GENERATE] = {constants.SSHS_SUFFIX: suffix}
1817
1818 run_cmd_fn(cluster_name, node_name, pathutils.SSH_UPDATE,
1819 ssh_port_map.get(node_name), data,
1820 debug=False, verbose=False, use_cluster_key=False,
1821 ask_key=False, strict_host_check=False)
1822
1823
1824 def _GetMasterNodeUUID(node_uuid_name_map, master_node_name):
1825 master_node_uuids = [node_uuid for (node_uuid, node_name)
1826 in node_uuid_name_map
1827 if node_name == master_node_name]
1828 if len(master_node_uuids) != 1:
1829 raise errors.SshUpdateError("No (unique) master UUID found. Master node"
1830 " name: '%s', Master UUID: '%s'" %
1831 (master_node_name, master_node_uuids))
1832 return master_node_uuids[0]
1833
1834
1835 def _GetOldMasterKeys(master_node_uuid, pub_key_file):
1836 old_master_keys_by_uuid = ssh.QueryPubKeyFile([master_node_uuid],
1837 key_file=pub_key_file)
1838 if not old_master_keys_by_uuid:
1839 raise errors.SshUpdateError("No public key of the master node (UUID '%s')"
1840 " found, not generating a new key."
1841 % master_node_uuid)
1842 return old_master_keys_by_uuid
1843
1844
1845 def _GetNewMasterKey(root_keyfiles, master_node_uuid):
1846 new_master_keys = []
1847 for (_, (_, public_key_file)) in root_keyfiles.items():
1848 public_key_dir = os.path.dirname(public_key_file)
1849 public_key_file_tmp_filename = \
1850 os.path.splitext(os.path.basename(public_key_file))[0] \
1851 + constants.SSHS_MASTER_SUFFIX + ".pub"
1852 public_key_path_tmp = os.path.join(public_key_dir,
1853 public_key_file_tmp_filename)
1854 if os.path.exists(public_key_path_tmp):
1855 # for some key types, there might not be any keys
1856 key = utils.ReadFile(public_key_path_tmp)
1857 new_master_keys.append(key)
1858 if not new_master_keys:
1859 raise errors.SshUpdateError("Cannot find any type of temporary SSH key.")
1860 return {master_node_uuid: new_master_keys}
1861
1862
1863 def _ReplaceMasterKeyOnMaster(root_keyfiles):
1864 number_of_moves = 0
1865 for (_, (private_key_file, public_key_file)) in root_keyfiles.items():
1866 key_dir = os.path.dirname(public_key_file)
1867 private_key_file_tmp = \
1868 os.path.basename(private_key_file) + constants.SSHS_MASTER_SUFFIX
1869 public_key_file_tmp = private_key_file_tmp + ".pub"
1870 private_key_path_tmp = os.path.join(key_dir,
1871 private_key_file_tmp)
1872 public_key_path_tmp = os.path.join(key_dir,
1873 public_key_file_tmp)
1874 if os.path.exists(public_key_file):
1875 utils.CreateBackup(public_key_file)
1876 utils.RemoveFile(public_key_file)
1877 if os.path.exists(private_key_file):
1878 utils.CreateBackup(private_key_file)
1879 utils.RemoveFile(private_key_file)
1880 if os.path.exists(public_key_path_tmp) and \
1881 os.path.exists(private_key_path_tmp):
1882 # for some key types, there might not be any keys
1883 shutil.move(public_key_path_tmp, public_key_file)
1884 shutil.move(private_key_path_tmp, private_key_file)
1885 number_of_moves += 1
1886 if not number_of_moves:
1887 raise errors.SshUpdateError("Could not move at least one master SSH key.")
1888
1889
1890 def RenewSshKeys(node_uuids, node_names, master_candidate_uuids,
1891 potential_master_candidates,
1892 pub_key_file=pathutils.SSH_PUB_KEYS,
1893 ssconf_store=None,
1894 noded_cert_file=pathutils.NODED_CERT_FILE,
1895 run_cmd_fn=ssh.RunSshCmdWithStdin):
1896 """Renews all SSH keys and updates authorized_keys and ganeti_pub_keys.
1897
1898 @type node_uuids: list of str
1899 @param node_uuids: list of node UUIDs whose keys should be renewed
1900 @type node_names: list of str
1901 @param node_names: list of node names whose keys should be removed. This list
1902 should match the C{node_uuids} parameter
1903 @type master_candidate_uuids: list of str
1904 @param master_candidate_uuids: list of UUIDs of master candidates or
1905 master node
1906 @type pub_key_file: str
1907 @param pub_key_file: file path of the the public key file
1908 @type noded_cert_file: str
1909 @param noded_cert_file: path of the noded SSL certificate file
1910 @type run_cmd_fn: function
1911 @param run_cmd_fn: function to run commands on remote nodes via SSH
1912 @raises ProgrammerError: if node_uuids and node_names don't match;
1913 SshUpdateError if a node's key is missing from the public key file,
1914 if a node's new SSH key could not be fetched from it, if there is
1915 none or more than one entry in the public key list for the master
1916 node.
1917
1918 """
1919 if not ssconf_store:
1920 ssconf_store = ssconf.SimpleStore()
1921 cluster_name = ssconf_store.GetClusterName()
1922
1923 if not len(node_uuids) == len(node_names):
1924 raise errors.ProgrammerError("List of nodes UUIDs and node names"
1925 " does not match in length.")
1926
1927 (_, root_keyfiles) = \
1928 ssh.GetAllUserFiles(constants.SSH_LOGIN_USER, mkdir=False, dircheck=False)
1929 (_, dsa_pub_keyfile) = root_keyfiles[constants.SSHK_DSA]
1930 old_master_key = utils.ReadFile(dsa_pub_keyfile)
1931
1932 node_uuid_name_map = zip(node_uuids, node_names)
1933
1934 master_node_name = ssconf_store.GetMasterNode()
1935 master_node_uuid = _GetMasterNodeUUID(node_uuid_name_map, master_node_name)
1936 ssh_port_map = ssconf_store.GetSshPortMap()
1937 # List of all node errors that happened, but which did not abort the
1938 # procedure as a whole. It is important that this is a list to have a
1939 # somewhat chronological history of events.
1940 all_node_errors = []
1941
1942 # process non-master nodes
1943 for node_uuid, node_name in node_uuid_name_map:
1944 if node_name == master_node_name:
1945 continue
1946 master_candidate = node_uuid in master_candidate_uuids
1947 potential_master_candidate = node_name in potential_master_candidates
1948
1949 keys_by_uuid = ssh.QueryPubKeyFile([node_uuid], key_file=pub_key_file)
1950 if not keys_by_uuid:
1951 raise errors.SshUpdateError("No public key of node %s (UUID %s) found,"
1952 " not generating a new key."
1953 % (node_name, node_uuid))
1954
1955 if master_candidate:
1956 logging.debug("Fetching old SSH key from node '%s'.", node_name)
1957 old_pub_key = ssh.ReadRemoteSshPubKeys(dsa_pub_keyfile,
1958 node_name, cluster_name,
1959 ssh_port_map[node_name],
1960 False, # ask_key
1961 False) # key_check
1962 if old_pub_key != old_master_key:
1963 # If we are already in a multi-key setup (that is past Ganeti 2.12),
1964 # we can safely remove the old key of the node. Otherwise, we cannot
1965 # remove that node's key, because it is also the master node's key
1966 # and that would terminate all communication from the master to the
1967 # node.
1968 logging.debug("Removing SSH key of node '%s'.", node_name)
1969 node_errors = RemoveNodeSshKey(
1970 node_uuid, node_name, master_candidate_uuids,
1971 potential_master_candidates,
1972 master_uuid=master_node_uuid, from_authorized_keys=master_candidate,
1973 from_public_keys=False, clear_authorized_keys=False,
1974 clear_public_keys=False)
1975 if node_errors:
1976 all_node_errors = all_node_errors + node_errors
1977 else:
1978 logging.debug("Old key of node '%s' is the same as the current master"
1979 " key. Not deleting that key on the node.", node_name)
1980
1981 logging.debug("Generating new SSH key for node '%s'.", node_name)
1982 _GenerateNodeSshKey(node_uuid, node_name, ssh_port_map,
1983 pub_key_file=pub_key_file,
1984 ssconf_store=ssconf_store,
1985 noded_cert_file=noded_cert_file,
1986 run_cmd_fn=run_cmd_fn)
1987
1988 try:
1989 logging.debug("Fetching newly created SSH key from node '%s'.", node_name)
1990 pub_key = ssh.ReadRemoteSshPubKeys(dsa_pub_keyfile,
1991 node_name, cluster_name,
1992 ssh_port_map[node_name],
1993 False, # ask_key
1994 False) # key_check
1995 except:
1996 raise errors.SshUpdateError("Could not fetch key of node %s"
1997 " (UUID %s)" % (node_name, node_uuid))
1998
1999 if potential_master_candidate:
2000 ssh.RemovePublicKey(node_uuid, key_file=pub_key_file)
2001 ssh.AddPublicKey(node_uuid, pub_key, key_file=pub_key_file)
2002
2003 logging.debug("Add ssh key of node '%s'.", node_name)
2004 node_errors = AddNodeSshKey(
2005 node_uuid, node_name, potential_master_candidates,
2006 to_authorized_keys=master_candidate,
2007 to_public_keys=potential_master_candidate,
2008 get_public_keys=True,
2009 pub_key_file=pub_key_file, ssconf_store=ssconf_store,
2010 noded_cert_file=noded_cert_file,
2011 run_cmd_fn=run_cmd_fn)
2012 if node_errors:
2013 all_node_errors = all_node_errors + node_errors
2014
2015 # Renewing the master node's key
2016
2017 # Preserve the old keys for now
2018 old_master_keys_by_uuid = _GetOldMasterKeys(master_node_uuid, pub_key_file)
2019
2020 # Generate a new master key with a suffix, don't touch the old one for now
2021 logging.debug("Generate new ssh key of master.")
2022 _GenerateNodeSshKey(master_node_uuid, master_node_name, ssh_port_map,
2023 pub_key_file=pub_key_file,
2024 ssconf_store=ssconf_store,
2025 noded_cert_file=noded_cert_file,
2026 run_cmd_fn=run_cmd_fn,
2027 suffix=constants.SSHS_MASTER_SUFFIX)
2028 # Read newly created master key
2029 new_master_key_dict = _GetNewMasterKey(root_keyfiles, master_node_uuid)
2030
2031 # Replace master key in the master nodes' public key file
2032 ssh.RemovePublicKey(master_node_uuid, key_file=pub_key_file)
2033 for pub_key in new_master_key_dict[master_node_uuid]:
2034 ssh.AddPublicKey(master_node_uuid, pub_key, key_file=pub_key_file)
2035
2036 # Add new master key to all node's public and authorized keys
2037 logging.debug("Add new master key to all nodes.")
2038 node_errors = AddNodeSshKey(
2039 master_node_uuid, master_node_name, potential_master_candidates,
2040 to_authorized_keys=True, to_public_keys=True,
2041 get_public_keys=False, pub_key_file=pub_key_file,
2042 ssconf_store=ssconf_store, noded_cert_file=noded_cert_file,
2043 run_cmd_fn=run_cmd_fn)
2044 if node_errors:
2045 all_node_errors = all_node_errors + node_errors
2046
2047 # Remove the old key file and rename the new key to the non-temporary filename
2048 _ReplaceMasterKeyOnMaster(root_keyfiles)
2049
2050 # Remove old key from authorized keys
2051 (auth_key_file, _) = \
2052 ssh.GetAllUserFiles(constants.SSH_LOGIN_USER, mkdir=False, dircheck=False)
2053 ssh.RemoveAuthorizedKeys(auth_key_file,
2054 old_master_keys_by_uuid[master_node_uuid])
2055
2056 # Remove the old key from all node's authorized keys file
2057 logging.debug("Remove the old master key from all nodes.")
2058 node_errors = RemoveNodeSshKey(
2059 master_node_uuid, master_node_name, master_candidate_uuids,
2060 potential_master_candidates,
2061 keys_to_remove=old_master_keys_by_uuid, from_authorized_keys=True,
2062 from_public_keys=False, clear_authorized_keys=False,
2063 clear_public_keys=False)
2064 if node_errors:
2065 all_node_errors = all_node_errors + node_errors
2066
2067 return all_node_errors
2068
2069
2070 def GetBlockDevSizes(devices):
2071 """Return the size of the given block devices
2072
2073 @type devices: list
2074 @param devices: list of block device nodes to query
2075 @rtype: dict
2076 @return:
2077 dictionary of all block devices under /dev (key). The value is their
2078 size in MiB.
2079
2080 {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
2081
2082 """
2083 DEV_PREFIX = "/dev/"
2084 blockdevs = {}
2085
2086 for devpath in devices:
2087 if not utils.IsBelowDir(DEV_PREFIX, devpath):
2088 continue
2089
2090 try:
2091 st = os.stat(devpath)
2092 except EnvironmentError, err:
2093 logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
2094 continue
2095
2096 if stat.S_ISBLK(st.st_mode):
2097 result = utils.RunCmd(["blockdev", "--getsize64", devpath])
2098 if result.failed:
2099 # We don't want to fail, just do not list this device as available
2100 logging.warning("Cannot get size for block device %s", devpath)
2101 continue
2102
2103 size = int(result.stdout) / (1024 * 1024)
2104 blockdevs[devpath] = size
2105 return blockdevs
2106
2107
2108 def GetVolumeList(vg_names):
2109 """Compute list of logical volumes and their size.
2110
2111 @type vg_names: list
2112 @param vg_names: the volume groups whose LVs we should list, or
2113 empty for all volume groups
2114 @rtype: dict
2115 @return:
2116 dictionary of all partions (key) with value being a tuple of
2117 their size (in MiB), inactive and online status::
2118
2119 {'xenvg/test1': ('20.06', True, True)}
2120
2121 in case of errors, a string is returned with the error
2122 details.
2123
2124 """
2125 lvs = {}
2126 sep = "|"
2127 if not vg_names:
2128 vg_names = []
2129 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
2130 "--separator=%s" % sep,
2131 "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
2132 if result.failed:
2133 _Fail("Failed to list logical volumes, lvs output: %s", result.output)
2134
2135 for line in result.stdout.splitlines():
2136 line = line.strip()
2137 match = _LVSLINE_REGEX.match(line)
2138 if not match:
2139 logging.error("Invalid line returned from lvs output: '%s'", line)
2140 continue
2141 vg_name, name, size, attr = match.groups()
2142 inactive = attr[4] == "-"
2143 online = attr[5] == "o"
2144 virtual = attr[0] == "v"
2145 if virtual:
2146 # we don't want to report such volumes as existing, since they
2147 # don't really hold data
2148 continue
2149 lvs[vg_name + "/" + name] = (size, inactive, online)
2150
2151 return lvs
2152
2153
2154 def ListVolumeGroups():
2155 """List the volume groups and their size.
2156
2157 @rtype: dict
2158 @return: dictionary with keys volume name and values the
2159 size of the volume
2160
2161 """
2162 return utils.ListVolumeGroups()
2163
2164
2165 def NodeVolumes():
2166 """List all volumes on this node.
2167
2168 @rtype: list
2169 @return:
2170 A list of dictionaries, each having four keys:
2171 - name: the logical volume name,
2172 - size: the size of the logical volume
2173 - dev: the physical device on which the LV lives
2174 - vg: the volume group to which it belongs
2175
2176 In case of errors, we return an empty list and log the
2177 error.
2178
2179 Note that since a logical volume can live on multiple physical
2180 volumes, the resulting list might include a logical volume
2181 multiple times.
2182
2183 """
2184 result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
2185 "--separator=|",
2186 "--options=lv_name,lv_size,devices,vg_name"])
2187 if result.failed:
2188 _Fail("Failed to list logical volumes, lvs output: %s",
2189 result.output)
2190
2191 def parse_dev(dev):
2192 return dev.split("(")[0]
2193
2194 def handle_dev(dev):
2195 return [parse_dev(x) for x in dev.split(",")]
2196
2197 def map_line(line):
2198 line = [v.strip() for v in line]
2199 return [{"name": line[0], "size": line[1],
2200 "dev": dev, "vg": line[3]} for dev in handle_dev(line[2])]
2201
2202 all_devs = []
2203 for line in result.stdout.splitlines():
2204 if line.count("|") >= 3:
2205 all_devs.extend(map_line(line.split("|")))
2206 else:
2207 logging.warning("Strange line in the output from lvs: '%s'", line)
2208 return all_devs
2209
2210
2211 def BridgesExist(bridges_list):
2212 """Check if a list of bridges exist on the current node.
2213
2214 @rtype: boolean
2215 @return: C{True} if all of them exist, C{False} otherwise
2216
2217 """
2218 missing = []
2219 for bridge in bridges_list:
2220 if not utils.BridgeExists(bridge):
2221 missing.append(bridge)
2222
2223 if missing:
2224 _Fail("Missing bridges %s", utils.CommaJoin(missing))
2225
2226
2227 def GetInstanceListForHypervisor(hname, hvparams=None,
2228 get_hv_fn=hypervisor.GetHypervisor):
2229 """Provides a list of instances of the given hypervisor.
2230
2231 @type hname: string
2232 @param hname: name of the hypervisor
2233 @type hvparams: dict of strings
2234 @param hvparams: hypervisor parameters for the given hypervisor
2235 @type get_hv_fn: function
2236 @param get_hv_fn: function that returns a hypervisor for the given hypervisor
2237 name; optional parameter to increase testability
2238
2239 @rtype: list
2240 @return: a list of all running instances on the current node
2241 - instance1.example.com
2242 - instance2.example.com
2243
2244 """
2245 try:
2246 return get_hv_fn(hname).ListInstances(hvparams=hvparams)
2247 except errors.HypervisorError, err:
2248 _Fail("Error enumerating instances (hypervisor %s): %s",
2249 hname, err, exc=True)
2250
2251
2252 def GetInstanceList(hypervisor_list, all_hvparams=None,
2253 get_hv_fn=hypervisor.GetHypervisor):
2254 """Provides a list of instances.
2255
2256 @type hypervisor_list: list
2257 @param hypervisor_list: the list of hypervisors to query information
2258 @type all_hvparams: dict of dict of strings
2259 @param all_hvparams: a dictionary mapping hypervisor types to respective
2260 cluster-wide hypervisor parameters
2261 @type get_hv_fn: function
2262 @param get_hv_fn: function that returns a hypervisor for the given hypervisor
2263 name; optional parameter to increase testability
2264
2265 @rtype: list
2266 @return: a list of all running instances on the current node
2267 - instance1.example.com
2268 - instance2.example.com
2269
2270 """
2271 results = []
2272 for hname in hypervisor_list:
2273 hvparams = all_hvparams[hname]
2274 results.extend(GetInstanceListForHypervisor(hname, hvparams=hvparams,
2275 get_hv_fn=get_hv_fn))
2276 return results
2277
2278
2279 def GetInstanceInfo(instance, hname, hvparams=None):
2280 """Gives back the information about an instance as a dictionary.
2281
2282 @type instance: string
2283 @param instance: the instance name
2284 @type hname: string
2285 @param hname: the hypervisor type of the instance
2286 @type hvparams: dict of strings
2287 @param hvparams: the instance's hvparams
2288
2289 @rtype: dict
2290 @return: dictionary with the following keys:
2291 - memory: memory size of instance (int)
2292 - state: state of instance (HvInstanceState)
2293 - time: cpu time of instance (float)
2294 - vcpus: the number of vcpus (int)
2295
2296 """
2297 output = {}
2298
2299 iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance,
2300 hvparams=hvparams)
2301 if iinfo is not None:
2302 output["memory"] = iinfo[2]
2303 output["vcpus"] = iinfo[3]
2304 output["state"] = iinfo[4]
2305 output["time"] = iinfo[5]
2306
2307 return output
2308
2309
2310 def GetInstanceMigratable(instance):
2311 """Computes whether an instance can be migrated.
2312
2313 @type instance: L{objects.Instance}
2314 @param instance: object representing the instance to be checked.
2315
2316 @rtype: tuple
2317 @return: tuple of (result, description) where:
2318 - result: whether the instance can be migrated or not
2319 - description: a description of the issue, if relevant
2320
2321 """
2322 hyper = hypervisor.GetHypervisor(instance.hypervisor)
2323 iname = instance.name
2324 if iname not in hyper.ListInstances(hvparams=instance.hvparams):
2325 _Fail("Instance %s is not running", iname)
2326
2327 for idx in range(len(instance.disks_info)):
2328 link_name = _GetBlockDevSymlinkPath(iname, idx)
2329 if not os.path.islink(link_name):
2330 logging.warning("Instance %s is missing symlink %s for disk %d",
2331 iname, link_name, idx)
2332
2333
2334 def GetAllInstancesInfo(hypervisor_list, all_hvparams):
2335 """Gather data about all instances.
2336
2337 This is the equivalent of L{GetInstanceInfo}, except that it
2338 computes data for all instances at once, thus being faster if one
2339 needs data about more than one instance.
2340
2341 @type hypervisor_list: list
2342 @param hypervisor_list: list of hypervisors to query for instance data
2343 @type all_hvparams: dict of dict of strings
2344 @param all_hvparams: mapping of hypervisor names to hvparams
2345
2346 @rtype: dict
2347 @return: dictionary of instance: data, with data having the following keys:
2348 - memory: memory size of instance (int)
2349 - state: xen state of instance (string)
2350 - time: cpu time of instance (float)
2351 - vcpus: the number of vcpus
2352
2353 """
2354 output = {}
2355 for hname in hypervisor_list:
2356 hvparams = all_hvparams[hname]
2357 iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo(hvparams)
2358 if iinfo:
2359 for name, _, memory, vcpus, state, times in iinfo:
2360 value = {
2361 "memory": memory,
2362 "vcpus": vcpus,
2363 "state": state,
2364 "time": times,
2365 }
2366 if name in output:
2367 # we only check static parameters, like memory and vcpus,
2368 # and not state and time which can change between the
2369 # invocations of the different hypervisors
2370 for key in "memory", "vcpus":
2371 if value[key] != output[name][key]:
2372 _Fail("Instance %s is running twice"
2373 " with different parameters", name)
2374 output[name] = value
2375
2376 return output
2377
2378
2379 def GetInstanceConsoleInfo(instance_param_dict,
2380 get_hv_fn=hypervisor.GetHypervisor):
2381 """Gather data about the console access of a set of instances of this node.
2382
2383 This function assumes that the caller already knows which instances are on
2384 this node, by calling a function such as L{GetAllInstancesInfo} or
2385 L{GetInstanceList}.
2386
2387 For every instance, a large amount of configuration data needs to be
2388 provided to the hypervisor interface in order to receive the console
2389 information. Whether this could or should be cut down can be discussed.
2390 The information is provided in a dictionary indexed by instance name,
2391 allowing any number of instance queries to be done.
2392
2393 @type instance_param_dict: dict of string to tuple of dictionaries, where the
2394 dictionaries represent: L{objects.Instance}, L{objects.Node},
2395 L{objects.NodeGroup}, HvParams, BeParams
2396 @param instance_param_dict: mapping of instance name to parameters necessary
2397 for console information retrieval
2398
2399 @rtype: dict
2400 @return: dictionary of instance: data, with data having the following keys:
2401 - instance: instance name
2402 - kind: console kind
2403 - message: used with kind == CONS_MESSAGE, indicates console to be
2404 unavailable, supplies error message
2405 - host: host to connect to
2406 - port: port to use
2407 - user: user for login
2408 - command: the command, broken into parts as an array
2409 - display: unknown, potentially unused?
2410
2411 """
2412
2413 output = {}
2414 for inst_name in instance_param_dict:
2415 instance = instance_param_dict[inst_name]["instance"]
2416 pnode = instance_param_dict[inst_name]["node"]
2417 group = instance_param_dict[inst_name]["group"]
2418 hvparams = instance_param_dict[inst_name]["hvParams"]
2419 beparams = instance_param_dict[inst_name]["beParams"]
2420
2421 instance = objects.Instance.FromDict(instance)
2422 pnode = objects.Node.FromDict(pnode)
2423 group = objects.NodeGroup.FromDict(group)
2424
2425 h = get_hv_fn(instance.hypervisor)
2426 output[inst_name] = h.GetInstanceConsole(instance, pnode, group,
2427 hvparams, beparams).ToDict()
2428
2429 return output
2430
2431
2432 def _InstanceLogName(kind, os_name, instance, component):
2433 """Compute the OS log filename for a given instance and operation.
2434
2435 The instance name and os name are passed in as strings since not all
2436 operations have these as part of an instance object.
2437
2438 @type kind: string
2439 @param kind: the operation type (e.g. add, import, etc.)
2440 @type os_name: string
2441 @param os_name: the os name
2442 @type instance: string
2443 @param instance: the name of the instance being imported/added/etc.
2444 @type component: string or None
2445 @param component: the name of the component of the instance being
2446 transferred
2447
2448 """
2449 # TODO: Use tempfile.mkstemp to create unique filename
2450 if component:
2451 assert "/" not in component
2452 c_msg = "-%s" % component
2453 else:
2454 c_msg = ""
2455 base = ("%s-%s-%s%s-%s.log" %
2456 (kind, os_name, instance, c_msg, utils.TimestampForFilename()))
2457 return utils.PathJoin(pathutils.LOG_OS_DIR, base)
2458
2459
2460 def InstanceOsAdd(instance, reinstall, debug):
2461 """Add an OS to an instance.
2462
2463 @type instance: L{objects.Instance}
2464 @param instance: Instance whose OS is to be installed
2465 @type reinstall: boolean
2466 @param reinstall: whether this is an instance reinstall
2467 @type debug: integer
2468 @param debug: debug level, passed to the OS scripts
2469 @rtype: None
2470
2471 """
2472 inst_os = OSFromDisk(instance.os)
2473
2474 create_env = OSEnvironment(instance, inst_os, debug)
2475 if reinstall:
2476 create_env["INSTANCE_REINSTALL"] = "1"
2477
2478 logfile = _InstanceLogName("add", instance.os, instance.name, None)
2479
2480 result = utils.RunCmd([inst_os.create_script], env=create_env,
2481 cwd=inst_os.path, output=logfile, reset_env=True)
2482 if result.failed:
2483 logging.error("os create command '%s' returned error: %s, logfile: %s,"
2484 " output: %s", result.cmd, result.fail_reason, logfile,
2485 result.output)
2486 lines = [utils.SafeEncode(val)
2487 for val in utils.TailFile(logfile, lines=20)]
2488 _Fail("OS create script failed (%s), last lines in the"
2489 " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
2490
2491
2492 def RunRenameInstance(instance, old_name, debug):
2493 """Run the OS rename script for an instance.
2494
2495 @type instance: L{objects.Instance}
2496 @param instance: Instance whose OS is to be installed
2497 @type old_name: string
2498 @param old_name: previous instance name
2499 @type debug: integer
2500 @param debug: debug level, passed to the OS scripts
2501 @rtype: boolean
2502 @return: the success of the operation
2503
2504 """
2505 inst_os = OSFromDisk(instance.os)
2506
2507 rename_env = OSEnvironment(instance, inst_os, debug)
2508 rename_env["OLD_INSTANCE_NAME"] = old_name
2509
2510 logfile = _InstanceLogName("rename", instance.os,
2511 "%s-%s" % (old_name, instance.name), None)
2512
2513 result = utils.RunCmd([inst_os.rename_script], env=rename_env,
2514 cwd=inst_os.path, output=logfile, reset_env=True)
2515
2516 if result.failed:
2517 logging.error("os create command '%s' returned error: %s output: %s",
2518 result.cmd, result.fail_reason, result.output)
2519 lines = [utils.SafeEncode(val)
2520 for val in utils.TailFile(logfile, lines=20)]
2521 _Fail("OS rename script failed (%s), last lines in the"
2522 " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
2523
2524
2525 def _GetBlockDevSymlinkPath(instance_name, idx, _dir=None):
2526 """Returns symlink path for block device.
2527
2528 """
2529 if _dir is None:
2530 _dir = pathutils.DISK_LINKS_DIR
2531
2532 return utils.PathJoin(_dir,
2533 ("%s%s%s" %
2534 (instance_name, constants.DISK_SEPARATOR, idx)))
2535
2536
2537 def _SymlinkBlockDev(instance_name, device_path, idx):
2538 """Set up symlinks to a instance's block device.
2539
2540 This is an auxiliary function run when an instance is start (on the primary
2541 node) or when an instance is migrated (on the target node).
2542
2543
2544 @param instance_name: the name of the target instance
2545 @param device_path: path of the physical block device, on the node
2546 @param idx: the disk index
2547 @return: absolute path to the disk's symlink
2548
2549 """
2550 link_name = _GetBlockDevSymlinkPath(instance_name, idx)
2551 try:
2552 os.symlink(device_path, link_name)
2553 except OSError, err:
2554 if err.errno == errno.EEXIST:
2555 if (not os.path.islink(link_name) or
2556 os.readlink(link_name) != device_path):
2557 os.remove(link_name)
2558 os.symlink(device_path, link_name)
2559 else:
2560 raise
2561
2562 return link_name
2563
2564
2565 def _RemoveBlockDevLinks(instance_name, disks):
2566 """Remove the block device symlinks belonging to the given instance.
2567
2568 """
2569 for idx, _ in enumerate(disks):
2570 link_name = _GetBlockDevSymlinkPath(instance_name, idx)
2571 if os.path.islink(link_name):
2572 try:
2573 os.remove(link_name)
2574 except OSError:
2575 logging.exception("Can't remove symlink '%s'", link_name)
2576
2577
2578 def _CalculateDeviceURI(instance, disk, device):
2579 """Get the URI for the device.
2580
2581 @type instance: L{objects.Instance}
2582 @param instance: the instance which disk belongs to
2583 @type disk: L{objects.Disk}
2584 @param disk: the target disk object
2585 @type device: L{bdev.BlockDev}
2586 @param device: the corresponding BlockDevice
2587 @rtype: string
2588 @return: the device uri if any else None
2589
2590 """
2591 access_mode = disk.params.get(constants.LDP_ACCESS,
2592 constants.DISK_KERNELSPACE)
2593 if access_mode == constants.DISK_USERSPACE:
2594 # This can raise errors.BlockDeviceError
2595 return device.GetUserspaceAccessUri(instance.hypervisor)
2596 else:
2597 return None
2598
2599
2600 def _GatherAndLinkBlockDevs(instance):
2601 """Set up an instance's block device(s).
2602
2603 This is run on the primary node at instance startup. The block
2604 devices must be already assembled.
2605
2606 @type instance: L{objects.Instance}
2607 @param instance: the instance whose disks we should assemble
2608 @rtype: list
2609 @return: list of (disk_object, link_name, drive_uri)
2610
2611 """
2612 block_devices = []
2613 for idx, disk in enumerate(instance.disks_info):
2614 device = _RecursiveFindBD(disk)
2615 if device is None:
2616 raise errors.BlockDeviceError("Block device '%s' is not set up." %
2617 str(disk))
2618 device.Open()
2619 try:
2620 link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
2621 except OSError, e:
2622 raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
2623 e.strerror)
2624 uri = _CalculateDeviceURI(instance, disk, device)
2625
2626 block_devices.append((disk, link_name, uri))
2627
2628 return block_devices
2629
2630
2631 def _IsInstanceUserDown(instance_info):
2632 return instance_info and \
2633 "state" in instance_info and \
2634 hv_base.HvInstanceState.IsShutdown(instance_info["state"])
2635
2636
2637 def _GetInstanceInfo(instance):
2638 """Helper function L{GetInstanceInfo}"""
2639 return GetInstanceInfo(instance.name, instance.hypervisor,
2640 hvparams=instance.hvparams)
2641
2642
2643 def StartInstance(instance, startup_paused, reason, store_reason=True):
2644 """Start an instance.
2645
2646 @type instance: L{objects.Instance}
2647 @param instance: the instance object
2648 @type startup_paused: bool
2649 @param instance: pause instance at startup?
2650 @type reason: list of reasons
2651 @param reason: the reason trail for this startup
2652 @type store_reason: boolean
2653 @param store_reason: whether to store the shutdown reason trail on file
2654 @rtype: None
2655
2656 """
2657 instance_info = _GetInstanceInfo(instance)
2658
2659 if instance_info and not _IsInstanceUserDown(instance_info):
2660 logging.info("Instance '%s' already running, not starting", instance.name)
2661 return
2662
2663 try:
2664 block_devices = _GatherAndLinkBlockDevs(instance)
2665 hyper = hypervisor.GetHypervisor(instance.hypervisor)
2666 hyper.StartInstance(instance, block_devices, startup_paused)
2667 if store_reason:
2668 _StoreInstReasonTrail(instance.name, reason)
2669 except errors.BlockDeviceError, err:
2670 _Fail("Block device error: %s", err, exc=True)
2671 except errors.HypervisorError, err:
2672 _RemoveBlockDevLinks(instance.name, instance.disks_info)
2673 _Fail("Hypervisor error: %s", err, exc=True)
2674
2675
2676 def InstanceShutdown(instance, timeout, reason, store_reason=True):
2677 """Shut an instance down.
2678
2679 @note: this functions uses polling with a hardcoded timeout.
2680
2681 @type instance: L{objects.Instance}
2682 @param instance: the instance object
2683 @type timeout: integer
2684 @param timeout: maximum timeout for soft shutdown
2685 @type reason: list of reasons
2686 @param reason: the reason trail for this shutdown
2687 @type store_reason: boolean
2688 @param store_reason: whether to store the shutdown reason trail on file
2689 @rtype: None
2690
2691 """
2692 hyper = hypervisor.GetHypervisor(instance.hypervisor)
2693
2694 if not _GetInstanceInfo(instance):
2695 logging.info("Instance '%s' not running, doing nothing", instance.name)
2696 return
2697
2698 class _TryShutdown(object):
2699 def __init__(self):
2700 self.tried_once = False
2701
2702 def __call__(self):
2703 if not _GetInstanceInfo(instance):
2704 return
2705
2706 try:
2707 hyper.StopInstance(instance, retry=self.tried_once, timeout=timeout)
2708 if store_reason:
2709 _StoreInstReasonTrail(instance.name, reason)
2710 except errors.HypervisorError, err:
2711 # if the instance is no longer existing, consider this a
2712 # success and go to cleanup
2713 if not _GetInstanceInfo(instance):
2714 return
2715
2716 _Fail("Failed to stop instance '%s': %s", instance.name, err)
2717
2718 self.tried_once = True
2719
2720 raise utils.RetryAgain()
2721
2722 try:
2723 utils.Retry(_TryShutdown(), 5, timeout)
2724 except utils.RetryTimeout:
2725 # the shutdown did not succeed
2726 logging.error("Shutdown of '%s' unsuccessful, forcing", instance.name)
2727
2728 try:
2729 hyper.StopInstance(instance, force=True)
2730 except errors.HypervisorError, err:
2731 # only raise an error if the instance still exists, otherwise
2732 # the error could simply be "instance ... unknown"!
2733 if _GetInstanceInfo(instance):
2734 _Fail("Failed to force stop instance '%s': %s", instance.name, err)
2735
2736 time.sleep(1)
2737
2738 if _GetInstanceInfo(instance):
2739 _Fail("Could not shutdown instance '%s' even by destroy", instance.name)
2740
2741 try:
2742 hyper.CleanupInstance(instance.name)
2743 except errors.HypervisorError, err:
2744 logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
2745
2746 _RemoveBlockDevLinks(instance.name, instance.disks_info)
2747
2748
2749 def InstanceReboot(instance, reboot_type, shutdown_timeout, reason):
2750 """Reboot an instance.
2751
2752 @type instance: L{objects.Instance}
2753 @param instance: the instance object to reboot
2754 @type reboot_type: str
2755 @param reboot_type: the type of reboot, one the following
2756 constants:
2757 - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
2758 instance OS, do not recreate the VM
2759 - L{constants.INSTANCE_REBOOT_HARD}: tear down and
2760 restart the VM (at the hypervisor level)
2761 - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
2762 not accepted here, since that mode is handled differently, in
2763 cmdlib, and translates into full stop and start of the
2764 instance (instead of a call_instance_reboot RPC)
2765 @type shutdown_timeout: integer
2766 @param shutdown_timeout: maximum timeout for soft shutdown
2767 @type reason: list of reasons
2768 @param reason: the reason trail for this reboot
2769 @rtype: None
2770
2771 """
2772 # TODO: this is inconsistent with 'StartInstance' and 'InstanceShutdown'
2773 # because those functions simply 'return' on error whereas this one
2774 # raises an exception with '_Fail'
2775 if not _GetInstanceInfo(instance):
2776 _Fail("Cannot reboot instance '%s' that is not running", instance.name)
2777
2778 hyper = hypervisor.GetHypervisor(instance.hypervisor)
2779 if reboot_type == constants.INSTANCE_REBOOT_SOFT:
2780 try:
2781 hyper.RebootInstance(instance)
2782 except errors.HypervisorError, err:
2783 _Fail("Failed to soft reboot instance '%s': %s", instance.name, err)
2784 elif reboot_type == constants.INSTANCE_REBOOT_HARD:
2785 try:
2786 InstanceShutdown(instance, shutdown_timeout, reason, store_reason=False)
2787 result = StartInstance(instance, False, reason, store_reason=False)
2788 _StoreInstReasonTrail(instance.name, reason)
2789 return result
2790 except errors.HypervisorError, err:
2791 _Fail("Failed to hard reboot instance '%s': %s", instance.name, err)
2792 else:
2793 _Fail("Invalid reboot_type received: '%s'", reboot_type)
2794
2795
2796 def InstanceBalloonMemory(instance, memory):
2797 """Resize an instance's memory.
2798
2799 @type instance: L{objects.Instance}
2800 @param instance: the instance object
2801 @type memory: int
2802 @param memory: new memory amount in MB
2803 @rtype: None
2804
2805 """
2806 hyper = hypervisor.GetHypervisor(instance.hypervisor)
2807 running = hyper.ListInstances(hvparams=instance.hvparams)
2808 if instance.name not in running:
2809 logging.info("Instance %s is not running, cannot balloon", instance.name)
2810 return
2811 try:
2812 hyper.BalloonInstanceMemory(instance, memory)
2813 except errors.HypervisorError, err:
2814 _Fail("Failed to balloon instance memory: %s", err, exc=True)
2815
2816
2817 def MigrationInfo(instance):
2818 """Gather information about an instance to be migrated.
2819
2820 @type instance: L{objects.Instance}
2821 @param instance: the instance definition
2822
2823 """
2824 hyper = hypervisor.GetHypervisor(instance.hypervisor)
2825 try:
2826 info = hyper.MigrationInfo(instance)
2827 except errors.HypervisorError, err:
2828 _Fail("Failed to fetch migration information: %s", err, exc=True)
2829 return info
2830
2831
2832 def AcceptInstance(instance, info, target):
2833 """Prepare the node to accept an instance.
2834
2835 @type instance: L{objects.Instance}
2836 @param instance: the instance definition
2837 @type info: string/data (opaque)
2838 @param info: migration information, from the source node
2839 @type target: string
2840 @param target: target host (usually ip), on this node
2841
2842 """
2843 # TODO: why is this required only for DTS_EXT_MIRROR?
2844 if instance.disk_template in constants.DTS_EXT_MIRROR:
2845 # Create the symlinks, as the disks are not active
2846 # in any way
2847 try:
2848 _GatherAndLinkBlockDevs(instance)
2849 except errors.BlockDeviceError, err:
2850 _Fail("Block device error: %s", err, exc=True)
2851
2852 hyper = hypervisor.GetHypervisor(instance.hypervisor)
2853 try:
2854 hyper.AcceptInstance(instance, info, target)
2855 except errors.HypervisorError, err:
2856 if instance.disk_template in constants.DTS_EXT_MIRROR:
2857 _RemoveBlockDevLinks(instance.name, instance.disks_info)
2858 _Fail("Failed to accept instance: %s", err, exc=True)
2859
2860
2861 def FinalizeMigrationDst(instance, info, success):
2862 """Finalize any preparation to accept an instance.
2863
2864 @type instance: L{objects.Instance}
2865 @param instance: the instance definition
2866 @type info: string/data (opaque)
2867 @param info: migration information, from the source node
2868 @type success: boolean
2869 @param success: whether the migration was a success or a failure
2870
2871 """
2872 hyper = hypervisor.GetHypervisor(instance.hypervisor)
2873 try:
2874 hyper.FinalizeMigrationDst(instance, info, success)
2875 except errors.HypervisorError, err:
2876 _Fail("Failed to finalize migration on the target node: %s", err, exc=True)
2877
2878
2879 def MigrateInstance(cluster_name, instance, target, live):
2880 """Migrates an instance to another node.
2881
2882 @type cluster_name: string
2883 @param cluster_name: name of the cluster
2884 @type instance: L{objects.Instance}
2885 @param instance: the instance definition
2886 @type target: string
2887 @param target: the target node name
2888 @type live: boolean
2889 @param live: whether the migration should be done live or not (the
2890 interpretation of this parameter is left to the hypervisor)
2891 @raise RPCFail: if migration fails for some reason
2892
2893 """
2894 hyper = hypervisor.GetHypervisor(instance.hypervisor)
2895
2896 try:
2897 hyper.MigrateInstance(cluster_name, instance, target, live)
2898 except errors.HypervisorError, err:
2899 _Fail("Failed to migrate instance: %s", err, exc=True)
2900
2901
2902 def FinalizeMigrationSource(instance, success, live):
2903 """Finalize the instance migration on the source node.
2904
2905 @type instance: L{objects.Instance}
2906 @param instance: the instance definition of the migrated instance
2907 @type success: bool
2908 @param success: whether the migration succeeded or not
2909 @type live: bool
2910 @param live: whether the user requested a live migration or not
2911 @raise RPCFail: If the execution fails for some reason
2912
2913 """
2914 hyper = hypervisor.GetHypervisor(instance.hypervisor)
2915
2916 try:
2917 hyper.FinalizeMigrationSource(instance, success, live)
2918 except Exception, err: # pylint: disable=W0703
2919 _Fail("Failed to finalize the migration on the source node: %s", err,
2920 exc=True)
2921
2922
2923 def GetMigrationStatus(instance):
2924 """Get the migration status
2925
2926 @type instance: L{objects.Instance}
2927 @param instance: the instance that is being migrated
2928 @rtype: L{objects.MigrationStatus}
2929 @return: the status of the current migration (one of
2930 L{constants.HV_MIGRATION_VALID_STATUSES}), plus any additional
2931 progress info that can be retrieved from the hypervisor
2932 @raise RPCFail: If the migration status cannot be retrieved
2933
2934 """
2935 hyper = hypervisor.GetHypervisor(instance.hypervisor)
2936 try:
2937 return hyper.GetMigrationStatus(instance)
2938 except Exception, err: # pylint: disable=W0703
2939 _Fail("Failed to get migration status: %s", err, exc=True)
2940
2941
2942 def HotplugDevice(instance, action, dev_type, device, extra, seq):
2943 """Hotplug a device
2944
2945 Hotplug is currently supported only for KVM Hypervisor.
2946 @type instance: L{objects.Instance}
2947 @param instance: the instance to which we hotplug a device
2948 @type action: string
2949 @param action: the hotplug action to perform
2950 @type dev_type: string
2951 @param dev_type: the device type to hotplug
2952 @type device: either L{objects.NIC} or L{objects.Disk}
2953 @param device: the device object to hotplug
2954 @type extra: tuple
2955 @param extra: extra info used for disk hotplug (disk link, drive uri)
2956 @type seq: int
2957 @param seq: the index of the device from master perspective
2958 @raise RPCFail: in case instance does not have KVM hypervisor
2959
2960 """
2961 hyper = hypervisor.GetHypervisor(instance.hypervisor)
2962 try:
2963 hyper.VerifyHotplugSupport(instance, action, dev_type)
2964 except errors.HotplugError, err:
2965 _Fail("Hotplug is not supported: %s", err)
2966
2967 if action == constants.HOTPLUG_ACTION_ADD:
2968 fn = hyper.HotAddDevice
2969 elif action == constants.HOTPLUG_ACTION_REMOVE:
2970 fn = hyper.HotDelDevice
2971 elif action == constants.HOTPLUG_ACTION_MODIFY:
2972 fn = hyper.HotModDevice
2973 else:
2974 assert action in constants.HOTPLUG_ALL_ACTIONS
2975
2976 return fn(instance, dev_type, device, extra, seq)
2977
2978
2979 def HotplugSupported(instance):
2980 """Checks if hotplug is generally supported.
2981
2982 """
2983 hyper = hypervisor.GetHypervisor(instance.hypervisor)
2984 try:
2985 hyper.HotplugSupported(instance)
2986 except errors.HotplugError, err:
2987 _Fail("Hotplug is not supported: %s", err)
2988
2989
2990 def ModifyInstanceMetadata(metadata):
2991 """Sends instance data to the metadata daemon.
2992
2993 Uses the Luxi transport layer to communicate with the metadata
2994 daemon configuration server. It starts the metadata daemon if it is
2995 not running.
2996 The daemon must be enabled during at configuration time.
2997
2998 @type metadata: dict
2999 @param metadata: instance metadata obtained by calling
3000 L{objects.Instance.ToDict} on an instance object
3001
3002 """
3003 if not constants.ENABLE_METAD:
3004 raise errors.ProgrammerError("The metadata deamon is disabled, yet"
3005 " ModifyInstanceMetadata has been called")
3006
3007 if not utils.IsDaemonAlive(constants.METAD):
3008 result = utils.RunCmd([pathutils.DAEMON_UTIL, "start", constants.METAD])
3009 if result.failed:
3010 raise errors.HypervisorError("Failed to start metadata daemon")
3011
3012 def _Connect():
3013 return transport.Transport(pathutils.SOCKET_DIR + "/ganeti-metad",
3014 allow_non_master=True)
3015
3016 retries = 5
3017
3018 while True:
3019 try:
3020 trans = utils.Retry(_Connect, 1.0, constants.LUXI_DEF_CTMO)
3021 break
3022 except utils.RetryTimeout:
3023 raise TimeoutError("Connection to metadata daemon timed out")
3024 except (socket.error, NoMasterError), err:
3025 if retries == 0:
3026 logging.error("Failed to connect to the metadata daemon",
3027 exc_info=True)
3028 raise TimeoutError("Failed to connect to metadata daemon: %s" % err)
3029 else:
3030 retries -= 1
3031
3032 data = serializer.DumpJson(metadata,
3033 private_encoder=serializer.EncodeWithPrivateFields)
3034
3035 trans.Send(data)
3036 trans.Close()
3037
3038
3039 def BlockdevCreate(disk, size, owner, on_primary, info, excl_stor):
3040 """Creates a block device for an instance.
3041
3042 @type disk: L{objects.Disk}
3043 @param disk: the object describing the disk we should create
3044 @type size: int
3045 @param size: the size of the physical underlying device, in MiB
3046 @type owner: str
3047 @param owner: the name of the instance for which disk is created,
3048 used for device cache data
3049 @type on_primary: boolean
3050 @param on_primary: indicates if it is the primary node or not
3051 @type info: string
3052 @param info: string that will be sent to the physical device
3053 creation, used for example to set (LVM) tags on LVs
3054 @type excl_stor: boolean
3055 @param excl_stor: Whether exclusive_storage is active
3056
3057 @return: the new unique_id of the device (this can sometime be
3058 computed only after creation), or None. On secondary nodes,
3059 it's not required to return anything.
3060
3061 """
3062 # TODO: remove the obsolete "size" argument
3063 # pylint: disable=W0613
3064 clist = []
3065 if disk.children:
3066 for child in disk.children:
3067 try:
3068 crdev = _RecursiveAssembleBD(child, owner, on_primary)
3069 except errors.BlockDeviceError, err:
3070 _Fail("Can't assemble device %s: %s", child, err)
3071 if on_primary or disk.AssembleOnSecondary():
3072 # we need the children open in case the device itself has to
3073 # be assembled
3074 try:
3075 # pylint: disable=E1103
3076 crdev.Open()
3077 except errors.BlockDeviceError, err:
3078 _Fail("Can't make child '%s' read-write: %s", child, err)
3079 clist.append(crdev)
3080
3081 try:
3082 device = bdev.Create(disk, clist, excl_stor)
3083 except errors.BlockDeviceError, err:
3084 _Fail("Can't create block device: %s", err)
3085
3086 if on_primary or disk.AssembleOnSecondary():
3087 try:
3088 device.Assemble()
3089 except errors.BlockDeviceError, err:
3090 _Fail("Can't assemble device after creation, unusual event: %s", err)
3091 if on_primary or disk.OpenOnSecondary():
3092 try:
3093 device.Open(force=True)
3094 except errors.BlockDeviceError, err:
3095 _Fail("Can't make device r/w after creation, unusual event: %s", err)
3096 DevCacheManager.UpdateCache(device.dev_path, owner,
3097 on_primary, disk.iv_name)
3098
3099 device.SetInfo(info)
3100
3101 return device.unique_id
3102
3103
3104 def _DumpDevice(source_path, target_path, offset, size, truncate):
3105 """This function images/wipes the device using a local file.
3106
3107 @type source_path: string
3108 @param source_path: path of the image or data source (e.g., "/dev/zero")
3109
3110 @type target_path: string
3111 @param target_path: path of the device to image/wipe
3112
3113 @type offset: int
3114 @param offset: offset in MiB in the output file
3115
3116 @type size: int
3117 @param size: maximum size in MiB to write (data source might be smaller)
3118
3119 @type truncate: bool
3120 @param truncate: whether the file should be truncated
3121
3122 @return: None
3123 @raise RPCFail: in case of failure
3124
3125 """
3126 # Internal sizes are always in Mebibytes; if the following "dd" command
3127 # should use a different block size the offset and size given to this
3128 # function must be adjusted accordingly before being passed to "dd".
3129 block_size = constants.DD_BLOCK_SIZE
3130
3131 cmd = [constants.DD_CMD, "if=%s" % source_path, "seek=%d" % offset,
3132 "bs=%s" % block_size, "oflag=direct", "of=%s" % target_path,
3133 "count=%d" % size]
3134
3135 if not truncate:
3136 cmd.append("conv=notrunc")
3137
3138 result = utils.RunCmd(cmd)
3139
3140 if result.failed:
3141 _Fail("Dump command '%s' exited with error: %s; output: %s", result.cmd,
3142 result.fail_reason, result.output)
3143
3144
3145 def _DownloadAndDumpDevice(source_url, target_path, size):
3146 """This function images a device using a downloaded image file.
3147
3148 @type source_url: string
3149 @param source_url: URL of image to dump to disk
3150
3151 @type target_path: string
3152 @param target_path: path of the device to image
3153
3154 @type size: int
3155 @param size: maximum size in MiB to write (data source might be smaller)
3156
3157 @rtype: NoneType
3158 @return: None
3159 @raise RPCFail: in case of download or write failures
3160
3161 """
3162 class DDParams(object):
3163 def __init__(self, current_size, total_size):
3164 self.current_size = current_size
3165 self.total_size = total_size
3166 self.image_size_error = False
3167
3168 def dd_write(ddparams, out):
3169 if ddparams.current_size < ddparams.total_size:
3170 ddparams.current_size += len(out)
3171 target_file.write(out)
3172 else:
3173 ddparams.image_size_error = True
3174 return -1
3175
3176 target_file = open(target_path, "r+")
3177 ddparams = DDParams(0, 1024 * 1024 * size)
3178
3179 curl = pycurl.Curl()
3180 curl.setopt(pycurl.VERBOSE, True)
3181 curl.setopt(pycurl.NOSIGNAL, True)
3182 curl.setopt(pycurl.USERAGENT, http.HTTP_GANETI_VERSION)
3183 curl.setopt(pycurl.URL, source_url)
3184 curl.setopt(pycurl.WRITEFUNCTION, lambda out: dd_write(ddparams, out))
3185
3186 try:
3187 curl.perform()
3188 except pycurl.error:
3189 if ddparams.image_size_error:
3190 _Fail("Disk image larger than the disk")
3191 else:
3192 raise
3193
3194 target_file.close()
3195
3196
3197 def BlockdevConvert(src_disk, target_disk):
3198 """Copies data from source block device to target.
3199
3200 This function gets the export and import commands from the source and
3201 target devices respectively, and then concatenates them to a single
3202 command using a pipe ("|"). Finally, executes the unified command that
3203 will transfer the data between the devices during the disk template
3204 conversion operation.
3205
3206 @type src_disk: L{objects.Disk}
3207 @param src_disk: the disk object we want to copy from
3208 @type target_disk: L{objects.Disk}
3209 @param target_disk: the disk object we want to copy to
3210
3211 @rtype: NoneType
3212 @return: None
3213 @raise RPCFail: in case of failure
3214
3215 """
3216 src_dev = _RecursiveFindBD(src_disk)
3217 if src_dev is None:
3218 _Fail("Cannot copy from device '%s': device not found", src_disk.uuid)
3219
3220 dest_dev = _RecursiveFindBD(target_disk)
3221 if dest_dev is None:
3222 _Fail("Cannot copy to device '%s': device not found", target_disk.uuid)
3223
3224 src_cmd = src_dev.Export()
3225 dest_cmd = dest_dev.Import()
3226 command = "%s | %s" % (utils.ShellQuoteArgs(src_cmd),
3227 utils.ShellQuoteArgs(dest_cmd))
3228
3229 result = utils.RunCmd(command)
3230 if result.failed:
3231 _Fail("Disk conversion command '%s' exited with error: %s; output: %s",
3232 result.cmd, result.fail_reason, result.output)
3233
3234
3235 def BlockdevWipe(disk, offset, size):
3236 """Wipes a block device.
3237
3238 @type disk: L{objects.Disk}
3239 @param disk: the disk object we want to wipe
3240 @type offset: int
3241 @param offset: The offset in MiB in the file
3242 @type size: int
3243 @param size: The size in MiB to write
3244
3245 """
3246 try:
3247 rdev = _RecursiveFindBD(disk)
3248 except errors.BlockDeviceError:
3249 rdev = None
3250
3251 if not rdev:
3252 _Fail("Cannot wipe device %s: device not found", disk.iv_name)
3253 if offset < 0:
3254 _Fail("Negative offset")
3255 if size < 0:
3256 _Fail("Negative size")
3257 if offset > rdev.size:
3258 _Fail("Wipe offset is bigger than device size")
3259 if (offset + size) > rdev.size:
3260 _Fail("Wipe offset and size are bigger than device size")
3261
3262 _DumpDevice("/dev/zero", rdev.dev_path, offset, size, True)
3263
3264
3265 def BlockdevImage(disk, image, size):
3266 """Images a block device either by dumping a local file or
3267 downloading a URL.
3268
3269 @type disk: L{objects.Disk}
3270 @param disk: the disk object we want to image
3271
3272 @type image: string
3273 @param image: file path to the disk image be dumped
3274
3275 @type size: int
3276 @param size: The size in MiB to write
3277
3278 @rtype: NoneType
3279 @return: None
3280 @raise RPCFail: in case of failure
3281
3282 """
3283 if not (utils.IsUrl(image) or os.path.exists(image)):
3284 _Fail("Image '%s' not found", image)
3285
3286 try:
3287 rdev = _RecursiveFindBD(disk)
3288 except errors.BlockDeviceError:
3289 rdev = None
3290
3291 if not rdev:
3292 _Fail("Cannot image device %s: device not found", disk.iv_name)
3293 if size < 0:
3294 _Fail("Negative size")
3295 if size > rdev.size:
3296 _Fail("Image size is bigger than device size")
3297
3298 if utils.IsUrl(image):
3299 _DownloadAndDumpDevice(image, rdev.dev_path, size)
3300 else:
3301 _DumpDevice(image, rdev.dev_path, 0, size, False)
3302
3303
3304 def BlockdevPauseResumeSync(disks, pause):
3305 """Pause or resume the sync of the block device.
3306
3307 @type disks: list of L{objects.Disk}
3308 @param disks: the disks object we want to pause/resume
3309 @type pause: bool
3310 @param pause: Wheater to pause or resume
3311
3312 """
3313 success = []
3314 for disk in disks:
3315 try:
3316 rdev = _RecursiveFindBD(disk)
3317 except errors.BlockDeviceError:
3318 rdev = None
3319
3320 if not rdev:
3321 success.append((False, ("Cannot change sync for device %s:"
3322 " device not found" % disk.iv_name)))
3323 continue
3324
3325 result = rdev.PauseResumeSync(pause)
3326
3327 if result:
3328 success.append((result, None))
3329 else:
3330 if pause:
3331 msg = "Pause"
3332 else:
3333 msg = "Resume"
3334 success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
3335
3336 return success
3337
3338
3339 def BlockdevRemove(disk):
3340 """Remove a block device.
3341
3342 @note: This is intended to be called recursively.
3343
3344 @type disk: L{objects.Disk}
3345 @param disk: the disk object we should remove
3346 @rtype: boolean
3347 @return: the success of the operation
3348
3349 """
3350 msgs = []
3351 try:
3352 rdev