Init: add master client certificate to configuration
[ganeti-github.git] / lib / bootstrap.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2010, 2011, 2012 Google Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions are
9 # met:
10 #
11 # 1. Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
13 #
14 # 2. Redistributions in binary form must reproduce the above copyright
15 # notice, this list of conditions and the following disclaimer in the
16 # documentation and/or other materials provided with the distribution.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
19 # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
20 # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30
31 """Functions to bootstrap a new cluster.
32
33 """
34
35 import os
36 import os.path
37 import re
38 import logging
39 import time
40 import tempfile
41
42 from ganeti.cmdlib import cluster
43 import ganeti.rpc.node as rpc
44 from ganeti import ssh
45 from ganeti import utils
46 from ganeti import errors
47 from ganeti import config
48 from ganeti import constants
49 from ganeti import objects
50 from ganeti import ssconf
51 from ganeti import serializer
52 from ganeti import hypervisor
53 from ganeti.storage import drbd
54 from ganeti.storage import filestorage
55 from ganeti import netutils
56 from ganeti import luxi
57 from ganeti import jstore
58 from ganeti import pathutils
59 from ganeti import runtime
60 from ganeti import vcluster
61
62
63 # ec_id for InitConfig's temporary reservation manager
64 _INITCONF_ECID = "initconfig-ecid"
65
66 #: After how many seconds daemon must be responsive
67 _DAEMON_READY_TIMEOUT = 10.0
68
69
70 def _InitSSHSetup():
71 """Setup the SSH configuration for the cluster.
72
73 This generates a dsa keypair for root, adds the pub key to the
74 permitted hosts and adds the hostkey to its own known hosts.
75
76 """
77 priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.SSH_LOGIN_USER)
78
79 for name in priv_key, pub_key:
80 if os.path.exists(name):
81 utils.CreateBackup(name)
82 utils.RemoveFile(name)
83
84 result = utils.RunCmd(["ssh-keygen", "-t", "dsa",
85 "-f", priv_key,
86 "-q", "-N", ""])
87 if result.failed:
88 raise errors.OpExecError("Could not generate ssh keypair, error %s" %
89 result.output)
90
91 utils.AddAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
92
93
94 def GenerateHmacKey(file_name):
95 """Writes a new HMAC key.
96
97 @type file_name: str
98 @param file_name: Path to output file
99
100 """
101 utils.WriteFile(file_name, data="%s\n" % utils.GenerateSecret(), mode=0400,
102 backup=True)
103
104
105 # pylint: disable=R0913
106 def GenerateClusterCrypto(new_cluster_cert, new_rapi_cert, new_spice_cert,
107 new_confd_hmac_key, new_cds, new_client_cert,
108 master_name,
109 rapi_cert_pem=None, spice_cert_pem=None,
110 spice_cacert_pem=None, cds=None,
111 nodecert_file=pathutils.NODED_CERT_FILE,
112 clientcert_file=pathutils.NODED_CLIENT_CERT_FILE,
113 rapicert_file=pathutils.RAPI_CERT_FILE,
114 spicecert_file=pathutils.SPICE_CERT_FILE,
115 spicecacert_file=pathutils.SPICE_CACERT_FILE,
116 hmackey_file=pathutils.CONFD_HMAC_KEY,
117 cds_file=pathutils.CLUSTER_DOMAIN_SECRET_FILE):
118 """Updates the cluster certificates, keys and secrets.
119
120 @type new_cluster_cert: bool
121 @param new_cluster_cert: Whether to generate a new cluster certificate
122 @type new_rapi_cert: bool
123 @param new_rapi_cert: Whether to generate a new RAPI certificate
124 @type new_spice_cert: bool
125 @param new_spice_cert: Whether to generate a new SPICE certificate
126 @type new_confd_hmac_key: bool
127 @param new_confd_hmac_key: Whether to generate a new HMAC key
128 @type new_cds: bool
129 @param new_cds: Whether to generate a new cluster domain secret
130 @type new_client_cert: bool
131 @param new_client_cert: Whether to generate a new client certificate
132 @type master_name: string
133 @param master_name: FQDN of the master node
134 @type rapi_cert_pem: string
135 @param rapi_cert_pem: New RAPI certificate in PEM format
136 @type spice_cert_pem: string
137 @param spice_cert_pem: New SPICE certificate in PEM format
138 @type spice_cacert_pem: string
139 @param spice_cacert_pem: Certificate of the CA that signed the SPICE
140 certificate, in PEM format
141 @type cds: string
142 @param cds: New cluster domain secret
143 @type nodecert_file: string
144 @param nodecert_file: optional override of the node cert file path
145 @type rapicert_file: string
146 @param rapicert_file: optional override of the rapi cert file path
147 @type spicecert_file: string
148 @param spicecert_file: optional override of the spice cert file path
149 @type spicecacert_file: string
150 @param spicecacert_file: optional override of the spice CA cert file path
151 @type hmackey_file: string
152 @param hmackey_file: optional override of the hmac key file path
153
154 """
155 # pylint: disable=R0913
156 # noded SSL certificate
157 utils.GenerateNewSslCert(
158 new_cluster_cert, nodecert_file, 1,
159 "Generating new cluster certificate at %s" % nodecert_file)
160
161 # If the cluster certificate was renewed, the client cert has to be
162 # renewed and resigned.
163 if new_cluster_cert or new_client_cert:
164 utils.GenerateNewClientSslCert(clientcert_file, nodecert_file,
165 master_name)
166
167 # confd HMAC key
168 if new_confd_hmac_key or not os.path.exists(hmackey_file):
169 logging.debug("Writing new confd HMAC key to %s", hmackey_file)
170 GenerateHmacKey(hmackey_file)
171
172 if rapi_cert_pem:
173 # Assume rapi_pem contains a valid PEM-formatted certificate and key
174 logging.debug("Writing RAPI certificate at %s", rapicert_file)
175 utils.WriteFile(rapicert_file, data=rapi_cert_pem, backup=True)
176
177 else:
178 utils.GenerateNewSslCert(
179 new_rapi_cert, rapicert_file, 1,
180 "Generating new RAPI certificate at %s" % rapicert_file)
181
182 # SPICE
183 spice_cert_exists = os.path.exists(spicecert_file)
184 spice_cacert_exists = os.path.exists(spicecacert_file)
185 if spice_cert_pem:
186 # spice_cert_pem implies also spice_cacert_pem
187 logging.debug("Writing SPICE certificate at %s", spicecert_file)
188 utils.WriteFile(spicecert_file, data=spice_cert_pem, backup=True)
189 logging.debug("Writing SPICE CA certificate at %s", spicecacert_file)
190 utils.WriteFile(spicecacert_file, data=spice_cacert_pem, backup=True)
191 elif new_spice_cert or not spice_cert_exists:
192 if spice_cert_exists:
193 utils.CreateBackup(spicecert_file)
194 if spice_cacert_exists:
195 utils.CreateBackup(spicecacert_file)
196
197 logging.debug("Generating new self-signed SPICE certificate at %s",
198 spicecert_file)
199 (_, cert_pem) = utils.GenerateSelfSignedSslCert(spicecert_file, 1)
200
201 # Self-signed certificate -> the public certificate is also the CA public
202 # certificate
203 logging.debug("Writing the public certificate to %s",
204 spicecert_file)
205 utils.io.WriteFile(spicecacert_file, mode=0400, data=cert_pem)
206
207 # Cluster domain secret
208 if cds:
209 logging.debug("Writing cluster domain secret to %s", cds_file)
210 utils.WriteFile(cds_file, data=cds, backup=True)
211
212 elif new_cds or not os.path.exists(cds_file):
213 logging.debug("Generating new cluster domain secret at %s", cds_file)
214 GenerateHmacKey(cds_file)
215
216
217 def _InitGanetiServerSetup(master_name, cfg):
218 """Setup the necessary configuration for the initial node daemon.
219
220 This creates the nodepass file containing the shared password for
221 the cluster, generates the SSL certificate and starts the node daemon.
222
223 @type master_name: str
224 @param master_name: Name of the master node
225 @type cfg: ConfigWriter
226 @param cfg: the configuration writer
227
228 """
229 # Generate cluster secrets
230 GenerateClusterCrypto(True, False, False, False, False, False, master_name)
231
232 # Add the master's SSL certificate digest to the configuration.
233 master_uuid = cfg.GetMasterNode()
234 master_digest = utils.GetCertificateDigest()
235 cfg.AddNodeToCandidateCerts(master_uuid, master_digest)
236 cfg.Update(cfg.GetClusterInfo(), logging.error)
237 ssconf.WriteSsconfFiles(cfg.GetSsconfValues())
238
239 # set up the inter-node password and certificate
240 result = utils.RunCmd([pathutils.DAEMON_UTIL, "start", constants.NODED])
241 if result.failed:
242 raise errors.OpExecError("Could not start the node daemon, command %s"
243 " had exitcode %s and error %s" %
244 (result.cmd, result.exit_code, result.output))
245
246 _WaitForNodeDaemon(master_name)
247
248
249 def _WaitForNodeDaemon(node_name):
250 """Wait for node daemon to become responsive.
251
252 """
253 def _CheckNodeDaemon():
254 # Pylint bug <http://www.logilab.org/ticket/35642>
255 # pylint: disable=E1101
256 result = rpc.BootstrapRunner().call_version([node_name])[node_name]
257 if result.fail_msg:
258 raise utils.RetryAgain()
259
260 try:
261 utils.Retry(_CheckNodeDaemon, 1.0, _DAEMON_READY_TIMEOUT)
262 except utils.RetryTimeout:
263 raise errors.OpExecError("Node daemon on %s didn't answer queries within"
264 " %s seconds" % (node_name, _DAEMON_READY_TIMEOUT))
265
266
267 def _WaitForMasterDaemon():
268 """Wait for master daemon to become responsive.
269
270 """
271 def _CheckMasterDaemon():
272 try:
273 cl = luxi.Client()
274 (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
275 except Exception:
276 raise utils.RetryAgain()
277
278 logging.debug("Received cluster name %s from master", cluster_name)
279
280 try:
281 utils.Retry(_CheckMasterDaemon, 1.0, _DAEMON_READY_TIMEOUT)
282 except utils.RetryTimeout:
283 raise errors.OpExecError("Master daemon didn't answer queries within"
284 " %s seconds" % _DAEMON_READY_TIMEOUT)
285
286
287 def _WaitForSshDaemon(hostname, port):
288 """Wait for SSH daemon to become responsive.
289
290 """
291 family = ssconf.SimpleStore().GetPrimaryIPFamily()
292 hostip = netutils.GetHostname(name=hostname, family=family).ip
293
294 def _CheckSshDaemon():
295 if netutils.TcpPing(hostip, port, timeout=1.0, live_port_needed=True):
296 logging.debug("SSH daemon on %s:%s (IP address %s) has become"
297 " responsive", hostname, port, hostip)
298 else:
299 raise utils.RetryAgain()
300
301 try:
302 utils.Retry(_CheckSshDaemon, 1.0, _DAEMON_READY_TIMEOUT)
303 except utils.RetryTimeout:
304 raise errors.OpExecError("SSH daemon on %s:%s (IP address %s) didn't"
305 " become responsive within %s seconds" %
306 (hostname, port, hostip, _DAEMON_READY_TIMEOUT))
307
308
309 def RunNodeSetupCmd(cluster_name, node, basecmd, debug, verbose,
310 use_cluster_key, ask_key, strict_host_check,
311 port, data):
312 """Runs a command to configure something on a remote machine.
313
314 @type cluster_name: string
315 @param cluster_name: Cluster name
316 @type node: string
317 @param node: Node name
318 @type basecmd: string
319 @param basecmd: Base command (path on the remote machine)
320 @type debug: bool
321 @param debug: Enable debug output
322 @type verbose: bool
323 @param verbose: Enable verbose output
324 @type use_cluster_key: bool
325 @param use_cluster_key: See L{ssh.SshRunner.BuildCmd}
326 @type ask_key: bool
327 @param ask_key: See L{ssh.SshRunner.BuildCmd}
328 @type strict_host_check: bool
329 @param strict_host_check: See L{ssh.SshRunner.BuildCmd}
330 @type port: int
331 @param port: The SSH port of the remote machine or None for the default
332 @param data: JSON-serializable input data for script (passed to stdin)
333
334 """
335 cmd = [basecmd]
336
337 # Pass --debug/--verbose to the external script if set on our invocation
338 if debug:
339 cmd.append("--debug")
340
341 if verbose:
342 cmd.append("--verbose")
343
344 logging.debug("Node setup command: %s", cmd)
345
346 version = constants.DIR_VERSION
347 all_cmds = [["test", "-d", os.path.join(pathutils.PKGLIBDIR, version)]]
348 if constants.HAS_GNU_LN:
349 all_cmds.extend([["ln", "-s", "-f", "-T",
350 os.path.join(pathutils.PKGLIBDIR, version),
351 os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")],
352 ["ln", "-s", "-f", "-T",
353 os.path.join(pathutils.SHAREDIR, version),
354 os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]])
355 else:
356 all_cmds.extend([["rm", "-f",
357 os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")],
358 ["ln", "-s", "-f",
359 os.path.join(pathutils.PKGLIBDIR, version),
360 os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")],
361 ["rm", "-f",
362 os.path.join(pathutils.SYSCONFDIR, "ganeti/share")],
363 ["ln", "-s", "-f",
364 os.path.join(pathutils.SHAREDIR, version),
365 os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]])
366 all_cmds.append(cmd)
367
368 if port is None:
369 port = netutils.GetDaemonPort(constants.SSH)
370
371 srun = ssh.SshRunner(cluster_name)
372 scmd = srun.BuildCmd(node, constants.SSH_LOGIN_USER,
373 utils.ShellQuoteArgs(
374 utils.ShellCombineCommands(all_cmds)),
375 batch=False, ask_key=ask_key, quiet=False,
376 strict_host_check=strict_host_check,
377 use_cluster_key=use_cluster_key,
378 port=port)
379
380 tempfh = tempfile.TemporaryFile()
381 try:
382 tempfh.write(serializer.DumpJson(data))
383 tempfh.seek(0)
384
385 result = utils.RunCmd(scmd, interactive=True, input_fd=tempfh)
386 finally:
387 tempfh.close()
388
389 if result.failed:
390 raise errors.OpExecError("Command '%s' failed: %s" %
391 (result.cmd, result.fail_reason))
392
393 _WaitForSshDaemon(node, port)
394
395
396 def _InitFileStorageDir(file_storage_dir):
397 """Initialize if needed the file storage.
398
399 @param file_storage_dir: the user-supplied value
400 @return: either empty string (if file storage was disabled at build
401 time) or the normalized path to the storage directory
402
403 """
404 file_storage_dir = os.path.normpath(file_storage_dir)
405
406 if not os.path.isabs(file_storage_dir):
407 raise errors.OpPrereqError("File storage directory '%s' is not an absolute"
408 " path" % file_storage_dir, errors.ECODE_INVAL)
409
410 if not os.path.exists(file_storage_dir):
411 try:
412 os.makedirs(file_storage_dir, 0750)
413 except OSError, err:
414 raise errors.OpPrereqError("Cannot create file storage directory"
415 " '%s': %s" % (file_storage_dir, err),
416 errors.ECODE_ENVIRON)
417
418 if not os.path.isdir(file_storage_dir):
419 raise errors.OpPrereqError("The file storage directory '%s' is not"
420 " a directory." % file_storage_dir,
421 errors.ECODE_ENVIRON)
422
423 return file_storage_dir
424
425
426 def _PrepareFileBasedStorage(
427 enabled_disk_templates, file_storage_dir,
428 default_dir, file_disk_template, _storage_path_acceptance_fn,
429 init_fn=_InitFileStorageDir, acceptance_fn=None):
430 """Checks if a file-base storage type is enabled and inits the dir.
431
432 @type enabled_disk_templates: list of string
433 @param enabled_disk_templates: list of enabled disk templates
434 @type file_storage_dir: string
435 @param file_storage_dir: the file storage directory
436 @type default_dir: string
437 @param default_dir: default file storage directory when C{file_storage_dir}
438 is 'None'
439 @type file_disk_template: string
440 @param file_disk_template: a disk template whose storage type is 'ST_FILE',
441 'ST_SHARED_FILE' or 'ST_GLUSTER'
442 @type _storage_path_acceptance_fn: function
443 @param _storage_path_acceptance_fn: checks whether the given file-based
444 storage directory is acceptable
445 @see: C{cluster.CheckFileBasedStoragePathVsEnabledDiskTemplates} for details
446
447 @rtype: string
448 @returns: the name of the actual file storage directory
449
450 """
451 assert (file_disk_template in utils.storage.GetDiskTemplatesOfStorageTypes(
452 constants.ST_FILE, constants.ST_SHARED_FILE, constants.ST_GLUSTER
453 ))
454
455 if file_storage_dir is None:
456 file_storage_dir = default_dir
457 if not acceptance_fn:
458 acceptance_fn = \
459 lambda path: filestorage.CheckFileStoragePathAcceptance(
460 path, exact_match_ok=True)
461
462 _storage_path_acceptance_fn(logging.warning, file_storage_dir,
463 enabled_disk_templates)
464
465 file_storage_enabled = file_disk_template in enabled_disk_templates
466 if file_storage_enabled:
467 try:
468 acceptance_fn(file_storage_dir)
469 except errors.FileStoragePathError as e:
470 raise errors.OpPrereqError(str(e))
471 result_file_storage_dir = init_fn(file_storage_dir)
472 else:
473 result_file_storage_dir = file_storage_dir
474 return result_file_storage_dir
475
476
477 def _PrepareFileStorage(
478 enabled_disk_templates, file_storage_dir, init_fn=_InitFileStorageDir,
479 acceptance_fn=None):
480 """Checks if file storage is enabled and inits the dir.
481
482 @see: C{_PrepareFileBasedStorage}
483
484 """
485 return _PrepareFileBasedStorage(
486 enabled_disk_templates, file_storage_dir,
487 pathutils.DEFAULT_FILE_STORAGE_DIR, constants.DT_FILE,
488 cluster.CheckFileStoragePathVsEnabledDiskTemplates,
489 init_fn=init_fn, acceptance_fn=acceptance_fn)
490
491
492 def _PrepareSharedFileStorage(
493 enabled_disk_templates, file_storage_dir, init_fn=_InitFileStorageDir,
494 acceptance_fn=None):
495 """Checks if shared file storage is enabled and inits the dir.
496
497 @see: C{_PrepareFileBasedStorage}
498
499 """
500 return _PrepareFileBasedStorage(
501 enabled_disk_templates, file_storage_dir,
502 pathutils.DEFAULT_SHARED_FILE_STORAGE_DIR, constants.DT_SHARED_FILE,
503 cluster.CheckSharedFileStoragePathVsEnabledDiskTemplates,
504 init_fn=init_fn, acceptance_fn=acceptance_fn)
505
506
507 def _PrepareGlusterStorage(
508 enabled_disk_templates, file_storage_dir, init_fn=_InitFileStorageDir,
509 acceptance_fn=None):
510 """Checks if gluster storage is enabled and inits the dir.
511
512 @see: C{_PrepareFileBasedStorage}
513
514 """
515 return _PrepareFileBasedStorage(
516 enabled_disk_templates, file_storage_dir,
517 pathutils.DEFAULT_GLUSTER_STORAGE_DIR, constants.DT_GLUSTER,
518 cluster.CheckGlusterStoragePathVsEnabledDiskTemplates,
519 init_fn=init_fn, acceptance_fn=acceptance_fn)
520
521
522 def _InitCheckEnabledDiskTemplates(enabled_disk_templates):
523 """Checks the sanity of the enabled disk templates.
524
525 """
526 if not enabled_disk_templates:
527 raise errors.OpPrereqError("Enabled disk templates list must contain at"
528 " least one member", errors.ECODE_INVAL)
529 invalid_disk_templates = \
530 set(enabled_disk_templates) - constants.DISK_TEMPLATES
531 if invalid_disk_templates:
532 raise errors.OpPrereqError("Enabled disk templates list contains invalid"
533 " entries: %s" % invalid_disk_templates,
534 errors.ECODE_INVAL)
535
536
537 def _RestrictIpolicyToEnabledDiskTemplates(ipolicy, enabled_disk_templates):
538 """Restricts the ipolicy's disk templates to the enabled ones.
539
540 This function clears the ipolicy's list of allowed disk templates from the
541 ones that are not enabled by the cluster.
542
543 @type ipolicy: dict
544 @param ipolicy: the instance policy
545 @type enabled_disk_templates: list of string
546 @param enabled_disk_templates: the list of cluster-wide enabled disk
547 templates
548
549 """
550 assert constants.IPOLICY_DTS in ipolicy
551 allowed_disk_templates = ipolicy[constants.IPOLICY_DTS]
552 restricted_disk_templates = list(set(allowed_disk_templates)
553 .intersection(set(enabled_disk_templates)))
554 ipolicy[constants.IPOLICY_DTS] = restricted_disk_templates
555
556
557 def _InitCheckDrbdHelper(drbd_helper, drbd_enabled):
558 """Checks the DRBD usermode helper.
559
560 @type drbd_helper: string
561 @param drbd_helper: name of the DRBD usermode helper that the system should
562 use
563
564 """
565 if not drbd_enabled:
566 return
567
568 if drbd_helper is not None:
569 try:
570 curr_helper = drbd.DRBD8.GetUsermodeHelper()
571 except errors.BlockDeviceError, err:
572 raise errors.OpPrereqError("Error while checking drbd helper"
573 " (disable drbd with --enabled-disk-templates"
574 " if you are not using drbd): %s" % str(err),
575 errors.ECODE_ENVIRON)
576 if drbd_helper != curr_helper:
577 raise errors.OpPrereqError("Error: requiring %s as drbd helper but %s"
578 " is the current helper" % (drbd_helper,
579 curr_helper),
580 errors.ECODE_INVAL)
581
582
583 def InitCluster(cluster_name, mac_prefix, # pylint: disable=R0913, R0914
584 master_netmask, master_netdev, file_storage_dir,
585 shared_file_storage_dir, gluster_storage_dir,
586 candidate_pool_size, secondary_ip=None,
587 vg_name=None, beparams=None, nicparams=None, ndparams=None,
588 hvparams=None, diskparams=None, enabled_hypervisors=None,
589 modify_etc_hosts=True, modify_ssh_setup=True,
590 maintain_node_health=False, drbd_helper=None, uid_pool=None,
591 default_iallocator=None, default_iallocator_params=None,
592 primary_ip_version=None, ipolicy=None,
593 prealloc_wipe_disks=False, use_external_mip_script=False,
594 hv_state=None, disk_state=None, enabled_disk_templates=None,
595 install_image=None, zeroing_image=None, compression_tools=None,
596 enabled_user_shutdown=False):
597 """Initialise the cluster.
598
599 @type candidate_pool_size: int
600 @param candidate_pool_size: master candidate pool size
601
602 @type enabled_disk_templates: list of string
603 @param enabled_disk_templates: list of disk_templates to be used in this
604 cluster
605
606 @type enabled_user_shutdown: bool
607 @param enabled_user_shutdown: whether user shutdown is enabled cluster
608 wide
609
610 """
611 # TODO: complete the docstring
612 if config.ConfigWriter.IsCluster():
613 raise errors.OpPrereqError("Cluster is already initialised",
614 errors.ECODE_STATE)
615
616 data_dir = vcluster.AddNodePrefix(pathutils.DATA_DIR)
617 queue_dir = vcluster.AddNodePrefix(pathutils.QUEUE_DIR)
618 archive_dir = vcluster.AddNodePrefix(pathutils.JOB_QUEUE_ARCHIVE_DIR)
619 for ddir in [queue_dir, data_dir, archive_dir]:
620 if os.path.isdir(ddir):
621 for entry in os.listdir(ddir):
622 if not os.path.isdir(os.path.join(ddir, entry)):
623 raise errors.OpPrereqError(
624 "%s contains non-directory enries like %s. Remove left-overs of an"
625 " old cluster before initialising a new one" % (ddir, entry),
626 errors.ECODE_STATE)
627
628 if not enabled_hypervisors:
629 raise errors.OpPrereqError("Enabled hypervisors list must contain at"
630 " least one member", errors.ECODE_INVAL)
631 invalid_hvs = set(enabled_hypervisors) - constants.HYPER_TYPES
632 if invalid_hvs:
633 raise errors.OpPrereqError("Enabled hypervisors contains invalid"
634 " entries: %s" % invalid_hvs,
635 errors.ECODE_INVAL)
636
637 _InitCheckEnabledDiskTemplates(enabled_disk_templates)
638
639 try:
640 ipcls = netutils.IPAddress.GetClassFromIpVersion(primary_ip_version)
641 except errors.ProgrammerError:
642 raise errors.OpPrereqError("Invalid primary ip version: %d." %
643 primary_ip_version, errors.ECODE_INVAL)
644
645 hostname = netutils.GetHostname(family=ipcls.family)
646 if not ipcls.IsValid(hostname.ip):
647 raise errors.OpPrereqError("This host's IP (%s) is not a valid IPv%d"
648 " address." % (hostname.ip, primary_ip_version),
649 errors.ECODE_INVAL)
650
651 if ipcls.IsLoopback(hostname.ip):
652 raise errors.OpPrereqError("This host's IP (%s) resolves to a loopback"
653 " address. Please fix DNS or %s." %
654 (hostname.ip, pathutils.ETC_HOSTS),
655 errors.ECODE_ENVIRON)
656
657 if not ipcls.Own(hostname.ip):
658 raise errors.OpPrereqError("Inconsistency: this host's name resolves"
659 " to %s,\nbut this ip address does not"
660 " belong to this host" %
661 hostname.ip, errors.ECODE_ENVIRON)
662
663 clustername = netutils.GetHostname(name=cluster_name, family=ipcls.family)
664
665 if netutils.TcpPing(clustername.ip, constants.DEFAULT_NODED_PORT, timeout=5):
666 raise errors.OpPrereqError("Cluster IP already active",
667 errors.ECODE_NOTUNIQUE)
668
669 if not secondary_ip:
670 if primary_ip_version == constants.IP6_VERSION:
671 raise errors.OpPrereqError("When using a IPv6 primary address, a valid"
672 " IPv4 address must be given as secondary",
673 errors.ECODE_INVAL)
674 secondary_ip = hostname.ip
675
676 if not netutils.IP4Address.IsValid(secondary_ip):
677 raise errors.OpPrereqError("Secondary IP address (%s) has to be a valid"
678 " IPv4 address." % secondary_ip,
679 errors.ECODE_INVAL)
680
681 if not netutils.IP4Address.Own(secondary_ip):
682 raise errors.OpPrereqError("You gave %s as secondary IP,"
683 " but it does not belong to this host." %
684 secondary_ip, errors.ECODE_ENVIRON)
685
686 if master_netmask is not None:
687 if not ipcls.ValidateNetmask(master_netmask):
688 raise errors.OpPrereqError("CIDR netmask (%s) not valid for IPv%s " %
689 (master_netmask, primary_ip_version),
690 errors.ECODE_INVAL)
691 else:
692 master_netmask = ipcls.iplen
693
694 if vg_name:
695 # Check if volume group is valid
696 vgstatus = utils.CheckVolumeGroupSize(utils.ListVolumeGroups(), vg_name,
697 constants.MIN_VG_SIZE)
698 if vgstatus:
699 raise errors.OpPrereqError("Error: %s" % vgstatus, errors.ECODE_INVAL)
700
701 drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
702 _InitCheckDrbdHelper(drbd_helper, drbd_enabled)
703
704 logging.debug("Stopping daemons (if any are running)")
705 result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop-all"])
706 if result.failed:
707 raise errors.OpExecError("Could not stop daemons, command %s"
708 " had exitcode %s and error '%s'" %
709 (result.cmd, result.exit_code, result.output))
710
711 file_storage_dir = _PrepareFileStorage(enabled_disk_templates,
712 file_storage_dir)
713 shared_file_storage_dir = _PrepareSharedFileStorage(enabled_disk_templates,
714 shared_file_storage_dir)
715 gluster_storage_dir = _PrepareGlusterStorage(enabled_disk_templates,
716 gluster_storage_dir)
717
718 if not re.match("^[0-9a-z]{2}:[0-9a-z]{2}:[0-9a-z]{2}$", mac_prefix):
719 raise errors.OpPrereqError("Invalid mac prefix given '%s'" % mac_prefix,
720 errors.ECODE_INVAL)
721
722 if not nicparams.get('mode', None) == constants.NIC_MODE_OVS:
723 # Do not do this check if mode=openvswitch, since the openvswitch is not
724 # created yet
725 result = utils.RunCmd(["ip", "link", "show", "dev", master_netdev])
726 if result.failed:
727 raise errors.OpPrereqError("Invalid master netdev given (%s): '%s'" %
728 (master_netdev,
729 result.output.strip()), errors.ECODE_INVAL)
730
731 dirs = [(pathutils.RUN_DIR, constants.RUN_DIRS_MODE)]
732 utils.EnsureDirs(dirs)
733
734 objects.UpgradeBeParams(beparams)
735 utils.ForceDictType(beparams, constants.BES_PARAMETER_TYPES)
736 utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
737
738 objects.NIC.CheckParameterSyntax(nicparams)
739
740 full_ipolicy = objects.FillIPolicy(constants.IPOLICY_DEFAULTS, ipolicy)
741 _RestrictIpolicyToEnabledDiskTemplates(full_ipolicy, enabled_disk_templates)
742
743 if ndparams is not None:
744 utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
745 else:
746 ndparams = dict(constants.NDC_DEFAULTS)
747
748 # This is ugly, as we modify the dict itself
749 # FIXME: Make utils.ForceDictType pure functional or write a wrapper
750 # around it
751 if hv_state:
752 for hvname, hvs_data in hv_state.items():
753 utils.ForceDictType(hvs_data, constants.HVSTS_PARAMETER_TYPES)
754 hv_state[hvname] = objects.Cluster.SimpleFillHvState(hvs_data)
755 else:
756 hv_state = dict((hvname, constants.HVST_DEFAULTS)
757 for hvname in enabled_hypervisors)
758
759 # FIXME: disk_state has no default values yet
760 if disk_state:
761 for storage, ds_data in disk_state.items():
762 if storage not in constants.DS_VALID_TYPES:
763 raise errors.OpPrereqError("Invalid storage type in disk state: %s" %
764 storage, errors.ECODE_INVAL)
765 for ds_name, state in ds_data.items():
766 utils.ForceDictType(state, constants.DSS_PARAMETER_TYPES)
767 ds_data[ds_name] = objects.Cluster.SimpleFillDiskState(state)
768
769 # hvparams is a mapping of hypervisor->hvparams dict
770 for hv_name, hv_params in hvparams.iteritems():
771 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
772 hv_class = hypervisor.GetHypervisor(hv_name)
773 hv_class.CheckParameterSyntax(hv_params)
774
775 # diskparams is a mapping of disk-template->diskparams dict
776 for template, dt_params in diskparams.items():
777 param_keys = set(dt_params.keys())
778 default_param_keys = set(constants.DISK_DT_DEFAULTS[template].keys())
779 if not (param_keys <= default_param_keys):
780 unknown_params = param_keys - default_param_keys
781 raise errors.OpPrereqError("Invalid parameters for disk template %s:"
782 " %s" % (template,
783 utils.CommaJoin(unknown_params)),
784 errors.ECODE_INVAL)
785 utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
786 if template == constants.DT_DRBD8 and vg_name is not None:
787 # The default METAVG value is equal to the VG name set at init time,
788 # if provided
789 dt_params[constants.DRBD_DEFAULT_METAVG] = vg_name
790
791 try:
792 utils.VerifyDictOptions(diskparams, constants.DISK_DT_DEFAULTS)
793 except errors.OpPrereqError, err:
794 raise errors.OpPrereqError("While verify diskparam options: %s" % err,
795 errors.ECODE_INVAL)
796
797 # set up ssh config and /etc/hosts
798 rsa_sshkey = ""
799 dsa_sshkey = ""
800 if os.path.isfile(pathutils.SSH_HOST_RSA_PUB):
801 sshline = utils.ReadFile(pathutils.SSH_HOST_RSA_PUB)
802 rsa_sshkey = sshline.split(" ")[1]
803 if os.path.isfile(pathutils.SSH_HOST_DSA_PUB):
804 sshline = utils.ReadFile(pathutils.SSH_HOST_DSA_PUB)
805 dsa_sshkey = sshline.split(" ")[1]
806 if not rsa_sshkey and not dsa_sshkey:
807 raise errors.OpPrereqError("Failed to find SSH public keys",
808 errors.ECODE_ENVIRON)
809
810 if modify_etc_hosts:
811 utils.AddHostToEtcHosts(hostname.name, hostname.ip)
812
813 if modify_ssh_setup:
814 _InitSSHSetup()
815
816 if default_iallocator is not None:
817 alloc_script = utils.FindFile(default_iallocator,
818 constants.IALLOCATOR_SEARCH_PATH,
819 os.path.isfile)
820 if alloc_script is None:
821 raise errors.OpPrereqError("Invalid default iallocator script '%s'"
822 " specified" % default_iallocator,
823 errors.ECODE_INVAL)
824 else:
825 # default to htools
826 if utils.FindFile(constants.IALLOC_HAIL,
827 constants.IALLOCATOR_SEARCH_PATH,
828 os.path.isfile):
829 default_iallocator = constants.IALLOC_HAIL
830
831 # check if we have all the users we need
832 try:
833 runtime.GetEnts()
834 except errors.ConfigurationError, err:
835 raise errors.OpPrereqError("Required system user/group missing: %s" %
836 err, errors.ECODE_ENVIRON)
837
838 candidate_certs = {}
839
840 now = time.time()
841
842 if compression_tools is not None:
843 cluster.CheckCompressionTools(compression_tools)
844
845 # init of cluster config file
846 cluster_config = objects.Cluster(
847 serial_no=1,
848 rsahostkeypub=rsa_sshkey,
849 dsahostkeypub=dsa_sshkey,
850 highest_used_port=(constants.FIRST_DRBD_PORT - 1),
851 mac_prefix=mac_prefix,
852 volume_group_name=vg_name,
853 tcpudp_port_pool=set(),
854 master_ip=clustername.ip,
855 master_netmask=master_netmask,
856 master_netdev=master_netdev,
857 cluster_name=clustername.name,
858 file_storage_dir=file_storage_dir,
859 shared_file_storage_dir=shared_file_storage_dir,
860 gluster_storage_dir=gluster_storage_dir,
861 enabled_hypervisors=enabled_hypervisors,
862 beparams={constants.PP_DEFAULT: beparams},
863 nicparams={constants.PP_DEFAULT: nicparams},
864 ndparams=ndparams,
865 hvparams=hvparams,
866 diskparams=diskparams,
867 candidate_pool_size=candidate_pool_size,
868 modify_etc_hosts=modify_etc_hosts,
869 modify_ssh_setup=modify_ssh_setup,
870 uid_pool=uid_pool,
871 ctime=now,
872 mtime=now,
873 maintain_node_health=maintain_node_health,
874 drbd_usermode_helper=drbd_helper,
875 default_iallocator=default_iallocator,
876 default_iallocator_params=default_iallocator_params,
877 primary_ip_family=ipcls.family,
878 prealloc_wipe_disks=prealloc_wipe_disks,
879 use_external_mip_script=use_external_mip_script,
880 ipolicy=full_ipolicy,
881 hv_state_static=hv_state,
882 disk_state_static=disk_state,
883 enabled_disk_templates=enabled_disk_templates,
884 candidate_certs=candidate_certs,
885 osparams={},
886 osparams_private_cluster={},
887 install_image=install_image,
888 zeroing_image=zeroing_image,
889 compression_tools=compression_tools,
890 enabled_user_shutdown=enabled_user_shutdown,
891 )
892 master_node_config = objects.Node(name=hostname.name,
893 primary_ip=hostname.ip,
894 secondary_ip=secondary_ip,
895 serial_no=1,
896 master_candidate=True,
897 offline=False, drained=False,
898 ctime=now, mtime=now,
899 )
900 InitConfig(constants.CONFIG_VERSION, cluster_config, master_node_config)
901 cfg = config.ConfigWriter(offline=True)
902 ssh.WriteKnownHostsFile(cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
903 cfg.Update(cfg.GetClusterInfo(), logging.error)
904 ssconf.WriteSsconfFiles(cfg.GetSsconfValues())
905
906 # set up the inter-node password and certificate
907 _InitGanetiServerSetup(hostname.name, cfg)
908
909 logging.debug("Starting daemons")
910 result = utils.RunCmd([pathutils.DAEMON_UTIL, "start-all"])
911 if result.failed:
912 raise errors.OpExecError("Could not start daemons, command %s"
913 " had exitcode %s and error %s" %
914 (result.cmd, result.exit_code, result.output))
915
916 _WaitForMasterDaemon()
917
918
919 def InitConfig(version, cluster_config, master_node_config,
920 cfg_file=pathutils.CLUSTER_CONF_FILE):
921 """Create the initial cluster configuration.
922
923 It will contain the current node, which will also be the master
924 node, and no instances.
925
926 @type version: int
927 @param version: configuration version
928 @type cluster_config: L{objects.Cluster}
929 @param cluster_config: cluster configuration
930 @type master_node_config: L{objects.Node}
931 @param master_node_config: master node configuration
932 @type cfg_file: string
933 @param cfg_file: configuration file path
934
935 """
936 uuid_generator = config.TemporaryReservationManager()
937 cluster_config.uuid = uuid_generator.Generate([], utils.NewUUID,
938 _INITCONF_ECID)
939 master_node_config.uuid = uuid_generator.Generate([], utils.NewUUID,
940 _INITCONF_ECID)
941 cluster_config.master_node = master_node_config.uuid
942 nodes = {
943 master_node_config.uuid: master_node_config,
944 }
945 default_nodegroup = objects.NodeGroup(
946 uuid=uuid_generator.Generate([], utils.NewUUID, _INITCONF_ECID),
947 name=constants.INITIAL_NODE_GROUP_NAME,
948 members=[master_node_config.uuid],
949 diskparams={},
950 )
951 nodegroups = {
952 default_nodegroup.uuid: default_nodegroup,
953 }
954 now = time.time()
955 config_data = objects.ConfigData(version=version,
956 cluster=cluster_config,
957 nodegroups=nodegroups,
958 nodes=nodes,
959 instances={},
960 networks={},
961 disks={},
962 serial_no=1,
963 ctime=now, mtime=now)
964 utils.WriteFile(cfg_file,
965 data=serializer.Dump(config_data.ToDict()),
966 mode=0600)
967
968
969 def FinalizeClusterDestroy(master_uuid):
970 """Execute the last steps of cluster destroy
971
972 This function shuts down all the daemons, completing the destroy
973 begun in cmdlib.LUDestroyOpcode.
974
975 """
976 livelock = utils.livelock.LiveLock("bootstrap_destroy")
977 cfg = config.GetConfig(None, livelock)
978 modify_ssh_setup = cfg.GetClusterInfo().modify_ssh_setup
979 runner = rpc.BootstrapRunner()
980
981 master_name = cfg.GetNodeName(master_uuid)
982
983 master_params = cfg.GetMasterNetworkParameters()
984 master_params.uuid = master_uuid
985 ems = cfg.GetUseExternalMipScript()
986 result = runner.call_node_deactivate_master_ip(master_name, master_params,
987 ems)
988
989 msg = result.fail_msg
990 if msg:
991 logging.warning("Could not disable the master IP: %s", msg)
992
993 result = runner.call_node_stop_master(master_name)
994 msg = result.fail_msg
995 if msg:
996 logging.warning("Could not disable the master role: %s", msg)
997
998 result = runner.call_node_leave_cluster(master_name, modify_ssh_setup)
999 msg = result.fail_msg
1000 if msg:
1001 logging.warning("Could not shutdown the node daemon and cleanup"
1002 " the node: %s", msg)
1003
1004
1005 def SetupNodeDaemon(opts, cluster_name, node, ssh_port):
1006 """Add a node to the cluster.
1007
1008 This function must be called before the actual opcode, and will ssh
1009 to the remote node, copy the needed files, and start ganeti-noded,
1010 allowing the master to do the rest via normal rpc calls.
1011
1012 @param cluster_name: the cluster name
1013 @param node: the name of the new node
1014 @param ssh_port: the SSH port of the new node
1015
1016 """
1017 data = {
1018 constants.NDS_CLUSTER_NAME: cluster_name,
1019 constants.NDS_NODE_DAEMON_CERTIFICATE:
1020 utils.ReadFile(pathutils.NODED_CERT_FILE),
1021 constants.NDS_SSCONF: ssconf.SimpleStore().ReadAll(),
1022 constants.NDS_START_NODE_DAEMON: True,
1023 constants.NDS_NODE_NAME: node,
1024 }
1025
1026 RunNodeSetupCmd(cluster_name, node, pathutils.NODE_DAEMON_SETUP,
1027 opts.debug, opts.verbose,
1028 True, opts.ssh_key_check, opts.ssh_key_check,
1029 ssh_port, data)
1030
1031 _WaitForNodeDaemon(node)
1032
1033
1034 def MasterFailover(no_voting=False):
1035 """Failover the master node.
1036
1037 This checks that we are not already the master, and will cause the
1038 current master to cease being master, and the non-master to become
1039 new master.
1040
1041 @type no_voting: boolean
1042 @param no_voting: force the operation without remote nodes agreement
1043 (dangerous)
1044
1045 @returns: the pair of an exit code and warnings to display
1046 """
1047 sstore = ssconf.SimpleStore()
1048
1049 old_master, new_master = ssconf.GetMasterAndMyself(sstore)
1050 node_names = sstore.GetNodeList()
1051 mc_list = sstore.GetMasterCandidates()
1052
1053 if old_master == new_master:
1054 raise errors.OpPrereqError("This commands must be run on the node"
1055 " where you want the new master to be."
1056 " %s is already the master" %
1057 old_master, errors.ECODE_INVAL)
1058
1059 if new_master not in mc_list:
1060 mc_no_master = [name for name in mc_list if name != old_master]
1061 raise errors.OpPrereqError("This node is not among the nodes marked"
1062 " as master candidates. Only these nodes"
1063 " can become masters. Current list of"
1064 " master candidates is:\n"
1065 "%s" % ("\n".join(mc_no_master)),
1066 errors.ECODE_STATE)
1067
1068 if not no_voting:
1069 vote_list = GatherMasterVotes(node_names)
1070
1071 if vote_list:
1072 voted_master = vote_list[0][0]
1073 if voted_master is None:
1074 raise errors.OpPrereqError("Cluster is inconsistent, most nodes did"
1075 " not respond.", errors.ECODE_ENVIRON)
1076 elif voted_master != old_master:
1077 raise errors.OpPrereqError("I have a wrong configuration, I believe"
1078 " the master is %s but the other nodes"
1079 " voted %s. Please resync the configuration"
1080 " of this node." %
1081 (old_master, voted_master),
1082 errors.ECODE_STATE)
1083 # end checks
1084
1085 rcode = 0
1086 warnings = []
1087
1088 logging.info("Setting master to %s, old master: %s", new_master, old_master)
1089
1090 try:
1091 # Forcefully start WConfd so that we can access the configuration
1092 result = utils.RunCmd([pathutils.DAEMON_UTIL,
1093 "start", constants.WCONFD, "--force-node",
1094 "--no-voting", "--yes-do-it"])
1095 if result.failed:
1096 raise errors.OpPrereqError("Could not start the configuration daemon,"
1097 " command %s had exitcode %s and error %s" %
1098 (result.cmd, result.exit_code, result.output),
1099 errors.ECODE_NOENT)
1100
1101 # instantiate a real config writer, as we now know we have the
1102 # configuration data
1103 livelock = utils.livelock.LiveLock("bootstrap_failover")
1104 cfg = config.GetConfig(None, livelock, accept_foreign=True)
1105
1106 old_master_node = cfg.GetNodeInfoByName(old_master)
1107 if old_master_node is None:
1108 raise errors.OpPrereqError("Could not find old master node '%s' in"
1109 " cluster configuration." % old_master,
1110 errors.ECODE_NOENT)
1111
1112 cluster_info = cfg.GetClusterInfo()
1113 new_master_node = cfg.GetNodeInfoByName(new_master)
1114 if new_master_node is None:
1115 raise errors.OpPrereqError("Could not find new master node '%s' in"
1116 " cluster configuration." % new_master,
1117 errors.ECODE_NOENT)
1118
1119 cluster_info.master_node = new_master_node.uuid
1120 # this will also regenerate the ssconf files, since we updated the
1121 # cluster info
1122 cfg.Update(cluster_info, logging.error)
1123
1124 # if cfg.Update worked, then it means the old master daemon won't be
1125 # able now to write its own config file (we rely on locking in both
1126 # backend.UploadFile() and ConfigWriter._Write(); hence the next
1127 # step is to kill the old master
1128
1129 logging.info("Stopping the master daemon on node %s", old_master)
1130
1131 runner = rpc.BootstrapRunner()
1132 master_params = cfg.GetMasterNetworkParameters()
1133 master_params.uuid = old_master_node.uuid
1134 ems = cfg.GetUseExternalMipScript()
1135 result = runner.call_node_deactivate_master_ip(old_master,
1136 master_params, ems)
1137
1138 msg = result.fail_msg
1139 if msg:
1140 warning = "Could not disable the master IP: %s" % (msg,)
1141 logging.warning("%s", warning)
1142 warnings.append(warning)
1143
1144 result = runner.call_node_stop_master(old_master)
1145 msg = result.fail_msg
1146 if msg:
1147 warning = ("Could not disable the master role on the old master"
1148 " %s, please disable manually: %s" % (old_master, msg))
1149 logging.error("%s", warning)
1150 warnings.append(warning)
1151 except errors.ConfigurationError, err:
1152 logging.error("Error while trying to set the new master: %s",
1153 str(err))
1154 return 1, warnings
1155 finally:
1156 # stop WConfd again:
1157 result = utils.RunCmd([pathutils.DAEMON_UTIL, "stop", constants.WCONFD])
1158 if result.failed:
1159 warning = ("Could not stop the configuration daemon,"
1160 " command %s had exitcode %s and error %s"
1161 % (result.cmd, result.exit_code, result.output))
1162 logging.error("%s", warning)
1163 rcode = 1
1164
1165 logging.info("Checking master IP non-reachability...")
1166
1167 master_ip = sstore.GetMasterIP()
1168 total_timeout = 30
1169
1170 # Here we have a phase where no master should be running
1171 def _check_ip(expected):
1172 if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT) != expected:
1173 raise utils.RetryAgain()
1174
1175 try:
1176 utils.Retry(_check_ip, (1, 1.5, 5), total_timeout, args=[False])
1177 except utils.RetryTimeout:
1178 warning = ("The master IP is still reachable after %s seconds,"
1179 " continuing but activating the master IP on the current"
1180 " node will probably fail" % total_timeout)
1181 logging.warning("%s", warning)
1182 warnings.append(warning)
1183 rcode = 1
1184
1185 if jstore.CheckDrainFlag():
1186 logging.info("Undraining job queue")
1187 jstore.SetDrainFlag(False)
1188
1189 logging.info("Starting the master daemons on the new master")
1190
1191 result = rpc.BootstrapRunner().call_node_start_master_daemons(new_master,
1192 no_voting)
1193 msg = result.fail_msg
1194 if msg:
1195 logging.error("Could not start the master role on the new master"
1196 " %s, please check: %s", new_master, msg)
1197 rcode = 1
1198
1199 # Finally verify that the new master managed to set up the master IP
1200 # and warn if it didn't.
1201 try:
1202 utils.Retry(_check_ip, (1, 1.5, 5), total_timeout, args=[True])
1203 except utils.RetryTimeout:
1204 warning = ("The master IP did not come up within %s seconds; the"
1205 " cluster should still be working and reachable via %s,"
1206 " but not via the master IP address"
1207 % (total_timeout, new_master))
1208 logging.warning("%s", warning)
1209 warnings.append(warning)
1210 rcode = 1
1211
1212 logging.info("Master failed over from %s to %s", old_master, new_master)
1213 return rcode, warnings
1214
1215
1216 def GetMaster():
1217 """Returns the current master node.
1218
1219 This is a separate function in bootstrap since it's needed by
1220 gnt-cluster, and instead of importing directly ssconf, it's better
1221 to abstract it in bootstrap, where we do use ssconf in other
1222 functions too.
1223
1224 """
1225 sstore = ssconf.SimpleStore()
1226
1227 old_master, _ = ssconf.GetMasterAndMyself(sstore)
1228
1229 return old_master
1230
1231
1232 def GatherMasterVotes(node_names):
1233 """Check the agreement on who is the master.
1234
1235 This function will return a list of (node, number of votes), ordered
1236 by the number of votes. Errors will be denoted by the key 'None'.
1237
1238 Note that the sum of votes is the number of nodes this machine
1239 knows, whereas the number of entries in the list could be different
1240 (if some nodes vote for another master).
1241
1242 @type node_names: list
1243 @param node_names: the list of nodes to query for master info; the current
1244 node will be removed if it is in the list
1245 @rtype: list
1246 @return: list of (node, votes)
1247
1248 """
1249 if not node_names:
1250 # no nodes
1251 return []
1252 results = rpc.BootstrapRunner().call_master_node_name(node_names)
1253 if not isinstance(results, dict):
1254 # this should not happen (unless internal error in rpc)
1255 logging.critical("Can't complete rpc call, aborting master startup")
1256 return [(None, len(node_names))]
1257 votes = {}
1258 for node_name in results:
1259 nres = results[node_name]
1260 msg = nres.fail_msg
1261
1262 if msg:
1263 logging.warning("Error contacting node %s: %s", node_name, msg)
1264 node = None
1265 else:
1266 node = nres.payload
1267
1268 if node not in votes:
1269 votes[node] = 1
1270 else:
1271 votes[node] += 1
1272
1273 vote_list = [v for v in votes.items()]
1274 # sort first on number of votes then on name, since we want None
1275 # sorted later if we have the half of the nodes not responding, and
1276 # half voting all for the same master
1277 vote_list.sort(key=lambda x: (x[1], x[0]), reverse=True)
1278
1279 return vote_list