bffc163dac0a0533ef03a1e3f711ef2a479e68c0
[ganeti-github.git] / lib / client / gnt_cluster.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011, 2012, 2013, 2014 Google Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions are
9 # met:
10 #
11 # 1. Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
13 #
14 # 2. Redistributions in binary form must reproduce the above copyright
15 # notice, this list of conditions and the following disclaimer in the
16 # documentation and/or other materials provided with the distribution.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
19 # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
20 # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30 """Cluster related commands"""
31
32 # pylint: disable=W0401,W0613,W0614,C0103
33 # W0401: Wildcard import ganeti.cli
34 # W0613: Unused argument, since all functions follow the same API
35 # W0614: Unused import %s from wildcard import (since we need cli)
36 # C0103: Invalid name gnt-cluster
37
38 from cStringIO import StringIO
39 import os
40 import time
41 import OpenSSL
42 import tempfile
43 import itertools
44
45 from ganeti.cli import *
46 from ganeti import bootstrap
47 from ganeti import compat
48 from ganeti import constants
49 from ganeti import config
50 from ganeti import errors
51 from ganeti import netutils
52 from ganeti import objects
53 from ganeti import opcodes
54 from ganeti import pathutils
55 from ganeti import qlang
56 from ganeti import serializer
57 from ganeti import ssconf
58 from ganeti import ssh
59 from ganeti import uidpool
60 from ganeti import utils
61 from ganeti.client import base
62
63
64 ON_OPT = cli_option("--on", default=False,
65 action="store_true", dest="on",
66 help="Recover from an EPO")
67
68 GROUPS_OPT = cli_option("--groups", default=False,
69 action="store_true", dest="groups",
70 help="Arguments are node groups instead of nodes")
71
72 FORCE_FAILOVER = cli_option("--yes-do-it", dest="yes_do_it",
73 help="Override interactive check for --no-voting",
74 default=False, action="store_true")
75
76 FORCE_DISTRIBUTION = cli_option("--yes-do-it", dest="yes_do_it",
77 help="Unconditionally distribute the"
78 " configuration, even if the queue"
79 " is drained",
80 default=False, action="store_true")
81
82 TO_OPT = cli_option("--to", default=None, type="string",
83 help="The Ganeti version to upgrade to")
84
85 RESUME_OPT = cli_option("--resume", default=False, action="store_true",
86 help="Resume any pending Ganeti upgrades")
87
88 _EPO_PING_INTERVAL = 30 # 30 seconds between pings
89 _EPO_PING_TIMEOUT = 1 # 1 second
90 _EPO_REACHABLE_TIMEOUT = 15 * 60 # 15 minutes
91
92
93 def _InitEnabledDiskTemplates(opts):
94 """Initialize the list of enabled disk templates.
95
96 """
97 if opts.enabled_disk_templates:
98 return opts.enabled_disk_templates.split(",")
99 else:
100 return constants.DEFAULT_ENABLED_DISK_TEMPLATES
101
102
103 def _InitVgName(opts, enabled_disk_templates):
104 """Initialize the volume group name.
105
106 @type enabled_disk_templates: list of strings
107 @param enabled_disk_templates: cluster-wide enabled disk templates
108
109 """
110 vg_name = None
111 if opts.vg_name is not None:
112 vg_name = opts.vg_name
113 if vg_name:
114 if not utils.IsLvmEnabled(enabled_disk_templates):
115 ToStdout("You specified a volume group with --vg-name, but you did not"
116 " enable any disk template that uses lvm.")
117 elif utils.IsLvmEnabled(enabled_disk_templates):
118 raise errors.OpPrereqError(
119 "LVM disk templates are enabled, but vg name not set.")
120 elif utils.IsLvmEnabled(enabled_disk_templates):
121 vg_name = constants.DEFAULT_VG
122 return vg_name
123
124
125 def _InitDrbdHelper(opts, enabled_disk_templates):
126 """Initialize the DRBD usermode helper.
127
128 """
129 drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
130
131 if not drbd_enabled and opts.drbd_helper is not None:
132 ToStdout("Note: You specified a DRBD usermode helper, while DRBD storage"
133 " is not enabled.")
134
135 if drbd_enabled:
136 if opts.drbd_helper is None:
137 return constants.DEFAULT_DRBD_HELPER
138 if opts.drbd_helper == '':
139 raise errors.OpPrereqError(
140 "Unsetting the drbd usermode helper while enabling DRBD is not"
141 " allowed.")
142
143 return opts.drbd_helper
144
145
146 @UsesRPC
147 def InitCluster(opts, args):
148 """Initialize the cluster.
149
150 @param opts: the command line options selected by the user
151 @type args: list
152 @param args: should contain only one element, the desired
153 cluster name
154 @rtype: int
155 @return: the desired exit code
156
157 """
158 enabled_disk_templates = _InitEnabledDiskTemplates(opts)
159
160 try:
161 vg_name = _InitVgName(opts, enabled_disk_templates)
162 drbd_helper = _InitDrbdHelper(opts, enabled_disk_templates)
163 except errors.OpPrereqError, e:
164 ToStderr(str(e))
165 return 1
166
167 master_netdev = opts.master_netdev
168 if master_netdev is None:
169 nic_mode = opts.nicparams.get(constants.NIC_MODE, None)
170 if not nic_mode:
171 # default case, use bridging
172 master_netdev = constants.DEFAULT_BRIDGE
173 elif nic_mode == constants.NIC_MODE_OVS:
174 # default ovs is different from default bridge
175 master_netdev = constants.DEFAULT_OVS
176 opts.nicparams[constants.NIC_LINK] = constants.DEFAULT_OVS
177
178 hvlist = opts.enabled_hypervisors
179 if hvlist is None:
180 hvlist = constants.DEFAULT_ENABLED_HYPERVISOR
181 hvlist = hvlist.split(",")
182
183 hvparams = dict(opts.hvparams)
184 beparams = opts.beparams
185 nicparams = opts.nicparams
186
187 diskparams = dict(opts.diskparams)
188
189 # check the disk template types here, as we cannot rely on the type check done
190 # by the opcode parameter types
191 diskparams_keys = set(diskparams.keys())
192 if not (diskparams_keys <= constants.DISK_TEMPLATES):
193 unknown = utils.NiceSort(diskparams_keys - constants.DISK_TEMPLATES)
194 ToStderr("Disk templates unknown: %s" % utils.CommaJoin(unknown))
195 return 1
196
197 # prepare beparams dict
198 beparams = objects.FillDict(constants.BEC_DEFAULTS, beparams)
199 utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
200
201 # prepare nicparams dict
202 nicparams = objects.FillDict(constants.NICC_DEFAULTS, nicparams)
203 utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
204
205 # prepare ndparams dict
206 if opts.ndparams is None:
207 ndparams = dict(constants.NDC_DEFAULTS)
208 else:
209 ndparams = objects.FillDict(constants.NDC_DEFAULTS, opts.ndparams)
210 utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
211
212 # prepare hvparams dict
213 for hv in constants.HYPER_TYPES:
214 if hv not in hvparams:
215 hvparams[hv] = {}
216 hvparams[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], hvparams[hv])
217 utils.ForceDictType(hvparams[hv], constants.HVS_PARAMETER_TYPES)
218
219 # prepare diskparams dict
220 for templ in constants.DISK_TEMPLATES:
221 if templ not in diskparams:
222 diskparams[templ] = {}
223 diskparams[templ] = objects.FillDict(constants.DISK_DT_DEFAULTS[templ],
224 diskparams[templ])
225 utils.ForceDictType(diskparams[templ], constants.DISK_DT_TYPES)
226
227 # prepare ipolicy dict
228 ipolicy = CreateIPolicyFromOpts(
229 ispecs_mem_size=opts.ispecs_mem_size,
230 ispecs_cpu_count=opts.ispecs_cpu_count,
231 ispecs_disk_count=opts.ispecs_disk_count,
232 ispecs_disk_size=opts.ispecs_disk_size,
233 ispecs_nic_count=opts.ispecs_nic_count,
234 minmax_ispecs=opts.ipolicy_bounds_specs,
235 std_ispecs=opts.ipolicy_std_specs,
236 ipolicy_disk_templates=opts.ipolicy_disk_templates,
237 ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
238 ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
239 fill_all=True)
240
241 if opts.candidate_pool_size is None:
242 opts.candidate_pool_size = constants.MASTER_POOL_SIZE_DEFAULT
243
244 if opts.mac_prefix is None:
245 opts.mac_prefix = constants.DEFAULT_MAC_PREFIX
246
247 uid_pool = opts.uid_pool
248 if uid_pool is not None:
249 uid_pool = uidpool.ParseUidPool(uid_pool)
250
251 if opts.prealloc_wipe_disks is None:
252 opts.prealloc_wipe_disks = False
253
254 external_ip_setup_script = opts.use_external_mip_script
255 if external_ip_setup_script is None:
256 external_ip_setup_script = False
257
258 try:
259 primary_ip_version = int(opts.primary_ip_version)
260 except (ValueError, TypeError), err:
261 ToStderr("Invalid primary ip version value: %s" % str(err))
262 return 1
263
264 master_netmask = opts.master_netmask
265 try:
266 if master_netmask is not None:
267 master_netmask = int(master_netmask)
268 except (ValueError, TypeError), err:
269 ToStderr("Invalid master netmask value: %s" % str(err))
270 return 1
271
272 if opts.disk_state:
273 disk_state = utils.FlatToDict(opts.disk_state)
274 else:
275 disk_state = {}
276
277 hv_state = dict(opts.hv_state)
278
279 if opts.install_image:
280 install_image = opts.install_image
281 else:
282 install_image = ""
283
284 if opts.zeroing_image:
285 zeroing_image = opts.zeroing_image
286 else:
287 zeroing_image = ""
288
289 compression_tools = _GetCompressionTools(opts)
290
291 default_ialloc_params = opts.default_iallocator_params
292
293 if opts.enabled_user_shutdown:
294 enabled_user_shutdown = True
295 else:
296 enabled_user_shutdown = False
297
298 bootstrap.InitCluster(cluster_name=args[0],
299 secondary_ip=opts.secondary_ip,
300 vg_name=vg_name,
301 mac_prefix=opts.mac_prefix,
302 master_netmask=master_netmask,
303 master_netdev=master_netdev,
304 file_storage_dir=opts.file_storage_dir,
305 shared_file_storage_dir=opts.shared_file_storage_dir,
306 gluster_storage_dir=opts.gluster_storage_dir,
307 enabled_hypervisors=hvlist,
308 hvparams=hvparams,
309 beparams=beparams,
310 nicparams=nicparams,
311 ndparams=ndparams,
312 diskparams=diskparams,
313 ipolicy=ipolicy,
314 candidate_pool_size=opts.candidate_pool_size,
315 modify_etc_hosts=opts.modify_etc_hosts,
316 modify_ssh_setup=opts.modify_ssh_setup,
317 maintain_node_health=opts.maintain_node_health,
318 drbd_helper=drbd_helper,
319 uid_pool=uid_pool,
320 default_iallocator=opts.default_iallocator,
321 default_iallocator_params=default_ialloc_params,
322 primary_ip_version=primary_ip_version,
323 prealloc_wipe_disks=opts.prealloc_wipe_disks,
324 use_external_mip_script=external_ip_setup_script,
325 hv_state=hv_state,
326 disk_state=disk_state,
327 enabled_disk_templates=enabled_disk_templates,
328 install_image=install_image,
329 zeroing_image=zeroing_image,
330 compression_tools=compression_tools,
331 enabled_user_shutdown=enabled_user_shutdown,
332 )
333 op = opcodes.OpClusterPostInit()
334 SubmitOpCode(op, opts=opts)
335 return 0
336
337
338 @UsesRPC
339 def DestroyCluster(opts, args):
340 """Destroy the cluster.
341
342 @param opts: the command line options selected by the user
343 @type args: list
344 @param args: should be an empty list
345 @rtype: int
346 @return: the desired exit code
347
348 """
349 if not opts.yes_do_it:
350 ToStderr("Destroying a cluster is irreversible. If you really want"
351 " destroy this cluster, supply the --yes-do-it option.")
352 return 1
353
354 op = opcodes.OpClusterDestroy()
355 master_uuid = SubmitOpCode(op, opts=opts)
356 # if we reached this, the opcode didn't fail; we can proceed to
357 # shutdown all the daemons
358 bootstrap.FinalizeClusterDestroy(master_uuid)
359 return 0
360
361
362 def RenameCluster(opts, args):
363 """Rename the cluster.
364
365 @param opts: the command line options selected by the user
366 @type args: list
367 @param args: should contain only one element, the new cluster name
368 @rtype: int
369 @return: the desired exit code
370
371 """
372 cl = GetClient()
373
374 (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
375
376 new_name = args[0]
377 if not opts.force:
378 usertext = ("This will rename the cluster from '%s' to '%s'. If you are"
379 " connected over the network to the cluster name, the"
380 " operation is very dangerous as the IP address will be"
381 " removed from the node and the change may not go through."
382 " Continue?") % (cluster_name, new_name)
383 if not AskUser(usertext):
384 return 1
385
386 op = opcodes.OpClusterRename(name=new_name)
387 result = SubmitOpCode(op, opts=opts, cl=cl)
388
389 if result:
390 ToStdout("Cluster renamed from '%s' to '%s'", cluster_name, result)
391
392 return 0
393
394
395 def ActivateMasterIp(opts, args):
396 """Activates the master IP.
397
398 """
399 op = opcodes.OpClusterActivateMasterIp()
400 SubmitOpCode(op)
401 return 0
402
403
404 def DeactivateMasterIp(opts, args):
405 """Deactivates the master IP.
406
407 """
408 if not opts.confirm:
409 usertext = ("This will disable the master IP. All the open connections to"
410 " the master IP will be closed. To reach the master you will"
411 " need to use its node IP."
412 " Continue?")
413 if not AskUser(usertext):
414 return 1
415
416 op = opcodes.OpClusterDeactivateMasterIp()
417 SubmitOpCode(op)
418 return 0
419
420
421 def RedistributeConfig(opts, args):
422 """Forces push of the cluster configuration.
423
424 @param opts: the command line options selected by the user
425 @type args: list
426 @param args: empty list
427 @rtype: int
428 @return: the desired exit code
429
430 """
431 op = opcodes.OpClusterRedistConf()
432 if opts.yes_do_it:
433 SubmitOpCodeToDrainedQueue(op)
434 else:
435 SubmitOrSend(op, opts)
436 return 0
437
438
439 def ShowClusterVersion(opts, args):
440 """Write version of ganeti software to the standard output.
441
442 @param opts: the command line options selected by the user
443 @type args: list
444 @param args: should be an empty list
445 @rtype: int
446 @return: the desired exit code
447
448 """
449 cl = GetClient()
450 result = cl.QueryClusterInfo()
451 ToStdout("Software version: %s", result["software_version"])
452 ToStdout("Internode protocol: %s", result["protocol_version"])
453 ToStdout("Configuration format: %s", result["config_version"])
454 ToStdout("OS api version: %s", result["os_api_version"])
455 ToStdout("Export interface: %s", result["export_version"])
456 ToStdout("VCS version: %s", result["vcs_version"])
457 return 0
458
459
460 def ShowClusterMaster(opts, args):
461 """Write name of master node to the standard output.
462
463 @param opts: the command line options selected by the user
464 @type args: list
465 @param args: should be an empty list
466 @rtype: int
467 @return: the desired exit code
468
469 """
470 master = bootstrap.GetMaster()
471 ToStdout(master)
472 return 0
473
474
475 def _FormatGroupedParams(paramsdict, roman=False):
476 """Format Grouped parameters (be, nic, disk) by group.
477
478 @type paramsdict: dict of dicts
479 @param paramsdict: {group: {param: value, ...}, ...}
480 @rtype: dict of dicts
481 @return: copy of the input dictionaries with strings as values
482
483 """
484 ret = {}
485 for (item, val) in paramsdict.items():
486 if isinstance(val, dict):
487 ret[item] = _FormatGroupedParams(val, roman=roman)
488 elif roman and isinstance(val, int):
489 ret[item] = compat.TryToRoman(val)
490 else:
491 ret[item] = str(val)
492 return ret
493
494
495 def ShowClusterConfig(opts, args):
496 """Shows cluster information.
497
498 @param opts: the command line options selected by the user
499 @type args: list
500 @param args: should be an empty list
501 @rtype: int
502 @return: the desired exit code
503
504 """
505 cl = GetClient()
506 result = cl.QueryClusterInfo()
507
508 if result["tags"]:
509 tags = utils.CommaJoin(utils.NiceSort(result["tags"]))
510 else:
511 tags = "(none)"
512 if result["reserved_lvs"]:
513 reserved_lvs = utils.CommaJoin(result["reserved_lvs"])
514 else:
515 reserved_lvs = "(none)"
516
517 enabled_hv = result["enabled_hypervisors"]
518 hvparams = dict((k, v) for k, v in result["hvparams"].iteritems()
519 if k in enabled_hv)
520
521 info = [
522 ("Cluster name", result["name"]),
523 ("Cluster UUID", result["uuid"]),
524
525 ("Creation time", utils.FormatTime(result["ctime"])),
526 ("Modification time", utils.FormatTime(result["mtime"])),
527
528 ("Master node", result["master"]),
529
530 ("Architecture (this node)",
531 "%s (%s)" % (result["architecture"][0], result["architecture"][1])),
532
533 ("Tags", tags),
534
535 ("Default hypervisor", result["default_hypervisor"]),
536 ("Enabled hypervisors", utils.CommaJoin(enabled_hv)),
537
538 ("Hypervisor parameters", _FormatGroupedParams(hvparams,
539 opts.roman_integers)),
540
541 ("OS-specific hypervisor parameters",
542 _FormatGroupedParams(result["os_hvp"], opts.roman_integers)),
543
544 ("OS parameters", _FormatGroupedParams(result["osparams"],
545 opts.roman_integers)),
546
547 ("Hidden OSes", utils.CommaJoin(result["hidden_os"])),
548 ("Blacklisted OSes", utils.CommaJoin(result["blacklisted_os"])),
549
550 ("Cluster parameters", [
551 ("candidate pool size",
552 compat.TryToRoman(result["candidate_pool_size"],
553 convert=opts.roman_integers)),
554 ("maximal number of jobs running simultaneously",
555 compat.TryToRoman(result["max_running_jobs"],
556 convert=opts.roman_integers)),
557 ("maximal number of jobs simultaneously tracked by the scheduler",
558 compat.TryToRoman(result["max_tracked_jobs"],
559 convert=opts.roman_integers)),
560 ("mac prefix", result["mac_prefix"]),
561 ("master netdev", result["master_netdev"]),
562 ("master netmask", compat.TryToRoman(result["master_netmask"],
563 opts.roman_integers)),
564 ("use external master IP address setup script",
565 result["use_external_mip_script"]),
566 ("lvm volume group", result["volume_group_name"]),
567 ("lvm reserved volumes", reserved_lvs),
568 ("drbd usermode helper", result["drbd_usermode_helper"]),
569 ("file storage path", result["file_storage_dir"]),
570 ("shared file storage path", result["shared_file_storage_dir"]),
571 ("gluster storage path", result["gluster_storage_dir"]),
572 ("maintenance of node health", result["maintain_node_health"]),
573 ("uid pool", uidpool.FormatUidPool(result["uid_pool"])),
574 ("default instance allocator", result["default_iallocator"]),
575 ("default instance allocator parameters",
576 result["default_iallocator_params"]),
577 ("primary ip version", compat.TryToRoman(result["primary_ip_version"],
578 opts.roman_integers)),
579 ("preallocation wipe disks", result["prealloc_wipe_disks"]),
580 ("OS search path", utils.CommaJoin(pathutils.OS_SEARCH_PATH)),
581 ("ExtStorage Providers search path",
582 utils.CommaJoin(pathutils.ES_SEARCH_PATH)),
583 ("enabled disk templates",
584 utils.CommaJoin(result["enabled_disk_templates"])),
585 ("install image", result["install_image"]),
586 ("instance communication network",
587 result["instance_communication_network"]),
588 ("zeroing image", result["zeroing_image"]),
589 ("compression tools", result["compression_tools"]),
590 ("enabled user shutdown", result["enabled_user_shutdown"]),
591 ]),
592
593 ("Default node parameters",
594 _FormatGroupedParams(result["ndparams"], roman=opts.roman_integers)),
595
596 ("Default instance parameters",
597 _FormatGroupedParams(result["beparams"], roman=opts.roman_integers)),
598
599 ("Default nic parameters",
600 _FormatGroupedParams(result["nicparams"], roman=opts.roman_integers)),
601
602 ("Default disk parameters",
603 _FormatGroupedParams(result["diskparams"], roman=opts.roman_integers)),
604
605 ("Instance policy - limits for instances",
606 FormatPolicyInfo(result["ipolicy"], None, True, opts.roman_integers)),
607 ]
608
609 PrintGenericInfo(info)
610 return 0
611
612
613 def ClusterCopyFile(opts, args):
614 """Copy a file from master to some nodes.
615
616 @param opts: the command line options selected by the user
617 @type args: list
618 @param args: should contain only one element, the path of
619 the file to be copied
620 @rtype: int
621 @return: the desired exit code
622
623 """
624 filename = args[0]
625 filename = os.path.abspath(filename)
626
627 if not os.path.exists(filename):
628 raise errors.OpPrereqError("No such filename '%s'" % filename,
629 errors.ECODE_INVAL)
630
631 cl = GetClient()
632 qcl = GetClient()
633 try:
634 cluster_name = cl.QueryConfigValues(["cluster_name"])[0]
635
636 results = GetOnlineNodes(nodes=opts.nodes, cl=qcl, filter_master=True,
637 secondary_ips=opts.use_replication_network,
638 nodegroup=opts.nodegroup)
639 ports = GetNodesSshPorts(opts.nodes, qcl)
640 finally:
641 cl.Close()
642 qcl.Close()
643
644 srun = ssh.SshRunner(cluster_name)
645 for (node, port) in zip(results, ports):
646 if not srun.CopyFileToNode(node, port, filename):
647 ToStderr("Copy of file %s to node %s:%d failed", filename, node, port)
648
649 return 0
650
651
652 def RunClusterCommand(opts, args):
653 """Run a command on some nodes.
654
655 @param opts: the command line options selected by the user
656 @type args: list
657 @param args: should contain the command to be run and its arguments
658 @rtype: int
659 @return: the desired exit code
660
661 """
662 cl = GetClient()
663 qcl = GetClient()
664
665 command = " ".join(args)
666
667 nodes = GetOnlineNodes(nodes=opts.nodes, cl=qcl, nodegroup=opts.nodegroup)
668 ports = GetNodesSshPorts(nodes, qcl)
669
670 cluster_name, master_node = cl.QueryConfigValues(["cluster_name",
671 "master_node"])
672
673 srun = ssh.SshRunner(cluster_name=cluster_name)
674
675 # Make sure master node is at list end
676 if master_node in nodes:
677 nodes.remove(master_node)
678 nodes.append(master_node)
679
680 for (name, port) in zip(nodes, ports):
681 result = srun.Run(name, constants.SSH_LOGIN_USER, command, port=port)
682
683 if opts.failure_only and result.exit_code == constants.EXIT_SUCCESS:
684 # Do not output anything for successful commands
685 continue
686
687 ToStdout("------------------------------------------------")
688 if opts.show_machine_names:
689 for line in result.output.splitlines():
690 ToStdout("%s: %s", name, line)
691 else:
692 ToStdout("node: %s", name)
693 ToStdout("%s", result.output)
694 ToStdout("return code = %s", result.exit_code)
695
696 return 0
697
698
699 def VerifyCluster(opts, args):
700 """Verify integrity of cluster, performing various test on nodes.
701
702 @param opts: the command line options selected by the user
703 @type args: list
704 @param args: should be an empty list
705 @rtype: int
706 @return: the desired exit code
707
708 """
709 skip_checks = []
710
711 if opts.skip_nplusone_mem:
712 skip_checks.append(constants.VERIFY_NPLUSONE_MEM)
713
714 cl = GetClient()
715
716 op = opcodes.OpClusterVerify(verbose=opts.verbose,
717 error_codes=opts.error_codes,
718 debug_simulate_errors=opts.simulate_errors,
719 skip_checks=skip_checks,
720 ignore_errors=opts.ignore_errors,
721 group_name=opts.nodegroup)
722 result = SubmitOpCode(op, cl=cl, opts=opts)
723
724 # Keep track of submitted jobs
725 jex = JobExecutor(cl=cl, opts=opts)
726
727 for (status, job_id) in result[constants.JOB_IDS_KEY]:
728 jex.AddJobId(None, status, job_id)
729
730 results = jex.GetResults()
731
732 (bad_jobs, bad_results) = \
733 map(len,
734 # Convert iterators to lists
735 map(list,
736 # Count errors
737 map(compat.partial(itertools.ifilterfalse, bool),
738 # Convert result to booleans in a tuple
739 zip(*((job_success, len(op_results) == 1 and op_results[0])
740 for (job_success, op_results) in results)))))
741
742 if bad_jobs == 0 and bad_results == 0:
743 rcode = constants.EXIT_SUCCESS
744 else:
745 rcode = constants.EXIT_FAILURE
746 if bad_jobs > 0:
747 ToStdout("%s job(s) failed while verifying the cluster.", bad_jobs)
748
749 return rcode
750
751
752 def VerifyDisks(opts, args):
753 """Verify integrity of cluster disks.
754
755 @param opts: the command line options selected by the user
756 @type args: list
757 @param args: should be an empty list
758 @rtype: int
759 @return: the desired exit code
760
761 """
762 cl = GetClient()
763
764 op = opcodes.OpClusterVerifyDisks()
765
766 result = SubmitOpCode(op, cl=cl, opts=opts)
767
768 # Keep track of submitted jobs
769 jex = JobExecutor(cl=cl, opts=opts)
770
771 for (status, job_id) in result[constants.JOB_IDS_KEY]:
772 jex.AddJobId(None, status, job_id)
773
774 retcode = constants.EXIT_SUCCESS
775
776 for (status, result) in jex.GetResults():
777 if not status:
778 ToStdout("Job failed: %s", result)
779 continue
780
781 ((bad_nodes, instances, missing), ) = result
782
783 for node, text in bad_nodes.items():
784 ToStdout("Error gathering data on node %s: %s",
785 node, utils.SafeEncode(text[-400:]))
786 retcode = constants.EXIT_FAILURE
787 ToStdout("You need to fix these nodes first before fixing instances")
788
789 for iname in instances:
790 if iname in missing:
791 continue
792 op = opcodes.OpInstanceActivateDisks(instance_name=iname)
793 try:
794 ToStdout("Activating disks for instance '%s'", iname)
795 SubmitOpCode(op, opts=opts, cl=cl)
796 except errors.GenericError, err:
797 nret, msg = FormatError(err)
798 retcode |= nret
799 ToStderr("Error activating disks for instance %s: %s", iname, msg)
800
801 if missing:
802 for iname, ival in missing.iteritems():
803 all_missing = compat.all(x[0] in bad_nodes for x in ival)
804 if all_missing:
805 ToStdout("Instance %s cannot be verified as it lives on"
806 " broken nodes", iname)
807 else:
808 ToStdout("Instance %s has missing logical volumes:", iname)
809 ival.sort()
810 for node, vol in ival:
811 if node in bad_nodes:
812 ToStdout("\tbroken node %s /dev/%s", node, vol)
813 else:
814 ToStdout("\t%s /dev/%s", node, vol)
815
816 ToStdout("You need to replace or recreate disks for all the above"
817 " instances if this message persists after fixing broken nodes.")
818 retcode = constants.EXIT_FAILURE
819 elif not instances:
820 ToStdout("No disks need to be activated.")
821
822 return retcode
823
824
825 def RepairDiskSizes(opts, args):
826 """Verify sizes of cluster disks.
827
828 @param opts: the command line options selected by the user
829 @type args: list
830 @param args: optional list of instances to restrict check to
831 @rtype: int
832 @return: the desired exit code
833
834 """
835 op = opcodes.OpClusterRepairDiskSizes(instances=args)
836 SubmitOpCode(op, opts=opts)
837
838
839 @UsesRPC
840 def MasterFailover(opts, args):
841 """Failover the master node.
842
843 This command, when run on a non-master node, will cause the current
844 master to cease being master, and the non-master to become new
845 master.
846
847 @param opts: the command line options selected by the user
848 @type args: list
849 @param args: should be an empty list
850 @rtype: int
851 @return: the desired exit code
852
853 """
854 if not opts.no_voting:
855 # Verify that a majority of nodes is still healthy
856 if not bootstrap.MajorityHealthy():
857 ToStderr("Master-failover with voting is only possible if the majority"
858 " of nodes is still healthy; use the --no-voting option after"
859 " ensuring by other means that you won't end up in a dual-master"
860 " scenario.")
861 return 1
862 if opts.no_voting and not opts.yes_do_it:
863 usertext = ("This will perform the failover even if most other nodes"
864 " are down, or if this node is outdated. This is dangerous"
865 " as it can lead to a non-consistent cluster. Check the"
866 " gnt-cluster(8) man page before proceeding. Continue?")
867 if not AskUser(usertext):
868 return 1
869
870 rvlaue, msgs = bootstrap.MasterFailover(no_voting=opts.no_voting)
871 for msg in msgs:
872 ToStderr(msg)
873 return rvlaue
874
875
876 def MasterPing(opts, args):
877 """Checks if the master is alive.
878
879 @param opts: the command line options selected by the user
880 @type args: list
881 @param args: should be an empty list
882 @rtype: int
883 @return: the desired exit code
884
885 """
886 try:
887 cl = GetClient()
888 cl.QueryClusterInfo()
889 return 0
890 except Exception: # pylint: disable=W0703
891 return 1
892
893
894 def SearchTags(opts, args):
895 """Searches the tags on all the cluster.
896
897 @param opts: the command line options selected by the user
898 @type args: list
899 @param args: should contain only one element, the tag pattern
900 @rtype: int
901 @return: the desired exit code
902
903 """
904 op = opcodes.OpTagsSearch(pattern=args[0])
905 result = SubmitOpCode(op, opts=opts)
906 if not result:
907 return 1
908 result = list(result)
909 result.sort()
910 for path, tag in result:
911 ToStdout("%s %s", path, tag)
912
913
914 def _ReadAndVerifyCert(cert_filename, verify_private_key=False):
915 """Reads and verifies an X509 certificate.
916
917 @type cert_filename: string
918 @param cert_filename: the path of the file containing the certificate to
919 verify encoded in PEM format
920 @type verify_private_key: bool
921 @param verify_private_key: whether to verify the private key in addition to
922 the public certificate
923 @rtype: string
924 @return: a string containing the PEM-encoded certificate.
925
926 """
927 try:
928 pem = utils.ReadFile(cert_filename)
929 except IOError, err:
930 raise errors.X509CertError(cert_filename,
931 "Unable to read certificate: %s" % str(err))
932
933 try:
934 OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem)
935 except Exception, err:
936 raise errors.X509CertError(cert_filename,
937 "Unable to load certificate: %s" % str(err))
938
939 if verify_private_key:
940 try:
941 OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, pem)
942 except Exception, err:
943 raise errors.X509CertError(cert_filename,
944 "Unable to load private key: %s" % str(err))
945
946 return pem
947
948
949 def _RenewCrypto(new_cluster_cert, new_rapi_cert, # pylint: disable=R0911
950 rapi_cert_filename, new_spice_cert, spice_cert_filename,
951 spice_cacert_filename, new_confd_hmac_key, new_cds,
952 cds_filename, force, new_node_cert, verbose, debug):
953 """Renews cluster certificates, keys and secrets.
954
955 @type new_cluster_cert: bool
956 @param new_cluster_cert: Whether to generate a new cluster certificate
957 @type new_rapi_cert: bool
958 @param new_rapi_cert: Whether to generate a new RAPI certificate
959 @type rapi_cert_filename: string
960 @param rapi_cert_filename: Path to file containing new RAPI certificate
961 @type new_spice_cert: bool
962 @param new_spice_cert: Whether to generate a new SPICE certificate
963 @type spice_cert_filename: string
964 @param spice_cert_filename: Path to file containing new SPICE certificate
965 @type spice_cacert_filename: string
966 @param spice_cacert_filename: Path to file containing the certificate of the
967 CA that signed the SPICE certificate
968 @type new_confd_hmac_key: bool
969 @param new_confd_hmac_key: Whether to generate a new HMAC key
970 @type new_cds: bool
971 @param new_cds: Whether to generate a new cluster domain secret
972 @type cds_filename: string
973 @param cds_filename: Path to file containing new cluster domain secret
974 @type force: bool
975 @param force: Whether to ask user for confirmation
976 @type new_node_cert: string
977 @param new_node_cert: Whether to generate new node certificates
978 @type verbose: boolean
979 @param verbose: show verbose output
980 @type debug: boolean
981 @param debug: show debug output
982
983 """
984 if new_rapi_cert and rapi_cert_filename:
985 ToStderr("Only one of the --new-rapi-certificate and --rapi-certificate"
986 " options can be specified at the same time.")
987 return 1
988
989 if new_cds and cds_filename:
990 ToStderr("Only one of the --new-cluster-domain-secret and"
991 " --cluster-domain-secret options can be specified at"
992 " the same time.")
993 return 1
994
995 if new_spice_cert and (spice_cert_filename or spice_cacert_filename):
996 ToStderr("When using --new-spice-certificate, the --spice-certificate"
997 " and --spice-ca-certificate must not be used.")
998 return 1
999
1000 if bool(spice_cacert_filename) ^ bool(spice_cert_filename):
1001 ToStderr("Both --spice-certificate and --spice-ca-certificate must be"
1002 " specified.")
1003 return 1
1004
1005 rapi_cert_pem, spice_cert_pem, spice_cacert_pem = (None, None, None)
1006 try:
1007 if rapi_cert_filename:
1008 rapi_cert_pem = _ReadAndVerifyCert(rapi_cert_filename, True)
1009 if spice_cert_filename:
1010 spice_cert_pem = _ReadAndVerifyCert(spice_cert_filename, True)
1011 spice_cacert_pem = _ReadAndVerifyCert(spice_cacert_filename)
1012 except errors.X509CertError, err:
1013 ToStderr("Unable to load X509 certificate from %s: %s", err[0], err[1])
1014 return 1
1015
1016 if cds_filename:
1017 try:
1018 cds = utils.ReadFile(cds_filename)
1019 except Exception, err: # pylint: disable=W0703
1020 ToStderr("Can't load new cluster domain secret from %s: %s" %
1021 (cds_filename, str(err)))
1022 return 1
1023 else:
1024 cds = None
1025
1026 if not force:
1027 usertext = ("This requires all daemons on all nodes to be restarted and"
1028 " may take some time. Continue?")
1029 if not AskUser(usertext):
1030 return 1
1031
1032 def _RenewCryptoInner(ctx):
1033 ctx.feedback_fn("Updating certificates and keys")
1034
1035 bootstrap.GenerateClusterCrypto(False,
1036 new_rapi_cert,
1037 new_spice_cert,
1038 new_confd_hmac_key,
1039 new_cds,
1040 False,
1041 None,
1042 rapi_cert_pem=rapi_cert_pem,
1043 spice_cert_pem=spice_cert_pem,
1044 spice_cacert_pem=spice_cacert_pem,
1045 cds=cds)
1046
1047 files_to_copy = []
1048
1049 if new_rapi_cert or rapi_cert_pem:
1050 files_to_copy.append(pathutils.RAPI_CERT_FILE)
1051
1052 if new_spice_cert or spice_cert_pem:
1053 files_to_copy.append(pathutils.SPICE_CERT_FILE)
1054 files_to_copy.append(pathutils.SPICE_CACERT_FILE)
1055
1056 if new_confd_hmac_key:
1057 files_to_copy.append(pathutils.CONFD_HMAC_KEY)
1058
1059 if new_cds or cds:
1060 files_to_copy.append(pathutils.CLUSTER_DOMAIN_SECRET_FILE)
1061
1062 if files_to_copy:
1063 for node_name in ctx.nonmaster_nodes:
1064 port = ctx.ssh_ports[node_name]
1065 ctx.feedback_fn("Copying %s to %s:%d" %
1066 (", ".join(files_to_copy), node_name, port))
1067 for file_name in files_to_copy:
1068 ctx.ssh.CopyFileToNode(node_name, port, file_name)
1069
1070 def _RenewClientCerts(ctx):
1071 ctx.feedback_fn("Updating client SSL certificates.")
1072
1073 cluster_name = ssconf.SimpleStore().GetClusterName()
1074
1075 for node_name in ctx.nonmaster_nodes + [ctx.master_node]:
1076 ssh_port = ctx.ssh_ports[node_name]
1077 data = {
1078 constants.NDS_CLUSTER_NAME: cluster_name,
1079 constants.NDS_NODE_DAEMON_CERTIFICATE:
1080 utils.ReadFile(pathutils.NODED_CERT_FILE),
1081 constants.NDS_NODE_NAME: node_name,
1082 constants.NDS_ACTION: constants.CRYPTO_ACTION_CREATE,
1083 }
1084
1085 bootstrap.RunNodeSetupCmd(
1086 cluster_name,
1087 node_name,
1088 pathutils.SSL_UPDATE,
1089 ctx.debug,
1090 ctx.verbose,
1091 True, # use cluster key
1092 False, # ask key
1093 True, # strict host check
1094 ssh_port,
1095 data)
1096
1097 # Create a temporary ssconf file using the master's client cert digest
1098 # and the 'bootstrap' keyword to enable distribution of all nodes' digests.
1099 master_digest = utils.GetCertificateDigest()
1100 ssconf_master_candidate_certs_filename = os.path.join(
1101 pathutils.DATA_DIR, "%s%s" %
1102 (constants.SSCONF_FILEPREFIX, constants.SS_MASTER_CANDIDATES_CERTS))
1103 utils.WriteFile(
1104 ssconf_master_candidate_certs_filename,
1105 data="%s=%s" % (constants.CRYPTO_BOOTSTRAP, master_digest))
1106 for node_name in ctx.nonmaster_nodes:
1107 port = ctx.ssh_ports[node_name]
1108 ctx.feedback_fn("Copying %s to %s:%d" %
1109 (ssconf_master_candidate_certs_filename, node_name, port))
1110 ctx.ssh.CopyFileToNode(node_name, port,
1111 ssconf_master_candidate_certs_filename)
1112
1113 # Write the boostrap entry to the config using wconfd.
1114 config_live_lock = utils.livelock.LiveLock("renew_crypto")
1115 cfg = config.GetConfig(None, config_live_lock)
1116 cfg.AddNodeToCandidateCerts(constants.CRYPTO_BOOTSTRAP, master_digest)
1117 cfg.Update(cfg.GetClusterInfo(), ctx.feedback_fn)
1118
1119 def _RenewServerAndClientCerts(ctx):
1120 ctx.feedback_fn("Updating the cluster SSL certificate.")
1121
1122 master_name = ssconf.SimpleStore().GetMasterNode()
1123 bootstrap.GenerateClusterCrypto(True, # cluster cert
1124 False, # rapi cert
1125 False, # spice cert
1126 False, # confd hmac key
1127 False, # cds
1128 True, # client cert
1129 master_name)
1130
1131 for node_name in ctx.nonmaster_nodes:
1132 port = ctx.ssh_ports[node_name]
1133 server_cert = pathutils.NODED_CERT_FILE
1134 ctx.feedback_fn("Copying %s to %s:%d" %
1135 (server_cert, node_name, port))
1136 ctx.ssh.CopyFileToNode(node_name, port, server_cert)
1137
1138 _RenewClientCerts(ctx)
1139
1140 if new_rapi_cert or new_spice_cert or new_confd_hmac_key or new_cds:
1141 RunWhileClusterStopped(ToStdout, _RenewCryptoInner)
1142
1143 # If only node certficates are recreated, call _RenewClientCerts only.
1144 if new_node_cert and not new_cluster_cert:
1145 RunWhileDaemonsStopped(ToStdout, [constants.NODED, constants.WCONFD],
1146 _RenewClientCerts, verbose=verbose, debug=debug)
1147
1148 # If the cluster certificate are renewed, the client certificates need
1149 # to be renewed too.
1150 if new_cluster_cert:
1151 RunWhileDaemonsStopped(ToStdout, [constants.NODED, constants.WCONFD],
1152 _RenewServerAndClientCerts, verbose=verbose,
1153 debug=debug)
1154
1155 ToStdout("All requested certificates and keys have been replaced."
1156 " Running \"gnt-cluster verify\" now is recommended.")
1157
1158 if new_node_cert or new_cluster_cert:
1159 cl = GetClient()
1160 renew_op = opcodes.OpClusterRenewCrypto()
1161 SubmitOpCode(renew_op, cl=cl)
1162
1163 return 0
1164
1165
1166 def RenewCrypto(opts, args):
1167 """Renews cluster certificates, keys and secrets.
1168
1169 """
1170 return _RenewCrypto(opts.new_cluster_cert,
1171 opts.new_rapi_cert,
1172 opts.rapi_cert,
1173 opts.new_spice_cert,
1174 opts.spice_cert,
1175 opts.spice_cacert,
1176 opts.new_confd_hmac_key,
1177 opts.new_cluster_domain_secret,
1178 opts.cluster_domain_secret,
1179 opts.force,
1180 opts.new_node_cert,
1181 opts.verbose,
1182 opts.debug > 0)
1183
1184
1185 def _GetEnabledDiskTemplates(opts):
1186 """Determine the list of enabled disk templates.
1187
1188 """
1189 if opts.enabled_disk_templates:
1190 return opts.enabled_disk_templates.split(",")
1191 else:
1192 return None
1193
1194
1195 def _GetVgName(opts, enabled_disk_templates):
1196 """Determine the volume group name.
1197
1198 @type enabled_disk_templates: list of strings
1199 @param enabled_disk_templates: cluster-wide enabled disk-templates
1200
1201 """
1202 # consistency between vg name and enabled disk templates
1203 vg_name = None
1204 if opts.vg_name is not None:
1205 vg_name = opts.vg_name
1206 if enabled_disk_templates:
1207 if vg_name and not utils.IsLvmEnabled(enabled_disk_templates):
1208 ToStdout("You specified a volume group with --vg-name, but you did not"
1209 " enable any of the following lvm-based disk templates: %s" %
1210 utils.CommaJoin(constants.DTS_LVM))
1211 return vg_name
1212
1213
1214 def _GetDrbdHelper(opts, enabled_disk_templates):
1215 """Determine the DRBD usermode helper.
1216
1217 """
1218 drbd_helper = opts.drbd_helper
1219 if enabled_disk_templates:
1220 drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
1221 if not drbd_enabled and opts.drbd_helper:
1222 ToStdout("You specified a DRBD usermode helper with "
1223 " --drbd-usermode-helper while DRBD is not enabled.")
1224 return drbd_helper
1225
1226
1227 def _GetCompressionTools(opts):
1228 """Determine the list of custom compression tools.
1229
1230 """
1231 if opts.compression_tools:
1232 return opts.compression_tools.split(",")
1233 elif opts.compression_tools is None:
1234 return None # To note the parameter was not provided
1235 else:
1236 return constants.IEC_DEFAULT_TOOLS # Resetting to default
1237
1238
1239 def SetClusterParams(opts, args):
1240 """Modify the cluster.
1241
1242 @param opts: the command line options selected by the user
1243 @type args: list
1244 @param args: should be an empty list
1245 @rtype: int
1246 @return: the desired exit code
1247
1248 """
1249 if not (opts.vg_name is not None or
1250 opts.drbd_helper is not None or
1251 opts.enabled_hypervisors or opts.hvparams or
1252 opts.beparams or opts.nicparams or
1253 opts.ndparams or opts.diskparams or
1254 opts.candidate_pool_size is not None or
1255 opts.max_running_jobs is not None or
1256 opts.max_tracked_jobs is not None or
1257 opts.uid_pool is not None or
1258 opts.maintain_node_health is not None or
1259 opts.add_uids is not None or
1260 opts.remove_uids is not None or
1261 opts.default_iallocator is not None or
1262 opts.default_iallocator_params is not None or
1263 opts.reserved_lvs is not None or
1264 opts.mac_prefix is not None or
1265 opts.master_netdev is not None or
1266 opts.master_netmask is not None or
1267 opts.use_external_mip_script is not None or
1268 opts.prealloc_wipe_disks is not None or
1269 opts.hv_state or
1270 opts.enabled_disk_templates or
1271 opts.disk_state or
1272 opts.ipolicy_bounds_specs is not None or
1273 opts.ipolicy_std_specs is not None or
1274 opts.ipolicy_disk_templates is not None or
1275 opts.ipolicy_vcpu_ratio is not None or
1276 opts.ipolicy_spindle_ratio is not None or
1277 opts.modify_etc_hosts is not None or
1278 opts.file_storage_dir is not None or
1279 opts.install_image is not None or
1280 opts.instance_communication_network is not None or
1281 opts.zeroing_image is not None or
1282 opts.shared_file_storage_dir is not None or
1283 opts.compression_tools is not None or
1284 opts.shared_file_storage_dir is not None or
1285 opts.enabled_user_shutdown is not None):
1286 ToStderr("Please give at least one of the parameters.")
1287 return 1
1288
1289 enabled_disk_templates = _GetEnabledDiskTemplates(opts)
1290 vg_name = _GetVgName(opts, enabled_disk_templates)
1291
1292 try:
1293 drbd_helper = _GetDrbdHelper(opts, enabled_disk_templates)
1294 except errors.OpPrereqError, e:
1295 ToStderr(str(e))
1296 return 1
1297
1298 hvlist = opts.enabled_hypervisors
1299 if hvlist is not None:
1300 hvlist = hvlist.split(",")
1301
1302 # a list of (name, dict) we can pass directly to dict() (or [])
1303 hvparams = dict(opts.hvparams)
1304 for hv_params in hvparams.values():
1305 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1306
1307 diskparams = dict(opts.diskparams)
1308
1309 for dt_params in diskparams.values():
1310 utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
1311
1312 beparams = opts.beparams
1313 utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
1314
1315 nicparams = opts.nicparams
1316 utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
1317
1318 ndparams = opts.ndparams
1319 if ndparams is not None:
1320 utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
1321
1322 ipolicy = CreateIPolicyFromOpts(
1323 minmax_ispecs=opts.ipolicy_bounds_specs,
1324 std_ispecs=opts.ipolicy_std_specs,
1325 ipolicy_disk_templates=opts.ipolicy_disk_templates,
1326 ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
1327 ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
1328 )
1329
1330 mnh = opts.maintain_node_health
1331
1332 uid_pool = opts.uid_pool
1333 if uid_pool is not None:
1334 uid_pool = uidpool.ParseUidPool(uid_pool)
1335
1336 add_uids = opts.add_uids
1337 if add_uids is not None:
1338 add_uids = uidpool.ParseUidPool(add_uids)
1339
1340 remove_uids = opts.remove_uids
1341 if remove_uids is not None:
1342 remove_uids = uidpool.ParseUidPool(remove_uids)
1343
1344 if opts.reserved_lvs is not None:
1345 if opts.reserved_lvs == "":
1346 opts.reserved_lvs = []
1347 else:
1348 opts.reserved_lvs = utils.UnescapeAndSplit(opts.reserved_lvs, sep=",")
1349
1350 if opts.master_netmask is not None:
1351 try:
1352 opts.master_netmask = int(opts.master_netmask)
1353 except ValueError:
1354 ToStderr("The --master-netmask option expects an int parameter.")
1355 return 1
1356
1357 ext_ip_script = opts.use_external_mip_script
1358
1359 if opts.disk_state:
1360 disk_state = utils.FlatToDict(opts.disk_state)
1361 else:
1362 disk_state = {}
1363
1364 hv_state = dict(opts.hv_state)
1365
1366 compression_tools = _GetCompressionTools(opts)
1367
1368 op = opcodes.OpClusterSetParams(
1369 vg_name=vg_name,
1370 drbd_helper=drbd_helper,
1371 enabled_hypervisors=hvlist,
1372 hvparams=hvparams,
1373 os_hvp=None,
1374 beparams=beparams,
1375 nicparams=nicparams,
1376 ndparams=ndparams,
1377 diskparams=diskparams,
1378 ipolicy=ipolicy,
1379 candidate_pool_size=opts.candidate_pool_size,
1380 max_running_jobs=opts.max_running_jobs,
1381 max_tracked_jobs=opts.max_tracked_jobs,
1382 maintain_node_health=mnh,
1383 modify_etc_hosts=opts.modify_etc_hosts,
1384 uid_pool=uid_pool,
1385 add_uids=add_uids,
1386 remove_uids=remove_uids,
1387 default_iallocator=opts.default_iallocator,
1388 default_iallocator_params=opts.default_iallocator_params,
1389 prealloc_wipe_disks=opts.prealloc_wipe_disks,
1390 mac_prefix=opts.mac_prefix,
1391 master_netdev=opts.master_netdev,
1392 master_netmask=opts.master_netmask,
1393 reserved_lvs=opts.reserved_lvs,
1394 use_external_mip_script=ext_ip_script,
1395 hv_state=hv_state,
1396 disk_state=disk_state,
1397 enabled_disk_templates=enabled_disk_templates,
1398 force=opts.force,
1399 file_storage_dir=opts.file_storage_dir,
1400 install_image=opts.install_image,
1401 instance_communication_network=opts.instance_communication_network,
1402 zeroing_image=opts.zeroing_image,
1403 shared_file_storage_dir=opts.shared_file_storage_dir,
1404 compression_tools=compression_tools,
1405 enabled_user_shutdown=opts.enabled_user_shutdown,
1406 )
1407 return base.GetResult(None, opts, SubmitOrSend(op, opts))
1408
1409
1410 def QueueOps(opts, args):
1411 """Queue operations.
1412
1413 @param opts: the command line options selected by the user
1414 @type args: list
1415 @param args: should contain only one element, the subcommand
1416 @rtype: int
1417 @return: the desired exit code
1418
1419 """
1420 command = args[0]
1421 client = GetClient()
1422 if command in ("drain", "undrain"):
1423 drain_flag = command == "drain"
1424 client.SetQueueDrainFlag(drain_flag)
1425 elif command == "info":
1426 result = client.QueryConfigValues(["drain_flag"])
1427 if result[0]:
1428 val = "set"
1429 else:
1430 val = "unset"
1431 ToStdout("The drain flag is %s" % val)
1432 else:
1433 raise errors.OpPrereqError("Command '%s' is not valid." % command,
1434 errors.ECODE_INVAL)
1435
1436 return 0
1437
1438
1439 def _ShowWatcherPause(until):
1440 if until is None or until < time.time():
1441 ToStdout("The watcher is not paused.")
1442 else:
1443 ToStdout("The watcher is paused until %s.", time.ctime(until))
1444
1445
1446 def WatcherOps(opts, args):
1447 """Watcher operations.
1448
1449 @param opts: the command line options selected by the user
1450 @type args: list
1451 @param args: should contain only one element, the subcommand
1452 @rtype: int
1453 @return: the desired exit code
1454
1455 """
1456 command = args[0]
1457 client = GetClient()
1458
1459 if command == "continue":
1460 client.SetWatcherPause(None)
1461 ToStdout("The watcher is no longer paused.")
1462
1463 elif command == "pause":
1464 if len(args) < 2:
1465 raise errors.OpPrereqError("Missing pause duration", errors.ECODE_INVAL)
1466
1467 result = client.SetWatcherPause(time.time() + ParseTimespec(args[1]))
1468 _ShowWatcherPause(result)
1469
1470 elif command == "info":
1471 result = client.QueryConfigValues(["watcher_pause"])
1472 _ShowWatcherPause(result[0])
1473
1474 else:
1475 raise errors.OpPrereqError("Command '%s' is not valid." % command,
1476 errors.ECODE_INVAL)
1477
1478 return 0
1479
1480
1481 def _OobPower(opts, node_list, power):
1482 """Puts the node in the list to desired power state.
1483
1484 @param opts: The command line options selected by the user
1485 @param node_list: The list of nodes to operate on
1486 @param power: True if they should be powered on, False otherwise
1487 @return: The success of the operation (none failed)
1488
1489 """
1490 if power:
1491 command = constants.OOB_POWER_ON
1492 else:
1493 command = constants.OOB_POWER_OFF
1494
1495 op = opcodes.OpOobCommand(node_names=node_list,
1496 command=command,
1497 ignore_status=True,
1498 timeout=opts.oob_timeout,
1499 power_delay=opts.power_delay)
1500 result = SubmitOpCode(op, opts=opts)
1501 errs = 0
1502 for node_result in result:
1503 (node_tuple, data_tuple) = node_result
1504 (_, node_name) = node_tuple
1505 (data_status, _) = data_tuple
1506 if data_status != constants.RS_NORMAL:
1507 assert data_status != constants.RS_UNAVAIL
1508 errs += 1
1509 ToStderr("There was a problem changing power for %s, please investigate",
1510 node_name)
1511
1512 if errs > 0:
1513 return False
1514
1515 return True
1516
1517
1518 def _InstanceStart(opts, inst_list, start, no_remember=False):
1519 """Puts the instances in the list to desired state.
1520
1521 @param opts: The command line options selected by the user
1522 @param inst_list: The list of instances to operate on
1523 @param start: True if they should be started, False for shutdown
1524 @param no_remember: If the instance state should be remembered
1525 @return: The success of the operation (none failed)
1526
1527 """
1528 if start:
1529 opcls = opcodes.OpInstanceStartup
1530 text_submit, text_success, text_failed = ("startup", "started", "starting")
1531 else:
1532 opcls = compat.partial(opcodes.OpInstanceShutdown,
1533 timeout=opts.shutdown_timeout,
1534 no_remember=no_remember)
1535 text_submit, text_success, text_failed = ("shutdown", "stopped", "stopping")
1536
1537 jex = JobExecutor(opts=opts)
1538
1539 for inst in inst_list:
1540 ToStdout("Submit %s of instance %s", text_submit, inst)
1541 op = opcls(instance_name=inst)
1542 jex.QueueJob(inst, op)
1543
1544 results = jex.GetResults()
1545 bad_cnt = len([1 for (success, _) in results if not success])
1546
1547 if bad_cnt == 0:
1548 ToStdout("All instances have been %s successfully", text_success)
1549 else:
1550 ToStderr("There were errors while %s instances:\n"
1551 "%d error(s) out of %d instance(s)", text_failed, bad_cnt,
1552 len(results))
1553 return False
1554
1555 return True
1556
1557
1558 class _RunWhenNodesReachableHelper(object):
1559 """Helper class to make shared internal state sharing easier.
1560
1561 @ivar success: Indicates if all action_cb calls were successful
1562
1563 """
1564 def __init__(self, node_list, action_cb, node2ip, port, feedback_fn,
1565 _ping_fn=netutils.TcpPing, _sleep_fn=time.sleep):
1566 """Init the object.
1567
1568 @param node_list: The list of nodes to be reachable
1569 @param action_cb: Callback called when a new host is reachable
1570 @type node2ip: dict
1571 @param node2ip: Node to ip mapping
1572 @param port: The port to use for the TCP ping
1573 @param feedback_fn: The function used for feedback
1574 @param _ping_fn: Function to check reachabilty (for unittest use only)
1575 @param _sleep_fn: Function to sleep (for unittest use only)
1576
1577 """
1578 self.down = set(node_list)
1579 self.up = set()
1580 self.node2ip = node2ip
1581 self.success = True
1582 self.action_cb = action_cb
1583 self.port = port
1584 self.feedback_fn = feedback_fn
1585 self._ping_fn = _ping_fn
1586 self._sleep_fn = _sleep_fn
1587
1588 def __call__(self):
1589 """When called we run action_cb.
1590
1591 @raises utils.RetryAgain: When there are still down nodes
1592
1593 """
1594 if not self.action_cb(self.up):
1595 self.success = False
1596
1597 if self.down:
1598 raise utils.RetryAgain()
1599 else:
1600 return self.success
1601
1602 def Wait(self, secs):
1603 """Checks if a host is up or waits remaining seconds.
1604
1605 @param secs: The secs remaining
1606
1607 """
1608 start = time.time()
1609 for node in self.down:
1610 if self._ping_fn(self.node2ip[node], self.port, timeout=_EPO_PING_TIMEOUT,
1611 live_port_needed=True):
1612 self.feedback_fn("Node %s became available" % node)
1613 self.up.add(node)
1614 self.down -= self.up
1615 # If we have a node available there is the possibility to run the
1616 # action callback successfully, therefore we don't wait and return
1617 return
1618
1619 self._sleep_fn(max(0.0, start + secs - time.time()))
1620
1621
1622 def _RunWhenNodesReachable(node_list, action_cb, interval):
1623 """Run action_cb when nodes become reachable.
1624
1625 @param node_list: The list of nodes to be reachable
1626 @param action_cb: Callback called when a new host is reachable
1627 @param interval: The earliest time to retry
1628
1629 """
1630 client = GetClient()
1631 cluster_info = client.QueryClusterInfo()
1632 if cluster_info["primary_ip_version"] == constants.IP4_VERSION:
1633 family = netutils.IPAddress.family
1634 else:
1635 family = netutils.IP6Address.family
1636
1637 node2ip = dict((node, netutils.GetHostname(node, family=family).ip)
1638 for node in node_list)
1639
1640 port = netutils.GetDaemonPort(constants.NODED)
1641 helper = _RunWhenNodesReachableHelper(node_list, action_cb, node2ip, port,
1642 ToStdout)
1643
1644 try:
1645 return utils.Retry(helper, interval, _EPO_REACHABLE_TIMEOUT,
1646 wait_fn=helper.Wait)
1647 except utils.RetryTimeout:
1648 ToStderr("Time exceeded while waiting for nodes to become reachable"
1649 " again:\n - %s", " - ".join(helper.down))
1650 return False
1651
1652
1653 def _MaybeInstanceStartup(opts, inst_map, nodes_online,
1654 _instance_start_fn=_InstanceStart):
1655 """Start the instances conditional based on node_states.
1656
1657 @param opts: The command line options selected by the user
1658 @param inst_map: A dict of inst -> nodes mapping
1659 @param nodes_online: A list of nodes online
1660 @param _instance_start_fn: Callback to start instances (unittest use only)
1661 @return: Success of the operation on all instances
1662
1663 """
1664 start_inst_list = []
1665 for (inst, nodes) in inst_map.items():
1666 if not (nodes - nodes_online):
1667 # All nodes the instance lives on are back online
1668 start_inst_list.append(inst)
1669
1670 for inst in start_inst_list:
1671 del inst_map[inst]
1672
1673 if start_inst_list:
1674 return _instance_start_fn(opts, start_inst_list, True)
1675
1676 return True
1677
1678
1679 def _EpoOn(opts, full_node_list, node_list, inst_map):
1680 """Does the actual power on.
1681
1682 @param opts: The command line options selected by the user
1683 @param full_node_list: All nodes to operate on (includes nodes not supporting
1684 OOB)
1685 @param node_list: The list of nodes to operate on (all need to support OOB)
1686 @param inst_map: A dict of inst -> nodes mapping
1687 @return: The desired exit status
1688
1689 """
1690 if node_list and not _OobPower(opts, node_list, False):
1691 ToStderr("Not all nodes seem to get back up, investigate and start"
1692 " manually if needed")
1693
1694 # Wait for the nodes to be back up
1695 action_cb = compat.partial(_MaybeInstanceStartup, opts, dict(inst_map))
1696
1697 ToStdout("Waiting until all nodes are available again")
1698 if not _RunWhenNodesReachable(full_node_list, action_cb, _EPO_PING_INTERVAL):
1699 ToStderr("Please investigate and start stopped instances manually")
1700 return constants.EXIT_FAILURE
1701
1702 return constants.EXIT_SUCCESS
1703
1704
1705 def _EpoOff(opts, node_list, inst_map):
1706 """Does the actual power off.
1707
1708 @param opts: The command line options selected by the user
1709 @param node_list: The list of nodes to operate on (all need to support OOB)
1710 @param inst_map: A dict of inst -> nodes mapping
1711 @return: The desired exit status
1712
1713 """
1714 if not _InstanceStart(opts, inst_map.keys(), False, no_remember=True):
1715 ToStderr("Please investigate and stop instances manually before continuing")
1716 return constants.EXIT_FAILURE
1717
1718 if not node_list:
1719 return constants.EXIT_SUCCESS
1720
1721 if _OobPower(opts, node_list, False):
1722 return constants.EXIT_SUCCESS
1723 else:
1724 return constants.EXIT_FAILURE
1725
1726
1727 def Epo(opts, args, qcl=None, _on_fn=_EpoOn, _off_fn=_EpoOff,
1728 _confirm_fn=ConfirmOperation,
1729 _stdout_fn=ToStdout, _stderr_fn=ToStderr):
1730 """EPO operations.
1731
1732 @param opts: the command line options selected by the user
1733 @type args: list
1734 @param args: should contain only one element, the subcommand
1735 @rtype: int
1736 @return: the desired exit code
1737
1738 """
1739 if opts.groups and opts.show_all:
1740 _stderr_fn("Only one of --groups or --all are allowed")
1741 return constants.EXIT_FAILURE
1742 elif args and opts.show_all:
1743 _stderr_fn("Arguments in combination with --all are not allowed")
1744 return constants.EXIT_FAILURE
1745
1746 if qcl is None:
1747 # Query client
1748 qcl = GetClient()
1749
1750 if opts.groups:
1751 node_query_list = \
1752 itertools.chain(*qcl.QueryGroups(args, ["node_list"], False))
1753 else:
1754 node_query_list = args
1755
1756 result = qcl.QueryNodes(node_query_list, ["name", "master", "pinst_list",
1757 "sinst_list", "powered", "offline"],
1758 False)
1759
1760 all_nodes = map(compat.fst, result)
1761 node_list = []
1762 inst_map = {}
1763 for (node, master, pinsts, sinsts, powered, offline) in result:
1764 if not offline:
1765 for inst in (pinsts + sinsts):
1766 if inst in inst_map:
1767 if not master:
1768 inst_map[inst].add(node)
1769 elif master:
1770 inst_map[inst] = set()
1771 else:
1772 inst_map[inst] = set([node])
1773
1774 if master and opts.on:
1775 # We ignore the master for turning on the machines, in fact we are
1776 # already operating on the master at this point :)
1777 continue
1778 elif master and not opts.show_all:
1779 _stderr_fn("%s is the master node, please do a master-failover to another"
1780 " node not affected by the EPO or use --all if you intend to"
1781 " shutdown the whole cluster", node)
1782 return constants.EXIT_FAILURE
1783 elif powered is None:
1784 _stdout_fn("Node %s does not support out-of-band handling, it can not be"
1785 " handled in a fully automated manner", node)
1786 elif powered == opts.on:
1787 _stdout_fn("Node %s is already in desired power state, skipping", node)
1788 elif not offline or (offline and powered):
1789 node_list.append(node)
1790
1791 if not (opts.force or _confirm_fn(all_nodes, "nodes", "epo")):
1792 return constants.EXIT_FAILURE
1793
1794 if opts.on:
1795 return _on_fn(opts, all_nodes, node_list, inst_map)
1796 else:
1797 return _off_fn(opts, node_list, inst_map)
1798
1799
1800 def _GetCreateCommand(info):
1801 buf = StringIO()
1802 buf.write("gnt-cluster init")
1803 PrintIPolicyCommand(buf, info["ipolicy"], False)
1804 buf.write(" ")
1805 buf.write(info["name"])
1806 return buf.getvalue()
1807
1808
1809 def ShowCreateCommand(opts, args):
1810 """Shows the command that can be used to re-create the cluster.
1811
1812 Currently it works only for ipolicy specs.
1813
1814 """
1815 cl = GetClient()
1816 result = cl.QueryClusterInfo()
1817 ToStdout(_GetCreateCommand(result))
1818
1819
1820 def _RunCommandAndReport(cmd):
1821 """Run a command and report its output, iff it failed.
1822
1823 @param cmd: the command to execute
1824 @type cmd: list
1825 @rtype: bool
1826 @return: False, if the execution failed.
1827
1828 """
1829 result = utils.RunCmd(cmd)
1830 if result.failed:
1831 ToStderr("Command %s failed: %s; Output %s" %
1832 (cmd, result.fail_reason, result.output))
1833 return False
1834 return True
1835
1836
1837 def _VerifyCommand(cmd):
1838 """Verify that a given command succeeds on all online nodes.
1839
1840 As this function is intended to run during upgrades, it
1841 is implemented in such a way that it still works, if all Ganeti
1842 daemons are down.
1843
1844 @param cmd: the command to execute
1845 @type cmd: list
1846 @rtype: list
1847 @return: the list of node names that are online where
1848 the command failed.
1849
1850 """
1851 command = utils.text.ShellQuoteArgs([str(val) for val in cmd])
1852
1853 nodes = ssconf.SimpleStore().GetOnlineNodeList()
1854 master_node = ssconf.SimpleStore().GetMasterNode()
1855 cluster_name = ssconf.SimpleStore().GetClusterName()
1856
1857 # If master node is in 'nodes', make sure master node is at list end
1858 if master_node in nodes:
1859 nodes.remove(master_node)
1860 nodes.append(master_node)
1861
1862 failed = []
1863
1864 srun = ssh.SshRunner(cluster_name=cluster_name)
1865 for name in nodes:
1866 result = srun.Run(name, constants.SSH_LOGIN_USER, command)
1867 if result.exit_code != 0:
1868 failed.append(name)
1869
1870 return failed
1871
1872
1873 def _VerifyVersionInstalled(versionstring):
1874 """Verify that the given version of ganeti is installed on all online nodes.
1875
1876 Do nothing, if this is the case, otherwise print an appropriate
1877 message to stderr.
1878
1879 @param versionstring: the version to check for
1880 @type versionstring: string
1881 @rtype: bool
1882 @return: True, if the version is installed on all online nodes
1883
1884 """
1885 badnodes = _VerifyCommand(["test", "-d",
1886 os.path.join(pathutils.PKGLIBDIR, versionstring)])
1887 if badnodes:
1888 ToStderr("Ganeti version %s not installed on nodes %s"
1889 % (versionstring, ", ".join(badnodes)))
1890 return False
1891
1892 return True
1893
1894
1895 def _GetRunning():
1896 """Determine the list of running jobs.
1897
1898 @rtype: list
1899 @return: the number of jobs still running
1900
1901 """
1902 cl = GetClient()
1903 qfilter = qlang.MakeSimpleFilter("status",
1904 frozenset([constants.JOB_STATUS_RUNNING]))
1905 return len(cl.Query(constants.QR_JOB, [], qfilter).data)
1906
1907
1908 def _SetGanetiVersion(versionstring):
1909 """Set the active version of ganeti to the given versionstring
1910
1911 @type versionstring: string
1912 @rtype: list
1913 @return: the list of nodes where the version change failed
1914
1915 """
1916 failed = []
1917 if constants.HAS_GNU_LN:
1918 failed.extend(_VerifyCommand(
1919 ["ln", "-s", "-f", "-T",
1920 os.path.join(pathutils.PKGLIBDIR, versionstring),
1921 os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
1922 failed.extend(_VerifyCommand(
1923 ["ln", "-s", "-f", "-T",
1924 os.path.join(pathutils.SHAREDIR, versionstring),
1925 os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
1926 else:
1927 failed.extend(_VerifyCommand(
1928 ["rm", "-f", os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
1929 failed.extend(_VerifyCommand(
1930 ["ln", "-s", "-f", os.path.join(pathutils.PKGLIBDIR, versionstring),
1931 os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
1932 failed.extend(_VerifyCommand(
1933 ["rm", "-f", os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
1934 failed.extend(_VerifyCommand(
1935 ["ln", "-s", "-f", os.path.join(pathutils.SHAREDIR, versionstring),
1936 os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
1937 return list(set(failed))
1938
1939
1940 def _ExecuteCommands(fns):
1941 """Execute a list of functions, in reverse order.
1942
1943 @type fns: list of functions.
1944 @param fns: the functions to be executed.
1945
1946 """
1947 for fn in reversed(fns):
1948 fn()
1949
1950
1951 def _GetConfigVersion():
1952 """Determine the version the configuration file currently has.
1953
1954 @rtype: tuple or None
1955 @return: (major, minor, revision) if the version can be determined,
1956 None otherwise
1957
1958 """
1959 config_data = serializer.LoadJson(utils.ReadFile(pathutils.CLUSTER_CONF_FILE))
1960 try:
1961 config_version = config_data["version"]
1962 except KeyError:
1963 return None
1964 return utils.SplitVersion(config_version)
1965
1966
1967 def _ReadIntentToUpgrade():
1968 """Read the file documenting the intent to upgrade the cluster.
1969
1970 @rtype: (string, string) or (None, None)
1971 @return: (old version, version to upgrade to), if the file exists,
1972 and (None, None) otherwise.
1973
1974 """
1975 if not os.path.isfile(pathutils.INTENT_TO_UPGRADE):
1976 return (None, None)
1977
1978 contentstring = utils.ReadFile(pathutils.INTENT_TO_UPGRADE)
1979 contents = utils.UnescapeAndSplit(contentstring)
1980 if len(contents) != 3:
1981 # file syntactically mal-formed
1982 return (None, None)
1983 return (contents[0], contents[1])
1984
1985
1986 def _WriteIntentToUpgrade(version):
1987 """Write file documenting the intent to upgrade the cluster.
1988
1989 @type version: string
1990 @param version: the version we intent to upgrade to
1991
1992 """
1993 utils.WriteFile(pathutils.INTENT_TO_UPGRADE,
1994 data=utils.EscapeAndJoin([constants.RELEASE_VERSION, version,
1995 "%d" % os.getpid()]))
1996
1997
1998 def _UpgradeBeforeConfigurationChange(versionstring):
1999 """
2000 Carry out all the tasks necessary for an upgrade that happen before
2001 the configuration file, or Ganeti version, changes.
2002
2003 @type versionstring: string
2004 @param versionstring: the version to upgrade to
2005 @rtype: (bool, list)
2006 @return: tuple of a bool indicating success and a list of rollback tasks
2007
2008 """
2009 rollback = []
2010
2011 if not _VerifyVersionInstalled(versionstring):
2012 return (False, rollback)
2013
2014 _WriteIntentToUpgrade(versionstring)
2015 rollback.append(
2016 lambda: utils.RunCmd(["rm", "-f", pathutils.INTENT_TO_UPGRADE]))
2017
2018 ToStdout("Draining queue")
2019 client = GetClient()
2020 client.SetQueueDrainFlag(True)
2021
2022 rollback.append(lambda: GetClient().SetQueueDrainFlag(False))
2023
2024 if utils.SimpleRetry(0, _GetRunning,
2025 constants.UPGRADE_QUEUE_POLL_INTERVAL,
2026 constants.UPGRADE_QUEUE_DRAIN_TIMEOUT):
2027 ToStderr("Failed to completely empty the queue.")
2028 return (False, rollback)
2029
2030 ToStdout("Pausing the watcher for one hour.")
2031 rollback.append(lambda: GetClient().SetWatcherPause(None))
2032 GetClient().SetWatcherPause(time.time() + 60 * 60)
2033
2034 ToStdout("Stopping daemons on master node.")
2035 if not _RunCommandAndReport([pathutils.DAEMON_UTIL, "stop-all"]):
2036 return (False, rollback)
2037
2038 if not _VerifyVersionInstalled(versionstring):
2039 utils.RunCmd([pathutils.DAEMON_UTIL, "start-all"])
2040 return (False, rollback)
2041
2042 ToStdout("Stopping daemons everywhere.")
2043 rollback.append(lambda: _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"]))
2044 badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "stop-all"])
2045 if badnodes:
2046 ToStderr("Failed to stop daemons on %s." % (", ".join(badnodes),))
2047 return (False, rollback)
2048
2049 backuptar = os.path.join(pathutils.BACKUP_DIR, "ganeti%d.tar" % time.time())
2050 ToStdout("Backing up configuration as %s" % backuptar)
2051 if not _RunCommandAndReport(["mkdir", "-p", pathutils.BACKUP_DIR]):
2052 return (False, rollback)
2053
2054 # Create the archive in a safe manner, as it contains sensitive
2055 # information.
2056 (_, tmp_name) = tempfile.mkstemp(prefix=backuptar, dir=pathutils.BACKUP_DIR)
2057 if not _RunCommandAndReport(["tar", "-cf", tmp_name,
2058 "--exclude=queue/archive",
2059 pathutils.DATA_DIR]):
2060 return (False, rollback)
2061
2062 os.rename(tmp_name, backuptar)
2063 return (True, rollback)
2064
2065
2066 def _VersionSpecificDowngrade():
2067 """
2068 Perform any additional downrade tasks that are version specific
2069 and need to be done just after the configuration downgrade. This
2070 function needs to be idempotent, so that it can be redone if the
2071 downgrade procedure gets interrupted after changing the
2072 configuration.
2073
2074 Note that this function has to be reset with every version bump.
2075
2076 @return: True upon success
2077 """
2078 ToStdout("Performing version-specific downgrade tasks.")
2079
2080 nodes = ssconf.SimpleStore().GetOnlineNodeList()
2081 cluster_name = ssconf.SimpleStore().GetClusterName()
2082 ssh_ports = ssconf.SimpleStore().GetSshPortMap()
2083
2084 for node in nodes:
2085 data = {
2086 constants.NDS_CLUSTER_NAME: cluster_name,
2087 constants.NDS_NODE_DAEMON_CERTIFICATE:
2088 utils.ReadFile(pathutils.NODED_CERT_FILE),
2089 constants.NDS_NODE_NAME: node,
2090 constants.NDS_ACTION: constants.CRYPTO_ACTION_DELETE,
2091 }
2092
2093 try:
2094 bootstrap.RunNodeSetupCmd(
2095 cluster_name,
2096 node,
2097 pathutils.SSL_UPDATE,
2098 True, # debug
2099 True, # verbose,
2100 True, # use cluster key
2101 False, # ask key
2102 True, # strict host check
2103 ssh_ports[node],
2104 data)
2105 except Exception as e: # pylint: disable=W0703
2106 # As downgrading can fail if a node is temporarily unreachable
2107 # only output the error, but do not abort the entire operation.
2108 ToStderr("Downgrading SSL setup of node '%s' failed: %s." %
2109 (node, e))
2110
2111 return True
2112
2113
2114 def _SwitchVersionAndConfig(versionstring, downgrade):
2115 """
2116 Switch to the new Ganeti version and change the configuration,
2117 in correct order.
2118
2119 @type versionstring: string
2120 @param versionstring: the version to change to
2121 @type downgrade: bool
2122 @param downgrade: True, if the configuration should be downgraded
2123 @rtype: (bool, list)
2124 @return: tupe of a bool indicating success, and a list of
2125 additional rollback tasks
2126
2127 """
2128 rollback = []
2129 if downgrade:
2130 ToStdout("Downgrading configuration")
2131 if not _RunCommandAndReport([pathutils.CFGUPGRADE, "--downgrade", "-f"]):
2132 return (False, rollback)
2133 # Note: version specific downgrades need to be done before switching
2134 # binaries, so that we still have the knowledgeable binary if the downgrade
2135 # process gets interrupted at this point.
2136 if not _VersionSpecificDowngrade():
2137 return (False, rollback)
2138
2139 # Configuration change is the point of no return. From then onwards, it is
2140 # safer to push through the up/dowgrade than to try to roll it back.
2141
2142 ToStdout("Switching to version %s on all nodes" % versionstring)
2143 rollback.append(lambda: _SetGanetiVersion(constants.DIR_VERSION))
2144 badnodes = _SetGanetiVersion(versionstring)
2145 if badnodes:
2146 ToStderr("Failed to switch to Ganeti version %s on nodes %s"
2147 % (versionstring, ", ".join(badnodes)))
2148 if not downgrade:
2149 return (False, rollback)
2150
2151 # Now that we have changed to the new version of Ganeti we should
2152 # not communicate over luxi any more, as luxi might have changed in
2153 # incompatible ways. Therefore, manually call the corresponding ganeti
2154 # commands using their canonical (version independent) path.
2155
2156 if not downgrade:
2157 ToStdout("Upgrading configuration")
2158 if not _RunCommandAndReport([pathutils.CFGUPGRADE, "-f"]):
2159 return (False, rollback)
2160
2161 return (True, rollback)
2162
2163
2164 def _UpgradeAfterConfigurationChange(oldversion):
2165 """
2166 Carry out the upgrade actions necessary after switching to the new
2167 Ganeti version and updating the configuration.
2168
2169 As this part is run at a time where the new version of Ganeti is already
2170 running, no communication should happen via luxi, as this is not a stable
2171 interface. Also, as the configuration change is the point of no return,
2172 all actions are pushed trough, even if some of them fail.
2173
2174 @param oldversion: the version the upgrade started from
2175 @type oldversion: string
2176 @rtype: int
2177 @return: the intended return value
2178
2179 """
2180 returnvalue = 0
2181
2182 ToStdout("Ensuring directories everywhere.")
2183 badnodes = _VerifyCommand([pathutils.ENSURE_DIRS])
2184 if badnodes:
2185 ToStderr("Warning: failed to ensure directories on %s." %
2186 (", ".join(badnodes)))
2187 returnvalue = 1
2188
2189 ToStdout("Starting daemons everywhere.")
2190 badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"])
2191 if badnodes:
2192 ToStderr("Warning: failed to start daemons on %s." % (", ".join(badnodes),))
2193 returnvalue = 1
2194
2195 ToStdout("Redistributing the configuration.")
2196 if not _RunCommandAndReport(["gnt-cluster", "redist-conf", "--yes-do-it"]):
2197 returnvalue = 1
2198
2199 ToStdout("Restarting daemons everywhere.")
2200 badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "stop-all"])
2201 badnodes.extend(_VerifyCommand([pathutils.DAEMON_UTIL, "start-all"]))
2202 if badnodes:
2203 ToStderr("Warning: failed to start daemons on %s." %
2204 (", ".join(list(set(badnodes))),))
2205 returnvalue = 1
2206
2207 ToStdout("Undraining the queue.")
2208 if not _RunCommandAndReport(["gnt-cluster", "queue", "undrain"]):
2209 returnvalue = 1
2210
2211 _RunCommandAndReport(["rm", "-f", pathutils.INTENT_TO_UPGRADE])
2212
2213 ToStdout("Running post-upgrade hooks")
2214 if not _RunCommandAndReport([pathutils.POST_UPGRADE, oldversion]):
2215 returnvalue = 1
2216
2217 ToStdout("Unpausing the watcher.")
2218 if not _RunCommandAndReport(["gnt-cluster", "watcher", "continue"]):
2219 returnvalue = 1
2220
2221 ToStdout("Verifying cluster.")
2222 if not _RunCommandAndReport(["gnt-cluster", "verify"]):
2223 returnvalue = 1
2224
2225 return returnvalue
2226
2227
2228 def UpgradeGanetiCommand(opts, args):
2229 """Upgrade a cluster to a new ganeti version.
2230
2231 @param opts: the command line options selected by the user
2232 @type args: list
2233 @param args: should be an empty list
2234 @rtype: int
2235 @return: the desired exit code
2236
2237 """
2238 if ((not opts.resume and opts.to is None)
2239 or (opts.resume and opts.to is not None)):
2240 ToStderr("Precisely one of the options --to and --resume"
2241 " has to be given")
2242 return 1
2243
2244 # If we're not told to resume, verify there is no upgrade
2245 # in progress.
2246 if not opts.resume:
2247 oldversion, versionstring = _ReadIntentToUpgrade()
2248 if versionstring is not None:
2249 # An upgrade is going on; verify whether the target matches
2250 if versionstring == opts.to:
2251 ToStderr("An upgrade is already in progress. Target version matches,"
2252 " resuming.")
2253 opts.resume = True
2254 opts.to = None
2255 else:
2256 ToStderr("An upgrade from %s to %s is in progress; use --resume to"
2257 " finish it first" % (oldversion, versionstring))
2258 return 1
2259
2260 utils.SetupLogging(pathutils.LOG_COMMANDS, 'gnt-cluster upgrade', debug=1)
2261
2262 oldversion = constants.RELEASE_VERSION
2263
2264 if opts.resume:
2265 ssconf.CheckMaster(False)
2266 oldversion, versionstring = _ReadIntentToUpgrade()
2267 if versionstring is None:
2268 return 0
2269 version = utils.version.ParseVersion(versionstring)
2270 if version is None:
2271 return 1
2272 configversion = _GetConfigVersion()
2273 if configversion is None:
2274 return 1
2275 # If the upgrade we resume was an upgrade between compatible
2276 # versions (like 2.10.0 to 2.10.1), the correct configversion
2277 # does not guarantee that the config has been updated.
2278 # However, in the case of a compatible update with the configuration
2279 # not touched, we are running a different dirversion with the same
2280 # config version.
2281 config_already_modified = \
2282 (utils.IsCorrectConfigVersion(version, configversion) and
2283 not (versionstring != constants.DIR_VERSION and
2284 configversion == (constants.CONFIG_MAJOR, constants.CONFIG_MINOR,
2285 constants.CONFIG_REVISION)))
2286 if not config_already_modified:
2287 # We have to start from the beginning; however, some daemons might have
2288 # already been stopped, so the only way to get into a well-defined state
2289 # is by starting all daemons again.
2290 _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"])
2291 else:
2292 versionstring = opts.to
2293 config_already_modified = False
2294 version = utils.version.ParseVersion(versionstring)
2295 if version is None:
2296 ToStderr("Could not parse version string %s" % versionstring)
2297 return 1
2298
2299 msg = utils.version.UpgradeRange(version)
2300 if msg is not None:
2301 ToStderr("Cannot upgrade to %s: %s" % (versionstring, msg))
2302 return 1
2303
2304 if not config_already_modified:
2305 success, rollback = _UpgradeBeforeConfigurationChange(versionstring)
2306 if not success:
2307 _ExecuteCommands(rollback)
2308 return 1
2309 else:
2310 rollback = []
2311
2312 downgrade = utils.version.ShouldCfgdowngrade(version)
2313
2314 success, additionalrollback = \
2315 _SwitchVersionAndConfig(versionstring, downgrade)
2316 if not success:
2317 rollback.extend(additionalrollback)
2318 _ExecuteCommands(rollback)
2319 return 1
2320
2321 return _UpgradeAfterConfigurationChange(oldversion)
2322
2323
2324 commands = {
2325 "init": (
2326 InitCluster, [ArgHost(min=1, max=1)],
2327 [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, GLOBAL_FILEDIR_OPT,
2328 HVLIST_OPT, MAC_PREFIX_OPT, MASTER_NETDEV_OPT, MASTER_NETMASK_OPT,
2329 NIC_PARAMS_OPT, NOMODIFY_ETCHOSTS_OPT, NOMODIFY_SSH_SETUP_OPT,
2330 SECONDARY_IP_OPT, VG_NAME_OPT, MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT,
2331 DRBD_HELPER_OPT, DEFAULT_IALLOCATOR_OPT, DEFAULT_IALLOCATOR_PARAMS_OPT,
2332 PRIMARY_IP_VERSION_OPT, PREALLOC_WIPE_DISKS_OPT, NODE_PARAMS_OPT,
2333 GLOBAL_SHARED_FILEDIR_OPT, USE_EXTERNAL_MIP_SCRIPT, DISK_PARAMS_OPT,
2334 HV_STATE_OPT, DISK_STATE_OPT, ENABLED_DISK_TEMPLATES_OPT,
2335 IPOLICY_STD_SPECS_OPT, GLOBAL_GLUSTER_FILEDIR_OPT, INSTALL_IMAGE_OPT,
2336 ZEROING_IMAGE_OPT, COMPRESSION_TOOLS_OPT,
2337 ENABLED_USER_SHUTDOWN_OPT,
2338 ]
2339 + INSTANCE_POLICY_OPTS + SPLIT_ISPECS_OPTS,
2340 "[opts...] <cluster_name>", "Initialises a new cluster configuration"),
2341 "destroy": (
2342 DestroyCluster, ARGS_NONE, [YES_DOIT_OPT],
2343 "", "Destroy cluster"),
2344 "rename": (
2345 RenameCluster, [ArgHost(min=1, max=1)],
2346 [FORCE_OPT, DRY_RUN_OPT],
2347 "<new_name>",
2348 "Renames the cluster"),
2349 "redist-conf": (
2350 RedistributeConfig, ARGS_NONE, SUBMIT_OPTS +
2351 [DRY_RUN_OPT, PRIORITY_OPT, FORCE_DISTRIBUTION],
2352 "", "Forces a push of the configuration file and ssconf files"
2353 " to the nodes in the cluster"),
2354 "verify": (
2355 VerifyCluster, ARGS_NONE,
2356 [VERBOSE_OPT, DEBUG_SIMERR_OPT, ERROR_CODES_OPT, NONPLUS1_OPT,
2357 DRY_RUN_OPT, PRIORITY_OPT, NODEGROUP_OPT, IGNORE_ERRORS_OPT],
2358 "", "Does a check on the cluster configuration"),
2359 "verify-disks": (
2360 VerifyDisks, ARGS_NONE, [PRIORITY_OPT],
2361 "", "Does a check on the cluster disk status"),
2362 "repair-disk-sizes": (
2363 RepairDiskSizes, ARGS_MANY_INSTANCES, [DRY_RUN_OPT, PRIORITY_OPT],
2364 "[instance...]", "Updates mismatches in recorded disk sizes"),
2365 "master-failover": (
2366 MasterFailover, ARGS_NONE, [NOVOTING_OPT, FORCE_FAILOVER],
2367 "", "Makes the current node the master"),
2368 "master-ping": (
2369 MasterPing, ARGS_NONE, [],
2370 "", "Checks if the master is alive"),
2371 "version": (
2372 ShowClusterVersion, ARGS_NONE, [],
2373 "", "Shows the cluster version"),
2374 "getmaster": (
2375 ShowClusterMaster, ARGS_NONE, [],
2376 "", "Shows the cluster master"),
2377 "copyfile": (
2378 ClusterCopyFile, [ArgFile(min=1, max=1)],
2379 [NODE_LIST_OPT, USE_REPL_NET_OPT, NODEGROUP_OPT],
2380 "[-n node...] <filename>", "Copies a file to all (or only some) nodes"),
2381 "command": (
2382 RunClusterCommand, [ArgCommand(min=1)],
2383 [NODE_LIST_OPT, NODEGROUP_OPT, SHOW_MACHINE_OPT, FAILURE_ONLY_OPT],
2384 "[-n node...] <command>", "Runs a command on all (or only some) nodes"),
2385 "info": (
2386 ShowClusterConfig, ARGS_NONE, [ROMAN_OPT],
2387 "[--roman]", "Show cluster configuration"),
2388 "list-tags": (
2389 ListTags, ARGS_NONE, [], "", "List the tags of the cluster"),
2390 "add-tags": (
2391 AddTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT] + SUBMIT_OPTS,
2392 "tag...", "Add tags to the cluster"),
2393 "remove-tags": (
2394 RemoveTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT] + SUBMIT_OPTS,
2395 "tag...", "Remove tags from the cluster"),
2396 "search-tags": (
2397 SearchTags, [ArgUnknown(min=1, max=1)], [PRIORITY_OPT], "",
2398 "Searches the tags on all objects on"
2399 " the cluster for a given pattern (regex)"),
2400 "queue": (
2401 QueueOps,
2402 [ArgChoice(min=1, max=1, choices=["drain", "undrain", "info"])],
2403 [], "drain|undrain|info", "Change queue properties"),
2404 "watcher": (
2405 WatcherOps,
2406 [ArgChoice(min=1, max=1, choices=["pause", "continue", "info"]),
2407 ArgSuggest(min=0, max=1, choices=["30m", "1h", "4h"])],
2408 [],
2409 "{pause <timespec>|continue|info}", "Change watcher properties"),
2410 "modify": (
2411 SetClusterParams, ARGS_NONE,
2412 [FORCE_OPT,
2413 BACKEND_OPT, CP_SIZE_OPT, RQL_OPT, MAX_TRACK_OPT, INSTALL_IMAGE_OPT,
2414 INSTANCE_COMMUNICATION_NETWORK_OPT, ENABLED_HV_OPT, HVLIST_OPT,
2415 MAC_PREFIX_OPT, MASTER_NETDEV_OPT, MASTER_NETMASK_OPT, NIC_PARAMS_OPT,
2416 VG_NAME_OPT, MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, ADD_UIDS_OPT,
2417 REMOVE_UIDS_OPT, DRBD_HELPER_OPT, DEFAULT_IALLOCATOR_OPT,
2418 DEFAULT_IALLOCATOR_PARAMS_OPT, RESERVED_LVS_OPT, DRY_RUN_OPT, PRIORITY_OPT,
2419 PREALLOC_WIPE_DISKS_OPT, NODE_PARAMS_OPT, USE_EXTERNAL_MIP_SCRIPT,
2420 DISK_PARAMS_OPT, HV_STATE_OPT, DISK_STATE_OPT] + SUBMIT_OPTS +
2421 [ENABLED_DISK_TEMPLATES_OPT, IPOLICY_STD_SPECS_OPT, MODIFY_ETCHOSTS_OPT,
2422 ENABLED_USER_SHUTDOWN_OPT] +
2423 INSTANCE_POLICY_OPTS +
2424 [GLOBAL_FILEDIR_OPT, GLOBAL_SHARED_FILEDIR_OPT, ZEROING_IMAGE_OPT,
2425 COMPRESSION_TOOLS_OPT],
2426 "[opts...]",
2427 "Alters the parameters of the cluster"),
2428 "renew-crypto": (
2429 RenewCrypto, ARGS_NONE,
2430 [NEW_CLUSTER_CERT_OPT, NEW_RAPI_CERT_OPT, RAPI_CERT_OPT,
2431 NEW_CONFD_HMAC_KEY_OPT, FORCE_OPT,
2432 NEW_CLUSTER_DOMAIN_SECRET_OPT, CLUSTER_DOMAIN_SECRET_OPT,
2433 NEW_SPICE_CERT_OPT, SPICE_CERT_OPT, SPICE_CACERT_OPT,
2434 NEW_NODE_CERT_OPT, VERBOSE_OPT],
2435 "[opts...]",
2436 "Renews cluster certificates, keys and secrets"),
2437 "epo": (
2438 Epo, [ArgUnknown()],
2439 [FORCE_OPT, ON_OPT, GROUPS_OPT, ALL_OPT, OOB_TIMEOUT_OPT,
2440 SHUTDOWN_TIMEOUT_OPT, POWER_DELAY_OPT],
2441 "[opts...] [args]",
2442 "Performs an emergency power-off on given args"),
2443 "activate-master-ip": (
2444 ActivateMasterIp, ARGS_NONE, [], "", "Activates the master IP"),
2445 "deactivate-master-ip": (
2446 DeactivateMasterIp, ARGS_NONE, [CONFIRM_OPT], "",
2447 "Deactivates the master IP"),
2448 "show-ispecs-cmd": (
2449 ShowCreateCommand, ARGS_NONE, [], "",
2450 "Show the command line to re-create the cluster"),
2451 "upgrade": (
2452 UpgradeGanetiCommand, ARGS_NONE, [TO_OPT, RESUME_OPT], "",
2453 "Upgrade (or downgrade) to a new Ganeti version"),
2454 }
2455
2456
2457 #: dictionary with aliases for commands
2458 aliases = {
2459 "masterfailover": "master-failover",
2460 "show": "info",
2461 }
2462
2463
2464 def Main():
2465 return GenericMain(commands, override={"tag_type": constants.TAG_CLUSTER},
2466 aliases=aliases)