Add a predicate testing that a majority of nodes is healthy
[ganeti-github.git] / lib / bootstrap.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2010, 2011, 2012 Google Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions are
9 # met:
10 #
11 # 1. Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
13 #
14 # 2. Redistributions in binary form must reproduce the above copyright
15 # notice, this list of conditions and the following disclaimer in the
16 # documentation and/or other materials provided with the distribution.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
19 # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
20 # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30
31 """Functions to bootstrap a new cluster.
32
33 """
34
35 import os
36 import os.path
37 import re
38 import logging
39 import time
40 import tempfile
41
42 from ganeti.cmdlib import cluster
43 import ganeti.rpc.node as rpc
44 from ganeti import ssh
45 from ganeti import utils
46 from ganeti import errors
47 from ganeti import config
48 from ganeti import constants
49 from ganeti import objects
50 from ganeti import ssconf
51 from ganeti import serializer
52 from ganeti import hypervisor
53 from ganeti.storage import drbd
54 from ganeti.storage import filestorage
55 from ganeti import netutils
56 from ganeti import luxi
57 from ganeti import jstore
58 from ganeti import pathutils
59 from ganeti import runtime
60 from ganeti import vcluster
61
62
63 # ec_id for InitConfig's temporary reservation manager
64 _INITCONF_ECID = "initconfig-ecid"
65
66 #: After how many seconds daemon must be responsive
67 _DAEMON_READY_TIMEOUT = 10.0
68
69
70 def _InitSSHSetup():
71 """Setup the SSH configuration for the cluster.
72
73 This generates a dsa keypair for root, adds the pub key to the
74 permitted hosts and adds the hostkey to its own known hosts.
75
76 """
77 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.SSH_LOGIN_USER)
78
79 for name in priv_key, pub_key:
80 if os.path.exists(name):
81 utils.CreateBackup(name)
82 utils.RemoveFile(name)
83
84 result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
85 "-f", priv_key,
86 "-q", "-N", ""])
87 if result.failed:
88 raise errors.OpExecError("Could not generate ssh keypair, error %s" %
89 result.output)
90
91 utils.AddAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
92
93
94 def GenerateHmacKey(file_name):
95 """Writes a new HMAC key.
96
97 @type file_name: str
98 @param file_name: Path to output file
99
100 """
101 utils.WriteFile(file_name, data="%s\n" % utils.GenerateSecret(), mode=0400,
102 backup=True)
103
104
105 # pylint: disable=R0913
106 def GenerateClusterCrypto(new_cluster_cert, new_rapi_cert, new_spice_cert,
107 new_confd_hmac_key, new_cds, new_client_cert,
108 master_name,
109 rapi_cert_pem=None, spice_cert_pem=None,
110 spice_cacert_pem=None, cds=None,
111 nodecert_file=pathutils.NODED_CERT_FILE,
112 clientcert_file=pathutils.NODED_CLIENT_CERT_FILE,
113 rapicert_file=pathutils.RAPI_CERT_FILE,
114 spicecert_file=pathutils.SPICE_CERT_FILE,
115 spicecacert_file=pathutils.SPICE_CACERT_FILE,
116 hmackey_file=pathutils.CONFD_HMAC_KEY,
117 cds_file=pathutils.CLUSTER_DOMAIN_SECRET_FILE):
118 """Updates the cluster certificates, keys and secrets.
119
120 @type new_cluster_cert: bool
121 @param new_cluster_cert: Whether to generate a new cluster certificate
122 @type new_rapi_cert: bool
123 @param new_rapi_cert: Whether to generate a new RAPI certificate
124 @type new_spice_cert: bool
125 @param new_spice_cert: Whether to generate a new SPICE certificate
126 @type new_confd_hmac_key: bool
127 @param new_confd_hmac_key: Whether to generate a new HMAC key
128 @type new_cds: bool
129 @param new_cds: Whether to generate a new cluster domain secret
130 @type new_client_cert: bool
131 @param new_client_cert: Whether to generate a new client certificate
132 @type master_name: string
133 @param master_name: FQDN of the master node
134 @type rapi_cert_pem: string
135 @param rapi_cert_pem: New RAPI certificate in PEM format
136 @type spice_cert_pem: string
137 @param spice_cert_pem: New SPICE certificate in PEM format
138 @type spice_cacert_pem: string
139 @param spice_cacert_pem: Certificate of the CA that signed the SPICE
140 certificate, in PEM format
141 @type cds: string
142 @param cds: New cluster domain secret
143 @type nodecert_file: string
144 @param nodecert_file: optional override of the node cert file path
145 @type rapicert_file: string
146 @param rapicert_file: optional override of the rapi cert file path
147 @type spicecert_file: string
148 @param spicecert_file: optional override of the spice cert file path
149 @type spicecacert_file: string
150 @param spicecacert_file: optional override of the spice CA cert file path
151 @type hmackey_file: string
152 @param hmackey_file: optional override of the hmac key file path
153
154 """
155 # pylint: disable=R0913
156 # noded SSL certificate
157 utils.GenerateNewSslCert(
158 new_cluster_cert, nodecert_file, 1,
159 "Generating new cluster certificate at %s" % nodecert_file)
160
161 # If the cluster certificate was renewed, the client cert has to be
162 # renewed and resigned.
163 if new_cluster_cert or new_client_cert:
164 utils.GenerateNewClientSslCert(clientcert_file, nodecert_file,
165 master_name)
166
167 # confd HMAC key
168 if new_confd_hmac_key or not os.path.exists(hmackey_file):
169 logging.debug("Writing new confd HMAC key to %s", hmackey_file)
170 GenerateHmacKey(hmackey_file)
171
172 if rapi_cert_pem:
173 # Assume rapi_pem contains a valid PEM-formatted certificate and key
174 logging.debug("Writing RAPI certificate at %s", rapicert_file)
175 utils.WriteFile(rapicert_file, data=rapi_cert_pem, backup=True)
176
177 else:
178 utils.GenerateNewSslCert(
179 new_rapi_cert, rapicert_file, 1,
180 "Generating new RAPI certificate at %s" % rapicert_file)
181
182 # SPICE
183 spice_cert_exists = os.path.exists(spicecert_file)
184 spice_cacert_exists = os.path.exists(spicecacert_file)
185 if spice_cert_pem:
186 # spice_cert_pem implies also spice_cacert_pem
187 logging.debug("Writing SPICE certificate at %s", spicecert_file)
188 utils.WriteFile(spicecert_file, data=spice_cert_pem, backup=True)
189 logging.debug("Writing SPICE CA certificate at %s", spicecacert_file)
190 utils.WriteFile(spicecacert_file, data=spice_cacert_pem, backup=True)
191 elif new_spice_cert or not spice_cert_exists:
192 if spice_cert_exists:
193 utils.CreateBackup(spicecert_file)
194 if spice_cacert_exists:
195 utils.CreateBackup(spicecacert_file)
196
197 logging.debug("Generating new self-signed SPICE certificate at %s",
198 spicecert_file)
199 (_, cert_pem) = utils.GenerateSelfSignedSslCert(spicecert_file, 1)
200
201 # Self-signed certificate -> the public certificate is also the CA public
202 # certificate
203 logging.debug("Writing the public certificate to %s",
204 spicecert_file)
205 utils.io.WriteFile(spicecacert_file, mode=0400, data=cert_pem)
206
207 # Cluster domain secret
208 if cds:
209 logging.debug("Writing cluster domain secret to %s", cds_file)
210 utils.WriteFile(cds_file, data=cds, backup=True)
211
212 elif new_cds or not os.path.exists(cds_file):
213 logging.debug("Generating new cluster domain secret at %s", cds_file)
214 GenerateHmacKey(cds_file)
215
216
217 def _InitGanetiServerSetup(master_name, cfg):
218 """Setup the necessary configuration for the initial node daemon.
219
220 This creates the nodepass file containing the shared password for
221 the cluster, generates the SSL certificate and starts the node daemon.
222
223 @type master_name: str
224 @param master_name: Name of the master node
225 @type cfg: ConfigWriter
226 @param cfg: the configuration writer
227
228 """
229 # Generate cluster secrets
230 GenerateClusterCrypto(True, False, False, False, False, False, master_name)
231
232 # Add the master's SSL certificate digest to the configuration.
233 master_uuid = cfg.GetMasterNode()
234 master_digest = utils.GetCertificateDigest()
235 cfg.AddNodeToCandidateCerts(master_uuid, master_digest)
236 cfg.Update(cfg.GetClusterInfo(), logging.error)
237 ssconf.WriteSsconfFiles(cfg.GetSsconfValues())
238
239 if not os.path.exists(
240 os.path.join(pathutils.DATA_DIR,
241 "%s%s" % (constants.SSCONF_FILEPREFIX,
242 constants.SS_MASTER_CANDIDATES_CERTS))):
243 raise errors.OpExecError("Ssconf file for master candidate certificates"
244 " was not written.")
245
246 if not os.path.exists(pathutils.NODED_CERT_FILE):
247 raise errors.OpExecError("The server certficate was not created properly.")
248
249 if not os.path.exists(pathutils.NODED_CLIENT_CERT_FILE):
250 raise errors.OpExecError("The client certificate was not created"
251 " properly.")
252
253 # set up the inter-node password and certificate
254 result = utils.RunCmd([pathutils.DAEMON_UTIL, "start", constants.NODED])
255 if result.failed:
256 raise errors.OpExecError("Could not start the node daemon, command %s"
257 " had exitcode %s and error %s" %
258 (result.cmd, result.exit_code, result.output))
259
260 _WaitForNodeDaemon(master_name)
261
262
263 def _WaitForNodeDaemon(node_name):
264 """Wait for node daemon to become responsive.
265
266 """
267 def _CheckNodeDaemon():
268 # Pylint bug <http://www.logilab.org/ticket/35642>
269 # pylint: disable=E1101
270 result = rpc.BootstrapRunner().call_version([node_name])[node_name]
271 if result.fail_msg:
272 raise utils.RetryAgain()
273
274 try:
275 utils.Retry(_CheckNodeDaemon, 1.0, _DAEMON_READY_TIMEOUT)
276 except utils.RetryTimeout:
277 raise errors.OpExecError("Node daemon on %s didn't answer queries within"
278 " %s seconds" % (node_name, _DAEMON_READY_TIMEOUT))
279
280
281 def _WaitForMasterDaemon():
282 """Wait for master daemon to become responsive.
283
284 """
285 def _CheckMasterDaemon():
286 try:
287 cl = luxi.Client()
288 (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
289 except Exception:
290 raise utils.RetryAgain()
291
292 logging.debug("Received cluster name %s from master", cluster_name)
293
294 try:
295 utils.Retry(_CheckMasterDaemon, 1.0, _DAEMON_READY_TIMEOUT)
296 except utils.RetryTimeout:
297 raise errors.OpExecError("Master daemon didn't answer queries within"
298 " %s seconds" % _DAEMON_READY_TIMEOUT)
299
300
301 def _WaitForSshDaemon(hostname, port):
302 """Wait for SSH daemon to become responsive.
303
304 """
305 family = ssconf.SimpleStore().GetPrimaryIPFamily()
306 hostip = netutils.GetHostname(name=hostname, family=family).ip
307
308 def _CheckSshDaemon():
309 if netutils.TcpPing(hostip, port, timeout=1.0, live_port_needed=True):
310 logging.debug("SSH daemon on %s:%s (IP address %s) has become"
311 " responsive", hostname, port, hostip)
312 else:
313 raise utils.RetryAgain()
314
315 try:
316 utils.Retry(_CheckSshDaemon, 1.0, _DAEMON_READY_TIMEOUT)
317 except utils.RetryTimeout:
318 raise errors.OpExecError("SSH daemon on %s:%s (IP address %s) didn't"
319 " become responsive within %s seconds" %
320 (hostname, port, hostip, _DAEMON_READY_TIMEOUT))
321
322
323 def RunNodeSetupCmd(cluster_name, node, basecmd, debug, verbose,
324 use_cluster_key, ask_key, strict_host_check,
325 port, data):
326 """Runs a command to configure something on a remote machine.
327
328 @type cluster_name: string
329 @param cluster_name: Cluster name
330 @type node: string
331 @param node: Node name
332 @type basecmd: string
333 @param basecmd: Base command (path on the remote machine)
334 @type debug: bool
335 @param debug: Enable debug output
336 @type verbose: bool
337 @param verbose: Enable verbose output
338 @type use_cluster_key: bool
339 @param use_cluster_key: See L{ssh.SshRunner.BuildCmd}
340 @type ask_key: bool
341 @param ask_key: See L{ssh.SshRunner.BuildCmd}
342 @type strict_host_check: bool
343 @param strict_host_check: See L{ssh.SshRunner.BuildCmd}
344 @type port: int
345 @param port: The SSH port of the remote machine or None for the default
346 @param data: JSON-serializable input data for script (passed to stdin)
347
348 """
349 cmd = [basecmd]
350
351 # Pass --debug/--verbose to the external script if set on our invocation
352 if debug:
353 cmd.append("--debug")
354
355 if verbose:
356 cmd.append("--verbose")
357
358 logging.debug("Node setup command: %s", cmd)
359
360 version = constants.DIR_VERSION
361 all_cmds = [["test", "-d", os.path.join(pathutils.PKGLIBDIR, version)]]
362 if constants.HAS_GNU_LN:
363 all_cmds.extend([["ln", "-s", "-f", "-T",
364 os.path.join(pathutils.PKGLIBDIR, version),
365 os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")],
366 ["ln", "-s", "-f", "-T",
367 os.path.join(pathutils.SHAREDIR, version),
368 os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]])
369 else:
370 all_cmds.extend([["rm", "-f",
371 os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")],
372 ["ln", "-s", "-f",
373 os.path.join(pathutils.PKGLIBDIR, version),
374 os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")],
375 ["rm", "-f",
376 os.path.join(pathutils.SYSCONFDIR, "ganeti/share")],
377 ["ln", "-s", "-f",
378 os.path.join(pathutils.SHAREDIR, version),
379 os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]])
380 all_cmds.append(cmd)
381
382 if port is None:
383 port = netutils.GetDaemonPort(constants.SSH)
384
385 srun = ssh.SshRunner(cluster_name)
386 scmd = srun.BuildCmd(node, constants.SSH_LOGIN_USER,
387 utils.ShellQuoteArgs(
388 utils.ShellCombineCommands(all_cmds)),
389 batch=False, ask_key=ask_key, quiet=False,
390 strict_host_check=strict_host_check,
391 use_cluster_key=use_cluster_key,
392 port=port)
393
394 tempfh = tempfile.TemporaryFile()
395 try:
396 tempfh.write(serializer.DumpJson(data))
397 tempfh.seek(0)
398
399 result = utils.RunCmd(scmd, interactive=True, input_fd=tempfh)
400 finally:
401 tempfh.close()
402
403 if result.failed:
404 raise errors.OpExecError("Command '%s' failed: %s" %
405 (result.cmd, result.fail_reason))
406
407 _WaitForSshDaemon(node, port)
408
409
410 def _InitFileStorageDir(file_storage_dir):
411 """Initialize if needed the file storage.
412
413 @param file_storage_dir: the user-supplied value
414 @return: either empty string (if file storage was disabled at build
415 time) or the normalized path to the storage directory
416
417 """
418 file_storage_dir = os.path.normpath(file_storage_dir)
419
420 if not os.path.isabs(file_storage_dir):
421 raise errors.OpPrereqError("File storage directory '%s' is not an absolute"
422 " path" % file_storage_dir, errors.ECODE_INVAL)
423
424 if not os.path.exists(file_storage_dir):
425 try:
426 os.makedirs(file_storage_dir, 0750)
427 except OSError, err:
428 raise errors.OpPrereqError("Cannot create file storage directory"
429 " '%s': %s" % (file_storage_dir, err),
430 errors.ECODE_ENVIRON)
431
432 if not os.path.isdir(file_storage_dir):
433 raise errors.OpPrereqError("The file storage directory '%s' is not"
434 " a directory." % file_storage_dir,
435 errors.ECODE_ENVIRON)
436
437 return file_storage_dir
438
439
440 def _PrepareFileBasedStorage(
441 enabled_disk_templates, file_storage_dir,
442 default_dir, file_disk_template, _storage_path_acceptance_fn,
443 init_fn=_InitFileStorageDir, acceptance_fn=None):
444 """Checks if a file-base storage type is enabled and inits the dir.
445
446 @type enabled_disk_templates: list of string
447 @param enabled_disk_templates: list of enabled disk templates
448 @type file_storage_dir: string
449 @param file_storage_dir: the file storage directory
450 @type default_dir: string
451 @param default_dir: default file storage directory when C{file_storage_dir}
452 is 'None'
453 @type file_disk_template: string
454 @param file_disk_template: a disk template whose storage type is 'ST_FILE',
455 'ST_SHARED_FILE' or 'ST_GLUSTER'
456 @type _storage_path_acceptance_fn: function
457 @param _storage_path_acceptance_fn: checks whether the given file-based
458 storage directory is acceptable
459 @see: C{cluster.CheckFileBasedStoragePathVsEnabledDiskTemplates} for details
460
461 @rtype: string
462 @returns: the name of the actual file storage directory
463
464 """
465 assert (file_disk_template in utils.storage.GetDiskTemplatesOfStorageTypes(
466 constants.ST_FILE, constants.ST_SHARED_FILE, constants.ST_GLUSTER
467 ))
468
469 if file_storage_dir is None:
470 file_storage_dir = default_dir
471 if not acceptance_fn:
472 acceptance_fn = \
473 lambda path: filestorage.CheckFileStoragePathAcceptance(
474 path, exact_match_ok=True)
475
476 _storage_path_acceptance_fn(logging.warning, file_storage_dir,
477 enabled_disk_templates)
478
479 file_storage_enabled = file_disk_template in enabled_disk_templates
480 if file_storage_enabled:
481 try:
482 acceptance_fn(file_storage_dir)
483 except errors.FileStoragePathError as e:
484 raise errors.OpPrereqError(str(e))
485 result_file_storage_dir = init_fn(file_storage_dir)
486 else:
487 result_file_storage_dir = file_storage_dir
488 return result_file_storage_dir
489
490
491 def _PrepareFileStorage(
492 enabled_disk_templates, file_storage_dir, init_fn=_InitFileStorageDir,
493 acceptance_fn=None):
494 """Checks if file storage is enabled and inits the dir.
495
496 @see: C{_PrepareFileBasedStorage}
497
498 """
499 return _PrepareFileBasedStorage(
500 enabled_disk_templates, file_storage_dir,
501 pathutils.DEFAULT_FILE_STORAGE_DIR, constants.DT_FILE,
502 cluster.CheckFileStoragePathVsEnabledDiskTemplates,
503 init_fn=init_fn, acceptance_fn=acceptance_fn)
504
505
506 def _PrepareSharedFileStorage(
507 enabled_disk_templates, file_storage_dir, init_fn=_InitFileStorageDir,
508 acceptance_fn=None):
509 """Checks if shared file storage is enabled and inits the dir.
510
511 @see: C{_PrepareFileBasedStorage}
512
513 """
514 return _PrepareFileBasedStorage(
515 enabled_disk_templates, file_storage_dir,
516 pathutils.DEFAULT_SHARED_FILE_STORAGE_DIR, constants.DT_SHARED_FILE,
517 cluster.CheckSharedFileStoragePathVsEnabledDiskTemplates,
518 init_fn=init_fn, acceptance_fn=acceptance_fn)
519
520
521 def _PrepareGlusterStorage(
522 enabled_disk_templates, file_storage_dir, init_fn=_InitFileStorageDir,
523 acceptance_fn=None):
524 """Checks if gluster storage is enabled and inits the dir.
525
526 @see: C{_PrepareFileBasedStorage}
527
528 """
529 return _PrepareFileBasedStorage(
530 enabled_disk_templates, file_storage_dir,
531 pathutils.DEFAULT_GLUSTER_STORAGE_DIR, constants.DT_GLUSTER,
532 cluster.CheckGlusterStoragePathVsEnabledDiskTemplates,
533 init_fn=init_fn, acceptance_fn=acceptance_fn)
534
535
536 def _InitCheckEnabledDiskTemplates(enabled_disk_templates):
537 """Checks the sanity of the enabled disk templates.
538
539 """
540 if not enabled_disk_templates:
541 raise errors.OpPrereqError("Enabled disk templates list must contain at"
542 " least one member", errors.ECODE_INVAL)
543 invalid_disk_templates = \
544 set(enabled_disk_templates) - constants.DISK_TEMPLATES
545 if invalid_disk_templates:
546 raise errors.OpPrereqError("Enabled disk templates list contains invalid"
547 " entries: %s" % invalid_disk_templates,
548 errors.ECODE_INVAL)
549
550
551 def _RestrictIpolicyToEnabledDiskTemplates(ipolicy, enabled_disk_templates):
552 """Restricts the ipolicy's disk templates to the enabled ones.
553
554 This function clears the ipolicy's list of allowed disk templates from the
555 ones that are not enabled by the cluster.
556
557 @type ipolicy: dict
558 @param ipolicy: the instance policy
559 @type enabled_disk_templates: list of string
560 @param enabled_disk_templates: the list of cluster-wide enabled disk
561 templates
562
563 """
564 assert constants.IPOLICY_DTS in ipolicy
565 allowed_disk_templates = ipolicy[constants.IPOLICY_DTS]
566 restricted_disk_templates = list(set(allowed_disk_templates)
567 .intersection(set(enabled_disk_templates)))
568 ipolicy[constants.IPOLICY_DTS] = restricted_disk_templates
569
570
571 def _InitCheckDrbdHelper(drbd_helper, drbd_enabled):
572 """Checks the DRBD usermode helper.
573
574 @type drbd_helper: string
575 @param drbd_helper: name of the DRBD usermode helper that the system should
576 use
577
578 """
579 if not drbd_enabled:
580 return
581
582 if drbd_helper is not None:
583 try:
584 curr_helper = drbd.DRBD8.GetUsermodeHelper()
585 except errors.BlockDeviceError, err:
586 raise errors.OpPrereqError("Error while checking drbd helper"
587 " (disable drbd with --enabled-disk-templates"
588 " if you are not using drbd): %s" % str(err),
589 errors.ECODE_ENVIRON)
590 if drbd_helper != curr_helper:
591 raise errors.OpPrereqError("Error: requiring %s as drbd helper but %s"
592 " is the current helper" % (drbd_helper,
593 curr_helper),
594 errors.ECODE_INVAL)
595
596
597 def InitCluster(cluster_name, mac_prefix, # pylint: disable=R0913, R0914
598 master_netmask, master_netdev, file_storage_dir,
599 shared_file_storage_dir, gluster_storage_dir,
600 candidate_pool_size, secondary_ip=None,
601 vg_name=None, beparams=None, nicparams=None, ndparams=None,
602 hvparams=None, diskparams=None, enabled_hypervisors=None,
603 modify_etc_hosts=True, modify_ssh_setup=True,
604 maintain_node_health=False, drbd_helper=None, uid_pool=None,
605 default_iallocator=None, default_iallocator_params=None,
606 primary_ip_version=None, ipolicy=None,
607 prealloc_wipe_disks=False, use_external_mip_script=False,
608 hv_state=None, disk_state=None, enabled_disk_templates=None,
609 install_image=None, zeroing_image=None, compression_tools=None,
610 enabled_user_shutdown=False):
611 """Initialise the cluster.
612
613 @type candidate_pool_size: int
614 @param candidate_pool_size: master candidate pool size
615
616 @type enabled_disk_templates: list of string
617 @param enabled_disk_templates: list of disk_templates to be used in this
618 cluster
619
620 @type enabled_user_shutdown: bool
621 @param enabled_user_shutdown: whether user shutdown is enabled cluster
622 wide
623
624 """
625 # TODO: complete the docstring
626 if config.ConfigWriter.IsCluster():
627 raise errors.OpPrereqError("Cluster is already initialised",
628 errors.ECODE_STATE)
629
630 data_dir = vcluster.AddNodePrefix(pathutils.DATA_DIR)
631 queue_dir = vcluster.AddNodePrefix(pathutils.QUEUE_DIR)
632 archive_dir = vcluster.AddNodePrefix(pathutils.JOB_QUEUE_ARCHIVE_DIR)
633 for ddir in [queue_dir, data_dir, archive_dir]:
634 if os.path.isdir(ddir):
635 for entry in os.listdir(ddir):
636 if not os.path.isdir(os.path.join(ddir, entry)):
637 raise errors.OpPrereqError(
638 "%s contains non-directory enries like %s. Remove left-overs of an"
639 " old cluster before initialising a new one" % (ddir, entry),
640 errors.ECODE_STATE)
641
642 if not enabled_hypervisors:
643 raise errors.OpPrereqError("Enabled hypervisors list must contain at"
644 " least one member", errors.ECODE_INVAL)
645 invalid_hvs = set(enabled_hypervisors) - constants.HYPER_TYPES
646 if invalid_hvs:
647 raise errors.OpPrereqError("Enabled hypervisors contains invalid"
648 " entries: %s" % invalid_hvs,
649 errors.ECODE_INVAL)
650
651 _InitCheckEnabledDiskTemplates(enabled_disk_templates)
652
653 try:
654 ipcls = netutils.IPAddress.GetClassFromIpVersion(primary_ip_version)
655 except errors.ProgrammerError:
656 raise errors.OpPrereqError("Invalid primary ip version: %d." %
657 primary_ip_version, errors.ECODE_INVAL)
658
659 hostname = netutils.GetHostname(family=ipcls.family)
660 if not ipcls.IsValid(hostname.ip):
661 raise errors.OpPrereqError("This host's IP (%s) is not a valid IPv%d"
662 " address." % (hostname.ip, primary_ip_version),
663 errors.ECODE_INVAL)
664
665 if ipcls.IsLoopback(hostname.ip):
666 raise errors.OpPrereqError("This host's IP (%s) resolves to a loopback"
667 " address. Please fix DNS or %s." %
668 (hostname.ip, pathutils.ETC_HOSTS),
669 errors.ECODE_ENVIRON)
670
671 if not ipcls.Own(hostname.ip):
672 raise errors.OpPrereqError("Inconsistency: this host's name resolves"
673 " to %s,\nbut this ip address does not"
674 " belong to this host" %
675 hostname.ip, errors.ECODE_ENVIRON)
676
677 clustername = netutils.GetHostname(name=cluster_name, family=ipcls.family)
678
679 if netutils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT, timeout=5):
680 raise errors.OpPrereqError("Cluster IP already active",
681 errors.ECODE_NOTUNIQUE)
682
683 if not secondary_ip:
684 if primary_ip_version == constants.IP6_VERSION:
685 raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
686 " IPv4 address must be given as secondary",
687 errors.ECODE_INVAL)
688 secondary_ip = hostname.ip
689
690 if not netutils.IP4Address.IsValid(secondary_ip):
691 raise errors.OpPrereqError("Secondary IP address (%s) has to be a valid"
692 " IPv4 address." % secondary_ip,
693 errors.ECODE_INVAL)
694
695 if not netutils.IP4Address.Own(secondary_ip):
696 raise errors.OpPrereqError("You gave %s as secondary IP,"
697 " but it does not belong to this host." %
698 secondary_ip, errors.ECODE_ENVIRON)
699
700 if master_netmask is not None:
701 if not ipcls.ValidateNetmask(master_netmask):
702 raise errors.OpPrereqError("CIDR netmask (%s) not valid for IPv%s " %
703 (master_netmask, primary_ip_version),
704 errors.ECODE_INVAL)
705 else:
706 master_netmask = ipcls.iplen
707
708 if vg_name:
709 # Check if volume group is valid
710 vgstatus = utils.CheckVolumeGroupSize(utils.ListVolumeGroups(), vg_name,
711 constants.MIN_VG_SIZE)
712 if vgstatus:
713 raise errors.OpPrereqError("Error: %s" % vgstatus, errors.ECODE_INVAL)
714
715 drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
716 _InitCheckDrbdHelper(drbd_helper, drbd_enabled)
717
718 logging.debug("Stopping daemons (if any are running)")
719 result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-all"])
720 if result.failed:
721 raise errors.OpExecError("Could not stop daemons, command %s"
722 " had exitcode %s and error '%s'" %
723 (result.cmd, result.exit_code, result.output))
724
725 file_storage_dir = _PrepareFileStorage(enabled_disk_templates,
726 file_storage_dir)
727 shared_file_storage_dir = _PrepareSharedFileStorage(enabled_disk_templates,
728 shared_file_storage_dir)
729 gluster_storage_dir = _PrepareGlusterStorage(enabled_disk_templates,
730 gluster_storage_dir)
731
732 if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$", mac_prefix):
733 raise errors.OpPrereqError("Invalid mac prefix given '%s'" % mac_prefix,
734 errors.ECODE_INVAL)
735
736 if not nicparams.get('mode', None) == constants.NIC_MODE_OVS:
737 # Do not do this check if mode=openvswitch, since the openvswitch is not
738 # created yet
739 result = utils.RunCmd(["ip", "link", "show", "dev", master_netdev])
740 if result.failed:
741 raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
742 (master_netdev,
743 result.output.strip()), errors.ECODE_INVAL)
744
745 dirs = [(pathutils.RUN_DIR, constants.RUN_DIRS_MODE)]
746 utils.EnsureDirs(dirs)
747
748 objects.UpgradeBeParams(beparams)
749 utils.ForceDictType(beparams, constants.BES_PARAMETER_TYPES)
750 utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
751
752 objects.NIC.CheckParameterSyntax(nicparams)
753
754 full_ipolicy = objects.FillIPolicy(constants.IPOLICY_DEFAULTS, ipolicy)
755 _RestrictIpolicyToEnabledDiskTemplates(full_ipolicy, enabled_disk_templates)
756
757 if ndparams is not None:
758 utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
759 else:
760 ndparams = dict(constants.NDC_DEFAULTS)
761
762 # This is ugly, as we modify the dict itself
763 # FIXME: Make utils.ForceDictType pure functional or write a wrapper
764 # around it
765 if hv_state:
766 for hvname, hvs_data in hv_state.items():
767 utils.ForceDictType(hvs_data, constants.HVSTS_PARAMETER_TYPES)
768 hv_state[hvname] = objects.Cluster.SimpleFillHvState(hvs_data)
769 else:
770 hv_state = dict((hvname, constants.HVST_DEFAULTS)
771 for hvname in enabled_hypervisors)
772
773 # FIXME: disk_state has no default values yet
774 if disk_state:
775 for storage, ds_data in disk_state.items():
776 if storage not in constants.DS_VALID_TYPES:
777 raise errors.OpPrereqError("Invalid storage type in disk state: %s" %
778 storage, errors.ECODE_INVAL)
779 for ds_name, state in ds_data.items():
780 utils.ForceDictType(state, constants.DSS_PARAMETER_TYPES)
781 ds_data[ds_name] = objects.Cluster.SimpleFillDiskState(state)
782
783 # hvparams is a mapping of hypervisor->hvparams dict
784 for hv_name, hv_params in hvparams.iteritems():
785 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
786 hv_class = hypervisor.GetHypervisor(hv_name)
787 hv_class.CheckParameterSyntax(hv_params)
788
789 # diskparams is a mapping of disk-template->diskparams dict
790 for template, dt_params in diskparams.items():
791 param_keys = set(dt_params.keys())
792 default_param_keys = set(constants.DISK_DT_DEFAULTS[template].keys())
793 if not (param_keys <= default_param_keys):
794 unknown_params = param_keys - default_param_keys
795 raise errors.OpPrereqError("Invalid parameters for disk template %s:"
796 " %s" % (template,
797 utils.CommaJoin(unknown_params)),
798 errors.ECODE_INVAL)
799 utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
800 if template == constants.DT_DRBD8 and vg_name is not None:
801 # The default METAVG value is equal to the VG name set at init time,
802 # if provided
803 dt_params[constants.DRBD_DEFAULT_METAVG] = vg_name
804
805 try:
806 utils.VerifyDictOptions(diskparams, constants.DISK_DT_DEFAULTS)
807 except errors.OpPrereqError, err:
808 raise errors.OpPrereqError("While verify diskparam options: %s" % err,
809 errors.ECODE_INVAL)
810
811 # set up ssh config and /etc/hosts
812 rsa_sshkey = ""
813 dsa_sshkey = ""
814 if os.path.isfile(pathutils.SSH_HOST_RSA_PUB):
815 sshline = utils.ReadFile(pathutils.SSH_HOST_RSA_PUB)
816 rsa_sshkey = sshline.split(" ")[1]
817 if os.path.isfile(pathutils.SSH_HOST_DSA_PUB):
818 sshline = utils.ReadFile(pathutils.SSH_HOST_DSA_PUB)
819 dsa_sshkey = sshline.split(" ")[1]
820 if not rsa_sshkey and not dsa_sshkey:
821 raise errors.OpPrereqError("Failed to find SSH public keys",
822 errors.ECODE_ENVIRON)
823
824 if modify_etc_hosts:
825 utils.AddHostToEtcHosts(hostname.name, hostname.ip)
826
827 if modify_ssh_setup:
828 _InitSSHSetup()
829
830 if default_iallocator is not None:
831 alloc_script = utils.FindFile(default_iallocator,
832 constants.IALLOCATOR_SEARCH_PATH,
833 os.path.isfile)
834 if alloc_script is None:
835 raise errors.OpPrereqError("Invalid default iallocator script '%s'"
836 " specified" % default_iallocator,
837 errors.ECODE_INVAL)
838 else:
839 # default to htools
840 if utils.FindFile(constants.IALLOC_HAIL,
841 constants.IALLOCATOR_SEARCH_PATH,
842 os.path.isfile):
843 default_iallocator = constants.IALLOC_HAIL
844
845 # check if we have all the users we need
846 try:
847 runtime.GetEnts()
848 except errors.ConfigurationError, err:
849 raise errors.OpPrereqError("Required system user/group missing: %s" %
850 err, errors.ECODE_ENVIRON)
851
852 candidate_certs = {}
853
854 now = time.time()
855
856 if compression_tools is not None:
857 cluster.CheckCompressionTools(compression_tools)
858
859 # init of cluster config file
860 cluster_config = objects.Cluster(
861 serial_no=1,
862 rsahostkeypub=rsa_sshkey,
863 dsahostkeypub=dsa_sshkey,
864 highest_used_port=(constants.FIRST_DRBD_PORT - 1),
865 mac_prefix=mac_prefix,
866 volume_group_name=vg_name,
867 tcpudp_port_pool=set(),
868 master_ip=clustername.ip,
869 master_netmask=master_netmask,
870 master_netdev=master_netdev,
871 cluster_name=clustername.name,
872 file_storage_dir=file_storage_dir,
873 shared_file_storage_dir=shared_file_storage_dir,
874 gluster_storage_dir=gluster_storage_dir,
875 enabled_hypervisors=enabled_hypervisors,
876 beparams={constants.PP_DEFAULT: beparams},
877 nicparams={constants.PP_DEFAULT: nicparams},
878 ndparams=ndparams,
879 hvparams=hvparams,
880 diskparams=diskparams,
881 candidate_pool_size=candidate_pool_size,
882 modify_etc_hosts=modify_etc_hosts,
883 modify_ssh_setup=modify_ssh_setup,
884 uid_pool=uid_pool,
885 ctime=now,
886 mtime=now,
887 maintain_node_health=maintain_node_health,
888 drbd_usermode_helper=drbd_helper,
889 default_iallocator=default_iallocator,
890 default_iallocator_params=default_iallocator_params,
891 primary_ip_family=ipcls.family,
892 prealloc_wipe_disks=prealloc_wipe_disks,
893 use_external_mip_script=use_external_mip_script,
894 ipolicy=full_ipolicy,
895 hv_state_static=hv_state,
896 disk_state_static=disk_state,
897 enabled_disk_templates=enabled_disk_templates,
898 candidate_certs=candidate_certs,
899 osparams={},
900 osparams_private_cluster={},
901 install_image=install_image,
902 zeroing_image=zeroing_image,
903 compression_tools=compression_tools,
904 enabled_user_shutdown=enabled_user_shutdown,
905 )
906 master_node_config = objects.Node(name=hostname.name,
907 primary_ip=hostname.ip,
908 secondary_ip=secondary_ip,
909 serial_no=1,
910 master_candidate=True,
911 offline=False, drained=False,
912 ctime=now, mtime=now,
913 )
914 InitConfig(constants.CONFIG_VERSION, cluster_config, master_node_config)
915 cfg = config.ConfigWriter(offline=True)
916 ssh.WriteKnownHostsFile(cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
917 cfg.Update(cfg.GetClusterInfo(), logging.error)
918 ssconf.WriteSsconfFiles(cfg.GetSsconfValues())
919
920 # set up the inter-node password and certificate
921 _InitGanetiServerSetup(hostname.name, cfg)
922
923 logging.debug("Starting daemons")
924 result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-all"])
925 if result.failed:
926 raise errors.OpExecError("Could not start daemons, command %s"
927 " had exitcode %s and error %s" %
928 (result.cmd, result.exit_code, result.output))
929
930 _WaitForMasterDaemon()
931
932
933 def InitConfig(version, cluster_config, master_node_config,
934 cfg_file=pathutils.CLUSTER_CONF_FILE):
935 """Create the initial cluster configuration.
936
937 It will contain the current node, which will also be the master
938 node, and no instances.
939
940 @type version: int
941 @param version: configuration version
942 @type cluster_config: L{objects.Cluster}
943 @param cluster_config: cluster configuration
944 @type master_node_config: L{objects.Node}
945 @param master_node_config: master node configuration
946 @type cfg_file: string
947 @param cfg_file: configuration file path
948
949 """
950 uuid_generator = config.TemporaryReservationManager()
951 cluster_config.uuid = uuid_generator.Generate([], utils.NewUUID,
952 _INITCONF_ECID)
953 master_node_config.uuid = uuid_generator.Generate([], utils.NewUUID,
954 _INITCONF_ECID)
955 cluster_config.master_node = master_node_config.uuid
956 nodes = {
957 master_node_config.uuid: master_node_config,
958 }
959 default_nodegroup = objects.NodeGroup(
960 uuid=uuid_generator.Generate([], utils.NewUUID, _INITCONF_ECID),
961 name=constants.INITIAL_NODE_GROUP_NAME,
962 members=[master_node_config.uuid],
963 diskparams={},
964 )
965 nodegroups = {
966 default_nodegroup.uuid: default_nodegroup,
967 }
968 now = time.time()
969 config_data = objects.ConfigData(version=version,
970 cluster=cluster_config,
971 nodegroups=nodegroups,
972 nodes=nodes,
973 instances={},
974 networks={},
975 disks={},
976 serial_no=1,
977 ctime=now, mtime=now)
978 utils.WriteFile(cfg_file,
979 data=serializer.Dump(config_data.ToDict()),
980 mode=0600)
981
982
983 def FinalizeClusterDestroy(master_uuid):
984 """Execute the last steps of cluster destroy
985
986 This function shuts down all the daemons, completing the destroy
987 begun in cmdlib.LUDestroyOpcode.
988
989 """
990 livelock = utils.livelock.LiveLock("bootstrap_destroy")
991 cfg = config.GetConfig(None, livelock)
992 modify_ssh_setup = cfg.GetClusterInfo().modify_ssh_setup
993 runner = rpc.BootstrapRunner()
994
995 master_name = cfg.GetNodeName(master_uuid)
996
997 master_params = cfg.GetMasterNetworkParameters()
998 master_params.uuid = master_uuid
999 ems = cfg.GetUseExternalMipScript()
1000 result = runner.call_node_deactivate_master_ip(master_name, master_params,
1001 ems)
1002
1003 msg = result.fail_msg
1004 if msg:
1005 logging.warning("Could not disable the master IP: %s", msg)
1006
1007 result = runner.call_node_stop_master(master_name)
1008 msg = result.fail_msg
1009 if msg:
1010 logging.warning("Could not disable the master role: %s", msg)
1011
1012 result = runner.call_node_leave_cluster(master_name, modify_ssh_setup)
1013 msg = result.fail_msg
1014 if msg:
1015 logging.warning("Could not shutdown the node daemon and cleanup"
1016 " the node: %s", msg)
1017
1018
1019 def SetupNodeDaemon(opts, cluster_name, node, ssh_port):
1020 """Add a node to the cluster.
1021
1022 This function must be called before the actual opcode, and will ssh
1023 to the remote node, copy the needed files, and start ganeti-noded,
1024 allowing the master to do the rest via normal rpc calls.
1025
1026 @param cluster_name: the cluster name
1027 @param node: the name of the new node
1028 @param ssh_port: the SSH port of the new node
1029
1030 """
1031 data = {
1032 constants.NDS_CLUSTER_NAME: cluster_name,
1033 constants.NDS_NODE_DAEMON_CERTIFICATE:
1034 utils.ReadFile(pathutils.NODED_CERT_FILE),
1035 constants.NDS_SSCONF: ssconf.SimpleStore().ReadAll(),
1036 constants.NDS_START_NODE_DAEMON: True,
1037 constants.NDS_NODE_NAME: node,
1038 }
1039
1040 RunNodeSetupCmd(cluster_name, node, pathutils.NODE_DAEMON_SETUP,
1041 opts.debug, opts.verbose,
1042 True, opts.ssh_key_check, opts.ssh_key_check,
1043 ssh_port, data)
1044
1045 _WaitForNodeDaemon(node)
1046
1047
1048 def MasterFailover(no_voting=False):
1049 """Failover the master node.
1050
1051 This checks that we are not already the master, and will cause the
1052 current master to cease being master, and the non-master to become
1053 new master.
1054
1055 @type no_voting: boolean
1056 @param no_voting: force the operation without remote nodes agreement
1057 (dangerous)
1058
1059 @returns: the pair of an exit code and warnings to display
1060 """
1061 sstore = ssconf.SimpleStore()
1062
1063 old_master, new_master = ssconf.GetMasterAndMyself(sstore)
1064 node_names = sstore.GetNodeList()
1065 mc_list = sstore.GetMasterCandidates()
1066
1067 if old_master == new_master:
1068 raise errors.OpPrereqError("This commands must be run on the node"
1069 " where you want the new master to be."
1070 " %s is already the master" %
1071 old_master, errors.ECODE_INVAL)
1072
1073 if new_master not in mc_list:
1074 mc_no_master = [name for name in mc_list if name != old_master]
1075 raise errors.OpPrereqError("This node is not among the nodes marked"
1076 " as master candidates. Only these nodes"
1077 " can become masters. Current list of"
1078 " master candidates is:\n"
1079 "%s" % ("\n".join(mc_no_master)),
1080 errors.ECODE_STATE)
1081
1082 if not no_voting:
1083 vote_list = GatherMasterVotes(node_names)
1084
1085 if vote_list:
1086 voted_master = vote_list[0][0]
1087 if voted_master is None:
1088 raise errors.OpPrereqError("Cluster is inconsistent, most nodes did"
1089 " not respond.", errors.ECODE_ENVIRON)
1090 elif voted_master != old_master:
1091 raise errors.OpPrereqError("I have a wrong configuration, I believe"
1092 " the master is %s but the other nodes"
1093 " voted %s. Please resync the configuration"
1094 " of this node." %
1095 (old_master, voted_master),
1096 errors.ECODE_STATE)
1097 # end checks
1098
1099 rcode = 0
1100 warnings = []
1101
1102 logging.info("Setting master to %s, old master: %s", new_master, old_master)
1103
1104 try:
1105 # Forcefully start WConfd so that we can access the configuration
1106 result = utils.RunCmd([pathutils.DAEMON_UTIL,
1107 "start", constants.WCONFD, "--force-node",
1108 "--no-voting", "--yes-do-it"])
1109 if result.failed:
1110 raise errors.OpPrereqError("Could not start the configuration daemon,"
1111 " command %s had exitcode %s and error %s" %
1112 (result.cmd, result.exit_code, result.output),
1113 errors.ECODE_NOENT)
1114
1115 # instantiate a real config writer, as we now know we have the
1116 # configuration data
1117 livelock = utils.livelock.LiveLock("bootstrap_failover")
1118 cfg = config.GetConfig(None, livelock, accept_foreign=True)
1119
1120 old_master_node = cfg.GetNodeInfoByName(old_master)
1121 if old_master_node is None:
1122 raise errors.OpPrereqError("Could not find old master node '%s' in"
1123 " cluster configuration." % old_master,
1124 errors.ECODE_NOENT)
1125
1126 cluster_info = cfg.GetClusterInfo()
1127 new_master_node = cfg.GetNodeInfoByName(new_master)
1128 if new_master_node is None:
1129 raise errors.OpPrereqError("Could not find new master node '%s' in"
1130 " cluster configuration." % new_master,
1131 errors.ECODE_NOENT)
1132
1133 cluster_info.master_node = new_master_node.uuid
1134 # this will also regenerate the ssconf files, since we updated the
1135 # cluster info
1136 cfg.Update(cluster_info, logging.error)
1137
1138 # if cfg.Update worked, then it means the old master daemon won't be
1139 # able now to write its own config file (we rely on locking in both
1140 # backend.UploadFile() and ConfigWriter._Write(); hence the next
1141 # step is to kill the old master
1142
1143 logging.info("Stopping the master daemon on node %s", old_master)
1144
1145 runner = rpc.BootstrapRunner()
1146 master_params = cfg.GetMasterNetworkParameters()
1147 master_params.uuid = old_master_node.uuid
1148 ems = cfg.GetUseExternalMipScript()
1149 result = runner.call_node_deactivate_master_ip(old_master,
1150 master_params, ems)
1151
1152 msg = result.fail_msg
1153 if msg:
1154 warning = "Could not disable the master IP: %s" % (msg,)
1155 logging.warning("%s", warning)
1156 warnings.append(warning)
1157
1158 result = runner.call_node_stop_master(old_master)
1159 msg = result.fail_msg
1160 if msg:
1161 warning = ("Could not disable the master role on the old master"
1162 " %s, please disable manually: %s" % (old_master, msg))
1163 logging.error("%s", warning)
1164 warnings.append(warning)
1165 except errors.ConfigurationError, err:
1166 logging.error("Error while trying to set the new master: %s",
1167 str(err))
1168 return 1, warnings
1169 finally:
1170 # stop WConfd again:
1171 result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop", constants.WCONFD])
1172 if result.failed:
1173 warning = ("Could not stop the configuration daemon,"
1174 " command %s had exitcode %s and error %s"
1175 % (result.cmd, result.exit_code, result.output))
1176 logging.error("%s", warning)
1177 rcode = 1
1178
1179 logging.info("Checking master IP non-reachability...")
1180
1181 master_ip = sstore.GetMasterIP()
1182 total_timeout = 30
1183
1184 # Here we have a phase where no master should be running
1185 def _check_ip(expected):
1186 if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT) != expected:
1187 raise utils.RetryAgain()
1188
1189 try:
1190 utils.Retry(_check_ip, (1, 1.5, 5), total_timeout, args=[False])
1191 except utils.RetryTimeout:
1192 warning = ("The master IP is still reachable after %s seconds,"
1193 " continuing but activating the master IP on the current"
1194 " node will probably fail" % total_timeout)
1195 logging.warning("%s", warning)
1196 warnings.append(warning)
1197 rcode = 1
1198
1199 if jstore.CheckDrainFlag():
1200 logging.info("Undraining job queue")
1201 jstore.SetDrainFlag(False)
1202
1203 logging.info("Starting the master daemons on the new master")
1204
1205 result = rpc.BootstrapRunner().call_node_start_master_daemons(new_master,
1206 no_voting)
1207 msg = result.fail_msg
1208 if msg:
1209 logging.error("Could not start the master role on the new master"
1210 " %s, please check: %s", new_master, msg)
1211 rcode = 1
1212
1213 # Finally verify that the new master managed to set up the master IP
1214 # and warn if it didn't.
1215 try:
1216 utils.Retry(_check_ip, (1, 1.5, 5), total_timeout, args=[True])
1217 except utils.RetryTimeout:
1218 warning = ("The master IP did not come up within %s seconds; the"
1219 " cluster should still be working and reachable via %s,"
1220 " but not via the master IP address"
1221 % (total_timeout, new_master))
1222 logging.warning("%s", warning)
1223 warnings.append(warning)
1224 rcode = 1
1225
1226 logging.info("Master failed over from %s to %s", old_master, new_master)
1227 return rcode, warnings
1228
1229
1230 def GetMaster():
1231 """Returns the current master node.
1232
1233 This is a separate function in bootstrap since it's needed by
1234 gnt-cluster, and instead of importing directly ssconf, it's better
1235 to abstract it in bootstrap, where we do use ssconf in other
1236 functions too.
1237
1238 """
1239 sstore = ssconf.SimpleStore()
1240
1241 old_master, _ = ssconf.GetMasterAndMyself(sstore)
1242
1243 return old_master
1244
1245
1246 def GatherMasterVotes(node_names):
1247 """Check the agreement on who is the master.
1248
1249 This function will return a list of (node, number of votes), ordered
1250 by the number of votes. Errors will be denoted by the key 'None'.
1251
1252 Note that the sum of votes is the number of nodes this machine
1253 knows, whereas the number of entries in the list could be different
1254 (if some nodes vote for another master).
1255
1256 @type node_names: list
1257 @param node_names: the list of nodes to query for master info
1258 @rtype: list
1259 @return: list of (node, votes)
1260
1261 """
1262 if not node_names:
1263 # no nodes
1264 return []
1265 results = rpc.BootstrapRunner().call_master_node_name(node_names)
1266 if not isinstance(results, dict):
1267 # this should not happen (unless internal error in rpc)
1268 logging.critical("Can't complete rpc call, aborting master startup")
1269 return [(None, len(node_names))]
1270 votes = {}
1271 for node_name in results:
1272 nres = results[node_name]
1273 msg = nres.fail_msg
1274
1275 if msg:
1276 logging.warning("Error contacting node %s: %s", node_name, msg)
1277 node = None
1278 else:
1279 node = nres.payload
1280
1281 if node not in votes:
1282 votes[node] = 1
1283 else:
1284 votes[node] += 1
1285
1286 vote_list = [v for v in votes.items()]
1287 # sort first on number of votes then on name, since we want None
1288 # sorted later if we have the half of the nodes not responding, and
1289 # half voting all for the same master
1290 vote_list.sort(key=lambda x: (x[1], x[0]), reverse=True)
1291
1292 return vote_list
1293
1294
1295 def MajorityHealthy():
1296 """Check if the majority of nodes is healthy
1297
1298 Gather master votes from all nodes known to this node;
1299 return True if a strict majority of nodes is reachable and
1300 has some opinion on which node is master. Note that this will
1301 not guarantee any node to win an election but it ensures that
1302 a standard master-failover is still possible.
1303
1304 """
1305 node_names = ssconf.SimpleStore().GetNodeList()
1306 node_count = len(node_names)
1307 vote_list = GatherMasterVotes(node_names)
1308 if vote_list is None:
1309 return False
1310 total_votes = sum([count for (node, count) in vote_list if node is not None])
1311 logging.info("Total %d nodes, %d votes: %s", node_count, total_votes,
1312 vote_list)
1313 return 2 * total_votes > node_count