c56e2bb42fa03931420921d0db5c073c392b722a
[ganeti-github.git] / lib / client / gnt_cluster.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011, 2012, 2013, 2014 Google Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions are
9 # met:
10 #
11 # 1. Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
13 #
14 # 2. Redistributions in binary form must reproduce the above copyright
15 # notice, this list of conditions and the following disclaimer in the
16 # documentation and/or other materials provided with the distribution.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
19 # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
20 # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30 """Cluster related commands"""
31
32 # pylint: disable=W0401,W0613,W0614,C0103
33 # W0401: Wildcard import ganeti.cli
34 # W0613: Unused argument, since all functions follow the same API
35 # W0614: Unused import %s from wildcard import (since we need cli)
36 # C0103: Invalid name gnt-cluster
37
38 from cStringIO import StringIO
39 import os
40 import time
41 import OpenSSL
42 import tempfile
43 import itertools
44
45 from ganeti.cli import *
46 from ganeti import bootstrap
47 from ganeti import compat
48 from ganeti import constants
49 from ganeti import config
50 from ganeti import errors
51 from ganeti import netutils
52 from ganeti import objects
53 from ganeti import opcodes
54 from ganeti import pathutils
55 from ganeti import qlang
56 from ganeti import serializer
57 from ganeti import ssconf
58 from ganeti import ssh
59 from ganeti import uidpool
60 from ganeti import utils
61 from ganeti.client import base
62
63
64 ON_OPT = cli_option("--on", default=False,
65 action="store_true", dest="on",
66 help="Recover from an EPO")
67
68 GROUPS_OPT = cli_option("--groups", default=False,
69 action="store_true", dest="groups",
70 help="Arguments are node groups instead of nodes")
71
72 FORCE_FAILOVER = cli_option("--yes-do-it", dest="yes_do_it",
73 help="Override interactive check for --no-voting",
74 default=False, action="store_true")
75
76 FORCE_DISTRIBUTION = cli_option("--yes-do-it", dest="yes_do_it",
77 help="Unconditionally distribute the"
78 " configuration, even if the queue"
79 " is drained",
80 default=False, action="store_true")
81
82 TO_OPT = cli_option("--to", default=None, type="string",
83 help="The Ganeti version to upgrade to")
84
85 RESUME_OPT = cli_option("--resume", default=False, action="store_true",
86 help="Resume any pending Ganeti upgrades")
87
88 _EPO_PING_INTERVAL = 30 # 30 seconds between pings
89 _EPO_PING_TIMEOUT = 1 # 1 second
90 _EPO_REACHABLE_TIMEOUT = 15 * 60 # 15 minutes
91
92
93 def _InitEnabledDiskTemplates(opts):
94 """Initialize the list of enabled disk templates.
95
96 """
97 if opts.enabled_disk_templates:
98 return opts.enabled_disk_templates.split(",")
99 else:
100 return constants.DEFAULT_ENABLED_DISK_TEMPLATES
101
102
103 def _InitVgName(opts, enabled_disk_templates):
104 """Initialize the volume group name.
105
106 @type enabled_disk_templates: list of strings
107 @param enabled_disk_templates: cluster-wide enabled disk templates
108
109 """
110 vg_name = None
111 if opts.vg_name is not None:
112 vg_name = opts.vg_name
113 if vg_name:
114 if not utils.IsLvmEnabled(enabled_disk_templates):
115 ToStdout("You specified a volume group with --vg-name, but you did not"
116 " enable any disk template that uses lvm.")
117 elif utils.IsLvmEnabled(enabled_disk_templates):
118 raise errors.OpPrereqError(
119 "LVM disk templates are enabled, but vg name not set.")
120 elif utils.IsLvmEnabled(enabled_disk_templates):
121 vg_name = constants.DEFAULT_VG
122 return vg_name
123
124
125 def _InitDrbdHelper(opts, enabled_disk_templates):
126 """Initialize the DRBD usermode helper.
127
128 """
129 drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
130
131 if not drbd_enabled and opts.drbd_helper is not None:
132 ToStdout("Note: You specified a DRBD usermode helper, while DRBD storage"
133 " is not enabled.")
134
135 if drbd_enabled:
136 if opts.drbd_helper is None:
137 return constants.DEFAULT_DRBD_HELPER
138 if opts.drbd_helper == '':
139 raise errors.OpPrereqError(
140 "Unsetting the drbd usermode helper while enabling DRBD is not"
141 " allowed.")
142
143 return opts.drbd_helper
144
145
146 @UsesRPC
147 def InitCluster(opts, args):
148 """Initialize the cluster.
149
150 @param opts: the command line options selected by the user
151 @type args: list
152 @param args: should contain only one element, the desired
153 cluster name
154 @rtype: int
155 @return: the desired exit code
156
157 """
158 enabled_disk_templates = _InitEnabledDiskTemplates(opts)
159
160 try:
161 vg_name = _InitVgName(opts, enabled_disk_templates)
162 drbd_helper = _InitDrbdHelper(opts, enabled_disk_templates)
163 except errors.OpPrereqError, e:
164 ToStderr(str(e))
165 return 1
166
167 master_netdev = opts.master_netdev
168 if master_netdev is None:
169 nic_mode = opts.nicparams.get(constants.NIC_MODE, None)
170 if not nic_mode:
171 # default case, use bridging
172 master_netdev = constants.DEFAULT_BRIDGE
173 elif nic_mode == constants.NIC_MODE_OVS:
174 # default ovs is different from default bridge
175 master_netdev = constants.DEFAULT_OVS
176 opts.nicparams[constants.NIC_LINK] = constants.DEFAULT_OVS
177
178 hvlist = opts.enabled_hypervisors
179 if hvlist is None:
180 hvlist = constants.DEFAULT_ENABLED_HYPERVISOR
181 hvlist = hvlist.split(",")
182
183 hvparams = dict(opts.hvparams)
184 beparams = opts.beparams
185 nicparams = opts.nicparams
186
187 diskparams = dict(opts.diskparams)
188
189 # check the disk template types here, as we cannot rely on the type check done
190 # by the opcode parameter types
191 diskparams_keys = set(diskparams.keys())
192 if not (diskparams_keys <= constants.DISK_TEMPLATES):
193 unknown = utils.NiceSort(diskparams_keys - constants.DISK_TEMPLATES)
194 ToStderr("Disk templates unknown: %s" % utils.CommaJoin(unknown))
195 return 1
196
197 # prepare beparams dict
198 beparams = objects.FillDict(constants.BEC_DEFAULTS, beparams)
199 utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
200
201 # prepare nicparams dict
202 nicparams = objects.FillDict(constants.NICC_DEFAULTS, nicparams)
203 utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
204
205 # prepare ndparams dict
206 if opts.ndparams is None:
207 ndparams = dict(constants.NDC_DEFAULTS)
208 else:
209 ndparams = objects.FillDict(constants.NDC_DEFAULTS, opts.ndparams)
210 utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
211
212 # prepare hvparams dict
213 for hv in constants.HYPER_TYPES:
214 if hv not in hvparams:
215 hvparams[hv] = {}
216 hvparams[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], hvparams[hv])
217 utils.ForceDictType(hvparams[hv], constants.HVS_PARAMETER_TYPES)
218
219 # prepare diskparams dict
220 for templ in constants.DISK_TEMPLATES:
221 if templ not in diskparams:
222 diskparams[templ] = {}
223 diskparams[templ] = objects.FillDict(constants.DISK_DT_DEFAULTS[templ],
224 diskparams[templ])
225 utils.ForceDictType(diskparams[templ], constants.DISK_DT_TYPES)
226
227 # prepare ipolicy dict
228 ipolicy = CreateIPolicyFromOpts(
229 ispecs_mem_size=opts.ispecs_mem_size,
230 ispecs_cpu_count=opts.ispecs_cpu_count,
231 ispecs_disk_count=opts.ispecs_disk_count,
232 ispecs_disk_size=opts.ispecs_disk_size,
233 ispecs_nic_count=opts.ispecs_nic_count,
234 minmax_ispecs=opts.ipolicy_bounds_specs,
235 std_ispecs=opts.ipolicy_std_specs,
236 ipolicy_disk_templates=opts.ipolicy_disk_templates,
237 ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
238 ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
239 fill_all=True)
240
241 if opts.candidate_pool_size is None:
242 opts.candidate_pool_size = constants.MASTER_POOL_SIZE_DEFAULT
243
244 if opts.mac_prefix is None:
245 opts.mac_prefix = constants.DEFAULT_MAC_PREFIX
246
247 uid_pool = opts.uid_pool
248 if uid_pool is not None:
249 uid_pool = uidpool.ParseUidPool(uid_pool)
250
251 if opts.prealloc_wipe_disks is None:
252 opts.prealloc_wipe_disks = False
253
254 external_ip_setup_script = opts.use_external_mip_script
255 if external_ip_setup_script is None:
256 external_ip_setup_script = False
257
258 try:
259 primary_ip_version = int(opts.primary_ip_version)
260 except (ValueError, TypeError), err:
261 ToStderr("Invalid primary ip version value: %s" % str(err))
262 return 1
263
264 master_netmask = opts.master_netmask
265 try:
266 if master_netmask is not None:
267 master_netmask = int(master_netmask)
268 except (ValueError, TypeError), err:
269 ToStderr("Invalid master netmask value: %s" % str(err))
270 return 1
271
272 if opts.disk_state:
273 disk_state = utils.FlatToDict(opts.disk_state)
274 else:
275 disk_state = {}
276
277 hv_state = dict(opts.hv_state)
278
279 if opts.install_image:
280 install_image = opts.install_image
281 else:
282 install_image = ""
283
284 if opts.zeroing_image:
285 zeroing_image = opts.zeroing_image
286 else:
287 zeroing_image = ""
288
289 compression_tools = _GetCompressionTools(opts)
290
291 default_ialloc_params = opts.default_iallocator_params
292
293 if opts.enabled_user_shutdown:
294 enabled_user_shutdown = True
295 else:
296 enabled_user_shutdown = False
297
298 bootstrap.InitCluster(cluster_name=args[0],
299 secondary_ip=opts.secondary_ip,
300 vg_name=vg_name,
301 mac_prefix=opts.mac_prefix,
302 master_netmask=master_netmask,
303 master_netdev=master_netdev,
304 file_storage_dir=opts.file_storage_dir,
305 shared_file_storage_dir=opts.shared_file_storage_dir,
306 gluster_storage_dir=opts.gluster_storage_dir,
307 enabled_hypervisors=hvlist,
308 hvparams=hvparams,
309 beparams=beparams,
310 nicparams=nicparams,
311 ndparams=ndparams,
312 diskparams=diskparams,
313 ipolicy=ipolicy,
314 candidate_pool_size=opts.candidate_pool_size,
315 modify_etc_hosts=opts.modify_etc_hosts,
316 modify_ssh_setup=opts.modify_ssh_setup,
317 maintain_node_health=opts.maintain_node_health,
318 drbd_helper=drbd_helper,
319 uid_pool=uid_pool,
320 default_iallocator=opts.default_iallocator,
321 default_iallocator_params=default_ialloc_params,
322 primary_ip_version=primary_ip_version,
323 prealloc_wipe_disks=opts.prealloc_wipe_disks,
324 use_external_mip_script=external_ip_setup_script,
325 hv_state=hv_state,
326 disk_state=disk_state,
327 enabled_disk_templates=enabled_disk_templates,
328 install_image=install_image,
329 zeroing_image=zeroing_image,
330 compression_tools=compression_tools,
331 enabled_user_shutdown=enabled_user_shutdown,
332 )
333 op = opcodes.OpClusterPostInit()
334 SubmitOpCode(op, opts=opts)
335 return 0
336
337
338 @UsesRPC
339 def DestroyCluster(opts, args):
340 """Destroy the cluster.
341
342 @param opts: the command line options selected by the user
343 @type args: list
344 @param args: should be an empty list
345 @rtype: int
346 @return: the desired exit code
347
348 """
349 if not opts.yes_do_it:
350 ToStderr("Destroying a cluster is irreversible. If you really want"
351 " destroy this cluster, supply the --yes-do-it option.")
352 return 1
353
354 op = opcodes.OpClusterDestroy()
355 master_uuid = SubmitOpCode(op, opts=opts)
356 # if we reached this, the opcode didn't fail; we can proceed to
357 # shutdown all the daemons
358 bootstrap.FinalizeClusterDestroy(master_uuid)
359 return 0
360
361
362 def RenameCluster(opts, args):
363 """Rename the cluster.
364
365 @param opts: the command line options selected by the user
366 @type args: list
367 @param args: should contain only one element, the new cluster name
368 @rtype: int
369 @return: the desired exit code
370
371 """
372 cl = GetClient()
373
374 (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
375
376 new_name = args[0]
377 if not opts.force:
378 usertext = ("This will rename the cluster from '%s' to '%s'. If you are"
379 " connected over the network to the cluster name, the"
380 " operation is very dangerous as the IP address will be"
381 " removed from the node and the change may not go through."
382 " Continue?") % (cluster_name, new_name)
383 if not AskUser(usertext):
384 return 1
385
386 op = opcodes.OpClusterRename(name=new_name)
387 result = SubmitOpCode(op, opts=opts, cl=cl)
388
389 if result:
390 ToStdout("Cluster renamed from '%s' to '%s'", cluster_name, result)
391
392 return 0
393
394
395 def ActivateMasterIp(opts, args):
396 """Activates the master IP.
397
398 """
399 op = opcodes.OpClusterActivateMasterIp()
400 SubmitOpCode(op)
401 return 0
402
403
404 def DeactivateMasterIp(opts, args):
405 """Deactivates the master IP.
406
407 """
408 if not opts.confirm:
409 usertext = ("This will disable the master IP. All the open connections to"
410 " the master IP will be closed. To reach the master you will"
411 " need to use its node IP."
412 " Continue?")
413 if not AskUser(usertext):
414 return 1
415
416 op = opcodes.OpClusterDeactivateMasterIp()
417 SubmitOpCode(op)
418 return 0
419
420
421 def RedistributeConfig(opts, args):
422 """Forces push of the cluster configuration.
423
424 @param opts: the command line options selected by the user
425 @type args: list
426 @param args: empty list
427 @rtype: int
428 @return: the desired exit code
429
430 """
431 op = opcodes.OpClusterRedistConf()
432 if opts.yes_do_it:
433 SubmitOpCodeToDrainedQueue(op)
434 else:
435 SubmitOrSend(op, opts)
436 return 0
437
438
439 def ShowClusterVersion(opts, args):
440 """Write version of ganeti software to the standard output.
441
442 @param opts: the command line options selected by the user
443 @type args: list
444 @param args: should be an empty list
445 @rtype: int
446 @return: the desired exit code
447
448 """
449 cl = GetClient()
450 result = cl.QueryClusterInfo()
451 ToStdout("Software version: %s", result["software_version"])
452 ToStdout("Internode protocol: %s", result["protocol_version"])
453 ToStdout("Configuration format: %s", result["config_version"])
454 ToStdout("OS api version: %s", result["os_api_version"])
455 ToStdout("Export interface: %s", result["export_version"])
456 ToStdout("VCS version: %s", result["vcs_version"])
457 return 0
458
459
460 def ShowClusterMaster(opts, args):
461 """Write name of master node to the standard output.
462
463 @param opts: the command line options selected by the user
464 @type args: list
465 @param args: should be an empty list
466 @rtype: int
467 @return: the desired exit code
468
469 """
470 master = bootstrap.GetMaster()
471 ToStdout(master)
472 return 0
473
474
475 def _FormatGroupedParams(paramsdict, roman=False):
476 """Format Grouped parameters (be, nic, disk) by group.
477
478 @type paramsdict: dict of dicts
479 @param paramsdict: {group: {param: value, ...}, ...}
480 @rtype: dict of dicts
481 @return: copy of the input dictionaries with strings as values
482
483 """
484 ret = {}
485 for (item, val) in paramsdict.items():
486 if isinstance(val, dict):
487 ret[item] = _FormatGroupedParams(val, roman=roman)
488 elif roman and isinstance(val, int):
489 ret[item] = compat.TryToRoman(val)
490 else:
491 ret[item] = str(val)
492 return ret
493
494
495 def ShowClusterConfig(opts, args):
496 """Shows cluster information.
497
498 @param opts: the command line options selected by the user
499 @type args: list
500 @param args: should be an empty list
501 @rtype: int
502 @return: the desired exit code
503
504 """
505 cl = GetClient()
506 result = cl.QueryClusterInfo()
507
508 if result["tags"]:
509 tags = utils.CommaJoin(utils.NiceSort(result["tags"]))
510 else:
511 tags = "(none)"
512 if result["reserved_lvs"]:
513 reserved_lvs = utils.CommaJoin(result["reserved_lvs"])
514 else:
515 reserved_lvs = "(none)"
516
517 enabled_hv = result["enabled_hypervisors"]
518 hvparams = dict((k, v) for k, v in result["hvparams"].iteritems()
519 if k in enabled_hv)
520
521 info = [
522 ("Cluster name", result["name"]),
523 ("Cluster UUID", result["uuid"]),
524
525 ("Creation time", utils.FormatTime(result["ctime"])),
526 ("Modification time", utils.FormatTime(result["mtime"])),
527
528 ("Master node", result["master"]),
529
530 ("Architecture (this node)",
531 "%s (%s)" % (result["architecture"][0], result["architecture"][1])),
532
533 ("Tags", tags),
534
535 ("Default hypervisor", result["default_hypervisor"]),
536 ("Enabled hypervisors", utils.CommaJoin(enabled_hv)),
537
538 ("Hypervisor parameters", _FormatGroupedParams(hvparams,
539 opts.roman_integers)),
540
541 ("OS-specific hypervisor parameters",
542 _FormatGroupedParams(result["os_hvp"], opts.roman_integers)),
543
544 ("OS parameters", _FormatGroupedParams(result["osparams"],
545 opts.roman_integers)),
546
547 ("Hidden OSes", utils.CommaJoin(result["hidden_os"])),
548 ("Blacklisted OSes", utils.CommaJoin(result["blacklisted_os"])),
549
550 ("Cluster parameters", [
551 ("candidate pool size",
552 compat.TryToRoman(result["candidate_pool_size"],
553 convert=opts.roman_integers)),
554 ("maximal number of jobs running simultaneously",
555 compat.TryToRoman(result["max_running_jobs"],
556 convert=opts.roman_integers)),
557 ("maximal number of jobs simultaneously tracked by the scheduler",
558 compat.TryToRoman(result["max_tracked_jobs"],
559 convert=opts.roman_integers)),
560 ("mac prefix", result["mac_prefix"]),
561 ("master netdev", result["master_netdev"]),
562 ("master netmask", compat.TryToRoman(result["master_netmask"],
563 opts.roman_integers)),
564 ("use external master IP address setup script",
565 result["use_external_mip_script"]),
566 ("lvm volume group", result["volume_group_name"]),
567 ("lvm reserved volumes", reserved_lvs),
568 ("drbd usermode helper", result["drbd_usermode_helper"]),
569 ("file storage path", result["file_storage_dir"]),
570 ("shared file storage path", result["shared_file_storage_dir"]),
571 ("gluster storage path", result["gluster_storage_dir"]),
572 ("maintenance of node health", result["maintain_node_health"]),
573 ("uid pool", uidpool.FormatUidPool(result["uid_pool"])),
574 ("default instance allocator", result["default_iallocator"]),
575 ("default instance allocator parameters",
576 result["default_iallocator_params"]),
577 ("primary ip version", compat.TryToRoman(result["primary_ip_version"],
578 opts.roman_integers)),
579 ("preallocation wipe disks", result["prealloc_wipe_disks"]),
580 ("OS search path", utils.CommaJoin(pathutils.OS_SEARCH_PATH)),
581 ("ExtStorage Providers search path",
582 utils.CommaJoin(pathutils.ES_SEARCH_PATH)),
583 ("enabled disk templates",
584 utils.CommaJoin(result["enabled_disk_templates"])),
585 ("install image", result["install_image"]),
586 ("instance communication network",
587 result["instance_communication_network"]),
588 ("zeroing image", result["zeroing_image"]),
589 ("compression tools", result["compression_tools"]),
590 ("enabled user shutdown", result["enabled_user_shutdown"]),
591 ]),
592
593 ("Default node parameters",
594 _FormatGroupedParams(result["ndparams"], roman=opts.roman_integers)),
595
596 ("Default instance parameters",
597 _FormatGroupedParams(result["beparams"], roman=opts.roman_integers)),
598
599 ("Default nic parameters",
600 _FormatGroupedParams(result["nicparams"], roman=opts.roman_integers)),
601
602 ("Default disk parameters",
603 _FormatGroupedParams(result["diskparams"], roman=opts.roman_integers)),
604
605 ("Instance policy - limits for instances",
606 FormatPolicyInfo(result["ipolicy"], None, True, opts.roman_integers)),
607 ]
608
609 PrintGenericInfo(info)
610 return 0
611
612
613 def ClusterCopyFile(opts, args):
614 """Copy a file from master to some nodes.
615
616 @param opts: the command line options selected by the user
617 @type args: list
618 @param args: should contain only one element, the path of
619 the file to be copied
620 @rtype: int
621 @return: the desired exit code
622
623 """
624 filename = args[0]
625 filename = os.path.abspath(filename)
626
627 if not os.path.exists(filename):
628 raise errors.OpPrereqError("No such filename '%s'" % filename,
629 errors.ECODE_INVAL)
630
631 cl = GetClient()
632 qcl = GetClient()
633 try:
634 cluster_name = cl.QueryConfigValues(["cluster_name"])[0]
635
636 results = GetOnlineNodes(nodes=opts.nodes, cl=qcl, filter_master=True,
637 secondary_ips=opts.use_replication_network,
638 nodegroup=opts.nodegroup)
639 ports = GetNodesSshPorts(opts.nodes, qcl)
640 finally:
641 cl.Close()
642 qcl.Close()
643
644 srun = ssh.SshRunner(cluster_name)
645 for (node, port) in zip(results, ports):
646 if not srun.CopyFileToNode(node, port, filename):
647 ToStderr("Copy of file %s to node %s:%d failed", filename, node, port)
648
649 return 0
650
651
652 def RunClusterCommand(opts, args):
653 """Run a command on some nodes.
654
655 @param opts: the command line options selected by the user
656 @type args: list
657 @param args: should contain the command to be run and its arguments
658 @rtype: int
659 @return: the desired exit code
660
661 """
662 cl = GetClient()
663 qcl = GetClient()
664
665 command = " ".join(args)
666
667 nodes = GetOnlineNodes(nodes=opts.nodes, cl=qcl, nodegroup=opts.nodegroup)
668 ports = GetNodesSshPorts(nodes, qcl)
669
670 cluster_name, master_node = cl.QueryConfigValues(["cluster_name",
671 "master_node"])
672
673 srun = ssh.SshRunner(cluster_name=cluster_name)
674
675 # Make sure master node is at list end
676 if master_node in nodes:
677 nodes.remove(master_node)
678 nodes.append(master_node)
679
680 for (name, port) in zip(nodes, ports):
681 result = srun.Run(name, constants.SSH_LOGIN_USER, command, port=port)
682
683 if opts.failure_only and result.exit_code == constants.EXIT_SUCCESS:
684 # Do not output anything for successful commands
685 continue
686
687 ToStdout("------------------------------------------------")
688 if opts.show_machine_names:
689 for line in result.output.splitlines():
690 ToStdout("%s: %s", name, line)
691 else:
692 ToStdout("node: %s", name)
693 ToStdout("%s", result.output)
694 ToStdout("return code = %s", result.exit_code)
695
696 return 0
697
698
699 def VerifyCluster(opts, args):
700 """Verify integrity of cluster, performing various test on nodes.
701
702 @param opts: the command line options selected by the user
703 @type args: list
704 @param args: should be an empty list
705 @rtype: int
706 @return: the desired exit code
707
708 """
709 skip_checks = []
710
711 if opts.skip_nplusone_mem:
712 skip_checks.append(constants.VERIFY_NPLUSONE_MEM)
713
714 cl = GetClient()
715
716 op = opcodes.OpClusterVerify(verbose=opts.verbose,
717 error_codes=opts.error_codes,
718 debug_simulate_errors=opts.simulate_errors,
719 skip_checks=skip_checks,
720 ignore_errors=opts.ignore_errors,
721 group_name=opts.nodegroup)
722 result = SubmitOpCode(op, cl=cl, opts=opts)
723
724 # Keep track of submitted jobs
725 jex = JobExecutor(cl=cl, opts=opts)
726
727 for (status, job_id) in result[constants.JOB_IDS_KEY]:
728 jex.AddJobId(None, status, job_id)
729
730 results = jex.GetResults()
731
732 (bad_jobs, bad_results) = \
733 map(len,
734 # Convert iterators to lists
735 map(list,
736 # Count errors
737 map(compat.partial(itertools.ifilterfalse, bool),
738 # Convert result to booleans in a tuple
739 zip(*((job_success, len(op_results) == 1 and op_results[0])
740 for (job_success, op_results) in results)))))
741
742 if bad_jobs == 0 and bad_results == 0:
743 rcode = constants.EXIT_SUCCESS
744 else:
745 rcode = constants.EXIT_FAILURE
746 if bad_jobs > 0:
747 ToStdout("%s job(s) failed while verifying the cluster.", bad_jobs)
748
749 return rcode
750
751
752 def VerifyDisks(opts, args):
753 """Verify integrity of cluster disks.
754
755 @param opts: the command line options selected by the user
756 @type args: list
757 @param args: should be an empty list
758 @rtype: int
759 @return: the desired exit code
760
761 """
762 cl = GetClient()
763
764 op = opcodes.OpClusterVerifyDisks()
765
766 result = SubmitOpCode(op, cl=cl, opts=opts)
767
768 # Keep track of submitted jobs
769 jex = JobExecutor(cl=cl, opts=opts)
770
771 for (status, job_id) in result[constants.JOB_IDS_KEY]:
772 jex.AddJobId(None, status, job_id)
773
774 retcode = constants.EXIT_SUCCESS
775
776 for (status, result) in jex.GetResults():
777 if not status:
778 ToStdout("Job failed: %s", result)
779 continue
780
781 ((bad_nodes, instances, missing), ) = result
782
783 for node, text in bad_nodes.items():
784 ToStdout("Error gathering data on node %s: %s",
785 node, utils.SafeEncode(text[-400:]))
786 retcode = constants.EXIT_FAILURE
787 ToStdout("You need to fix these nodes first before fixing instances")
788
789 for iname in instances:
790 if iname in missing:
791 continue
792 op = opcodes.OpInstanceActivateDisks(instance_name=iname)
793 try:
794 ToStdout("Activating disks for instance '%s'", iname)
795 SubmitOpCode(op, opts=opts, cl=cl)
796 except errors.GenericError, err:
797 nret, msg = FormatError(err)
798 retcode |= nret
799 ToStderr("Error activating disks for instance %s: %s", iname, msg)
800
801 if missing:
802 for iname, ival in missing.iteritems():
803 all_missing = compat.all(x[0] in bad_nodes for x in ival)
804 if all_missing:
805 ToStdout("Instance %s cannot be verified as it lives on"
806 " broken nodes", iname)
807 else:
808 ToStdout("Instance %s has missing logical volumes:", iname)
809 ival.sort()
810 for node, vol in ival:
811 if node in bad_nodes:
812 ToStdout("\tbroken node %s /dev/%s", node, vol)
813 else:
814 ToStdout("\t%s /dev/%s", node, vol)
815
816 ToStdout("You need to replace or recreate disks for all the above"
817 " instances if this message persists after fixing broken nodes.")
818 retcode = constants.EXIT_FAILURE
819 elif not instances:
820 ToStdout("No disks need to be activated.")
821
822 return retcode
823
824
825 def RepairDiskSizes(opts, args):
826 """Verify sizes of cluster disks.
827
828 @param opts: the command line options selected by the user
829 @type args: list
830 @param args: optional list of instances to restrict check to
831 @rtype: int
832 @return: the desired exit code
833
834 """
835 op = opcodes.OpClusterRepairDiskSizes(instances=args)
836 SubmitOpCode(op, opts=opts)
837
838
839 @UsesRPC
840 def MasterFailover(opts, args):
841 """Failover the master node.
842
843 This command, when run on a non-master node, will cause the current
844 master to cease being master, and the non-master to become new
845 master.
846
847 @param opts: the command line options selected by the user
848 @type args: list
849 @param args: should be an empty list
850 @rtype: int
851 @return: the desired exit code
852
853 """
854 if opts.no_voting and not opts.yes_do_it:
855 usertext = ("This will perform the failover even if most other nodes"
856 " are down, or if this node is outdated. This is dangerous"
857 " as it can lead to a non-consistent cluster. Check the"
858 " gnt-cluster(8) man page before proceeding. Continue?")
859 if not AskUser(usertext):
860 return 1
861
862 rvlaue, msgs = bootstrap.MasterFailover(no_voting=opts.no_voting)
863 for msg in msgs:
864 ToStderr(msg)
865 return rvlaue
866
867
868 def MasterPing(opts, args):
869 """Checks if the master is alive.
870
871 @param opts: the command line options selected by the user
872 @type args: list
873 @param args: should be an empty list
874 @rtype: int
875 @return: the desired exit code
876
877 """
878 try:
879 cl = GetClient()
880 cl.QueryClusterInfo()
881 return 0
882 except Exception: # pylint: disable=W0703
883 return 1
884
885
886 def SearchTags(opts, args):
887 """Searches the tags on all the cluster.
888
889 @param opts: the command line options selected by the user
890 @type args: list
891 @param args: should contain only one element, the tag pattern
892 @rtype: int
893 @return: the desired exit code
894
895 """
896 op = opcodes.OpTagsSearch(pattern=args[0])
897 result = SubmitOpCode(op, opts=opts)
898 if not result:
899 return 1
900 result = list(result)
901 result.sort()
902 for path, tag in result:
903 ToStdout("%s %s", path, tag)
904
905
906 def _ReadAndVerifyCert(cert_filename, verify_private_key=False):
907 """Reads and verifies an X509 certificate.
908
909 @type cert_filename: string
910 @param cert_filename: the path of the file containing the certificate to
911 verify encoded in PEM format
912 @type verify_private_key: bool
913 @param verify_private_key: whether to verify the private key in addition to
914 the public certificate
915 @rtype: string
916 @return: a string containing the PEM-encoded certificate.
917
918 """
919 try:
920 pem = utils.ReadFile(cert_filename)
921 except IOError, err:
922 raise errors.X509CertError(cert_filename,
923 "Unable to read certificate: %s" % str(err))
924
925 try:
926 OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem)
927 except Exception, err:
928 raise errors.X509CertError(cert_filename,
929 "Unable to load certificate: %s" % str(err))
930
931 if verify_private_key:
932 try:
933 OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, pem)
934 except Exception, err:
935 raise errors.X509CertError(cert_filename,
936 "Unable to load private key: %s" % str(err))
937
938 return pem
939
940
941 def _RenewCrypto(new_cluster_cert, new_rapi_cert, # pylint: disable=R0911
942 rapi_cert_filename, new_spice_cert, spice_cert_filename,
943 spice_cacert_filename, new_confd_hmac_key, new_cds,
944 cds_filename, force, new_node_cert, verbose, debug):
945 """Renews cluster certificates, keys and secrets.
946
947 @type new_cluster_cert: bool
948 @param new_cluster_cert: Whether to generate a new cluster certificate
949 @type new_rapi_cert: bool
950 @param new_rapi_cert: Whether to generate a new RAPI certificate
951 @type rapi_cert_filename: string
952 @param rapi_cert_filename: Path to file containing new RAPI certificate
953 @type new_spice_cert: bool
954 @param new_spice_cert: Whether to generate a new SPICE certificate
955 @type spice_cert_filename: string
956 @param spice_cert_filename: Path to file containing new SPICE certificate
957 @type spice_cacert_filename: string
958 @param spice_cacert_filename: Path to file containing the certificate of the
959 CA that signed the SPICE certificate
960 @type new_confd_hmac_key: bool
961 @param new_confd_hmac_key: Whether to generate a new HMAC key
962 @type new_cds: bool
963 @param new_cds: Whether to generate a new cluster domain secret
964 @type cds_filename: string
965 @param cds_filename: Path to file containing new cluster domain secret
966 @type force: bool
967 @param force: Whether to ask user for confirmation
968 @type new_node_cert: string
969 @param new_node_cert: Whether to generate new node certificates
970 @type verbose: boolean
971 @param verbose: show verbose output
972 @type debug: boolean
973 @param debug: show debug output
974
975 """
976 if new_rapi_cert and rapi_cert_filename:
977 ToStderr("Only one of the --new-rapi-certificate and --rapi-certificate"
978 " options can be specified at the same time.")
979 return 1
980
981 if new_cds and cds_filename:
982 ToStderr("Only one of the --new-cluster-domain-secret and"
983 " --cluster-domain-secret options can be specified at"
984 " the same time.")
985 return 1
986
987 if new_spice_cert and (spice_cert_filename or spice_cacert_filename):
988 ToStderr("When using --new-spice-certificate, the --spice-certificate"
989 " and --spice-ca-certificate must not be used.")
990 return 1
991
992 if bool(spice_cacert_filename) ^ bool(spice_cert_filename):
993 ToStderr("Both --spice-certificate and --spice-ca-certificate must be"
994 " specified.")
995 return 1
996
997 rapi_cert_pem, spice_cert_pem, spice_cacert_pem = (None, None, None)
998 try:
999 if rapi_cert_filename:
1000 rapi_cert_pem = _ReadAndVerifyCert(rapi_cert_filename, True)
1001 if spice_cert_filename:
1002 spice_cert_pem = _ReadAndVerifyCert(spice_cert_filename, True)
1003 spice_cacert_pem = _ReadAndVerifyCert(spice_cacert_filename)
1004 except errors.X509CertError, err:
1005 ToStderr("Unable to load X509 certificate from %s: %s", err[0], err[1])
1006 return 1
1007
1008 if cds_filename:
1009 try:
1010 cds = utils.ReadFile(cds_filename)
1011 except Exception, err: # pylint: disable=W0703
1012 ToStderr("Can't load new cluster domain secret from %s: %s" %
1013 (cds_filename, str(err)))
1014 return 1
1015 else:
1016 cds = None
1017
1018 if not force:
1019 usertext = ("This requires all daemons on all nodes to be restarted and"
1020 " may take some time. Continue?")
1021 if not AskUser(usertext):
1022 return 1
1023
1024 def _RenewCryptoInner(ctx):
1025 ctx.feedback_fn("Updating certificates and keys")
1026
1027 bootstrap.GenerateClusterCrypto(False,
1028 new_rapi_cert,
1029 new_spice_cert,
1030 new_confd_hmac_key,
1031 new_cds,
1032 False,
1033 None,
1034 rapi_cert_pem=rapi_cert_pem,
1035 spice_cert_pem=spice_cert_pem,
1036 spice_cacert_pem=spice_cacert_pem,
1037 cds=cds)
1038
1039 files_to_copy = []
1040
1041 if new_rapi_cert or rapi_cert_pem:
1042 files_to_copy.append(pathutils.RAPI_CERT_FILE)
1043
1044 if new_spice_cert or spice_cert_pem:
1045 files_to_copy.append(pathutils.SPICE_CERT_FILE)
1046 files_to_copy.append(pathutils.SPICE_CACERT_FILE)
1047
1048 if new_confd_hmac_key:
1049 files_to_copy.append(pathutils.CONFD_HMAC_KEY)
1050
1051 if new_cds or cds:
1052 files_to_copy.append(pathutils.CLUSTER_DOMAIN_SECRET_FILE)
1053
1054 if files_to_copy:
1055 for node_name in ctx.nonmaster_nodes:
1056 port = ctx.ssh_ports[node_name]
1057 ctx.feedback_fn("Copying %s to %s:%d" %
1058 (", ".join(files_to_copy), node_name, port))
1059 for file_name in files_to_copy:
1060 ctx.ssh.CopyFileToNode(node_name, port, file_name)
1061
1062 def _RenewClientCerts(ctx):
1063 ctx.feedback_fn("Updating client SSL certificates.")
1064
1065 cluster_name = ssconf.SimpleStore().GetClusterName()
1066
1067 for node_name in ctx.nonmaster_nodes + [ctx.master_node]:
1068 ssh_port = ctx.ssh_ports[node_name]
1069 data = {
1070 constants.NDS_CLUSTER_NAME: cluster_name,
1071 constants.NDS_NODE_DAEMON_CERTIFICATE:
1072 utils.ReadFile(pathutils.NODED_CERT_FILE),
1073 constants.NDS_NODE_NAME: node_name,
1074 constants.NDS_ACTION: constants.CRYPTO_ACTION_CREATE,
1075 }
1076
1077 bootstrap.RunNodeSetupCmd(
1078 cluster_name,
1079 node_name,
1080 pathutils.SSL_UPDATE,
1081 ctx.debug,
1082 ctx.verbose,
1083 True, # use cluster key
1084 False, # ask key
1085 True, # strict host check
1086 ssh_port,
1087 data)
1088
1089 # Create a temporary ssconf file using the master's client cert digest
1090 # and the 'bootstrap' keyword to enable distribution of all nodes' digests.
1091 master_digest = utils.GetCertificateDigest()
1092 ssconf_master_candidate_certs_filename = os.path.join(
1093 pathutils.DATA_DIR, "%s%s" %
1094 (constants.SSCONF_FILEPREFIX, constants.SS_MASTER_CANDIDATES_CERTS))
1095 utils.WriteFile(
1096 ssconf_master_candidate_certs_filename,
1097 data="%s=%s" % (constants.CRYPTO_BOOTSTRAP, master_digest))
1098 for node_name in ctx.nonmaster_nodes:
1099 port = ctx.ssh_ports[node_name]
1100 ctx.feedback_fn("Copying %s to %s:%d" %
1101 (ssconf_master_candidate_certs_filename, node_name, port))
1102 ctx.ssh.CopyFileToNode(node_name, port,
1103 ssconf_master_candidate_certs_filename)
1104
1105 # Write the boostrap entry to the config using wconfd.
1106 config_live_lock = utils.livelock.LiveLock("renew_crypto")
1107 cfg = config.GetConfig(None, config_live_lock)
1108 cfg.AddNodeToCandidateCerts(constants.CRYPTO_BOOTSTRAP, master_digest)
1109 cfg.Update(cfg.GetClusterInfo(), ctx.feedback_fn)
1110
1111 def _RenewServerAndClientCerts(ctx):
1112 ctx.feedback_fn("Updating the cluster SSL certificate.")
1113
1114 master_name = ssconf.SimpleStore().GetMasterNode()
1115 bootstrap.GenerateClusterCrypto(True, # cluster cert
1116 False, # rapi cert
1117 False, # spice cert
1118 False, # confd hmac key
1119 False, # cds
1120 True, # client cert
1121 master_name)
1122
1123 for node_name in ctx.nonmaster_nodes:
1124 port = ctx.ssh_ports[node_name]
1125 server_cert = pathutils.NODED_CERT_FILE
1126 ctx.feedback_fn("Copying %s to %s:%d" %
1127 (server_cert, node_name, port))
1128 ctx.ssh.CopyFileToNode(node_name, port, server_cert)
1129
1130 _RenewClientCerts(ctx)
1131
1132 if new_cluster_cert or new_rapi_cert or new_spice_cert \
1133 or new_confd_hmac_key or new_cds:
1134 RunWhileClusterStopped(ToStdout, _RenewCryptoInner)
1135
1136 # If only node certficates are recreated, call _RenewClientCerts only.
1137 if new_node_cert and not new_cluster_cert:
1138 RunWhileDaemonsStopped(ToStdout, [constants.NODED, constants.WCONFD],
1139 _RenewClientCerts, verbose=verbose, debug=debug)
1140
1141 # If the cluster certificate are renewed, the client certificates need
1142 # to be renewed too.
1143 if new_cluster_cert:
1144 RunWhileDaemonsStopped(ToStdout, [constants.NODED, constants.WCONFD],
1145 _RenewServerAndClientCerts, verbose=verbose,
1146 debug=debug)
1147
1148 ToStdout("All requested certificates and keys have been replaced."
1149 " Running \"gnt-cluster verify\" now is recommended.")
1150
1151 if new_node_cert or new_cluster_cert:
1152 cl = GetClient()
1153 renew_op = opcodes.OpClusterRenewCrypto()
1154 SubmitOpCode(renew_op, cl=cl)
1155
1156 return 0
1157
1158
1159 def RenewCrypto(opts, args):
1160 """Renews cluster certificates, keys and secrets.
1161
1162 """
1163 return _RenewCrypto(opts.new_cluster_cert,
1164 opts.new_rapi_cert,
1165 opts.rapi_cert,
1166 opts.new_spice_cert,
1167 opts.spice_cert,
1168 opts.spice_cacert,
1169 opts.new_confd_hmac_key,
1170 opts.new_cluster_domain_secret,
1171 opts.cluster_domain_secret,
1172 opts.force,
1173 opts.new_node_cert,
1174 opts.verbose,
1175 opts.debug > 0)
1176
1177
1178 def _GetEnabledDiskTemplates(opts):
1179 """Determine the list of enabled disk templates.
1180
1181 """
1182 if opts.enabled_disk_templates:
1183 return opts.enabled_disk_templates.split(",")
1184 else:
1185 return None
1186
1187
1188 def _GetVgName(opts, enabled_disk_templates):
1189 """Determine the volume group name.
1190
1191 @type enabled_disk_templates: list of strings
1192 @param enabled_disk_templates: cluster-wide enabled disk-templates
1193
1194 """
1195 # consistency between vg name and enabled disk templates
1196 vg_name = None
1197 if opts.vg_name is not None:
1198 vg_name = opts.vg_name
1199 if enabled_disk_templates:
1200 if vg_name and not utils.IsLvmEnabled(enabled_disk_templates):
1201 ToStdout("You specified a volume group with --vg-name, but you did not"
1202 " enable any of the following lvm-based disk templates: %s" %
1203 utils.CommaJoin(constants.DTS_LVM))
1204 return vg_name
1205
1206
1207 def _GetDrbdHelper(opts, enabled_disk_templates):
1208 """Determine the DRBD usermode helper.
1209
1210 """
1211 drbd_helper = opts.drbd_helper
1212 if enabled_disk_templates:
1213 drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
1214 if not drbd_enabled and opts.drbd_helper:
1215 ToStdout("You specified a DRBD usermode helper with "
1216 " --drbd-usermode-helper while DRBD is not enabled.")
1217 return drbd_helper
1218
1219
1220 def _GetCompressionTools(opts):
1221 """Determine the list of custom compression tools.
1222
1223 """
1224 if opts.compression_tools:
1225 return opts.compression_tools.split(",")
1226 elif opts.compression_tools is None:
1227 return None # To note the parameter was not provided
1228 else:
1229 return constants.IEC_DEFAULT_TOOLS # Resetting to default
1230
1231
1232 def SetClusterParams(opts, args):
1233 """Modify the cluster.
1234
1235 @param opts: the command line options selected by the user
1236 @type args: list
1237 @param args: should be an empty list
1238 @rtype: int
1239 @return: the desired exit code
1240
1241 """
1242 if not (opts.vg_name is not None or
1243 opts.drbd_helper is not None or
1244 opts.enabled_hypervisors or opts.hvparams or
1245 opts.beparams or opts.nicparams or
1246 opts.ndparams or opts.diskparams or
1247 opts.candidate_pool_size is not None or
1248 opts.max_running_jobs is not None or
1249 opts.max_tracked_jobs is not None or
1250 opts.uid_pool is not None or
1251 opts.maintain_node_health is not None or
1252 opts.add_uids is not None or
1253 opts.remove_uids is not None or
1254 opts.default_iallocator is not None or
1255 opts.default_iallocator_params or
1256 opts.reserved_lvs is not None or
1257 opts.mac_prefix is not None or
1258 opts.master_netdev is not None or
1259 opts.master_netmask is not None or
1260 opts.use_external_mip_script is not None or
1261 opts.prealloc_wipe_disks is not None or
1262 opts.hv_state or
1263 opts.enabled_disk_templates or
1264 opts.disk_state or
1265 opts.ipolicy_bounds_specs is not None or
1266 opts.ipolicy_std_specs is not None or
1267 opts.ipolicy_disk_templates is not None or
1268 opts.ipolicy_vcpu_ratio is not None or
1269 opts.ipolicy_spindle_ratio is not None or
1270 opts.modify_etc_hosts is not None or
1271 opts.file_storage_dir is not None or
1272 opts.install_image is not None or
1273 opts.instance_communication_network is not None or
1274 opts.zeroing_image is not None or
1275 opts.shared_file_storage_dir is not None or
1276 opts.compression_tools is not None or
1277 opts.shared_file_storage_dir is not None or
1278 opts.enabled_user_shutdown is not None):
1279 ToStderr("Please give at least one of the parameters.")
1280 return 1
1281
1282 enabled_disk_templates = _GetEnabledDiskTemplates(opts)
1283 vg_name = _GetVgName(opts, enabled_disk_templates)
1284
1285 try:
1286 drbd_helper = _GetDrbdHelper(opts, enabled_disk_templates)
1287 except errors.OpPrereqError, e:
1288 ToStderr(str(e))
1289 return 1
1290
1291 hvlist = opts.enabled_hypervisors
1292 if hvlist is not None:
1293 hvlist = hvlist.split(",")
1294
1295 # a list of (name, dict) we can pass directly to dict() (or [])
1296 hvparams = dict(opts.hvparams)
1297 for hv_params in hvparams.values():
1298 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1299
1300 diskparams = dict(opts.diskparams)
1301
1302 for dt_params in diskparams.values():
1303 utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
1304
1305 beparams = opts.beparams
1306 utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
1307
1308 nicparams = opts.nicparams
1309 utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
1310
1311 ndparams = opts.ndparams
1312 if ndparams is not None:
1313 utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
1314
1315 ipolicy = CreateIPolicyFromOpts(
1316 minmax_ispecs=opts.ipolicy_bounds_specs,
1317 std_ispecs=opts.ipolicy_std_specs,
1318 ipolicy_disk_templates=opts.ipolicy_disk_templates,
1319 ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
1320 ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
1321 )
1322
1323 mnh = opts.maintain_node_health
1324
1325 uid_pool = opts.uid_pool
1326 if uid_pool is not None:
1327 uid_pool = uidpool.ParseUidPool(uid_pool)
1328
1329 add_uids = opts.add_uids
1330 if add_uids is not None:
1331 add_uids = uidpool.ParseUidPool(add_uids)
1332
1333 remove_uids = opts.remove_uids
1334 if remove_uids is not None:
1335 remove_uids = uidpool.ParseUidPool(remove_uids)
1336
1337 if opts.reserved_lvs is not None:
1338 if opts.reserved_lvs == "":
1339 opts.reserved_lvs = []
1340 else:
1341 opts.reserved_lvs = utils.UnescapeAndSplit(opts.reserved_lvs, sep=",")
1342
1343 if opts.master_netmask is not None:
1344 try:
1345 opts.master_netmask = int(opts.master_netmask)
1346 except ValueError:
1347 ToStderr("The --master-netmask option expects an int parameter.")
1348 return 1
1349
1350 ext_ip_script = opts.use_external_mip_script
1351
1352 if opts.disk_state:
1353 disk_state = utils.FlatToDict(opts.disk_state)
1354 else:
1355 disk_state = {}
1356
1357 hv_state = dict(opts.hv_state)
1358
1359 compression_tools = _GetCompressionTools(opts)
1360
1361 op = opcodes.OpClusterSetParams(
1362 vg_name=vg_name,
1363 drbd_helper=drbd_helper,
1364 enabled_hypervisors=hvlist,
1365 hvparams=hvparams,
1366 os_hvp=None,
1367 beparams=beparams,
1368 nicparams=nicparams,
1369 ndparams=ndparams,
1370 diskparams=diskparams,
1371 ipolicy=ipolicy,
1372 candidate_pool_size=opts.candidate_pool_size,
1373 max_running_jobs=opts.max_running_jobs,
1374 max_tracked_jobs=opts.max_tracked_jobs,
1375 maintain_node_health=mnh,
1376 modify_etc_hosts=opts.modify_etc_hosts,
1377 uid_pool=uid_pool,
1378 add_uids=add_uids,
1379 remove_uids=remove_uids,
1380 default_iallocator=opts.default_iallocator,
1381 default_iallocator_params=opts.default_iallocator_params,
1382 prealloc_wipe_disks=opts.prealloc_wipe_disks,
1383 mac_prefix=opts.mac_prefix,
1384 master_netdev=opts.master_netdev,
1385 master_netmask=opts.master_netmask,
1386 reserved_lvs=opts.reserved_lvs,
1387 use_external_mip_script=ext_ip_script,
1388 hv_state=hv_state,
1389 disk_state=disk_state,
1390 enabled_disk_templates=enabled_disk_templates,
1391 force=opts.force,
1392 file_storage_dir=opts.file_storage_dir,
1393 install_image=opts.install_image,
1394 instance_communication_network=opts.instance_communication_network,
1395 zeroing_image=opts.zeroing_image,
1396 shared_file_storage_dir=opts.shared_file_storage_dir,
1397 compression_tools=compression_tools,
1398 enabled_user_shutdown=opts.enabled_user_shutdown,
1399 )
1400 return base.GetResult(None, opts, SubmitOrSend(op, opts))
1401
1402
1403 def QueueOps(opts, args):
1404 """Queue operations.
1405
1406 @param opts: the command line options selected by the user
1407 @type args: list
1408 @param args: should contain only one element, the subcommand
1409 @rtype: int
1410 @return: the desired exit code
1411
1412 """
1413 command = args[0]
1414 client = GetClient()
1415 if command in ("drain", "undrain"):
1416 drain_flag = command == "drain"
1417 client.SetQueueDrainFlag(drain_flag)
1418 elif command == "info":
1419 result = client.QueryConfigValues(["drain_flag"])
1420 if result[0]:
1421 val = "set"
1422 else:
1423 val = "unset"
1424 ToStdout("The drain flag is %s" % val)
1425 else:
1426 raise errors.OpPrereqError("Command '%s' is not valid." % command,
1427 errors.ECODE_INVAL)
1428
1429 return 0
1430
1431
1432 def _ShowWatcherPause(until):
1433 if until is None or until < time.time():
1434 ToStdout("The watcher is not paused.")
1435 else:
1436 ToStdout("The watcher is paused until %s.", time.ctime(until))
1437
1438
1439 def WatcherOps(opts, args):
1440 """Watcher operations.
1441
1442 @param opts: the command line options selected by the user
1443 @type args: list
1444 @param args: should contain only one element, the subcommand
1445 @rtype: int
1446 @return: the desired exit code
1447
1448 """
1449 command = args[0]
1450 client = GetClient()
1451
1452 if command == "continue":
1453 client.SetWatcherPause(None)
1454 ToStdout("The watcher is no longer paused.")
1455
1456 elif command == "pause":
1457 if len(args) < 2:
1458 raise errors.OpPrereqError("Missing pause duration", errors.ECODE_INVAL)
1459
1460 result = client.SetWatcherPause(time.time() + ParseTimespec(args[1]))
1461 _ShowWatcherPause(result)
1462
1463 elif command == "info":
1464 result = client.QueryConfigValues(["watcher_pause"])
1465 _ShowWatcherPause(result[0])
1466
1467 else:
1468 raise errors.OpPrereqError("Command '%s' is not valid." % command,
1469 errors.ECODE_INVAL)
1470
1471 return 0
1472
1473
1474 def _OobPower(opts, node_list, power):
1475 """Puts the node in the list to desired power state.
1476
1477 @param opts: The command line options selected by the user
1478 @param node_list: The list of nodes to operate on
1479 @param power: True if they should be powered on, False otherwise
1480 @return: The success of the operation (none failed)
1481
1482 """
1483 if power:
1484 command = constants.OOB_POWER_ON
1485 else:
1486 command = constants.OOB_POWER_OFF
1487
1488 op = opcodes.OpOobCommand(node_names=node_list,
1489 command=command,
1490 ignore_status=True,
1491 timeout=opts.oob_timeout,
1492 power_delay=opts.power_delay)
1493 result = SubmitOpCode(op, opts=opts)
1494 errs = 0
1495 for node_result in result:
1496 (node_tuple, data_tuple) = node_result
1497 (_, node_name) = node_tuple
1498 (data_status, _) = data_tuple
1499 if data_status != constants.RS_NORMAL:
1500 assert data_status != constants.RS_UNAVAIL
1501 errs += 1
1502 ToStderr("There was a problem changing power for %s, please investigate",
1503 node_name)
1504
1505 if errs > 0:
1506 return False
1507
1508 return True
1509
1510
1511 def _InstanceStart(opts, inst_list, start, no_remember=False):
1512 """Puts the instances in the list to desired state.
1513
1514 @param opts: The command line options selected by the user
1515 @param inst_list: The list of instances to operate on
1516 @param start: True if they should be started, False for shutdown
1517 @param no_remember: If the instance state should be remembered
1518 @return: The success of the operation (none failed)
1519
1520 """
1521 if start:
1522 opcls = opcodes.OpInstanceStartup
1523 text_submit, text_success, text_failed = ("startup", "started", "starting")
1524 else:
1525 opcls = compat.partial(opcodes.OpInstanceShutdown,
1526 timeout=opts.shutdown_timeout,
1527 no_remember=no_remember)
1528 text_submit, text_success, text_failed = ("shutdown", "stopped", "stopping")
1529
1530 jex = JobExecutor(opts=opts)
1531
1532 for inst in inst_list:
1533 ToStdout("Submit %s of instance %s", text_submit, inst)
1534 op = opcls(instance_name=inst)
1535 jex.QueueJob(inst, op)
1536
1537 results = jex.GetResults()
1538 bad_cnt = len([1 for (success, _) in results if not success])
1539
1540 if bad_cnt == 0:
1541 ToStdout("All instances have been %s successfully", text_success)
1542 else:
1543 ToStderr("There were errors while %s instances:\n"
1544 "%d error(s) out of %d instance(s)", text_failed, bad_cnt,
1545 len(results))
1546 return False
1547
1548 return True
1549
1550
1551 class _RunWhenNodesReachableHelper(object):
1552 """Helper class to make shared internal state sharing easier.
1553
1554 @ivar success: Indicates if all action_cb calls were successful
1555
1556 """
1557 def __init__(self, node_list, action_cb, node2ip, port, feedback_fn,
1558 _ping_fn=netutils.TcpPing, _sleep_fn=time.sleep):
1559 """Init the object.
1560
1561 @param node_list: The list of nodes to be reachable
1562 @param action_cb: Callback called when a new host is reachable
1563 @type node2ip: dict
1564 @param node2ip: Node to ip mapping
1565 @param port: The port to use for the TCP ping
1566 @param feedback_fn: The function used for feedback
1567 @param _ping_fn: Function to check reachabilty (for unittest use only)
1568 @param _sleep_fn: Function to sleep (for unittest use only)
1569
1570 """
1571 self.down = set(node_list)
1572 self.up = set()
1573 self.node2ip = node2ip
1574 self.success = True
1575 self.action_cb = action_cb
1576 self.port = port
1577 self.feedback_fn = feedback_fn
1578 self._ping_fn = _ping_fn
1579 self._sleep_fn = _sleep_fn
1580
1581 def __call__(self):
1582 """When called we run action_cb.
1583
1584 @raises utils.RetryAgain: When there are still down nodes
1585
1586 """
1587 if not self.action_cb(self.up):
1588 self.success = False
1589
1590 if self.down:
1591 raise utils.RetryAgain()
1592 else:
1593 return self.success
1594
1595 def Wait(self, secs):
1596 """Checks if a host is up or waits remaining seconds.
1597
1598 @param secs: The secs remaining
1599
1600 """
1601 start = time.time()
1602 for node in self.down:
1603 if self._ping_fn(self.node2ip[node], self.port, timeout=_EPO_PING_TIMEOUT,
1604 live_port_needed=True):
1605 self.feedback_fn("Node %s became available" % node)
1606 self.up.add(node)
1607 self.down -= self.up
1608 # If we have a node available there is the possibility to run the
1609 # action callback successfully, therefore we don't wait and return
1610 return
1611
1612 self._sleep_fn(max(0.0, start + secs - time.time()))
1613
1614
1615 def _RunWhenNodesReachable(node_list, action_cb, interval):
1616 """Run action_cb when nodes become reachable.
1617
1618 @param node_list: The list of nodes to be reachable
1619 @param action_cb: Callback called when a new host is reachable
1620 @param interval: The earliest time to retry
1621
1622 """
1623 client = GetClient()
1624 cluster_info = client.QueryClusterInfo()
1625 if cluster_info["primary_ip_version"] == constants.IP4_VERSION:
1626 family = netutils.IPAddress.family
1627 else:
1628 family = netutils.IP6Address.family
1629
1630 node2ip = dict((node, netutils.GetHostname(node, family=family).ip)
1631 for node in node_list)
1632
1633 port = netutils.GetDaemonPort(constants.NODED)
1634 helper = _RunWhenNodesReachableHelper(node_list, action_cb, node2ip, port,
1635 ToStdout)
1636
1637 try:
1638 return utils.Retry(helper, interval, _EPO_REACHABLE_TIMEOUT,
1639 wait_fn=helper.Wait)
1640 except utils.RetryTimeout:
1641 ToStderr("Time exceeded while waiting for nodes to become reachable"
1642 " again:\n - %s", " - ".join(helper.down))
1643 return False
1644
1645
1646 def _MaybeInstanceStartup(opts, inst_map, nodes_online,
1647 _instance_start_fn=_InstanceStart):
1648 """Start the instances conditional based on node_states.
1649
1650 @param opts: The command line options selected by the user
1651 @param inst_map: A dict of inst -> nodes mapping
1652 @param nodes_online: A list of nodes online
1653 @param _instance_start_fn: Callback to start instances (unittest use only)
1654 @return: Success of the operation on all instances
1655
1656 """
1657 start_inst_list = []
1658 for (inst, nodes) in inst_map.items():
1659 if not (nodes - nodes_online):
1660 # All nodes the instance lives on are back online
1661 start_inst_list.append(inst)
1662
1663 for inst in start_inst_list:
1664 del inst_map[inst]
1665
1666 if start_inst_list:
1667 return _instance_start_fn(opts, start_inst_list, True)
1668
1669 return True
1670
1671
1672 def _EpoOn(opts, full_node_list, node_list, inst_map):
1673 """Does the actual power on.
1674
1675 @param opts: The command line options selected by the user
1676 @param full_node_list: All nodes to operate on (includes nodes not supporting
1677 OOB)
1678 @param node_list: The list of nodes to operate on (all need to support OOB)
1679 @param inst_map: A dict of inst -> nodes mapping
1680 @return: The desired exit status
1681
1682 """
1683 if node_list and not _OobPower(opts, node_list, False):
1684 ToStderr("Not all nodes seem to get back up, investigate and start"
1685 " manually if needed")
1686
1687 # Wait for the nodes to be back up
1688 action_cb = compat.partial(_MaybeInstanceStartup, opts, dict(inst_map))
1689
1690 ToStdout("Waiting until all nodes are available again")
1691 if not _RunWhenNodesReachable(full_node_list, action_cb, _EPO_PING_INTERVAL):
1692 ToStderr("Please investigate and start stopped instances manually")
1693 return constants.EXIT_FAILURE
1694
1695 return constants.EXIT_SUCCESS
1696
1697
1698 def _EpoOff(opts, node_list, inst_map):
1699 """Does the actual power off.
1700
1701 @param opts: The command line options selected by the user
1702 @param node_list: The list of nodes to operate on (all need to support OOB)
1703 @param inst_map: A dict of inst -> nodes mapping
1704 @return: The desired exit status
1705
1706 """
1707 if not _InstanceStart(opts, inst_map.keys(), False, no_remember=True):
1708 ToStderr("Please investigate and stop instances manually before continuing")
1709 return constants.EXIT_FAILURE
1710
1711 if not node_list:
1712 return constants.EXIT_SUCCESS
1713
1714 if _OobPower(opts, node_list, False):
1715 return constants.EXIT_SUCCESS
1716 else:
1717 return constants.EXIT_FAILURE
1718
1719
1720 def Epo(opts, args, qcl=None, _on_fn=_EpoOn, _off_fn=_EpoOff,
1721 _confirm_fn=ConfirmOperation,
1722 _stdout_fn=ToStdout, _stderr_fn=ToStderr):
1723 """EPO operations.
1724
1725 @param opts: the command line options selected by the user
1726 @type args: list
1727 @param args: should contain only one element, the subcommand
1728 @rtype: int
1729 @return: the desired exit code
1730
1731 """
1732 if opts.groups and opts.show_all:
1733 _stderr_fn("Only one of --groups or --all are allowed")
1734 return constants.EXIT_FAILURE
1735 elif args and opts.show_all:
1736 _stderr_fn("Arguments in combination with --all are not allowed")
1737 return constants.EXIT_FAILURE
1738
1739 if qcl is None:
1740 # Query client
1741 qcl = GetClient()
1742
1743 if opts.groups:
1744 node_query_list = \
1745 itertools.chain(*qcl.QueryGroups(args, ["node_list"], False))
1746 else:
1747 node_query_list = args
1748
1749 result = qcl.QueryNodes(node_query_list, ["name", "master", "pinst_list",
1750 "sinst_list", "powered", "offline"],
1751 False)
1752
1753 all_nodes = map(compat.fst, result)
1754 node_list = []
1755 inst_map = {}
1756 for (node, master, pinsts, sinsts, powered, offline) in result:
1757 if not offline:
1758 for inst in (pinsts + sinsts):
1759 if inst in inst_map:
1760 if not master:
1761 inst_map[inst].add(node)
1762 elif master:
1763 inst_map[inst] = set()
1764 else:
1765 inst_map[inst] = set([node])
1766
1767 if master and opts.on:
1768 # We ignore the master for turning on the machines, in fact we are
1769 # already operating on the master at this point :)
1770 continue
1771 elif master and not opts.show_all:
1772 _stderr_fn("%s is the master node, please do a master-failover to another"
1773 " node not affected by the EPO or use --all if you intend to"
1774 " shutdown the whole cluster", node)
1775 return constants.EXIT_FAILURE
1776 elif powered is None:
1777 _stdout_fn("Node %s does not support out-of-band handling, it can not be"
1778 " handled in a fully automated manner", node)
1779 elif powered == opts.on:
1780 _stdout_fn("Node %s is already in desired power state, skipping", node)
1781 elif not offline or (offline and powered):
1782 node_list.append(node)
1783
1784 if not (opts.force or _confirm_fn(all_nodes, "nodes", "epo")):
1785 return constants.EXIT_FAILURE
1786
1787 if opts.on:
1788 return _on_fn(opts, all_nodes, node_list, inst_map)
1789 else:
1790 return _off_fn(opts, node_list, inst_map)
1791
1792
1793 def _GetCreateCommand(info):
1794 buf = StringIO()
1795 buf.write("gnt-cluster init")
1796 PrintIPolicyCommand(buf, info["ipolicy"], False)
1797 buf.write(" ")
1798 buf.write(info["name"])
1799 return buf.getvalue()
1800
1801
1802 def ShowCreateCommand(opts, args):
1803 """Shows the command that can be used to re-create the cluster.
1804
1805 Currently it works only for ipolicy specs.
1806
1807 """
1808 cl = GetClient()
1809 result = cl.QueryClusterInfo()
1810 ToStdout(_GetCreateCommand(result))
1811
1812
1813 def _RunCommandAndReport(cmd):
1814 """Run a command and report its output, iff it failed.
1815
1816 @param cmd: the command to execute
1817 @type cmd: list
1818 @rtype: bool
1819 @return: False, if the execution failed.
1820
1821 """
1822 result = utils.RunCmd(cmd)
1823 if result.failed:
1824 ToStderr("Command %s failed: %s; Output %s" %
1825 (cmd, result.fail_reason, result.output))
1826 return False
1827 return True
1828
1829
1830 def _VerifyCommand(cmd):
1831 """Verify that a given command succeeds on all online nodes.
1832
1833 As this function is intended to run during upgrades, it
1834 is implemented in such a way that it still works, if all Ganeti
1835 daemons are down.
1836
1837 @param cmd: the command to execute
1838 @type cmd: list
1839 @rtype: list
1840 @return: the list of node names that are online where
1841 the command failed.
1842
1843 """
1844 command = utils.text.ShellQuoteArgs([str(val) for val in cmd])
1845
1846 nodes = ssconf.SimpleStore().GetOnlineNodeList()
1847 master_node = ssconf.SimpleStore().GetMasterNode()
1848 cluster_name = ssconf.SimpleStore().GetClusterName()
1849
1850 # If master node is in 'nodes', make sure master node is at list end
1851 if master_node in nodes:
1852 nodes.remove(master_node)
1853 nodes.append(master_node)
1854
1855 failed = []
1856
1857 srun = ssh.SshRunner(cluster_name=cluster_name)
1858 for name in nodes:
1859 result = srun.Run(name, constants.SSH_LOGIN_USER, command)
1860 if result.exit_code != 0:
1861 failed.append(name)
1862
1863 return failed
1864
1865
1866 def _VerifyVersionInstalled(versionstring):
1867 """Verify that the given version of ganeti is installed on all online nodes.
1868
1869 Do nothing, if this is the case, otherwise print an appropriate
1870 message to stderr.
1871
1872 @param versionstring: the version to check for
1873 @type versionstring: string
1874 @rtype: bool
1875 @return: True, if the version is installed on all online nodes
1876
1877 """
1878 badnodes = _VerifyCommand(["test", "-d",
1879 os.path.join(pathutils.PKGLIBDIR, versionstring)])
1880 if badnodes:
1881 ToStderr("Ganeti version %s not installed on nodes %s"
1882 % (versionstring, ", ".join(badnodes)))
1883 return False
1884
1885 return True
1886
1887
1888 def _GetRunning():
1889 """Determine the list of running jobs.
1890
1891 @rtype: list
1892 @return: the number of jobs still running
1893
1894 """
1895 cl = GetClient()
1896 qfilter = qlang.MakeSimpleFilter("status",
1897 frozenset([constants.JOB_STATUS_RUNNING]))
1898 return len(cl.Query(constants.QR_JOB, [], qfilter).data)
1899
1900
1901 def _SetGanetiVersion(versionstring):
1902 """Set the active version of ganeti to the given versionstring
1903
1904 @type versionstring: string
1905 @rtype: list
1906 @return: the list of nodes where the version change failed
1907
1908 """
1909 failed = []
1910 if constants.HAS_GNU_LN:
1911 failed.extend(_VerifyCommand(
1912 ["ln", "-s", "-f", "-T",
1913 os.path.join(pathutils.PKGLIBDIR, versionstring),
1914 os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
1915 failed.extend(_VerifyCommand(
1916 ["ln", "-s", "-f", "-T",
1917 os.path.join(pathutils.SHAREDIR, versionstring),
1918 os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
1919 else:
1920 failed.extend(_VerifyCommand(
1921 ["rm", "-f", os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
1922 failed.extend(_VerifyCommand(
1923 ["ln", "-s", "-f", os.path.join(pathutils.PKGLIBDIR, versionstring),
1924 os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
1925 failed.extend(_VerifyCommand(
1926 ["rm", "-f", os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
1927 failed.extend(_VerifyCommand(
1928 ["ln", "-s", "-f", os.path.join(pathutils.SHAREDIR, versionstring),
1929 os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
1930 return list(set(failed))
1931
1932
1933 def _ExecuteCommands(fns):
1934 """Execute a list of functions, in reverse order.
1935
1936 @type fns: list of functions.
1937 @param fns: the functions to be executed.
1938
1939 """
1940 for fn in reversed(fns):
1941 fn()
1942
1943
1944 def _GetConfigVersion():
1945 """Determine the version the configuration file currently has.
1946
1947 @rtype: tuple or None
1948 @return: (major, minor, revision) if the version can be determined,
1949 None otherwise
1950
1951 """
1952 config_data = serializer.LoadJson(utils.ReadFile(pathutils.CLUSTER_CONF_FILE))
1953 try:
1954 config_version = config_data["version"]
1955 except KeyError:
1956 return None
1957 return utils.SplitVersion(config_version)
1958
1959
1960 def _ReadIntentToUpgrade():
1961 """Read the file documenting the intent to upgrade the cluster.
1962
1963 @rtype: (string, string) or (None, None)
1964 @return: (old version, version to upgrade to), if the file exists,
1965 and (None, None) otherwise.
1966
1967 """
1968 if not os.path.isfile(pathutils.INTENT_TO_UPGRADE):
1969 return (None, None)
1970
1971 contentstring = utils.ReadFile(pathutils.INTENT_TO_UPGRADE)
1972 contents = utils.UnescapeAndSplit(contentstring)
1973 if len(contents) != 3:
1974 # file syntactically mal-formed
1975 return (None, None)
1976 return (contents[0], contents[1])
1977
1978
1979 def _WriteIntentToUpgrade(version):
1980 """Write file documenting the intent to upgrade the cluster.
1981
1982 @type version: string
1983 @param version: the version we intent to upgrade to
1984
1985 """
1986 utils.WriteFile(pathutils.INTENT_TO_UPGRADE,
1987 data=utils.EscapeAndJoin([constants.RELEASE_VERSION, version,
1988 "%d" % os.getpid()]))
1989
1990
1991 def _UpgradeBeforeConfigurationChange(versionstring):
1992 """
1993 Carry out all the tasks necessary for an upgrade that happen before
1994 the configuration file, or Ganeti version, changes.
1995
1996 @type versionstring: string
1997 @param versionstring: the version to upgrade to
1998 @rtype: (bool, list)
1999 @return: tuple of a bool indicating success and a list of rollback tasks
2000
2001 """
2002 rollback = []
2003
2004 if not _VerifyVersionInstalled(versionstring):
2005 return (False, rollback)
2006
2007 _WriteIntentToUpgrade(versionstring)
2008 rollback.append(
2009 lambda: utils.RunCmd(["rm", "-f", pathutils.INTENT_TO_UPGRADE]))
2010
2011 ToStdout("Draining queue")
2012 client = GetClient()
2013 client.SetQueueDrainFlag(True)
2014
2015 rollback.append(lambda: GetClient().SetQueueDrainFlag(False))
2016
2017 if utils.SimpleRetry(0, _GetRunning,
2018 constants.UPGRADE_QUEUE_POLL_INTERVAL,
2019 constants.UPGRADE_QUEUE_DRAIN_TIMEOUT):
2020 ToStderr("Failed to completely empty the queue.")
2021 return (False, rollback)
2022
2023 ToStdout("Pausing the watcher for one hour.")
2024 rollback.append(lambda: GetClient().SetWatcherPause(None))
2025 GetClient().SetWatcherPause(time.time() + 60 * 60)
2026
2027 ToStdout("Stopping daemons on master node.")
2028 if not _RunCommandAndReport([pathutils.DAEMON_UTIL, "stop-all"]):
2029 return (False, rollback)
2030
2031 if not _VerifyVersionInstalled(versionstring):
2032 utils.RunCmd([pathutils.DAEMON_UTIL, "start-all"])
2033 return (False, rollback)
2034
2035 ToStdout("Stopping daemons everywhere.")
2036 rollback.append(lambda: _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"]))
2037 badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "stop-all"])
2038 if badnodes:
2039 ToStderr("Failed to stop daemons on %s." % (", ".join(badnodes),))
2040 return (False, rollback)
2041
2042 backuptar = os.path.join(pathutils.BACKUP_DIR, "ganeti%d.tar" % time.time())
2043 ToStdout("Backing up configuration as %s" % backuptar)
2044 if not _RunCommandAndReport(["mkdir", "-p", pathutils.BACKUP_DIR]):
2045 return (False, rollback)
2046
2047 # Create the archive in a safe manner, as it contains sensitive
2048 # information.
2049 (_, tmp_name) = tempfile.mkstemp(prefix=backuptar, dir=pathutils.BACKUP_DIR)
2050 if not _RunCommandAndReport(["tar", "-cf", tmp_name,
2051 "--exclude=queue/archive",
2052 pathutils.DATA_DIR]):
2053 return (False, rollback)
2054
2055 os.rename(tmp_name, backuptar)
2056 return (True, rollback)
2057
2058
2059 def _VersionSpecificDowngrade():
2060 """
2061 Perform any additional downrade tasks that are version specific
2062 and need to be done just after the configuration downgrade. This
2063 function needs to be idempotent, so that it can be redone if the
2064 downgrade procedure gets interrupted after changing the
2065 configuration.
2066
2067 Note that this function has to be reset with every version bump.
2068
2069 @return: True upon success
2070 """
2071 ToStdout("Performing version-specific downgrade tasks.")
2072
2073 nodes = ssconf.SimpleStore().GetOnlineNodeList()
2074 cluster_name = ssconf.SimpleStore().GetClusterName()
2075 ssh_ports = ssconf.SimpleStore().GetSshPortMap()
2076
2077 for node in nodes:
2078 data = {
2079 constants.NDS_CLUSTER_NAME: cluster_name,
2080 constants.NDS_NODE_DAEMON_CERTIFICATE:
2081 utils.ReadFile(pathutils.NODED_CERT_FILE),
2082 constants.NDS_NODE_NAME: node,
2083 constants.NDS_ACTION: constants.CRYPTO_ACTION_DELETE,
2084 }
2085
2086 try:
2087 bootstrap.RunNodeSetupCmd(
2088 cluster_name,
2089 node,
2090 pathutils.SSL_UPDATE,
2091 True, # debug
2092 True, # verbose,
2093 True, # use cluster key
2094 False, # ask key
2095 True, # strict host check
2096 ssh_ports[node],
2097 data)
2098 except Exception as e: # pylint: disable=W0703
2099 # As downgrading can fail if a node is temporarily unreachable
2100 # only output the error, but do not abort the entire operation.
2101 ToStderr("Downgrading SSL setup of node '%s' failed: %s." %
2102 (node, e))
2103
2104 return True
2105
2106
2107 def _SwitchVersionAndConfig(versionstring, downgrade):
2108 """
2109 Switch to the new Ganeti version and change the configuration,
2110 in correct order.
2111
2112 @type versionstring: string
2113 @param versionstring: the version to change to
2114 @type downgrade: bool
2115 @param downgrade: True, if the configuration should be downgraded
2116 @rtype: (bool, list)
2117 @return: tupe of a bool indicating success, and a list of
2118 additional rollback tasks
2119
2120 """
2121 rollback = []
2122 if downgrade:
2123 ToStdout("Downgrading configuration")
2124 if not _RunCommandAndReport([pathutils.CFGUPGRADE, "--downgrade", "-f"]):
2125 return (False, rollback)
2126 # Note: version specific downgrades need to be done before switching
2127 # binaries, so that we still have the knowledgeable binary if the downgrade
2128 # process gets interrupted at this point.
2129 if not _VersionSpecificDowngrade():
2130 return (False, rollback)
2131
2132 # Configuration change is the point of no return. From then onwards, it is
2133 # safer to push through the up/dowgrade than to try to roll it back.
2134
2135 ToStdout("Switching to version %s on all nodes" % versionstring)
2136 rollback.append(lambda: _SetGanetiVersion(constants.DIR_VERSION))
2137 badnodes = _SetGanetiVersion(versionstring)
2138 if badnodes:
2139 ToStderr("Failed to switch to Ganeti version %s on nodes %s"
2140 % (versionstring, ", ".join(badnodes)))
2141 if not downgrade:
2142 return (False, rollback)
2143
2144 # Now that we have changed to the new version of Ganeti we should
2145 # not communicate over luxi any more, as luxi might have changed in
2146 # incompatible ways. Therefore, manually call the corresponding ganeti
2147 # commands using their canonical (version independent) path.
2148
2149 if not downgrade:
2150 ToStdout("Upgrading configuration")
2151 if not _RunCommandAndReport([pathutils.CFGUPGRADE, "-f"]):
2152 return (False, rollback)
2153
2154 return (True, rollback)
2155
2156
2157 def _UpgradeAfterConfigurationChange(oldversion):
2158 """
2159 Carry out the upgrade actions necessary after switching to the new
2160 Ganeti version and updating the configuration.
2161
2162 As this part is run at a time where the new version of Ganeti is already
2163 running, no communication should happen via luxi, as this is not a stable
2164 interface. Also, as the configuration change is the point of no return,
2165 all actions are pushed trough, even if some of them fail.
2166
2167 @param oldversion: the version the upgrade started from
2168 @type oldversion: string
2169 @rtype: int
2170 @return: the intended return value
2171
2172 """
2173 returnvalue = 0
2174
2175 ToStdout("Ensuring directories everywhere.")
2176 badnodes = _VerifyCommand([pathutils.ENSURE_DIRS])
2177 if badnodes:
2178 ToStderr("Warning: failed to ensure directories on %s." %
2179 (", ".join(badnodes)))
2180 returnvalue = 1
2181
2182 ToStdout("Starting daemons everywhere.")
2183 badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"])
2184 if badnodes:
2185 ToStderr("Warning: failed to start daemons on %s." % (", ".join(badnodes),))
2186 returnvalue = 1
2187
2188 ToStdout("Redistributing the configuration.")
2189 if not _RunCommandAndReport(["gnt-cluster", "redist-conf", "--yes-do-it"]):
2190 returnvalue = 1
2191
2192 ToStdout("Restarting daemons everywhere.")
2193 badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "stop-all"])
2194 badnodes.extend(_VerifyCommand([pathutils.DAEMON_UTIL, "start-all"]))
2195 if badnodes:
2196 ToStderr("Warning: failed to start daemons on %s." %
2197 (", ".join(list(set(badnodes))),))
2198 returnvalue = 1
2199
2200 ToStdout("Undraining the queue.")
2201 if not _RunCommandAndReport(["gnt-cluster", "queue", "undrain"]):
2202 returnvalue = 1
2203
2204 _RunCommandAndReport(["rm", "-f", pathutils.INTENT_TO_UPGRADE])
2205
2206 ToStdout("Running post-upgrade hooks")
2207 if not _RunCommandAndReport([pathutils.POST_UPGRADE, oldversion]):
2208 returnvalue = 1
2209
2210 ToStdout("Unpausing the watcher.")
2211 if not _RunCommandAndReport(["gnt-cluster", "watcher", "continue"]):
2212 returnvalue = 1
2213
2214 ToStdout("Verifying cluster.")
2215 if not _RunCommandAndReport(["gnt-cluster", "verify"]):
2216 returnvalue = 1
2217
2218 return returnvalue
2219
2220
2221 def UpgradeGanetiCommand(opts, args):
2222 """Upgrade a cluster to a new ganeti version.
2223
2224 @param opts: the command line options selected by the user
2225 @type args: list
2226 @param args: should be an empty list
2227 @rtype: int
2228 @return: the desired exit code
2229
2230 """
2231 if ((not opts.resume and opts.to is None)
2232 or (opts.resume and opts.to is not None)):
2233 ToStderr("Precisely one of the options --to and --resume"
2234 " has to be given")
2235 return 1
2236
2237 # If we're not told to resume, verify there is no upgrade
2238 # in progress.
2239 if not opts.resume:
2240 oldversion, versionstring = _ReadIntentToUpgrade()
2241 if versionstring is not None:
2242 # An upgrade is going on; verify whether the target matches
2243 if versionstring == opts.to:
2244 ToStderr("An upgrade is already in progress. Target version matches,"
2245 " resuming.")
2246 opts.resume = True
2247 opts.to = None
2248 else:
2249 ToStderr("An upgrade from %s to %s is in progress; use --resume to"
2250 " finish it first" % (oldversion, versionstring))
2251 return 1
2252
2253 oldversion = constants.RELEASE_VERSION
2254
2255 if opts.resume:
2256 ssconf.CheckMaster(False)
2257 oldversion, versionstring = _ReadIntentToUpgrade()
2258 if versionstring is None:
2259 return 0
2260 version = utils.version.ParseVersion(versionstring)
2261 if version is None:
2262 return 1
2263 configversion = _GetConfigVersion()
2264 if configversion is None:
2265 return 1
2266 # If the upgrade we resume was an upgrade between compatible
2267 # versions (like 2.10.0 to 2.10.1), the correct configversion
2268 # does not guarantee that the config has been updated.
2269 # However, in the case of a compatible update with the configuration
2270 # not touched, we are running a different dirversion with the same
2271 # config version.
2272 config_already_modified = \
2273 (utils.IsCorrectConfigVersion(version, configversion) and
2274 not (versionstring != constants.DIR_VERSION and
2275 configversion == (constants.CONFIG_MAJOR, constants.CONFIG_MINOR,
2276 constants.CONFIG_REVISION)))
2277 if not config_already_modified:
2278 # We have to start from the beginning; however, some daemons might have
2279 # already been stopped, so the only way to get into a well-defined state
2280 # is by starting all daemons again.
2281 _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"])
2282 else:
2283 versionstring = opts.to
2284 config_already_modified = False
2285 version = utils.version.ParseVersion(versionstring)
2286 if version is None:
2287 ToStderr("Could not parse version string %s" % versionstring)
2288 return 1
2289
2290 msg = utils.version.UpgradeRange(version)
2291 if msg is not None:
2292 ToStderr("Cannot upgrade to %s: %s" % (versionstring, msg))
2293 return 1
2294
2295 if not config_already_modified:
2296 success, rollback = _UpgradeBeforeConfigurationChange(versionstring)
2297 if not success:
2298 _ExecuteCommands(rollback)
2299 return 1
2300 else:
2301 rollback = []
2302
2303 downgrade = utils.version.ShouldCfgdowngrade(version)
2304
2305 success, additionalrollback = \
2306 _SwitchVersionAndConfig(versionstring, downgrade)
2307 if not success:
2308 rollback.extend(additionalrollback)
2309 _ExecuteCommands(rollback)
2310 return 1
2311
2312 return _UpgradeAfterConfigurationChange(oldversion)
2313
2314
2315 commands = {
2316 "init": (
2317 InitCluster, [ArgHost(min=1, max=1)],
2318 [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, GLOBAL_FILEDIR_OPT,
2319 HVLIST_OPT, MAC_PREFIX_OPT, MASTER_NETDEV_OPT, MASTER_NETMASK_OPT,
2320 NIC_PARAMS_OPT, NOMODIFY_ETCHOSTS_OPT, NOMODIFY_SSH_SETUP_OPT,
2321 SECONDARY_IP_OPT, VG_NAME_OPT, MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT,
2322 DRBD_HELPER_OPT, DEFAULT_IALLOCATOR_OPT, DEFAULT_IALLOCATOR_PARAMS_OPT,
2323 PRIMARY_IP_VERSION_OPT, PREALLOC_WIPE_DISKS_OPT, NODE_PARAMS_OPT,
2324 GLOBAL_SHARED_FILEDIR_OPT, USE_EXTERNAL_MIP_SCRIPT, DISK_PARAMS_OPT,
2325 HV_STATE_OPT, DISK_STATE_OPT, ENABLED_DISK_TEMPLATES_OPT,
2326 IPOLICY_STD_SPECS_OPT, GLOBAL_GLUSTER_FILEDIR_OPT, INSTALL_IMAGE_OPT,
2327 ZEROING_IMAGE_OPT, COMPRESSION_TOOLS_OPT,
2328 ENABLED_USER_SHUTDOWN_OPT,
2329 ]
2330 + INSTANCE_POLICY_OPTS + SPLIT_ISPECS_OPTS,
2331 "[opts...] <cluster_name>", "Initialises a new cluster configuration"),
2332 "destroy": (
2333 DestroyCluster, ARGS_NONE, [YES_DOIT_OPT],
2334 "", "Destroy cluster"),
2335 "rename": (
2336 RenameCluster, [ArgHost(min=1, max=1)],
2337 [FORCE_OPT, DRY_RUN_OPT],
2338 "<new_name>",
2339 "Renames the cluster"),
2340 "redist-conf": (
2341 RedistributeConfig, ARGS_NONE, SUBMIT_OPTS +
2342 [DRY_RUN_OPT, PRIORITY_OPT, FORCE_DISTRIBUTION],
2343 "", "Forces a push of the configuration file and ssconf files"
2344 " to the nodes in the cluster"),
2345 "verify": (
2346 VerifyCluster, ARGS_NONE,
2347 [VERBOSE_OPT, DEBUG_SIMERR_OPT, ERROR_CODES_OPT, NONPLUS1_OPT,
2348 DRY_RUN_OPT, PRIORITY_OPT, NODEGROUP_OPT, IGNORE_ERRORS_OPT],
2349 "", "Does a check on the cluster configuration"),
2350 "verify-disks": (
2351 VerifyDisks, ARGS_NONE, [PRIORITY_OPT],
2352 "", "Does a check on the cluster disk status"),
2353 "repair-disk-sizes": (
2354 RepairDiskSizes, ARGS_MANY_INSTANCES, [DRY_RUN_OPT, PRIORITY_OPT],
2355 "[instance...]", "Updates mismatches in recorded disk sizes"),
2356 "master-failover": (
2357 MasterFailover, ARGS_NONE, [NOVOTING_OPT, FORCE_FAILOVER],
2358 "", "Makes the current node the master"),
2359 "master-ping": (
2360 MasterPing, ARGS_NONE, [],
2361 "", "Checks if the master is alive"),
2362 "version": (
2363 ShowClusterVersion, ARGS_NONE, [],
2364 "", "Shows the cluster version"),
2365 "getmaster": (
2366 ShowClusterMaster, ARGS_NONE, [],
2367 "", "Shows the cluster master"),
2368 "copyfile": (
2369 ClusterCopyFile, [ArgFile(min=1, max=1)],
2370 [NODE_LIST_OPT, USE_REPL_NET_OPT, NODEGROUP_OPT],
2371 "[-n node...] <filename>", "Copies a file to all (or only some) nodes"),
2372 "command": (
2373 RunClusterCommand, [ArgCommand(min=1)],
2374 [NODE_LIST_OPT, NODEGROUP_OPT, SHOW_MACHINE_OPT, FAILURE_ONLY_OPT],
2375 "[-n node...] <command>", "Runs a command on all (or only some) nodes"),
2376 "info": (
2377 ShowClusterConfig, ARGS_NONE, [ROMAN_OPT],
2378 "[--roman]", "Show cluster configuration"),
2379 "list-tags": (
2380 ListTags, ARGS_NONE, [], "", "List the tags of the cluster"),
2381 "add-tags": (
2382 AddTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT] + SUBMIT_OPTS,
2383 "tag...", "Add tags to the cluster"),
2384 "remove-tags": (
2385 RemoveTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT] + SUBMIT_OPTS,
2386 "tag...", "Remove tags from the cluster"),
2387 "search-tags": (
2388 SearchTags, [ArgUnknown(min=1, max=1)], [PRIORITY_OPT], "",
2389 "Searches the tags on all objects on"
2390 " the cluster for a given pattern (regex)"),
2391 "queue": (
2392 QueueOps,
2393 [ArgChoice(min=1, max=1, choices=["drain", "undrain", "info"])],
2394 [], "drain|undrain|info", "Change queue properties"),
2395 "watcher": (
2396 WatcherOps,
2397 [ArgChoice(min=1, max=1, choices=["pause", "continue", "info"]),
2398 ArgSuggest(min=0, max=1, choices=["30m", "1h", "4h"])],
2399 [],
2400 "{pause <timespec>|continue|info}", "Change watcher properties"),
2401 "modify": (
2402 SetClusterParams, ARGS_NONE,
2403 [FORCE_OPT,
2404 BACKEND_OPT, CP_SIZE_OPT, RQL_OPT, MAX_TRACK_OPT, INSTALL_IMAGE_OPT,
2405 INSTANCE_COMMUNICATION_NETWORK_OPT, ENABLED_HV_OPT, HVLIST_OPT,
2406 MAC_PREFIX_OPT, MASTER_NETDEV_OPT, MASTER_NETMASK_OPT, NIC_PARAMS_OPT,
2407 VG_NAME_OPT, MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, ADD_UIDS_OPT,
2408 REMOVE_UIDS_OPT, DRBD_HELPER_OPT, DEFAULT_IALLOCATOR_OPT,
2409 DEFAULT_IALLOCATOR_PARAMS_OPT, RESERVED_LVS_OPT, DRY_RUN_OPT, PRIORITY_OPT,
2410 PREALLOC_WIPE_DISKS_OPT, NODE_PARAMS_OPT, USE_EXTERNAL_MIP_SCRIPT,
2411 DISK_PARAMS_OPT, HV_STATE_OPT, DISK_STATE_OPT] + SUBMIT_OPTS +
2412 [ENABLED_DISK_TEMPLATES_OPT, IPOLICY_STD_SPECS_OPT, MODIFY_ETCHOSTS_OPT,
2413 ENABLED_USER_SHUTDOWN_OPT] +
2414 INSTANCE_POLICY_OPTS +
2415 [GLOBAL_FILEDIR_OPT, GLOBAL_SHARED_FILEDIR_OPT, ZEROING_IMAGE_OPT,
2416 COMPRESSION_TOOLS_OPT],
2417 "[opts...]",
2418 "Alters the parameters of the cluster"),
2419 "renew-crypto": (
2420 RenewCrypto, ARGS_NONE,
2421 [NEW_CLUSTER_CERT_OPT, NEW_RAPI_CERT_OPT, RAPI_CERT_OPT,
2422 NEW_CONFD_HMAC_KEY_OPT, FORCE_OPT,
2423 NEW_CLUSTER_DOMAIN_SECRET_OPT, CLUSTER_DOMAIN_SECRET_OPT,
2424 NEW_SPICE_CERT_OPT, SPICE_CERT_OPT, SPICE_CACERT_OPT,
2425 NEW_NODE_CERT_OPT, VERBOSE_OPT],
2426 "[opts...]",
2427 "Renews cluster certificates, keys and secrets"),
2428 "epo": (
2429 Epo, [ArgUnknown()],
2430 [FORCE_OPT, ON_OPT, GROUPS_OPT, ALL_OPT, OOB_TIMEOUT_OPT,
2431 SHUTDOWN_TIMEOUT_OPT, POWER_DELAY_OPT],
2432 "[opts...] [args]",
2433 "Performs an emergency power-off on given args"),
2434 "activate-master-ip": (
2435 ActivateMasterIp, ARGS_NONE, [], "", "Activates the master IP"),
2436 "deactivate-master-ip": (
2437 DeactivateMasterIp, ARGS_NONE, [CONFIRM_OPT], "",
2438 "Deactivates the master IP"),
2439 "show-ispecs-cmd": (
2440 ShowCreateCommand, ARGS_NONE, [], "",
2441 "Show the command line to re-create the cluster"),
2442 "upgrade": (
2443 UpgradeGanetiCommand, ARGS_NONE, [TO_OPT, RESUME_OPT], "",
2444 "Upgrade (or downgrade) to a new Ganeti version"),
2445 }
2446
2447
2448 #: dictionary with aliases for commands
2449 aliases = {
2450 "masterfailover": "master-failover",
2451 "show": "info",
2452 }
2453
2454
2455 def Main():
2456 return GenericMain(commands, override={"tag_type": constants.TAG_CLUSTER},
2457 aliases=aliases)