Disable superfluous restarting of daemons
[ganeti-github.git] / lib / client / gnt_cluster.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011, 2012, 2013, 2014 Google Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions are
9 # met:
10 #
11 # 1. Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
13 #
14 # 2. Redistributions in binary form must reproduce the above copyright
15 # notice, this list of conditions and the following disclaimer in the
16 # documentation and/or other materials provided with the distribution.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
19 # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
20 # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30 """Cluster related commands"""
31
32 # pylint: disable=W0401,W0613,W0614,C0103
33 # W0401: Wildcard import ganeti.cli
34 # W0613: Unused argument, since all functions follow the same API
35 # W0614: Unused import %s from wildcard import (since we need cli)
36 # C0103: Invalid name gnt-cluster
37
38 from cStringIO import StringIO
39 import os
40 import time
41 import OpenSSL
42 import tempfile
43 import itertools
44
45 from ganeti.cli import *
46 from ganeti import bootstrap
47 from ganeti import compat
48 from ganeti import constants
49 from ganeti import config
50 from ganeti import errors
51 from ganeti import netutils
52 from ganeti import objects
53 from ganeti import opcodes
54 from ganeti import pathutils
55 from ganeti import qlang
56 from ganeti import serializer
57 from ganeti import ssconf
58 from ganeti import ssh
59 from ganeti import uidpool
60 from ganeti import utils
61 from ganeti.client import base
62
63
64 ON_OPT = cli_option("--on", default=False,
65 action="store_true", dest="on",
66 help="Recover from an EPO")
67
68 GROUPS_OPT = cli_option("--groups", default=False,
69 action="store_true", dest="groups",
70 help="Arguments are node groups instead of nodes")
71
72 FORCE_FAILOVER = cli_option("--yes-do-it", dest="yes_do_it",
73 help="Override interactive check for --no-voting",
74 default=False, action="store_true")
75
76 FORCE_DISTRIBUTION = cli_option("--yes-do-it", dest="yes_do_it",
77 help="Unconditionally distribute the"
78 " configuration, even if the queue"
79 " is drained",
80 default=False, action="store_true")
81
82 TO_OPT = cli_option("--to", default=None, type="string",
83 help="The Ganeti version to upgrade to")
84
85 RESUME_OPT = cli_option("--resume", default=False, action="store_true",
86 help="Resume any pending Ganeti upgrades")
87
88 _EPO_PING_INTERVAL = 30 # 30 seconds between pings
89 _EPO_PING_TIMEOUT = 1 # 1 second
90 _EPO_REACHABLE_TIMEOUT = 15 * 60 # 15 minutes
91
92
93 def _InitEnabledDiskTemplates(opts):
94 """Initialize the list of enabled disk templates.
95
96 """
97 if opts.enabled_disk_templates:
98 return opts.enabled_disk_templates.split(",")
99 else:
100 return constants.DEFAULT_ENABLED_DISK_TEMPLATES
101
102
103 def _InitVgName(opts, enabled_disk_templates):
104 """Initialize the volume group name.
105
106 @type enabled_disk_templates: list of strings
107 @param enabled_disk_templates: cluster-wide enabled disk templates
108
109 """
110 vg_name = None
111 if opts.vg_name is not None:
112 vg_name = opts.vg_name
113 if vg_name:
114 if not utils.IsLvmEnabled(enabled_disk_templates):
115 ToStdout("You specified a volume group with --vg-name, but you did not"
116 " enable any disk template that uses lvm.")
117 elif utils.IsLvmEnabled(enabled_disk_templates):
118 raise errors.OpPrereqError(
119 "LVM disk templates are enabled, but vg name not set.")
120 elif utils.IsLvmEnabled(enabled_disk_templates):
121 vg_name = constants.DEFAULT_VG
122 return vg_name
123
124
125 def _InitDrbdHelper(opts, enabled_disk_templates):
126 """Initialize the DRBD usermode helper.
127
128 """
129 drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
130
131 if not drbd_enabled and opts.drbd_helper is not None:
132 ToStdout("Note: You specified a DRBD usermode helper, while DRBD storage"
133 " is not enabled.")
134
135 if drbd_enabled:
136 if opts.drbd_helper is None:
137 return constants.DEFAULT_DRBD_HELPER
138 if opts.drbd_helper == '':
139 raise errors.OpPrereqError(
140 "Unsetting the drbd usermode helper while enabling DRBD is not"
141 " allowed.")
142
143 return opts.drbd_helper
144
145
146 @UsesRPC
147 def InitCluster(opts, args):
148 """Initialize the cluster.
149
150 @param opts: the command line options selected by the user
151 @type args: list
152 @param args: should contain only one element, the desired
153 cluster name
154 @rtype: int
155 @return: the desired exit code
156
157 """
158 enabled_disk_templates = _InitEnabledDiskTemplates(opts)
159
160 try:
161 vg_name = _InitVgName(opts, enabled_disk_templates)
162 drbd_helper = _InitDrbdHelper(opts, enabled_disk_templates)
163 except errors.OpPrereqError, e:
164 ToStderr(str(e))
165 return 1
166
167 master_netdev = opts.master_netdev
168 if master_netdev is None:
169 nic_mode = opts.nicparams.get(constants.NIC_MODE, None)
170 if not nic_mode:
171 # default case, use bridging
172 master_netdev = constants.DEFAULT_BRIDGE
173 elif nic_mode == constants.NIC_MODE_OVS:
174 # default ovs is different from default bridge
175 master_netdev = constants.DEFAULT_OVS
176 opts.nicparams[constants.NIC_LINK] = constants.DEFAULT_OVS
177
178 hvlist = opts.enabled_hypervisors
179 if hvlist is None:
180 hvlist = constants.DEFAULT_ENABLED_HYPERVISOR
181 hvlist = hvlist.split(",")
182
183 hvparams = dict(opts.hvparams)
184 beparams = opts.beparams
185 nicparams = opts.nicparams
186
187 diskparams = dict(opts.diskparams)
188
189 # check the disk template types here, as we cannot rely on the type check done
190 # by the opcode parameter types
191 diskparams_keys = set(diskparams.keys())
192 if not (diskparams_keys <= constants.DISK_TEMPLATES):
193 unknown = utils.NiceSort(diskparams_keys - constants.DISK_TEMPLATES)
194 ToStderr("Disk templates unknown: %s" % utils.CommaJoin(unknown))
195 return 1
196
197 # prepare beparams dict
198 beparams = objects.FillDict(constants.BEC_DEFAULTS, beparams)
199 utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
200
201 # prepare nicparams dict
202 nicparams = objects.FillDict(constants.NICC_DEFAULTS, nicparams)
203 utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
204
205 # prepare ndparams dict
206 if opts.ndparams is None:
207 ndparams = dict(constants.NDC_DEFAULTS)
208 else:
209 ndparams = objects.FillDict(constants.NDC_DEFAULTS, opts.ndparams)
210 utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
211
212 # prepare hvparams dict
213 for hv in constants.HYPER_TYPES:
214 if hv not in hvparams:
215 hvparams[hv] = {}
216 hvparams[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], hvparams[hv])
217 utils.ForceDictType(hvparams[hv], constants.HVS_PARAMETER_TYPES)
218
219 # prepare diskparams dict
220 for templ in constants.DISK_TEMPLATES:
221 if templ not in diskparams:
222 diskparams[templ] = {}
223 diskparams[templ] = objects.FillDict(constants.DISK_DT_DEFAULTS[templ],
224 diskparams[templ])
225 utils.ForceDictType(diskparams[templ], constants.DISK_DT_TYPES)
226
227 # prepare ipolicy dict
228 ipolicy = CreateIPolicyFromOpts(
229 ispecs_mem_size=opts.ispecs_mem_size,
230 ispecs_cpu_count=opts.ispecs_cpu_count,
231 ispecs_disk_count=opts.ispecs_disk_count,
232 ispecs_disk_size=opts.ispecs_disk_size,
233 ispecs_nic_count=opts.ispecs_nic_count,
234 minmax_ispecs=opts.ipolicy_bounds_specs,
235 std_ispecs=opts.ipolicy_std_specs,
236 ipolicy_disk_templates=opts.ipolicy_disk_templates,
237 ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
238 ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
239 fill_all=True)
240
241 if opts.candidate_pool_size is None:
242 opts.candidate_pool_size = constants.MASTER_POOL_SIZE_DEFAULT
243
244 if opts.mac_prefix is None:
245 opts.mac_prefix = constants.DEFAULT_MAC_PREFIX
246
247 uid_pool = opts.uid_pool
248 if uid_pool is not None:
249 uid_pool = uidpool.ParseUidPool(uid_pool)
250
251 if opts.prealloc_wipe_disks is None:
252 opts.prealloc_wipe_disks = False
253
254 external_ip_setup_script = opts.use_external_mip_script
255 if external_ip_setup_script is None:
256 external_ip_setup_script = False
257
258 try:
259 primary_ip_version = int(opts.primary_ip_version)
260 except (ValueError, TypeError), err:
261 ToStderr("Invalid primary ip version value: %s" % str(err))
262 return 1
263
264 master_netmask = opts.master_netmask
265 try:
266 if master_netmask is not None:
267 master_netmask = int(master_netmask)
268 except (ValueError, TypeError), err:
269 ToStderr("Invalid master netmask value: %s" % str(err))
270 return 1
271
272 if opts.disk_state:
273 disk_state = utils.FlatToDict(opts.disk_state)
274 else:
275 disk_state = {}
276
277 hv_state = dict(opts.hv_state)
278
279 if opts.install_image:
280 install_image = opts.install_image
281 else:
282 install_image = ""
283
284 if opts.zeroing_image:
285 zeroing_image = opts.zeroing_image
286 else:
287 zeroing_image = ""
288
289 compression_tools = _GetCompressionTools(opts)
290
291 default_ialloc_params = opts.default_iallocator_params
292
293 if opts.enabled_user_shutdown:
294 enabled_user_shutdown = True
295 else:
296 enabled_user_shutdown = False
297
298 bootstrap.InitCluster(cluster_name=args[0],
299 secondary_ip=opts.secondary_ip,
300 vg_name=vg_name,
301 mac_prefix=opts.mac_prefix,
302 master_netmask=master_netmask,
303 master_netdev=master_netdev,
304 file_storage_dir=opts.file_storage_dir,
305 shared_file_storage_dir=opts.shared_file_storage_dir,
306 gluster_storage_dir=opts.gluster_storage_dir,
307 enabled_hypervisors=hvlist,
308 hvparams=hvparams,
309 beparams=beparams,
310 nicparams=nicparams,
311 ndparams=ndparams,
312 diskparams=diskparams,
313 ipolicy=ipolicy,
314 candidate_pool_size=opts.candidate_pool_size,
315 modify_etc_hosts=opts.modify_etc_hosts,
316 modify_ssh_setup=opts.modify_ssh_setup,
317 maintain_node_health=opts.maintain_node_health,
318 drbd_helper=drbd_helper,
319 uid_pool=uid_pool,
320 default_iallocator=opts.default_iallocator,
321 default_iallocator_params=default_ialloc_params,
322 primary_ip_version=primary_ip_version,
323 prealloc_wipe_disks=opts.prealloc_wipe_disks,
324 use_external_mip_script=external_ip_setup_script,
325 hv_state=hv_state,
326 disk_state=disk_state,
327 enabled_disk_templates=enabled_disk_templates,
328 install_image=install_image,
329 zeroing_image=zeroing_image,
330 compression_tools=compression_tools,
331 enabled_user_shutdown=enabled_user_shutdown,
332 )
333 op = opcodes.OpClusterPostInit()
334 SubmitOpCode(op, opts=opts)
335 return 0
336
337
338 @UsesRPC
339 def DestroyCluster(opts, args):
340 """Destroy the cluster.
341
342 @param opts: the command line options selected by the user
343 @type args: list
344 @param args: should be an empty list
345 @rtype: int
346 @return: the desired exit code
347
348 """
349 if not opts.yes_do_it:
350 ToStderr("Destroying a cluster is irreversible. If you really want"
351 " destroy this cluster, supply the --yes-do-it option.")
352 return 1
353
354 op = opcodes.OpClusterDestroy()
355 master_uuid = SubmitOpCode(op, opts=opts)
356 # if we reached this, the opcode didn't fail; we can proceed to
357 # shutdown all the daemons
358 bootstrap.FinalizeClusterDestroy(master_uuid)
359 return 0
360
361
362 def RenameCluster(opts, args):
363 """Rename the cluster.
364
365 @param opts: the command line options selected by the user
366 @type args: list
367 @param args: should contain only one element, the new cluster name
368 @rtype: int
369 @return: the desired exit code
370
371 """
372 cl = GetClient()
373
374 (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
375
376 new_name = args[0]
377 if not opts.force:
378 usertext = ("This will rename the cluster from '%s' to '%s'. If you are"
379 " connected over the network to the cluster name, the"
380 " operation is very dangerous as the IP address will be"
381 " removed from the node and the change may not go through."
382 " Continue?") % (cluster_name, new_name)
383 if not AskUser(usertext):
384 return 1
385
386 op = opcodes.OpClusterRename(name=new_name)
387 result = SubmitOpCode(op, opts=opts, cl=cl)
388
389 if result:
390 ToStdout("Cluster renamed from '%s' to '%s'", cluster_name, result)
391
392 return 0
393
394
395 def ActivateMasterIp(opts, args):
396 """Activates the master IP.
397
398 """
399 op = opcodes.OpClusterActivateMasterIp()
400 SubmitOpCode(op)
401 return 0
402
403
404 def DeactivateMasterIp(opts, args):
405 """Deactivates the master IP.
406
407 """
408 if not opts.confirm:
409 usertext = ("This will disable the master IP. All the open connections to"
410 " the master IP will be closed. To reach the master you will"
411 " need to use its node IP."
412 " Continue?")
413 if not AskUser(usertext):
414 return 1
415
416 op = opcodes.OpClusterDeactivateMasterIp()
417 SubmitOpCode(op)
418 return 0
419
420
421 def RedistributeConfig(opts, args):
422 """Forces push of the cluster configuration.
423
424 @param opts: the command line options selected by the user
425 @type args: list
426 @param args: empty list
427 @rtype: int
428 @return: the desired exit code
429
430 """
431 op = opcodes.OpClusterRedistConf()
432 if opts.yes_do_it:
433 SubmitOpCodeToDrainedQueue(op)
434 else:
435 SubmitOrSend(op, opts)
436 return 0
437
438
439 def ShowClusterVersion(opts, args):
440 """Write version of ganeti software to the standard output.
441
442 @param opts: the command line options selected by the user
443 @type args: list
444 @param args: should be an empty list
445 @rtype: int
446 @return: the desired exit code
447
448 """
449 cl = GetClient()
450 result = cl.QueryClusterInfo()
451 ToStdout("Software version: %s", result["software_version"])
452 ToStdout("Internode protocol: %s", result["protocol_version"])
453 ToStdout("Configuration format: %s", result["config_version"])
454 ToStdout("OS api version: %s", result["os_api_version"])
455 ToStdout("Export interface: %s", result["export_version"])
456 ToStdout("VCS version: %s", result["vcs_version"])
457 return 0
458
459
460 def ShowClusterMaster(opts, args):
461 """Write name of master node to the standard output.
462
463 @param opts: the command line options selected by the user
464 @type args: list
465 @param args: should be an empty list
466 @rtype: int
467 @return: the desired exit code
468
469 """
470 master = bootstrap.GetMaster()
471 ToStdout(master)
472 return 0
473
474
475 def _FormatGroupedParams(paramsdict, roman=False):
476 """Format Grouped parameters (be, nic, disk) by group.
477
478 @type paramsdict: dict of dicts
479 @param paramsdict: {group: {param: value, ...}, ...}
480 @rtype: dict of dicts
481 @return: copy of the input dictionaries with strings as values
482
483 """
484 ret = {}
485 for (item, val) in paramsdict.items():
486 if isinstance(val, dict):
487 ret[item] = _FormatGroupedParams(val, roman=roman)
488 elif roman and isinstance(val, int):
489 ret[item] = compat.TryToRoman(val)
490 else:
491 ret[item] = str(val)
492 return ret
493
494
495 def ShowClusterConfig(opts, args):
496 """Shows cluster information.
497
498 @param opts: the command line options selected by the user
499 @type args: list
500 @param args: should be an empty list
501 @rtype: int
502 @return: the desired exit code
503
504 """
505 cl = GetClient()
506 result = cl.QueryClusterInfo()
507
508 if result["tags"]:
509 tags = utils.CommaJoin(utils.NiceSort(result["tags"]))
510 else:
511 tags = "(none)"
512 if result["reserved_lvs"]:
513 reserved_lvs = utils.CommaJoin(result["reserved_lvs"])
514 else:
515 reserved_lvs = "(none)"
516
517 enabled_hv = result["enabled_hypervisors"]
518 hvparams = dict((k, v) for k, v in result["hvparams"].iteritems()
519 if k in enabled_hv)
520
521 info = [
522 ("Cluster name", result["name"]),
523 ("Cluster UUID", result["uuid"]),
524
525 ("Creation time", utils.FormatTime(result["ctime"])),
526 ("Modification time", utils.FormatTime(result["mtime"])),
527
528 ("Master node", result["master"]),
529
530 ("Architecture (this node)",
531 "%s (%s)" % (result["architecture"][0], result["architecture"][1])),
532
533 ("Tags", tags),
534
535 ("Default hypervisor", result["default_hypervisor"]),
536 ("Enabled hypervisors", utils.CommaJoin(enabled_hv)),
537
538 ("Hypervisor parameters", _FormatGroupedParams(hvparams,
539 opts.roman_integers)),
540
541 ("OS-specific hypervisor parameters",
542 _FormatGroupedParams(result["os_hvp"], opts.roman_integers)),
543
544 ("OS parameters", _FormatGroupedParams(result["osparams"],
545 opts.roman_integers)),
546
547 ("Hidden OSes", utils.CommaJoin(result["hidden_os"])),
548 ("Blacklisted OSes", utils.CommaJoin(result["blacklisted_os"])),
549
550 ("Cluster parameters", [
551 ("candidate pool size",
552 compat.TryToRoman(result["candidate_pool_size"],
553 convert=opts.roman_integers)),
554 ("maximal number of jobs running simultaneously",
555 compat.TryToRoman(result["max_running_jobs"],
556 convert=opts.roman_integers)),
557 ("maximal number of jobs simultaneously tracked by the scheduler",
558 compat.TryToRoman(result["max_tracked_jobs"],
559 convert=opts.roman_integers)),
560 ("mac prefix", result["mac_prefix"]),
561 ("master netdev", result["master_netdev"]),
562 ("master netmask", compat.TryToRoman(result["master_netmask"],
563 opts.roman_integers)),
564 ("use external master IP address setup script",
565 result["use_external_mip_script"]),
566 ("lvm volume group", result["volume_group_name"]),
567 ("lvm reserved volumes", reserved_lvs),
568 ("drbd usermode helper", result["drbd_usermode_helper"]),
569 ("file storage path", result["file_storage_dir"]),
570 ("shared file storage path", result["shared_file_storage_dir"]),
571 ("gluster storage path", result["gluster_storage_dir"]),
572 ("maintenance of node health", result["maintain_node_health"]),
573 ("uid pool", uidpool.FormatUidPool(result["uid_pool"])),
574 ("default instance allocator", result["default_iallocator"]),
575 ("default instance allocator parameters",
576 result["default_iallocator_params"]),
577 ("primary ip version", compat.TryToRoman(result["primary_ip_version"],
578 opts.roman_integers)),
579 ("preallocation wipe disks", result["prealloc_wipe_disks"]),
580 ("OS search path", utils.CommaJoin(pathutils.OS_SEARCH_PATH)),
581 ("ExtStorage Providers search path",
582 utils.CommaJoin(pathutils.ES_SEARCH_PATH)),
583 ("enabled disk templates",
584 utils.CommaJoin(result["enabled_disk_templates"])),
585 ("install image", result["install_image"]),
586 ("instance communication network",
587 result["instance_communication_network"]),
588 ("zeroing image", result["zeroing_image"]),
589 ("compression tools", result["compression_tools"]),
590 ("enabled user shutdown", result["enabled_user_shutdown"]),
591 ]),
592
593 ("Default node parameters",
594 _FormatGroupedParams(result["ndparams"], roman=opts.roman_integers)),
595
596 ("Default instance parameters",
597 _FormatGroupedParams(result["beparams"], roman=opts.roman_integers)),
598
599 ("Default nic parameters",
600 _FormatGroupedParams(result["nicparams"], roman=opts.roman_integers)),
601
602 ("Default disk parameters",
603 _FormatGroupedParams(result["diskparams"], roman=opts.roman_integers)),
604
605 ("Instance policy - limits for instances",
606 FormatPolicyInfo(result["ipolicy"], None, True, opts.roman_integers)),
607 ]
608
609 PrintGenericInfo(info)
610 return 0
611
612
613 def ClusterCopyFile(opts, args):
614 """Copy a file from master to some nodes.
615
616 @param opts: the command line options selected by the user
617 @type args: list
618 @param args: should contain only one element, the path of
619 the file to be copied
620 @rtype: int
621 @return: the desired exit code
622
623 """
624 filename = args[0]
625 filename = os.path.abspath(filename)
626
627 if not os.path.exists(filename):
628 raise errors.OpPrereqError("No such filename '%s'" % filename,
629 errors.ECODE_INVAL)
630
631 cl = GetClient()
632 qcl = GetClient()
633 try:
634 cluster_name = cl.QueryConfigValues(["cluster_name"])[0]
635
636 results = GetOnlineNodes(nodes=opts.nodes, cl=qcl, filter_master=True,
637 secondary_ips=opts.use_replication_network,
638 nodegroup=opts.nodegroup)
639 ports = GetNodesSshPorts(opts.nodes, qcl)
640 finally:
641 cl.Close()
642 qcl.Close()
643
644 srun = ssh.SshRunner(cluster_name)
645 for (node, port) in zip(results, ports):
646 if not srun.CopyFileToNode(node, port, filename):
647 ToStderr("Copy of file %s to node %s:%d failed", filename, node, port)
648
649 return 0
650
651
652 def RunClusterCommand(opts, args):
653 """Run a command on some nodes.
654
655 @param opts: the command line options selected by the user
656 @type args: list
657 @param args: should contain the command to be run and its arguments
658 @rtype: int
659 @return: the desired exit code
660
661 """
662 cl = GetClient()
663 qcl = GetClient()
664
665 command = " ".join(args)
666
667 nodes = GetOnlineNodes(nodes=opts.nodes, cl=qcl, nodegroup=opts.nodegroup)
668 ports = GetNodesSshPorts(nodes, qcl)
669
670 cluster_name, master_node = cl.QueryConfigValues(["cluster_name",
671 "master_node"])
672
673 srun = ssh.SshRunner(cluster_name=cluster_name)
674
675 # Make sure master node is at list end
676 if master_node in nodes:
677 nodes.remove(master_node)
678 nodes.append(master_node)
679
680 for (name, port) in zip(nodes, ports):
681 result = srun.Run(name, constants.SSH_LOGIN_USER, command, port=port)
682
683 if opts.failure_only and result.exit_code == constants.EXIT_SUCCESS:
684 # Do not output anything for successful commands
685 continue
686
687 ToStdout("------------------------------------------------")
688 if opts.show_machine_names:
689 for line in result.output.splitlines():
690 ToStdout("%s: %s", name, line)
691 else:
692 ToStdout("node: %s", name)
693 ToStdout("%s", result.output)
694 ToStdout("return code = %s", result.exit_code)
695
696 return 0
697
698
699 def VerifyCluster(opts, args):
700 """Verify integrity of cluster, performing various test on nodes.
701
702 @param opts: the command line options selected by the user
703 @type args: list
704 @param args: should be an empty list
705 @rtype: int
706 @return: the desired exit code
707
708 """
709 skip_checks = []
710
711 if opts.skip_nplusone_mem:
712 skip_checks.append(constants.VERIFY_NPLUSONE_MEM)
713
714 cl = GetClient()
715
716 op = opcodes.OpClusterVerify(verbose=opts.verbose,
717 error_codes=opts.error_codes,
718 debug_simulate_errors=opts.simulate_errors,
719 skip_checks=skip_checks,
720 ignore_errors=opts.ignore_errors,
721 group_name=opts.nodegroup)
722 result = SubmitOpCode(op, cl=cl, opts=opts)
723
724 # Keep track of submitted jobs
725 jex = JobExecutor(cl=cl, opts=opts)
726
727 for (status, job_id) in result[constants.JOB_IDS_KEY]:
728 jex.AddJobId(None, status, job_id)
729
730 results = jex.GetResults()
731
732 (bad_jobs, bad_results) = \
733 map(len,
734 # Convert iterators to lists
735 map(list,
736 # Count errors
737 map(compat.partial(itertools.ifilterfalse, bool),
738 # Convert result to booleans in a tuple
739 zip(*((job_success, len(op_results) == 1 and op_results[0])
740 for (job_success, op_results) in results)))))
741
742 if bad_jobs == 0 and bad_results == 0:
743 rcode = constants.EXIT_SUCCESS
744 else:
745 rcode = constants.EXIT_FAILURE
746 if bad_jobs > 0:
747 ToStdout("%s job(s) failed while verifying the cluster.", bad_jobs)
748
749 return rcode
750
751
752 def VerifyDisks(opts, args):
753 """Verify integrity of cluster disks.
754
755 @param opts: the command line options selected by the user
756 @type args: list
757 @param args: should be an empty list
758 @rtype: int
759 @return: the desired exit code
760
761 """
762 cl = GetClient()
763
764 op = opcodes.OpClusterVerifyDisks()
765
766 result = SubmitOpCode(op, cl=cl, opts=opts)
767
768 # Keep track of submitted jobs
769 jex = JobExecutor(cl=cl, opts=opts)
770
771 for (status, job_id) in result[constants.JOB_IDS_KEY]:
772 jex.AddJobId(None, status, job_id)
773
774 retcode = constants.EXIT_SUCCESS
775
776 for (status, result) in jex.GetResults():
777 if not status:
778 ToStdout("Job failed: %s", result)
779 continue
780
781 ((bad_nodes, instances, missing), ) = result
782
783 for node, text in bad_nodes.items():
784 ToStdout("Error gathering data on node %s: %s",
785 node, utils.SafeEncode(text[-400:]))
786 retcode = constants.EXIT_FAILURE
787 ToStdout("You need to fix these nodes first before fixing instances")
788
789 for iname in instances:
790 if iname in missing:
791 continue
792 op = opcodes.OpInstanceActivateDisks(instance_name=iname)
793 try:
794 ToStdout("Activating disks for instance '%s'", iname)
795 SubmitOpCode(op, opts=opts, cl=cl)
796 except errors.GenericError, err:
797 nret, msg = FormatError(err)
798 retcode |= nret
799 ToStderr("Error activating disks for instance %s: %s", iname, msg)
800
801 if missing:
802 for iname, ival in missing.iteritems():
803 all_missing = compat.all(x[0] in bad_nodes for x in ival)
804 if all_missing:
805 ToStdout("Instance %s cannot be verified as it lives on"
806 " broken nodes", iname)
807 else:
808 ToStdout("Instance %s has missing logical volumes:", iname)
809 ival.sort()
810 for node, vol in ival:
811 if node in bad_nodes:
812 ToStdout("\tbroken node %s /dev/%s", node, vol)
813 else:
814 ToStdout("\t%s /dev/%s", node, vol)
815
816 ToStdout("You need to replace or recreate disks for all the above"
817 " instances if this message persists after fixing broken nodes.")
818 retcode = constants.EXIT_FAILURE
819 elif not instances:
820 ToStdout("No disks need to be activated.")
821
822 return retcode
823
824
825 def RepairDiskSizes(opts, args):
826 """Verify sizes of cluster disks.
827
828 @param opts: the command line options selected by the user
829 @type args: list
830 @param args: optional list of instances to restrict check to
831 @rtype: int
832 @return: the desired exit code
833
834 """
835 op = opcodes.OpClusterRepairDiskSizes(instances=args)
836 SubmitOpCode(op, opts=opts)
837
838
839 @UsesRPC
840 def MasterFailover(opts, args):
841 """Failover the master node.
842
843 This command, when run on a non-master node, will cause the current
844 master to cease being master, and the non-master to become new
845 master.
846
847 @param opts: the command line options selected by the user
848 @type args: list
849 @param args: should be an empty list
850 @rtype: int
851 @return: the desired exit code
852
853 """
854 if opts.no_voting and not opts.yes_do_it:
855 usertext = ("This will perform the failover even if most other nodes"
856 " are down, or if this node is outdated. This is dangerous"
857 " as it can lead to a non-consistent cluster. Check the"
858 " gnt-cluster(8) man page before proceeding. Continue?")
859 if not AskUser(usertext):
860 return 1
861
862 rvlaue, msgs = bootstrap.MasterFailover(no_voting=opts.no_voting)
863 for msg in msgs:
864 ToStderr(msg)
865 return rvlaue
866
867
868 def MasterPing(opts, args):
869 """Checks if the master is alive.
870
871 @param opts: the command line options selected by the user
872 @type args: list
873 @param args: should be an empty list
874 @rtype: int
875 @return: the desired exit code
876
877 """
878 try:
879 cl = GetClient()
880 cl.QueryClusterInfo()
881 return 0
882 except Exception: # pylint: disable=W0703
883 return 1
884
885
886 def SearchTags(opts, args):
887 """Searches the tags on all the cluster.
888
889 @param opts: the command line options selected by the user
890 @type args: list
891 @param args: should contain only one element, the tag pattern
892 @rtype: int
893 @return: the desired exit code
894
895 """
896 op = opcodes.OpTagsSearch(pattern=args[0])
897 result = SubmitOpCode(op, opts=opts)
898 if not result:
899 return 1
900 result = list(result)
901 result.sort()
902 for path, tag in result:
903 ToStdout("%s %s", path, tag)
904
905
906 def _ReadAndVerifyCert(cert_filename, verify_private_key=False):
907 """Reads and verifies an X509 certificate.
908
909 @type cert_filename: string
910 @param cert_filename: the path of the file containing the certificate to
911 verify encoded in PEM format
912 @type verify_private_key: bool
913 @param verify_private_key: whether to verify the private key in addition to
914 the public certificate
915 @rtype: string
916 @return: a string containing the PEM-encoded certificate.
917
918 """
919 try:
920 pem = utils.ReadFile(cert_filename)
921 except IOError, err:
922 raise errors.X509CertError(cert_filename,
923 "Unable to read certificate: %s" % str(err))
924
925 try:
926 OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem)
927 except Exception, err:
928 raise errors.X509CertError(cert_filename,
929 "Unable to load certificate: %s" % str(err))
930
931 if verify_private_key:
932 try:
933 OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, pem)
934 except Exception, err:
935 raise errors.X509CertError(cert_filename,
936 "Unable to load private key: %s" % str(err))
937
938 return pem
939
940
941 def _RenewCrypto(new_cluster_cert, new_rapi_cert, # pylint: disable=R0911
942 rapi_cert_filename, new_spice_cert, spice_cert_filename,
943 spice_cacert_filename, new_confd_hmac_key, new_cds,
944 cds_filename, force, new_node_cert, verbose, debug):
945 """Renews cluster certificates, keys and secrets.
946
947 @type new_cluster_cert: bool
948 @param new_cluster_cert: Whether to generate a new cluster certificate
949 @type new_rapi_cert: bool
950 @param new_rapi_cert: Whether to generate a new RAPI certificate
951 @type rapi_cert_filename: string
952 @param rapi_cert_filename: Path to file containing new RAPI certificate
953 @type new_spice_cert: bool
954 @param new_spice_cert: Whether to generate a new SPICE certificate
955 @type spice_cert_filename: string
956 @param spice_cert_filename: Path to file containing new SPICE certificate
957 @type spice_cacert_filename: string
958 @param spice_cacert_filename: Path to file containing the certificate of the
959 CA that signed the SPICE certificate
960 @type new_confd_hmac_key: bool
961 @param new_confd_hmac_key: Whether to generate a new HMAC key
962 @type new_cds: bool
963 @param new_cds: Whether to generate a new cluster domain secret
964 @type cds_filename: string
965 @param cds_filename: Path to file containing new cluster domain secret
966 @type force: bool
967 @param force: Whether to ask user for confirmation
968 @type new_node_cert: string
969 @param new_node_cert: Whether to generate new node certificates
970 @type verbose: boolean
971 @param verbose: show verbose output
972 @type debug: boolean
973 @param debug: show debug output
974
975 """
976 if new_rapi_cert and rapi_cert_filename:
977 ToStderr("Only one of the --new-rapi-certificate and --rapi-certificate"
978 " options can be specified at the same time.")
979 return 1
980
981 if new_cds and cds_filename:
982 ToStderr("Only one of the --new-cluster-domain-secret and"
983 " --cluster-domain-secret options can be specified at"
984 " the same time.")
985 return 1
986
987 if new_spice_cert and (spice_cert_filename or spice_cacert_filename):
988 ToStderr("When using --new-spice-certificate, the --spice-certificate"
989 " and --spice-ca-certificate must not be used.")
990 return 1
991
992 if bool(spice_cacert_filename) ^ bool(spice_cert_filename):
993 ToStderr("Both --spice-certificate and --spice-ca-certificate must be"
994 " specified.")
995 return 1
996
997 rapi_cert_pem, spice_cert_pem, spice_cacert_pem = (None, None, None)
998 try:
999 if rapi_cert_filename:
1000 rapi_cert_pem = _ReadAndVerifyCert(rapi_cert_filename, True)
1001 if spice_cert_filename:
1002 spice_cert_pem = _ReadAndVerifyCert(spice_cert_filename, True)
1003 spice_cacert_pem = _ReadAndVerifyCert(spice_cacert_filename)
1004 except errors.X509CertError, err:
1005 ToStderr("Unable to load X509 certificate from %s: %s", err[0], err[1])
1006 return 1
1007
1008 if cds_filename:
1009 try:
1010 cds = utils.ReadFile(cds_filename)
1011 except Exception, err: # pylint: disable=W0703
1012 ToStderr("Can't load new cluster domain secret from %s: %s" %
1013 (cds_filename, str(err)))
1014 return 1
1015 else:
1016 cds = None
1017
1018 if not force:
1019 usertext = ("This requires all daemons on all nodes to be restarted and"
1020 " may take some time. Continue?")
1021 if not AskUser(usertext):
1022 return 1
1023
1024 def _RenewCryptoInner(ctx):
1025 ctx.feedback_fn("Updating certificates and keys")
1026
1027 bootstrap.GenerateClusterCrypto(False,
1028 new_rapi_cert,
1029 new_spice_cert,
1030 new_confd_hmac_key,
1031 new_cds,
1032 False,
1033 None,
1034 rapi_cert_pem=rapi_cert_pem,
1035 spice_cert_pem=spice_cert_pem,
1036 spice_cacert_pem=spice_cacert_pem,
1037 cds=cds)
1038
1039 files_to_copy = []
1040
1041 if new_rapi_cert or rapi_cert_pem:
1042 files_to_copy.append(pathutils.RAPI_CERT_FILE)
1043
1044 if new_spice_cert or spice_cert_pem:
1045 files_to_copy.append(pathutils.SPICE_CERT_FILE)
1046 files_to_copy.append(pathutils.SPICE_CACERT_FILE)
1047
1048 if new_confd_hmac_key:
1049 files_to_copy.append(pathutils.CONFD_HMAC_KEY)
1050
1051 if new_cds or cds:
1052 files_to_copy.append(pathutils.CLUSTER_DOMAIN_SECRET_FILE)
1053
1054 if files_to_copy:
1055 for node_name in ctx.nonmaster_nodes:
1056 port = ctx.ssh_ports[node_name]
1057 ctx.feedback_fn("Copying %s to %s:%d" %
1058 (", ".join(files_to_copy), node_name, port))
1059 for file_name in files_to_copy:
1060 ctx.ssh.CopyFileToNode(node_name, port, file_name)
1061
1062 def _RenewClientCerts(ctx):
1063 ctx.feedback_fn("Updating client SSL certificates.")
1064
1065 cluster_name = ssconf.SimpleStore().GetClusterName()
1066
1067 for node_name in ctx.nonmaster_nodes + [ctx.master_node]:
1068 ssh_port = ctx.ssh_ports[node_name]
1069 data = {
1070 constants.NDS_CLUSTER_NAME: cluster_name,
1071 constants.NDS_NODE_DAEMON_CERTIFICATE:
1072 utils.ReadFile(pathutils.NODED_CERT_FILE),
1073 constants.NDS_NODE_NAME: node_name,
1074 constants.NDS_ACTION: constants.CRYPTO_ACTION_CREATE,
1075 }
1076
1077 bootstrap.RunNodeSetupCmd(
1078 cluster_name,
1079 node_name,
1080 pathutils.SSL_UPDATE,
1081 ctx.debug,
1082 ctx.verbose,
1083 True, # use cluster key
1084 False, # ask key
1085 True, # strict host check
1086 ssh_port,
1087 data)
1088
1089 # Create a temporary ssconf file using the master's client cert digest
1090 # and the 'bootstrap' keyword to enable distribution of all nodes' digests.
1091 master_digest = utils.GetCertificateDigest()
1092 ssconf_master_candidate_certs_filename = os.path.join(
1093 pathutils.DATA_DIR, "%s%s" %
1094 (constants.SSCONF_FILEPREFIX, constants.SS_MASTER_CANDIDATES_CERTS))
1095 utils.WriteFile(
1096 ssconf_master_candidate_certs_filename,
1097 data="%s=%s" % (constants.CRYPTO_BOOTSTRAP, master_digest))
1098 for node_name in ctx.nonmaster_nodes:
1099 port = ctx.ssh_ports[node_name]
1100 ctx.feedback_fn("Copying %s to %s:%d" %
1101 (ssconf_master_candidate_certs_filename, node_name, port))
1102 ctx.ssh.CopyFileToNode(node_name, port,
1103 ssconf_master_candidate_certs_filename)
1104
1105 # Write the boostrap entry to the config using wconfd.
1106 config_live_lock = utils.livelock.LiveLock("renew_crypto")
1107 cfg = config.GetConfig(None, config_live_lock)
1108 cfg.AddNodeToCandidateCerts(constants.CRYPTO_BOOTSTRAP, master_digest)
1109 cfg.Update(cfg.GetClusterInfo(), ctx.feedback_fn)
1110
1111 def _RenewServerAndClientCerts(ctx):
1112 ctx.feedback_fn("Updating the cluster SSL certificate.")
1113
1114 master_name = ssconf.SimpleStore().GetMasterNode()
1115 bootstrap.GenerateClusterCrypto(True, # cluster cert
1116 False, # rapi cert
1117 False, # spice cert
1118 False, # confd hmac key
1119 False, # cds
1120 True, # client cert
1121 master_name)
1122
1123 for node_name in ctx.nonmaster_nodes:
1124 port = ctx.ssh_ports[node_name]
1125 server_cert = pathutils.NODED_CERT_FILE
1126 ctx.feedback_fn("Copying %s to %s:%d" %
1127 (server_cert, node_name, port))
1128 ctx.ssh.CopyFileToNode(node_name, port, server_cert)
1129
1130 _RenewClientCerts(ctx)
1131
1132 if new_rapi_cert or new_spice_cert or new_confd_hmac_key or new_cds:
1133 RunWhileClusterStopped(ToStdout, _RenewCryptoInner)
1134
1135 # If only node certficates are recreated, call _RenewClientCerts only.
1136 if new_node_cert and not new_cluster_cert:
1137 RunWhileDaemonsStopped(ToStdout, [constants.NODED, constants.WCONFD],
1138 _RenewClientCerts, verbose=verbose, debug=debug)
1139
1140 # If the cluster certificate are renewed, the client certificates need
1141 # to be renewed too.
1142 if new_cluster_cert:
1143 RunWhileDaemonsStopped(ToStdout, [constants.NODED, constants.WCONFD],
1144 _RenewServerAndClientCerts, verbose=verbose,
1145 debug=debug)
1146
1147 ToStdout("All requested certificates and keys have been replaced."
1148 " Running \"gnt-cluster verify\" now is recommended.")
1149
1150 if new_node_cert or new_cluster_cert:
1151 cl = GetClient()
1152 renew_op = opcodes.OpClusterRenewCrypto()
1153 SubmitOpCode(renew_op, cl=cl)
1154
1155 return 0
1156
1157
1158 def RenewCrypto(opts, args):
1159 """Renews cluster certificates, keys and secrets.
1160
1161 """
1162 return _RenewCrypto(opts.new_cluster_cert,
1163 opts.new_rapi_cert,
1164 opts.rapi_cert,
1165 opts.new_spice_cert,
1166 opts.spice_cert,
1167 opts.spice_cacert,
1168 opts.new_confd_hmac_key,
1169 opts.new_cluster_domain_secret,
1170 opts.cluster_domain_secret,
1171 opts.force,
1172 opts.new_node_cert,
1173 opts.verbose,
1174 opts.debug > 0)
1175
1176
1177 def _GetEnabledDiskTemplates(opts):
1178 """Determine the list of enabled disk templates.
1179
1180 """
1181 if opts.enabled_disk_templates:
1182 return opts.enabled_disk_templates.split(",")
1183 else:
1184 return None
1185
1186
1187 def _GetVgName(opts, enabled_disk_templates):
1188 """Determine the volume group name.
1189
1190 @type enabled_disk_templates: list of strings
1191 @param enabled_disk_templates: cluster-wide enabled disk-templates
1192
1193 """
1194 # consistency between vg name and enabled disk templates
1195 vg_name = None
1196 if opts.vg_name is not None:
1197 vg_name = opts.vg_name
1198 if enabled_disk_templates:
1199 if vg_name and not utils.IsLvmEnabled(enabled_disk_templates):
1200 ToStdout("You specified a volume group with --vg-name, but you did not"
1201 " enable any of the following lvm-based disk templates: %s" %
1202 utils.CommaJoin(constants.DTS_LVM))
1203 return vg_name
1204
1205
1206 def _GetDrbdHelper(opts, enabled_disk_templates):
1207 """Determine the DRBD usermode helper.
1208
1209 """
1210 drbd_helper = opts.drbd_helper
1211 if enabled_disk_templates:
1212 drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
1213 if not drbd_enabled and opts.drbd_helper:
1214 ToStdout("You specified a DRBD usermode helper with "
1215 " --drbd-usermode-helper while DRBD is not enabled.")
1216 return drbd_helper
1217
1218
1219 def _GetCompressionTools(opts):
1220 """Determine the list of custom compression tools.
1221
1222 """
1223 if opts.compression_tools:
1224 return opts.compression_tools.split(",")
1225 elif opts.compression_tools is None:
1226 return None # To note the parameter was not provided
1227 else:
1228 return constants.IEC_DEFAULT_TOOLS # Resetting to default
1229
1230
1231 def SetClusterParams(opts, args):
1232 """Modify the cluster.
1233
1234 @param opts: the command line options selected by the user
1235 @type args: list
1236 @param args: should be an empty list
1237 @rtype: int
1238 @return: the desired exit code
1239
1240 """
1241 if not (opts.vg_name is not None or
1242 opts.drbd_helper is not None or
1243 opts.enabled_hypervisors or opts.hvparams or
1244 opts.beparams or opts.nicparams or
1245 opts.ndparams or opts.diskparams or
1246 opts.candidate_pool_size is not None or
1247 opts.max_running_jobs is not None or
1248 opts.max_tracked_jobs is not None or
1249 opts.uid_pool is not None or
1250 opts.maintain_node_health is not None or
1251 opts.add_uids is not None or
1252 opts.remove_uids is not None or
1253 opts.default_iallocator is not None or
1254 opts.default_iallocator_params or
1255 opts.reserved_lvs is not None or
1256 opts.mac_prefix is not None or
1257 opts.master_netdev is not None or
1258 opts.master_netmask is not None or
1259 opts.use_external_mip_script is not None or
1260 opts.prealloc_wipe_disks is not None or
1261 opts.hv_state or
1262 opts.enabled_disk_templates or
1263 opts.disk_state or
1264 opts.ipolicy_bounds_specs is not None or
1265 opts.ipolicy_std_specs is not None or
1266 opts.ipolicy_disk_templates is not None or
1267 opts.ipolicy_vcpu_ratio is not None or
1268 opts.ipolicy_spindle_ratio is not None or
1269 opts.modify_etc_hosts is not None or
1270 opts.file_storage_dir is not None or
1271 opts.install_image is not None or
1272 opts.instance_communication_network is not None or
1273 opts.zeroing_image is not None or
1274 opts.shared_file_storage_dir is not None or
1275 opts.compression_tools is not None or
1276 opts.shared_file_storage_dir is not None or
1277 opts.enabled_user_shutdown is not None):
1278 ToStderr("Please give at least one of the parameters.")
1279 return 1
1280
1281 enabled_disk_templates = _GetEnabledDiskTemplates(opts)
1282 vg_name = _GetVgName(opts, enabled_disk_templates)
1283
1284 try:
1285 drbd_helper = _GetDrbdHelper(opts, enabled_disk_templates)
1286 except errors.OpPrereqError, e:
1287 ToStderr(str(e))
1288 return 1
1289
1290 hvlist = opts.enabled_hypervisors
1291 if hvlist is not None:
1292 hvlist = hvlist.split(",")
1293
1294 # a list of (name, dict) we can pass directly to dict() (or [])
1295 hvparams = dict(opts.hvparams)
1296 for hv_params in hvparams.values():
1297 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1298
1299 diskparams = dict(opts.diskparams)
1300
1301 for dt_params in diskparams.values():
1302 utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
1303
1304 beparams = opts.beparams
1305 utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
1306
1307 nicparams = opts.nicparams
1308 utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
1309
1310 ndparams = opts.ndparams
1311 if ndparams is not None:
1312 utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
1313
1314 ipolicy = CreateIPolicyFromOpts(
1315 minmax_ispecs=opts.ipolicy_bounds_specs,
1316 std_ispecs=opts.ipolicy_std_specs,
1317 ipolicy_disk_templates=opts.ipolicy_disk_templates,
1318 ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
1319 ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
1320 )
1321
1322 mnh = opts.maintain_node_health
1323
1324 uid_pool = opts.uid_pool
1325 if uid_pool is not None:
1326 uid_pool = uidpool.ParseUidPool(uid_pool)
1327
1328 add_uids = opts.add_uids
1329 if add_uids is not None:
1330 add_uids = uidpool.ParseUidPool(add_uids)
1331
1332 remove_uids = opts.remove_uids
1333 if remove_uids is not None:
1334 remove_uids = uidpool.ParseUidPool(remove_uids)
1335
1336 if opts.reserved_lvs is not None:
1337 if opts.reserved_lvs == "":
1338 opts.reserved_lvs = []
1339 else:
1340 opts.reserved_lvs = utils.UnescapeAndSplit(opts.reserved_lvs, sep=",")
1341
1342 if opts.master_netmask is not None:
1343 try:
1344 opts.master_netmask = int(opts.master_netmask)
1345 except ValueError:
1346 ToStderr("The --master-netmask option expects an int parameter.")
1347 return 1
1348
1349 ext_ip_script = opts.use_external_mip_script
1350
1351 if opts.disk_state:
1352 disk_state = utils.FlatToDict(opts.disk_state)
1353 else:
1354 disk_state = {}
1355
1356 hv_state = dict(opts.hv_state)
1357
1358 compression_tools = _GetCompressionTools(opts)
1359
1360 op = opcodes.OpClusterSetParams(
1361 vg_name=vg_name,
1362 drbd_helper=drbd_helper,
1363 enabled_hypervisors=hvlist,
1364 hvparams=hvparams,
1365 os_hvp=None,
1366 beparams=beparams,
1367 nicparams=nicparams,
1368 ndparams=ndparams,
1369 diskparams=diskparams,
1370 ipolicy=ipolicy,
1371 candidate_pool_size=opts.candidate_pool_size,
1372 max_running_jobs=opts.max_running_jobs,
1373 max_tracked_jobs=opts.max_tracked_jobs,
1374 maintain_node_health=mnh,
1375 modify_etc_hosts=opts.modify_etc_hosts,
1376 uid_pool=uid_pool,
1377 add_uids=add_uids,
1378 remove_uids=remove_uids,
1379 default_iallocator=opts.default_iallocator,
1380 default_iallocator_params=opts.default_iallocator_params,
1381 prealloc_wipe_disks=opts.prealloc_wipe_disks,
1382 mac_prefix=opts.mac_prefix,
1383 master_netdev=opts.master_netdev,
1384 master_netmask=opts.master_netmask,
1385 reserved_lvs=opts.reserved_lvs,
1386 use_external_mip_script=ext_ip_script,
1387 hv_state=hv_state,
1388 disk_state=disk_state,
1389 enabled_disk_templates=enabled_disk_templates,
1390 force=opts.force,
1391 file_storage_dir=opts.file_storage_dir,
1392 install_image=opts.install_image,
1393 instance_communication_network=opts.instance_communication_network,
1394 zeroing_image=opts.zeroing_image,
1395 shared_file_storage_dir=opts.shared_file_storage_dir,
1396 compression_tools=compression_tools,
1397 enabled_user_shutdown=opts.enabled_user_shutdown,
1398 )
1399 return base.GetResult(None, opts, SubmitOrSend(op, opts))
1400
1401
1402 def QueueOps(opts, args):
1403 """Queue operations.
1404
1405 @param opts: the command line options selected by the user
1406 @type args: list
1407 @param args: should contain only one element, the subcommand
1408 @rtype: int
1409 @return: the desired exit code
1410
1411 """
1412 command = args[0]
1413 client = GetClient()
1414 if command in ("drain", "undrain"):
1415 drain_flag = command == "drain"
1416 client.SetQueueDrainFlag(drain_flag)
1417 elif command == "info":
1418 result = client.QueryConfigValues(["drain_flag"])
1419 if result[0]:
1420 val = "set"
1421 else:
1422 val = "unset"
1423 ToStdout("The drain flag is %s" % val)
1424 else:
1425 raise errors.OpPrereqError("Command '%s' is not valid." % command,
1426 errors.ECODE_INVAL)
1427
1428 return 0
1429
1430
1431 def _ShowWatcherPause(until):
1432 if until is None or until < time.time():
1433 ToStdout("The watcher is not paused.")
1434 else:
1435 ToStdout("The watcher is paused until %s.", time.ctime(until))
1436
1437
1438 def WatcherOps(opts, args):
1439 """Watcher operations.
1440
1441 @param opts: the command line options selected by the user
1442 @type args: list
1443 @param args: should contain only one element, the subcommand
1444 @rtype: int
1445 @return: the desired exit code
1446
1447 """
1448 command = args[0]
1449 client = GetClient()
1450
1451 if command == "continue":
1452 client.SetWatcherPause(None)
1453 ToStdout("The watcher is no longer paused.")
1454
1455 elif command == "pause":
1456 if len(args) < 2:
1457 raise errors.OpPrereqError("Missing pause duration", errors.ECODE_INVAL)
1458
1459 result = client.SetWatcherPause(time.time() + ParseTimespec(args[1]))
1460 _ShowWatcherPause(result)
1461
1462 elif command == "info":
1463 result = client.QueryConfigValues(["watcher_pause"])
1464 _ShowWatcherPause(result[0])
1465
1466 else:
1467 raise errors.OpPrereqError("Command '%s' is not valid." % command,
1468 errors.ECODE_INVAL)
1469
1470 return 0
1471
1472
1473 def _OobPower(opts, node_list, power):
1474 """Puts the node in the list to desired power state.
1475
1476 @param opts: The command line options selected by the user
1477 @param node_list: The list of nodes to operate on
1478 @param power: True if they should be powered on, False otherwise
1479 @return: The success of the operation (none failed)
1480
1481 """
1482 if power:
1483 command = constants.OOB_POWER_ON
1484 else:
1485 command = constants.OOB_POWER_OFF
1486
1487 op = opcodes.OpOobCommand(node_names=node_list,
1488 command=command,
1489 ignore_status=True,
1490 timeout=opts.oob_timeout,
1491 power_delay=opts.power_delay)
1492 result = SubmitOpCode(op, opts=opts)
1493 errs = 0
1494 for node_result in result:
1495 (node_tuple, data_tuple) = node_result
1496 (_, node_name) = node_tuple
1497 (data_status, _) = data_tuple
1498 if data_status != constants.RS_NORMAL:
1499 assert data_status != constants.RS_UNAVAIL
1500 errs += 1
1501 ToStderr("There was a problem changing power for %s, please investigate",
1502 node_name)
1503
1504 if errs > 0:
1505 return False
1506
1507 return True
1508
1509
1510 def _InstanceStart(opts, inst_list, start, no_remember=False):
1511 """Puts the instances in the list to desired state.
1512
1513 @param opts: The command line options selected by the user
1514 @param inst_list: The list of instances to operate on
1515 @param start: True if they should be started, False for shutdown
1516 @param no_remember: If the instance state should be remembered
1517 @return: The success of the operation (none failed)
1518
1519 """
1520 if start:
1521 opcls = opcodes.OpInstanceStartup
1522 text_submit, text_success, text_failed = ("startup", "started", "starting")
1523 else:
1524 opcls = compat.partial(opcodes.OpInstanceShutdown,
1525 timeout=opts.shutdown_timeout,
1526 no_remember=no_remember)
1527 text_submit, text_success, text_failed = ("shutdown", "stopped", "stopping")
1528
1529 jex = JobExecutor(opts=opts)
1530
1531 for inst in inst_list:
1532 ToStdout("Submit %s of instance %s", text_submit, inst)
1533 op = opcls(instance_name=inst)
1534 jex.QueueJob(inst, op)
1535
1536 results = jex.GetResults()
1537 bad_cnt = len([1 for (success, _) in results if not success])
1538
1539 if bad_cnt == 0:
1540 ToStdout("All instances have been %s successfully", text_success)
1541 else:
1542 ToStderr("There were errors while %s instances:\n"
1543 "%d error(s) out of %d instance(s)", text_failed, bad_cnt,
1544 len(results))
1545 return False
1546
1547 return True
1548
1549
1550 class _RunWhenNodesReachableHelper(object):
1551 """Helper class to make shared internal state sharing easier.
1552
1553 @ivar success: Indicates if all action_cb calls were successful
1554
1555 """
1556 def __init__(self, node_list, action_cb, node2ip, port, feedback_fn,
1557 _ping_fn=netutils.TcpPing, _sleep_fn=time.sleep):
1558 """Init the object.
1559
1560 @param node_list: The list of nodes to be reachable
1561 @param action_cb: Callback called when a new host is reachable
1562 @type node2ip: dict
1563 @param node2ip: Node to ip mapping
1564 @param port: The port to use for the TCP ping
1565 @param feedback_fn: The function used for feedback
1566 @param _ping_fn: Function to check reachabilty (for unittest use only)
1567 @param _sleep_fn: Function to sleep (for unittest use only)
1568
1569 """
1570 self.down = set(node_list)
1571 self.up = set()
1572 self.node2ip = node2ip
1573 self.success = True
1574 self.action_cb = action_cb
1575 self.port = port
1576 self.feedback_fn = feedback_fn
1577 self._ping_fn = _ping_fn
1578 self._sleep_fn = _sleep_fn
1579
1580 def __call__(self):
1581 """When called we run action_cb.
1582
1583 @raises utils.RetryAgain: When there are still down nodes
1584
1585 """
1586 if not self.action_cb(self.up):
1587 self.success = False
1588
1589 if self.down:
1590 raise utils.RetryAgain()
1591 else:
1592 return self.success
1593
1594 def Wait(self, secs):
1595 """Checks if a host is up or waits remaining seconds.
1596
1597 @param secs: The secs remaining
1598
1599 """
1600 start = time.time()
1601 for node in self.down:
1602 if self._ping_fn(self.node2ip[node], self.port, timeout=_EPO_PING_TIMEOUT,
1603 live_port_needed=True):
1604 self.feedback_fn("Node %s became available" % node)
1605 self.up.add(node)
1606 self.down -= self.up
1607 # If we have a node available there is the possibility to run the
1608 # action callback successfully, therefore we don't wait and return
1609 return
1610
1611 self._sleep_fn(max(0.0, start + secs - time.time()))
1612
1613
1614 def _RunWhenNodesReachable(node_list, action_cb, interval):
1615 """Run action_cb when nodes become reachable.
1616
1617 @param node_list: The list of nodes to be reachable
1618 @param action_cb: Callback called when a new host is reachable
1619 @param interval: The earliest time to retry
1620
1621 """
1622 client = GetClient()
1623 cluster_info = client.QueryClusterInfo()
1624 if cluster_info["primary_ip_version"] == constants.IP4_VERSION:
1625 family = netutils.IPAddress.family
1626 else:
1627 family = netutils.IP6Address.family
1628
1629 node2ip = dict((node, netutils.GetHostname(node, family=family).ip)
1630 for node in node_list)
1631
1632 port = netutils.GetDaemonPort(constants.NODED)
1633 helper = _RunWhenNodesReachableHelper(node_list, action_cb, node2ip, port,
1634 ToStdout)
1635
1636 try:
1637 return utils.Retry(helper, interval, _EPO_REACHABLE_TIMEOUT,
1638 wait_fn=helper.Wait)
1639 except utils.RetryTimeout:
1640 ToStderr("Time exceeded while waiting for nodes to become reachable"
1641 " again:\n - %s", " - ".join(helper.down))
1642 return False
1643
1644
1645 def _MaybeInstanceStartup(opts, inst_map, nodes_online,
1646 _instance_start_fn=_InstanceStart):
1647 """Start the instances conditional based on node_states.
1648
1649 @param opts: The command line options selected by the user
1650 @param inst_map: A dict of inst -> nodes mapping
1651 @param nodes_online: A list of nodes online
1652 @param _instance_start_fn: Callback to start instances (unittest use only)
1653 @return: Success of the operation on all instances
1654
1655 """
1656 start_inst_list = []
1657 for (inst, nodes) in inst_map.items():
1658 if not (nodes - nodes_online):
1659 # All nodes the instance lives on are back online
1660 start_inst_list.append(inst)
1661
1662 for inst in start_inst_list:
1663 del inst_map[inst]
1664
1665 if start_inst_list:
1666 return _instance_start_fn(opts, start_inst_list, True)
1667
1668 return True
1669
1670
1671 def _EpoOn(opts, full_node_list, node_list, inst_map):
1672 """Does the actual power on.
1673
1674 @param opts: The command line options selected by the user
1675 @param full_node_list: All nodes to operate on (includes nodes not supporting
1676 OOB)
1677 @param node_list: The list of nodes to operate on (all need to support OOB)
1678 @param inst_map: A dict of inst -> nodes mapping
1679 @return: The desired exit status
1680
1681 """
1682 if node_list and not _OobPower(opts, node_list, False):
1683 ToStderr("Not all nodes seem to get back up, investigate and start"
1684 " manually if needed")
1685
1686 # Wait for the nodes to be back up
1687 action_cb = compat.partial(_MaybeInstanceStartup, opts, dict(inst_map))
1688
1689 ToStdout("Waiting until all nodes are available again")
1690 if not _RunWhenNodesReachable(full_node_list, action_cb, _EPO_PING_INTERVAL):
1691 ToStderr("Please investigate and start stopped instances manually")
1692 return constants.EXIT_FAILURE
1693
1694 return constants.EXIT_SUCCESS
1695
1696
1697 def _EpoOff(opts, node_list, inst_map):
1698 """Does the actual power off.
1699
1700 @param opts: The command line options selected by the user
1701 @param node_list: The list of nodes to operate on (all need to support OOB)
1702 @param inst_map: A dict of inst -> nodes mapping
1703 @return: The desired exit status
1704
1705 """
1706 if not _InstanceStart(opts, inst_map.keys(), False, no_remember=True):
1707 ToStderr("Please investigate and stop instances manually before continuing")
1708 return constants.EXIT_FAILURE
1709
1710 if not node_list:
1711 return constants.EXIT_SUCCESS
1712
1713 if _OobPower(opts, node_list, False):
1714 return constants.EXIT_SUCCESS
1715 else:
1716 return constants.EXIT_FAILURE
1717
1718
1719 def Epo(opts, args, qcl=None, _on_fn=_EpoOn, _off_fn=_EpoOff,
1720 _confirm_fn=ConfirmOperation,
1721 _stdout_fn=ToStdout, _stderr_fn=ToStderr):
1722 """EPO operations.
1723
1724 @param opts: the command line options selected by the user
1725 @type args: list
1726 @param args: should contain only one element, the subcommand
1727 @rtype: int
1728 @return: the desired exit code
1729
1730 """
1731 if opts.groups and opts.show_all:
1732 _stderr_fn("Only one of --groups or --all are allowed")
1733 return constants.EXIT_FAILURE
1734 elif args and opts.show_all:
1735 _stderr_fn("Arguments in combination with --all are not allowed")
1736 return constants.EXIT_FAILURE
1737
1738 if qcl is None:
1739 # Query client
1740 qcl = GetClient()
1741
1742 if opts.groups:
1743 node_query_list = \
1744 itertools.chain(*qcl.QueryGroups(args, ["node_list"], False))
1745 else:
1746 node_query_list = args
1747
1748 result = qcl.QueryNodes(node_query_list, ["name", "master", "pinst_list",
1749 "sinst_list", "powered", "offline"],
1750 False)
1751
1752 all_nodes = map(compat.fst, result)
1753 node_list = []
1754 inst_map = {}
1755 for (node, master, pinsts, sinsts, powered, offline) in result:
1756 if not offline:
1757 for inst in (pinsts + sinsts):
1758 if inst in inst_map:
1759 if not master:
1760 inst_map[inst].add(node)
1761 elif master:
1762 inst_map[inst] = set()
1763 else:
1764 inst_map[inst] = set([node])
1765
1766 if master and opts.on:
1767 # We ignore the master for turning on the machines, in fact we are
1768 # already operating on the master at this point :)
1769 continue
1770 elif master and not opts.show_all:
1771 _stderr_fn("%s is the master node, please do a master-failover to another"
1772 " node not affected by the EPO or use --all if you intend to"
1773 " shutdown the whole cluster", node)
1774 return constants.EXIT_FAILURE
1775 elif powered is None:
1776 _stdout_fn("Node %s does not support out-of-band handling, it can not be"
1777 " handled in a fully automated manner", node)
1778 elif powered == opts.on:
1779 _stdout_fn("Node %s is already in desired power state, skipping", node)
1780 elif not offline or (offline and powered):
1781 node_list.append(node)
1782
1783 if not (opts.force or _confirm_fn(all_nodes, "nodes", "epo")):
1784 return constants.EXIT_FAILURE
1785
1786 if opts.on:
1787 return _on_fn(opts, all_nodes, node_list, inst_map)
1788 else:
1789 return _off_fn(opts, node_list, inst_map)
1790
1791
1792 def _GetCreateCommand(info):
1793 buf = StringIO()
1794 buf.write("gnt-cluster init")
1795 PrintIPolicyCommand(buf, info["ipolicy"], False)
1796 buf.write(" ")
1797 buf.write(info["name"])
1798 return buf.getvalue()
1799
1800
1801 def ShowCreateCommand(opts, args):
1802 """Shows the command that can be used to re-create the cluster.
1803
1804 Currently it works only for ipolicy specs.
1805
1806 """
1807 cl = GetClient()
1808 result = cl.QueryClusterInfo()
1809 ToStdout(_GetCreateCommand(result))
1810
1811
1812 def _RunCommandAndReport(cmd):
1813 """Run a command and report its output, iff it failed.
1814
1815 @param cmd: the command to execute
1816 @type cmd: list
1817 @rtype: bool
1818 @return: False, if the execution failed.
1819
1820 """
1821 result = utils.RunCmd(cmd)
1822 if result.failed:
1823 ToStderr("Command %s failed: %s; Output %s" %
1824 (cmd, result.fail_reason, result.output))
1825 return False
1826 return True
1827
1828
1829 def _VerifyCommand(cmd):
1830 """Verify that a given command succeeds on all online nodes.
1831
1832 As this function is intended to run during upgrades, it
1833 is implemented in such a way that it still works, if all Ganeti
1834 daemons are down.
1835
1836 @param cmd: the command to execute
1837 @type cmd: list
1838 @rtype: list
1839 @return: the list of node names that are online where
1840 the command failed.
1841
1842 """
1843 command = utils.text.ShellQuoteArgs([str(val) for val in cmd])
1844
1845 nodes = ssconf.SimpleStore().GetOnlineNodeList()
1846 master_node = ssconf.SimpleStore().GetMasterNode()
1847 cluster_name = ssconf.SimpleStore().GetClusterName()
1848
1849 # If master node is in 'nodes', make sure master node is at list end
1850 if master_node in nodes:
1851 nodes.remove(master_node)
1852 nodes.append(master_node)
1853
1854 failed = []
1855
1856 srun = ssh.SshRunner(cluster_name=cluster_name)
1857 for name in nodes:
1858 result = srun.Run(name, constants.SSH_LOGIN_USER, command)
1859 if result.exit_code != 0:
1860 failed.append(name)
1861
1862 return failed
1863
1864
1865 def _VerifyVersionInstalled(versionstring):
1866 """Verify that the given version of ganeti is installed on all online nodes.
1867
1868 Do nothing, if this is the case, otherwise print an appropriate
1869 message to stderr.
1870
1871 @param versionstring: the version to check for
1872 @type versionstring: string
1873 @rtype: bool
1874 @return: True, if the version is installed on all online nodes
1875
1876 """
1877 badnodes = _VerifyCommand(["test", "-d",
1878 os.path.join(pathutils.PKGLIBDIR, versionstring)])
1879 if badnodes:
1880 ToStderr("Ganeti version %s not installed on nodes %s"
1881 % (versionstring, ", ".join(badnodes)))
1882 return False
1883
1884 return True
1885
1886
1887 def _GetRunning():
1888 """Determine the list of running jobs.
1889
1890 @rtype: list
1891 @return: the number of jobs still running
1892
1893 """
1894 cl = GetClient()
1895 qfilter = qlang.MakeSimpleFilter("status",
1896 frozenset([constants.JOB_STATUS_RUNNING]))
1897 return len(cl.Query(constants.QR_JOB, [], qfilter).data)
1898
1899
1900 def _SetGanetiVersion(versionstring):
1901 """Set the active version of ganeti to the given versionstring
1902
1903 @type versionstring: string
1904 @rtype: list
1905 @return: the list of nodes where the version change failed
1906
1907 """
1908 failed = []
1909 if constants.HAS_GNU_LN:
1910 failed.extend(_VerifyCommand(
1911 ["ln", "-s", "-f", "-T",
1912 os.path.join(pathutils.PKGLIBDIR, versionstring),
1913 os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
1914 failed.extend(_VerifyCommand(
1915 ["ln", "-s", "-f", "-T",
1916 os.path.join(pathutils.SHAREDIR, versionstring),
1917 os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
1918 else:
1919 failed.extend(_VerifyCommand(
1920 ["rm", "-f", os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
1921 failed.extend(_VerifyCommand(
1922 ["ln", "-s", "-f", os.path.join(pathutils.PKGLIBDIR, versionstring),
1923 os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
1924 failed.extend(_VerifyCommand(
1925 ["rm", "-f", os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
1926 failed.extend(_VerifyCommand(
1927 ["ln", "-s", "-f", os.path.join(pathutils.SHAREDIR, versionstring),
1928 os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
1929 return list(set(failed))
1930
1931
1932 def _ExecuteCommands(fns):
1933 """Execute a list of functions, in reverse order.
1934
1935 @type fns: list of functions.
1936 @param fns: the functions to be executed.
1937
1938 """
1939 for fn in reversed(fns):
1940 fn()
1941
1942
1943 def _GetConfigVersion():
1944 """Determine the version the configuration file currently has.
1945
1946 @rtype: tuple or None
1947 @return: (major, minor, revision) if the version can be determined,
1948 None otherwise
1949
1950 """
1951 config_data = serializer.LoadJson(utils.ReadFile(pathutils.CLUSTER_CONF_FILE))
1952 try:
1953 config_version = config_data["version"]
1954 except KeyError:
1955 return None
1956 return utils.SplitVersion(config_version)
1957
1958
1959 def _ReadIntentToUpgrade():
1960 """Read the file documenting the intent to upgrade the cluster.
1961
1962 @rtype: (string, string) or (None, None)
1963 @return: (old version, version to upgrade to), if the file exists,
1964 and (None, None) otherwise.
1965
1966 """
1967 if not os.path.isfile(pathutils.INTENT_TO_UPGRADE):
1968 return (None, None)
1969
1970 contentstring = utils.ReadFile(pathutils.INTENT_TO_UPGRADE)
1971 contents = utils.UnescapeAndSplit(contentstring)
1972 if len(contents) != 3:
1973 # file syntactically mal-formed
1974 return (None, None)
1975 return (contents[0], contents[1])
1976
1977
1978 def _WriteIntentToUpgrade(version):
1979 """Write file documenting the intent to upgrade the cluster.
1980
1981 @type version: string
1982 @param version: the version we intent to upgrade to
1983
1984 """
1985 utils.WriteFile(pathutils.INTENT_TO_UPGRADE,
1986 data=utils.EscapeAndJoin([constants.RELEASE_VERSION, version,
1987 "%d" % os.getpid()]))
1988
1989
1990 def _UpgradeBeforeConfigurationChange(versionstring):
1991 """
1992 Carry out all the tasks necessary for an upgrade that happen before
1993 the configuration file, or Ganeti version, changes.
1994
1995 @type versionstring: string
1996 @param versionstring: the version to upgrade to
1997 @rtype: (bool, list)
1998 @return: tuple of a bool indicating success and a list of rollback tasks
1999
2000 """
2001 rollback = []
2002
2003 if not _VerifyVersionInstalled(versionstring):
2004 return (False, rollback)
2005
2006 _WriteIntentToUpgrade(versionstring)
2007 rollback.append(
2008 lambda: utils.RunCmd(["rm", "-f", pathutils.INTENT_TO_UPGRADE]))
2009
2010 ToStdout("Draining queue")
2011 client = GetClient()
2012 client.SetQueueDrainFlag(True)
2013
2014 rollback.append(lambda: GetClient().SetQueueDrainFlag(False))
2015
2016 if utils.SimpleRetry(0, _GetRunning,
2017 constants.UPGRADE_QUEUE_POLL_INTERVAL,
2018 constants.UPGRADE_QUEUE_DRAIN_TIMEOUT):
2019 ToStderr("Failed to completely empty the queue.")
2020 return (False, rollback)
2021
2022 ToStdout("Pausing the watcher for one hour.")
2023 rollback.append(lambda: GetClient().SetWatcherPause(None))
2024 GetClient().SetWatcherPause(time.time() + 60 * 60)
2025
2026 ToStdout("Stopping daemons on master node.")
2027 if not _RunCommandAndReport([pathutils.DAEMON_UTIL, "stop-all"]):
2028 return (False, rollback)
2029
2030 if not _VerifyVersionInstalled(versionstring):
2031 utils.RunCmd([pathutils.DAEMON_UTIL, "start-all"])
2032 return (False, rollback)
2033
2034 ToStdout("Stopping daemons everywhere.")
2035 rollback.append(lambda: _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"]))
2036 badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "stop-all"])
2037 if badnodes:
2038 ToStderr("Failed to stop daemons on %s." % (", ".join(badnodes),))
2039 return (False, rollback)
2040
2041 backuptar = os.path.join(pathutils.BACKUP_DIR, "ganeti%d.tar" % time.time())
2042 ToStdout("Backing up configuration as %s" % backuptar)
2043 if not _RunCommandAndReport(["mkdir", "-p", pathutils.BACKUP_DIR]):
2044 return (False, rollback)
2045
2046 # Create the archive in a safe manner, as it contains sensitive
2047 # information.
2048 (_, tmp_name) = tempfile.mkstemp(prefix=backuptar, dir=pathutils.BACKUP_DIR)
2049 if not _RunCommandAndReport(["tar", "-cf", tmp_name,
2050 "--exclude=queue/archive",
2051 pathutils.DATA_DIR]):
2052 return (False, rollback)
2053
2054 os.rename(tmp_name, backuptar)
2055 return (True, rollback)
2056
2057
2058 def _VersionSpecificDowngrade():
2059 """
2060 Perform any additional downrade tasks that are version specific
2061 and need to be done just after the configuration downgrade. This
2062 function needs to be idempotent, so that it can be redone if the
2063 downgrade procedure gets interrupted after changing the
2064 configuration.
2065
2066 Note that this function has to be reset with every version bump.
2067
2068 @return: True upon success
2069 """
2070 ToStdout("Performing version-specific downgrade tasks.")
2071
2072 nodes = ssconf.SimpleStore().GetOnlineNodeList()
2073 cluster_name = ssconf.SimpleStore().GetClusterName()
2074 ssh_ports = ssconf.SimpleStore().GetSshPortMap()
2075
2076 for node in nodes:
2077 data = {
2078 constants.NDS_CLUSTER_NAME: cluster_name,
2079 constants.NDS_NODE_DAEMON_CERTIFICATE:
2080 utils.ReadFile(pathutils.NODED_CERT_FILE),
2081 constants.NDS_NODE_NAME: node,
2082 constants.NDS_ACTION: constants.CRYPTO_ACTION_DELETE,
2083 }
2084
2085 try:
2086 bootstrap.RunNodeSetupCmd(
2087 cluster_name,
2088 node,
2089 pathutils.SSL_UPDATE,
2090 True, # debug
2091 True, # verbose,
2092 True, # use cluster key
2093 False, # ask key
2094 True, # strict host check
2095 ssh_ports[node],
2096 data)
2097 except Exception as e: # pylint: disable=W0703
2098 # As downgrading can fail if a node is temporarily unreachable
2099 # only output the error, but do not abort the entire operation.
2100 ToStderr("Downgrading SSL setup of node '%s' failed: %s." %
2101 (node, e))
2102
2103 return True
2104
2105
2106 def _SwitchVersionAndConfig(versionstring, downgrade):
2107 """
2108 Switch to the new Ganeti version and change the configuration,
2109 in correct order.
2110
2111 @type versionstring: string
2112 @param versionstring: the version to change to
2113 @type downgrade: bool
2114 @param downgrade: True, if the configuration should be downgraded
2115 @rtype: (bool, list)
2116 @return: tupe of a bool indicating success, and a list of
2117 additional rollback tasks
2118
2119 """
2120 rollback = []
2121 if downgrade:
2122 ToStdout("Downgrading configuration")
2123 if not _RunCommandAndReport([pathutils.CFGUPGRADE, "--downgrade", "-f"]):
2124 return (False, rollback)
2125 # Note: version specific downgrades need to be done before switching
2126 # binaries, so that we still have the knowledgeable binary if the downgrade
2127 # process gets interrupted at this point.
2128 if not _VersionSpecificDowngrade():
2129 return (False, rollback)
2130
2131 # Configuration change is the point of no return. From then onwards, it is
2132 # safer to push through the up/dowgrade than to try to roll it back.
2133
2134 ToStdout("Switching to version %s on all nodes" % versionstring)
2135 rollback.append(lambda: _SetGanetiVersion(constants.DIR_VERSION))
2136 badnodes = _SetGanetiVersion(versionstring)
2137 if badnodes:
2138 ToStderr("Failed to switch to Ganeti version %s on nodes %s"
2139 % (versionstring, ", ".join(badnodes)))
2140 if not downgrade:
2141 return (False, rollback)
2142
2143 # Now that we have changed to the new version of Ganeti we should
2144 # not communicate over luxi any more, as luxi might have changed in
2145 # incompatible ways. Therefore, manually call the corresponding ganeti
2146 # commands using their canonical (version independent) path.
2147
2148 if not downgrade:
2149 ToStdout("Upgrading configuration")
2150 if not _RunCommandAndReport([pathutils.CFGUPGRADE, "-f"]):
2151 return (False, rollback)
2152
2153 return (True, rollback)
2154
2155
2156 def _UpgradeAfterConfigurationChange(oldversion):
2157 """
2158 Carry out the upgrade actions necessary after switching to the new
2159 Ganeti version and updating the configuration.
2160
2161 As this part is run at a time where the new version of Ganeti is already
2162 running, no communication should happen via luxi, as this is not a stable
2163 interface. Also, as the configuration change is the point of no return,
2164 all actions are pushed trough, even if some of them fail.
2165
2166 @param oldversion: the version the upgrade started from
2167 @type oldversion: string
2168 @rtype: int
2169 @return: the intended return value
2170
2171 """
2172 returnvalue = 0
2173
2174 ToStdout("Ensuring directories everywhere.")
2175 badnodes = _VerifyCommand([pathutils.ENSURE_DIRS])
2176 if badnodes:
2177 ToStderr("Warning: failed to ensure directories on %s." %
2178 (", ".join(badnodes)))
2179 returnvalue = 1
2180
2181 ToStdout("Starting daemons everywhere.")
2182 badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"])
2183 if badnodes:
2184 ToStderr("Warning: failed to start daemons on %s." % (", ".join(badnodes),))
2185 returnvalue = 1
2186
2187 ToStdout("Redistributing the configuration.")
2188 if not _RunCommandAndReport(["gnt-cluster", "redist-conf", "--yes-do-it"]):
2189 returnvalue = 1
2190
2191 ToStdout("Restarting daemons everywhere.")
2192 badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "stop-all"])
2193 badnodes.extend(_VerifyCommand([pathutils.DAEMON_UTIL, "start-all"]))
2194 if badnodes:
2195 ToStderr("Warning: failed to start daemons on %s." %
2196 (", ".join(list(set(badnodes))),))
2197 returnvalue = 1
2198
2199 ToStdout("Undraining the queue.")
2200 if not _RunCommandAndReport(["gnt-cluster", "queue", "undrain"]):
2201 returnvalue = 1
2202
2203 _RunCommandAndReport(["rm", "-f", pathutils.INTENT_TO_UPGRADE])
2204
2205 ToStdout("Running post-upgrade hooks")
2206 if not _RunCommandAndReport([pathutils.POST_UPGRADE, oldversion]):
2207 returnvalue = 1
2208
2209 ToStdout("Unpausing the watcher.")
2210 if not _RunCommandAndReport(["gnt-cluster", "watcher", "continue"]):
2211 returnvalue = 1
2212
2213 ToStdout("Verifying cluster.")
2214 if not _RunCommandAndReport(["gnt-cluster", "verify"]):
2215 returnvalue = 1
2216
2217 return returnvalue
2218
2219
2220 def UpgradeGanetiCommand(opts, args):
2221 """Upgrade a cluster to a new ganeti version.
2222
2223 @param opts: the command line options selected by the user
2224 @type args: list
2225 @param args: should be an empty list
2226 @rtype: int
2227 @return: the desired exit code
2228
2229 """
2230 if ((not opts.resume and opts.to is None)
2231 or (opts.resume and opts.to is not None)):
2232 ToStderr("Precisely one of the options --to and --resume"
2233 " has to be given")
2234 return 1
2235
2236 # If we're not told to resume, verify there is no upgrade
2237 # in progress.
2238 if not opts.resume:
2239 oldversion, versionstring = _ReadIntentToUpgrade()
2240 if versionstring is not None:
2241 # An upgrade is going on; verify whether the target matches
2242 if versionstring == opts.to:
2243 ToStderr("An upgrade is already in progress. Target version matches,"
2244 " resuming.")
2245 opts.resume = True
2246 opts.to = None
2247 else:
2248 ToStderr("An upgrade from %s to %s is in progress; use --resume to"
2249 " finish it first" % (oldversion, versionstring))
2250 return 1
2251
2252 oldversion = constants.RELEASE_VERSION
2253
2254 if opts.resume:
2255 ssconf.CheckMaster(False)
2256 oldversion, versionstring = _ReadIntentToUpgrade()
2257 if versionstring is None:
2258 return 0
2259 version = utils.version.ParseVersion(versionstring)
2260 if version is None:
2261 return 1
2262 configversion = _GetConfigVersion()
2263 if configversion is None:
2264 return 1
2265 # If the upgrade we resume was an upgrade between compatible
2266 # versions (like 2.10.0 to 2.10.1), the correct configversion
2267 # does not guarantee that the config has been updated.
2268 # However, in the case of a compatible update with the configuration
2269 # not touched, we are running a different dirversion with the same
2270 # config version.
2271 config_already_modified = \
2272 (utils.IsCorrectConfigVersion(version, configversion) and
2273 not (versionstring != constants.DIR_VERSION and
2274 configversion == (constants.CONFIG_MAJOR, constants.CONFIG_MINOR,
2275 constants.CONFIG_REVISION)))
2276 if not config_already_modified:
2277 # We have to start from the beginning; however, some daemons might have
2278 # already been stopped, so the only way to get into a well-defined state
2279 # is by starting all daemons again.
2280 _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"])
2281 else:
2282 versionstring = opts.to
2283 config_already_modified = False
2284 version = utils.version.ParseVersion(versionstring)
2285 if version is None:
2286 ToStderr("Could not parse version string %s" % versionstring)
2287 return 1
2288
2289 msg = utils.version.UpgradeRange(version)
2290 if msg is not None:
2291 ToStderr("Cannot upgrade to %s: %s" % (versionstring, msg))
2292 return 1
2293
2294 if not config_already_modified:
2295 success, rollback = _UpgradeBeforeConfigurationChange(versionstring)
2296 if not success:
2297 _ExecuteCommands(rollback)
2298 return 1
2299 else:
2300 rollback = []
2301
2302 downgrade = utils.version.ShouldCfgdowngrade(version)
2303
2304 success, additionalrollback = \
2305 _SwitchVersionAndConfig(versionstring, downgrade)
2306 if not success:
2307 rollback.extend(additionalrollback)
2308 _ExecuteCommands(rollback)
2309 return 1
2310
2311 return _UpgradeAfterConfigurationChange(oldversion)
2312
2313
2314 commands = {
2315 "init": (
2316 InitCluster, [ArgHost(min=1, max=1)],
2317 [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, GLOBAL_FILEDIR_OPT,
2318 HVLIST_OPT, MAC_PREFIX_OPT, MASTER_NETDEV_OPT, MASTER_NETMASK_OPT,
2319 NIC_PARAMS_OPT, NOMODIFY_ETCHOSTS_OPT, NOMODIFY_SSH_SETUP_OPT,
2320 SECONDARY_IP_OPT, VG_NAME_OPT, MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT,
2321 DRBD_HELPER_OPT, DEFAULT_IALLOCATOR_OPT, DEFAULT_IALLOCATOR_PARAMS_OPT,
2322 PRIMARY_IP_VERSION_OPT, PREALLOC_WIPE_DISKS_OPT, NODE_PARAMS_OPT,
2323 GLOBAL_SHARED_FILEDIR_OPT, USE_EXTERNAL_MIP_SCRIPT, DISK_PARAMS_OPT,
2324 HV_STATE_OPT, DISK_STATE_OPT, ENABLED_DISK_TEMPLATES_OPT,
2325 IPOLICY_STD_SPECS_OPT, GLOBAL_GLUSTER_FILEDIR_OPT, INSTALL_IMAGE_OPT,
2326 ZEROING_IMAGE_OPT, COMPRESSION_TOOLS_OPT,
2327 ENABLED_USER_SHUTDOWN_OPT,
2328 ]
2329 + INSTANCE_POLICY_OPTS + SPLIT_ISPECS_OPTS,
2330 "[opts...] <cluster_name>", "Initialises a new cluster configuration"),
2331 "destroy": (
2332 DestroyCluster, ARGS_NONE, [YES_DOIT_OPT],
2333 "", "Destroy cluster"),
2334 "rename": (
2335 RenameCluster, [ArgHost(min=1, max=1)],
2336 [FORCE_OPT, DRY_RUN_OPT],
2337 "<new_name>",
2338 "Renames the cluster"),
2339 "redist-conf": (
2340 RedistributeConfig, ARGS_NONE, SUBMIT_OPTS +
2341 [DRY_RUN_OPT, PRIORITY_OPT, FORCE_DISTRIBUTION],
2342 "", "Forces a push of the configuration file and ssconf files"
2343 " to the nodes in the cluster"),
2344 "verify": (
2345 VerifyCluster, ARGS_NONE,
2346 [VERBOSE_OPT, DEBUG_SIMERR_OPT, ERROR_CODES_OPT, NONPLUS1_OPT,
2347 DRY_RUN_OPT, PRIORITY_OPT, NODEGROUP_OPT, IGNORE_ERRORS_OPT],
2348 "", "Does a check on the cluster configuration"),
2349 "verify-disks": (
2350 VerifyDisks, ARGS_NONE, [PRIORITY_OPT],
2351 "", "Does a check on the cluster disk status"),
2352 "repair-disk-sizes": (
2353 RepairDiskSizes, ARGS_MANY_INSTANCES, [DRY_RUN_OPT, PRIORITY_OPT],
2354 "[instance...]", "Updates mismatches in recorded disk sizes"),
2355 "master-failover": (
2356 MasterFailover, ARGS_NONE, [NOVOTING_OPT, FORCE_FAILOVER],
2357 "", "Makes the current node the master"),
2358 "master-ping": (
2359 MasterPing, ARGS_NONE, [],
2360 "", "Checks if the master is alive"),
2361 "version": (
2362 ShowClusterVersion, ARGS_NONE, [],
2363 "", "Shows the cluster version"),
2364 "getmaster": (
2365 ShowClusterMaster, ARGS_NONE, [],
2366 "", "Shows the cluster master"),
2367 "copyfile": (
2368 ClusterCopyFile, [ArgFile(min=1, max=1)],
2369 [NODE_LIST_OPT, USE_REPL_NET_OPT, NODEGROUP_OPT],
2370 "[-n node...] <filename>", "Copies a file to all (or only some) nodes"),
2371 "command": (
2372 RunClusterCommand, [ArgCommand(min=1)],
2373 [NODE_LIST_OPT, NODEGROUP_OPT, SHOW_MACHINE_OPT, FAILURE_ONLY_OPT],
2374 "[-n node...] <command>", "Runs a command on all (or only some) nodes"),
2375 "info": (
2376 ShowClusterConfig, ARGS_NONE, [ROMAN_OPT],
2377 "[--roman]", "Show cluster configuration"),
2378 "list-tags": (
2379 ListTags, ARGS_NONE, [], "", "List the tags of the cluster"),
2380 "add-tags": (
2381 AddTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT] + SUBMIT_OPTS,
2382 "tag...", "Add tags to the cluster"),
2383 "remove-tags": (
2384 RemoveTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT] + SUBMIT_OPTS,
2385 "tag...", "Remove tags from the cluster"),
2386 "search-tags": (
2387 SearchTags, [ArgUnknown(min=1, max=1)], [PRIORITY_OPT], "",
2388 "Searches the tags on all objects on"
2389 " the cluster for a given pattern (regex)"),
2390 "queue": (
2391 QueueOps,
2392 [ArgChoice(min=1, max=1, choices=["drain", "undrain", "info"])],
2393 [], "drain|undrain|info", "Change queue properties"),
2394 "watcher": (
2395 WatcherOps,
2396 [ArgChoice(min=1, max=1, choices=["pause", "continue", "info"]),
2397 ArgSuggest(min=0, max=1, choices=["30m", "1h", "4h"])],
2398 [],
2399 "{pause <timespec>|continue|info}", "Change watcher properties"),
2400 "modify": (
2401 SetClusterParams, ARGS_NONE,
2402 [FORCE_OPT,
2403 BACKEND_OPT, CP_SIZE_OPT, RQL_OPT, MAX_TRACK_OPT, INSTALL_IMAGE_OPT,
2404 INSTANCE_COMMUNICATION_NETWORK_OPT, ENABLED_HV_OPT, HVLIST_OPT,
2405 MAC_PREFIX_OPT, MASTER_NETDEV_OPT, MASTER_NETMASK_OPT, NIC_PARAMS_OPT,
2406 VG_NAME_OPT, MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, ADD_UIDS_OPT,
2407 REMOVE_UIDS_OPT, DRBD_HELPER_OPT, DEFAULT_IALLOCATOR_OPT,
2408 DEFAULT_IALLOCATOR_PARAMS_OPT, RESERVED_LVS_OPT, DRY_RUN_OPT, PRIORITY_OPT,
2409 PREALLOC_WIPE_DISKS_OPT, NODE_PARAMS_OPT, USE_EXTERNAL_MIP_SCRIPT,
2410 DISK_PARAMS_OPT, HV_STATE_OPT, DISK_STATE_OPT] + SUBMIT_OPTS +
2411 [ENABLED_DISK_TEMPLATES_OPT, IPOLICY_STD_SPECS_OPT, MODIFY_ETCHOSTS_OPT,
2412 ENABLED_USER_SHUTDOWN_OPT] +
2413 INSTANCE_POLICY_OPTS +
2414 [GLOBAL_FILEDIR_OPT, GLOBAL_SHARED_FILEDIR_OPT, ZEROING_IMAGE_OPT,
2415 COMPRESSION_TOOLS_OPT],
2416 "[opts...]",
2417 "Alters the parameters of the cluster"),
2418 "renew-crypto": (
2419 RenewCrypto, ARGS_NONE,
2420 [NEW_CLUSTER_CERT_OPT, NEW_RAPI_CERT_OPT, RAPI_CERT_OPT,
2421 NEW_CONFD_HMAC_KEY_OPT, FORCE_OPT,
2422 NEW_CLUSTER_DOMAIN_SECRET_OPT, CLUSTER_DOMAIN_SECRET_OPT,
2423 NEW_SPICE_CERT_OPT, SPICE_CERT_OPT, SPICE_CACERT_OPT,
2424 NEW_NODE_CERT_OPT, VERBOSE_OPT],
2425 "[opts...]",
2426 "Renews cluster certificates, keys and secrets"),
2427 "epo": (
2428 Epo, [ArgUnknown()],
2429 [FORCE_OPT, ON_OPT, GROUPS_OPT, ALL_OPT, OOB_TIMEOUT_OPT,
2430 SHUTDOWN_TIMEOUT_OPT, POWER_DELAY_OPT],
2431 "[opts...] [args]",
2432 "Performs an emergency power-off on given args"),
2433 "activate-master-ip": (
2434 ActivateMasterIp, ARGS_NONE, [], "", "Activates the master IP"),
2435 "deactivate-master-ip": (
2436 DeactivateMasterIp, ARGS_NONE, [CONFIRM_OPT], "",
2437 "Deactivates the master IP"),
2438 "show-ispecs-cmd": (
2439 ShowCreateCommand, ARGS_NONE, [], "",
2440 "Show the command line to re-create the cluster"),
2441 "upgrade": (
2442 UpgradeGanetiCommand, ARGS_NONE, [TO_OPT, RESUME_OPT], "",
2443 "Upgrade (or downgrade) to a new Ganeti version"),
2444 }
2445
2446
2447 #: dictionary with aliases for commands
2448 aliases = {
2449 "masterfailover": "master-failover",
2450 "show": "info",
2451 }
2452
2453
2454 def Main():
2455 return GenericMain(commands, override={"tag_type": constants.TAG_CLUSTER},
2456 aliases=aliases)