e938b559cb68b4b046f9b6d8afb44f20498647b5
[ganeti-github.git] / lib / client / gnt_cluster.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011, 2012, 2013, 2014 Google Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions are
9 # met:
10 #
11 # 1. Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
13 #
14 # 2. Redistributions in binary form must reproduce the above copyright
15 # notice, this list of conditions and the following disclaimer in the
16 # documentation and/or other materials provided with the distribution.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
19 # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
20 # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30 """Cluster related commands"""
31
32 # pylint: disable=W0401,W0613,W0614,C0103
33 # W0401: Wildcard import ganeti.cli
34 # W0613: Unused argument, since all functions follow the same API
35 # W0614: Unused import %s from wildcard import (since we need cli)
36 # C0103: Invalid name gnt-cluster
37
38 from cStringIO import StringIO
39 import os
40 import time
41 import OpenSSL
42 import tempfile
43 import itertools
44
45 from ganeti.cli import *
46 from ganeti import bootstrap
47 from ganeti import compat
48 from ganeti import constants
49 from ganeti import errors
50 from ganeti import netutils
51 from ganeti import objects
52 from ganeti import opcodes
53 from ganeti import pathutils
54 from ganeti import qlang
55 from ganeti import serializer
56 from ganeti import ssconf
57 from ganeti import ssh
58 from ganeti import uidpool
59 from ganeti import utils
60 from ganeti.client import base
61
62
63 ON_OPT = cli_option("--on", default=False,
64 action="store_true", dest="on",
65 help="Recover from an EPO")
66
67 GROUPS_OPT = cli_option("--groups", default=False,
68 action="store_true", dest="groups",
69 help="Arguments are node groups instead of nodes")
70
71 FORCE_FAILOVER = cli_option("--yes-do-it", dest="yes_do_it",
72 help="Override interactive check for --no-voting",
73 default=False, action="store_true")
74
75 FORCE_DISTRIBUTION = cli_option("--yes-do-it", dest="yes_do_it",
76 help="Unconditionally distribute the"
77 " configuration, even if the queue"
78 " is drained",
79 default=False, action="store_true")
80
81 TO_OPT = cli_option("--to", default=None, type="string",
82 help="The Ganeti version to upgrade to")
83
84 RESUME_OPT = cli_option("--resume", default=False, action="store_true",
85 help="Resume any pending Ganeti upgrades")
86
87 DATA_COLLECTOR_INTERVAL_OPT = cli_option(
88 "--data-collector-interval", default={}, type="keyval",
89 help="Set collection intervals in seconds of data collectors.")
90
91
92 _EPO_PING_INTERVAL = 30 # 30 seconds between pings
93 _EPO_PING_TIMEOUT = 1 # 1 second
94 _EPO_REACHABLE_TIMEOUT = 15 * 60 # 15 minutes
95
96
97 def _InitEnabledDiskTemplates(opts):
98 """Initialize the list of enabled disk templates.
99
100 """
101 if opts.enabled_disk_templates:
102 return opts.enabled_disk_templates.split(",")
103 else:
104 return constants.DEFAULT_ENABLED_DISK_TEMPLATES
105
106
107 def _InitVgName(opts, enabled_disk_templates):
108 """Initialize the volume group name.
109
110 @type enabled_disk_templates: list of strings
111 @param enabled_disk_templates: cluster-wide enabled disk templates
112
113 """
114 vg_name = None
115 if opts.vg_name is not None:
116 vg_name = opts.vg_name
117 if vg_name:
118 if not utils.IsLvmEnabled(enabled_disk_templates):
119 ToStdout("You specified a volume group with --vg-name, but you did not"
120 " enable any disk template that uses lvm.")
121 elif utils.IsLvmEnabled(enabled_disk_templates):
122 raise errors.OpPrereqError(
123 "LVM disk templates are enabled, but vg name not set.")
124 elif utils.IsLvmEnabled(enabled_disk_templates):
125 vg_name = constants.DEFAULT_VG
126 return vg_name
127
128
129 def _InitDrbdHelper(opts, enabled_disk_templates):
130 """Initialize the DRBD usermode helper.
131
132 """
133 drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
134
135 if not drbd_enabled and opts.drbd_helper is not None:
136 ToStdout("Note: You specified a DRBD usermode helper, while DRBD storage"
137 " is not enabled.")
138
139 if drbd_enabled:
140 if opts.drbd_helper is None:
141 return constants.DEFAULT_DRBD_HELPER
142 if opts.drbd_helper == '':
143 raise errors.OpPrereqError(
144 "Unsetting the drbd usermode helper while enabling DRBD is not"
145 " allowed.")
146
147 return opts.drbd_helper
148
149
150 @UsesRPC
151 def InitCluster(opts, args):
152 """Initialize the cluster.
153
154 @param opts: the command line options selected by the user
155 @type args: list
156 @param args: should contain only one element, the desired
157 cluster name
158 @rtype: int
159 @return: the desired exit code
160
161 """
162 enabled_disk_templates = _InitEnabledDiskTemplates(opts)
163
164 try:
165 vg_name = _InitVgName(opts, enabled_disk_templates)
166 drbd_helper = _InitDrbdHelper(opts, enabled_disk_templates)
167 except errors.OpPrereqError, e:
168 ToStderr(str(e))
169 return 1
170
171 master_netdev = opts.master_netdev
172 if master_netdev is None:
173 nic_mode = opts.nicparams.get(constants.NIC_MODE, None)
174 if not nic_mode:
175 # default case, use bridging
176 master_netdev = constants.DEFAULT_BRIDGE
177 elif nic_mode == constants.NIC_MODE_OVS:
178 # default ovs is different from default bridge
179 master_netdev = constants.DEFAULT_OVS
180 opts.nicparams[constants.NIC_LINK] = constants.DEFAULT_OVS
181
182 hvlist = opts.enabled_hypervisors
183 if hvlist is None:
184 hvlist = constants.DEFAULT_ENABLED_HYPERVISOR
185 hvlist = hvlist.split(",")
186
187 hvparams = dict(opts.hvparams)
188 beparams = opts.beparams
189 nicparams = opts.nicparams
190
191 diskparams = dict(opts.diskparams)
192
193 # check the disk template types here, as we cannot rely on the type check done
194 # by the opcode parameter types
195 diskparams_keys = set(diskparams.keys())
196 if not (diskparams_keys <= constants.DISK_TEMPLATES):
197 unknown = utils.NiceSort(diskparams_keys - constants.DISK_TEMPLATES)
198 ToStderr("Disk templates unknown: %s" % utils.CommaJoin(unknown))
199 return 1
200
201 # prepare beparams dict
202 beparams = objects.FillDict(constants.BEC_DEFAULTS, beparams)
203 utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
204
205 # prepare nicparams dict
206 nicparams = objects.FillDict(constants.NICC_DEFAULTS, nicparams)
207 utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
208
209 # prepare ndparams dict
210 if opts.ndparams is None:
211 ndparams = dict(constants.NDC_DEFAULTS)
212 else:
213 ndparams = objects.FillDict(constants.NDC_DEFAULTS, opts.ndparams)
214 utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
215
216 # prepare hvparams dict
217 for hv in constants.HYPER_TYPES:
218 if hv not in hvparams:
219 hvparams[hv] = {}
220 hvparams[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], hvparams[hv])
221 utils.ForceDictType(hvparams[hv], constants.HVS_PARAMETER_TYPES)
222
223 # prepare diskparams dict
224 for templ in constants.DISK_TEMPLATES:
225 if templ not in diskparams:
226 diskparams[templ] = {}
227 diskparams[templ] = objects.FillDict(constants.DISK_DT_DEFAULTS[templ],
228 diskparams[templ])
229 utils.ForceDictType(diskparams[templ], constants.DISK_DT_TYPES)
230
231 # prepare ipolicy dict
232 ipolicy = CreateIPolicyFromOpts(
233 ispecs_mem_size=opts.ispecs_mem_size,
234 ispecs_cpu_count=opts.ispecs_cpu_count,
235 ispecs_disk_count=opts.ispecs_disk_count,
236 ispecs_disk_size=opts.ispecs_disk_size,
237 ispecs_nic_count=opts.ispecs_nic_count,
238 minmax_ispecs=opts.ipolicy_bounds_specs,
239 std_ispecs=opts.ipolicy_std_specs,
240 ipolicy_disk_templates=opts.ipolicy_disk_templates,
241 ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
242 ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
243 fill_all=True)
244
245 if opts.candidate_pool_size is None:
246 opts.candidate_pool_size = constants.MASTER_POOL_SIZE_DEFAULT
247
248 if opts.mac_prefix is None:
249 opts.mac_prefix = constants.DEFAULT_MAC_PREFIX
250
251 uid_pool = opts.uid_pool
252 if uid_pool is not None:
253 uid_pool = uidpool.ParseUidPool(uid_pool)
254
255 if opts.prealloc_wipe_disks is None:
256 opts.prealloc_wipe_disks = False
257
258 external_ip_setup_script = opts.use_external_mip_script
259 if external_ip_setup_script is None:
260 external_ip_setup_script = False
261
262 try:
263 primary_ip_version = int(opts.primary_ip_version)
264 except (ValueError, TypeError), err:
265 ToStderr("Invalid primary ip version value: %s" % str(err))
266 return 1
267
268 master_netmask = opts.master_netmask
269 try:
270 if master_netmask is not None:
271 master_netmask = int(master_netmask)
272 except (ValueError, TypeError), err:
273 ToStderr("Invalid master netmask value: %s" % str(err))
274 return 1
275
276 if opts.disk_state:
277 disk_state = utils.FlatToDict(opts.disk_state)
278 else:
279 disk_state = {}
280
281 hv_state = dict(opts.hv_state)
282
283 if opts.install_image:
284 install_image = opts.install_image
285 else:
286 install_image = ""
287
288 if opts.zeroing_image:
289 zeroing_image = opts.zeroing_image
290 else:
291 zeroing_image = ""
292
293 compression_tools = _GetCompressionTools(opts)
294
295 default_ialloc_params = opts.default_iallocator_params
296
297 if opts.enabled_user_shutdown:
298 enabled_user_shutdown = True
299 else:
300 enabled_user_shutdown = False
301
302 bootstrap.InitCluster(cluster_name=args[0],
303 secondary_ip=opts.secondary_ip,
304 vg_name=vg_name,
305 mac_prefix=opts.mac_prefix,
306 master_netmask=master_netmask,
307 master_netdev=master_netdev,
308 file_storage_dir=opts.file_storage_dir,
309 shared_file_storage_dir=opts.shared_file_storage_dir,
310 gluster_storage_dir=opts.gluster_storage_dir,
311 enabled_hypervisors=hvlist,
312 hvparams=hvparams,
313 beparams=beparams,
314 nicparams=nicparams,
315 ndparams=ndparams,
316 diskparams=diskparams,
317 ipolicy=ipolicy,
318 candidate_pool_size=opts.candidate_pool_size,
319 modify_etc_hosts=opts.modify_etc_hosts,
320 modify_ssh_setup=opts.modify_ssh_setup,
321 maintain_node_health=opts.maintain_node_health,
322 drbd_helper=drbd_helper,
323 uid_pool=uid_pool,
324 default_iallocator=opts.default_iallocator,
325 default_iallocator_params=default_ialloc_params,
326 primary_ip_version=primary_ip_version,
327 prealloc_wipe_disks=opts.prealloc_wipe_disks,
328 use_external_mip_script=external_ip_setup_script,
329 hv_state=hv_state,
330 disk_state=disk_state,
331 enabled_disk_templates=enabled_disk_templates,
332 install_image=install_image,
333 zeroing_image=zeroing_image,
334 compression_tools=compression_tools,
335 enabled_user_shutdown=enabled_user_shutdown,
336 )
337 op = opcodes.OpClusterPostInit()
338 SubmitOpCode(op, opts=opts)
339 return 0
340
341
342 @UsesRPC
343 def DestroyCluster(opts, args):
344 """Destroy the cluster.
345
346 @param opts: the command line options selected by the user
347 @type args: list
348 @param args: should be an empty list
349 @rtype: int
350 @return: the desired exit code
351
352 """
353 if not opts.yes_do_it:
354 ToStderr("Destroying a cluster is irreversible. If you really want"
355 " destroy this cluster, supply the --yes-do-it option.")
356 return 1
357
358 op = opcodes.OpClusterDestroy()
359 master_uuid = SubmitOpCode(op, opts=opts)
360 # if we reached this, the opcode didn't fail; we can proceed to
361 # shutdown all the daemons
362 bootstrap.FinalizeClusterDestroy(master_uuid)
363 return 0
364
365
366 def RenameCluster(opts, args):
367 """Rename the cluster.
368
369 @param opts: the command line options selected by the user
370 @type args: list
371 @param args: should contain only one element, the new cluster name
372 @rtype: int
373 @return: the desired exit code
374
375 """
376 cl = GetClient()
377
378 (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
379
380 new_name = args[0]
381 if not opts.force:
382 usertext = ("This will rename the cluster from '%s' to '%s'. If you are"
383 " connected over the network to the cluster name, the"
384 " operation is very dangerous as the IP address will be"
385 " removed from the node and the change may not go through."
386 " Continue?") % (cluster_name, new_name)
387 if not AskUser(usertext):
388 return 1
389
390 op = opcodes.OpClusterRename(name=new_name)
391 result = SubmitOpCode(op, opts=opts, cl=cl)
392
393 if result:
394 ToStdout("Cluster renamed from '%s' to '%s'", cluster_name, result)
395
396 return 0
397
398
399 def ActivateMasterIp(opts, args):
400 """Activates the master IP.
401
402 """
403 op = opcodes.OpClusterActivateMasterIp()
404 SubmitOpCode(op)
405 return 0
406
407
408 def DeactivateMasterIp(opts, args):
409 """Deactivates the master IP.
410
411 """
412 if not opts.confirm:
413 usertext = ("This will disable the master IP. All the open connections to"
414 " the master IP will be closed. To reach the master you will"
415 " need to use its node IP."
416 " Continue?")
417 if not AskUser(usertext):
418 return 1
419
420 op = opcodes.OpClusterDeactivateMasterIp()
421 SubmitOpCode(op)
422 return 0
423
424
425 def RedistributeConfig(opts, args):
426 """Forces push of the cluster configuration.
427
428 @param opts: the command line options selected by the user
429 @type args: list
430 @param args: empty list
431 @rtype: int
432 @return: the desired exit code
433
434 """
435 op = opcodes.OpClusterRedistConf()
436 if opts.yes_do_it:
437 SubmitOpCodeToDrainedQueue(op)
438 else:
439 SubmitOrSend(op, opts)
440 return 0
441
442
443 def ShowClusterVersion(opts, args):
444 """Write version of ganeti software to the standard output.
445
446 @param opts: the command line options selected by the user
447 @type args: list
448 @param args: should be an empty list
449 @rtype: int
450 @return: the desired exit code
451
452 """
453 cl = GetClient()
454 result = cl.QueryClusterInfo()
455 ToStdout("Software version: %s", result["software_version"])
456 ToStdout("Internode protocol: %s", result["protocol_version"])
457 ToStdout("Configuration format: %s", result["config_version"])
458 ToStdout("OS api version: %s", result["os_api_version"])
459 ToStdout("Export interface: %s", result["export_version"])
460 ToStdout("VCS version: %s", result["vcs_version"])
461 return 0
462
463
464 def ShowClusterMaster(opts, args):
465 """Write name of master node to the standard output.
466
467 @param opts: the command line options selected by the user
468 @type args: list
469 @param args: should be an empty list
470 @rtype: int
471 @return: the desired exit code
472
473 """
474 master = bootstrap.GetMaster()
475 ToStdout(master)
476 return 0
477
478
479 def _FormatGroupedParams(paramsdict, roman=False):
480 """Format Grouped parameters (be, nic, disk) by group.
481
482 @type paramsdict: dict of dicts
483 @param paramsdict: {group: {param: value, ...}, ...}
484 @rtype: dict of dicts
485 @return: copy of the input dictionaries with strings as values
486
487 """
488 ret = {}
489 for (item, val) in paramsdict.items():
490 if isinstance(val, dict):
491 ret[item] = _FormatGroupedParams(val, roman=roman)
492 elif roman and isinstance(val, int):
493 ret[item] = compat.TryToRoman(val)
494 else:
495 ret[item] = str(val)
496 return ret
497
498
499 def _FormatDataCollectors(paramsdict):
500 """Format Grouped parameters (be, nic, disk) by group.
501
502 @type paramsdict: dict of dicts
503 @param paramsdict: response of QueryClusterInfo
504 @rtype: dict of dicts
505 @return: parameter grouped by data collector
506
507 """
508
509 enabled = paramsdict[constants.DATA_COLLECTORS_ENABLED_NAME]
510 interval = paramsdict[constants.DATA_COLLECTORS_INTERVAL_NAME]
511
512 ret = {}
513 for key in enabled:
514 ret[key] = dict(active=enabled[key],
515 interval="%.3fs" % (interval[key] / 1e6))
516 return ret
517
518
519 def ShowClusterConfig(opts, args):
520 """Shows cluster information.
521
522 @param opts: the command line options selected by the user
523 @type args: list
524 @param args: should be an empty list
525 @rtype: int
526 @return: the desired exit code
527
528 """
529 cl = GetClient()
530 result = cl.QueryClusterInfo()
531
532 if result["tags"]:
533 tags = utils.CommaJoin(utils.NiceSort(result["tags"]))
534 else:
535 tags = "(none)"
536 if result["reserved_lvs"]:
537 reserved_lvs = utils.CommaJoin(result["reserved_lvs"])
538 else:
539 reserved_lvs = "(none)"
540
541 enabled_hv = result["enabled_hypervisors"]
542 hvparams = dict((k, v) for k, v in result["hvparams"].iteritems()
543 if k in enabled_hv)
544
545 info = [
546 ("Cluster name", result["name"]),
547 ("Cluster UUID", result["uuid"]),
548
549 ("Creation time", utils.FormatTime(result["ctime"])),
550 ("Modification time", utils.FormatTime(result["mtime"])),
551
552 ("Master node", result["master"]),
553
554 ("Architecture (this node)",
555 "%s (%s)" % (result["architecture"][0], result["architecture"][1])),
556
557 ("Tags", tags),
558
559 ("Default hypervisor", result["default_hypervisor"]),
560 ("Enabled hypervisors", utils.CommaJoin(enabled_hv)),
561
562 ("Hypervisor parameters", _FormatGroupedParams(hvparams,
563 opts.roman_integers)),
564
565 ("OS-specific hypervisor parameters",
566 _FormatGroupedParams(result["os_hvp"], opts.roman_integers)),
567
568 ("OS parameters", _FormatGroupedParams(result["osparams"],
569 opts.roman_integers)),
570
571 ("Hidden OSes", utils.CommaJoin(result["hidden_os"])),
572 ("Blacklisted OSes", utils.CommaJoin(result["blacklisted_os"])),
573
574 ("Cluster parameters", [
575 ("candidate pool size",
576 compat.TryToRoman(result["candidate_pool_size"],
577 convert=opts.roman_integers)),
578 ("maximal number of jobs running simultaneously",
579 compat.TryToRoman(result["max_running_jobs"],
580 convert=opts.roman_integers)),
581 ("maximal number of jobs simultaneously tracked by the scheduler",
582 compat.TryToRoman(result["max_tracked_jobs"],
583 convert=opts.roman_integers)),
584 ("mac prefix", result["mac_prefix"]),
585 ("master netdev", result["master_netdev"]),
586 ("master netmask", compat.TryToRoman(result["master_netmask"],
587 opts.roman_integers)),
588 ("use external master IP address setup script",
589 result["use_external_mip_script"]),
590 ("lvm volume group", result["volume_group_name"]),
591 ("lvm reserved volumes", reserved_lvs),
592 ("drbd usermode helper", result["drbd_usermode_helper"]),
593 ("file storage path", result["file_storage_dir"]),
594 ("shared file storage path", result["shared_file_storage_dir"]),
595 ("gluster storage path", result["gluster_storage_dir"]),
596 ("maintenance of node health", result["maintain_node_health"]),
597 ("uid pool", uidpool.FormatUidPool(result["uid_pool"])),
598 ("default instance allocator", result["default_iallocator"]),
599 ("default instance allocator parameters",
600 result["default_iallocator_params"]),
601 ("primary ip version", compat.TryToRoman(result["primary_ip_version"],
602 opts.roman_integers)),
603 ("preallocation wipe disks", result["prealloc_wipe_disks"]),
604 ("OS search path", utils.CommaJoin(pathutils.OS_SEARCH_PATH)),
605 ("ExtStorage Providers search path",
606 utils.CommaJoin(pathutils.ES_SEARCH_PATH)),
607 ("enabled disk templates",
608 utils.CommaJoin(result["enabled_disk_templates"])),
609 ("install image", result["install_image"]),
610 ("instance communication network",
611 result["instance_communication_network"]),
612 ("zeroing image", result["zeroing_image"]),
613 ("compression tools", result["compression_tools"]),
614 ("enabled user shutdown", result["enabled_user_shutdown"]),
615 ]),
616
617 ("Default node parameters",
618 _FormatGroupedParams(result["ndparams"], roman=opts.roman_integers)),
619
620 ("Default instance parameters",
621 _FormatGroupedParams(result["beparams"], roman=opts.roman_integers)),
622
623 ("Default nic parameters",
624 _FormatGroupedParams(result["nicparams"], roman=opts.roman_integers)),
625
626 ("Default disk parameters",
627 _FormatGroupedParams(result["diskparams"], roman=opts.roman_integers)),
628
629 ("Instance policy - limits for instances",
630 FormatPolicyInfo(result["ipolicy"], None, True, opts.roman_integers)),
631 ("Data collectors", _FormatDataCollectors(result)),
632 ]
633
634 PrintGenericInfo(info)
635 return 0
636
637
638 def ClusterCopyFile(opts, args):
639 """Copy a file from master to some nodes.
640
641 @param opts: the command line options selected by the user
642 @type args: list
643 @param args: should contain only one element, the path of
644 the file to be copied
645 @rtype: int
646 @return: the desired exit code
647
648 """
649 filename = args[0]
650 filename = os.path.abspath(filename)
651
652 if not os.path.exists(filename):
653 raise errors.OpPrereqError("No such filename '%s'" % filename,
654 errors.ECODE_INVAL)
655
656 cl = GetClient()
657 qcl = GetClient()
658 try:
659 cluster_name = cl.QueryConfigValues(["cluster_name"])[0]
660
661 results = GetOnlineNodes(nodes=opts.nodes, cl=qcl, filter_master=True,
662 secondary_ips=opts.use_replication_network,
663 nodegroup=opts.nodegroup)
664 ports = GetNodesSshPorts(opts.nodes, qcl)
665 finally:
666 cl.Close()
667 qcl.Close()
668
669 srun = ssh.SshRunner(cluster_name)
670 for (node, port) in zip(results, ports):
671 if not srun.CopyFileToNode(node, port, filename):
672 ToStderr("Copy of file %s to node %s:%d failed", filename, node, port)
673
674 return 0
675
676
677 def RunClusterCommand(opts, args):
678 """Run a command on some nodes.
679
680 @param opts: the command line options selected by the user
681 @type args: list
682 @param args: should contain the command to be run and its arguments
683 @rtype: int
684 @return: the desired exit code
685
686 """
687 cl = GetClient()
688 qcl = GetClient()
689
690 command = " ".join(args)
691
692 nodes = GetOnlineNodes(nodes=opts.nodes, cl=qcl, nodegroup=opts.nodegroup)
693 ports = GetNodesSshPorts(nodes, qcl)
694
695 cluster_name, master_node = cl.QueryConfigValues(["cluster_name",
696 "master_node"])
697
698 srun = ssh.SshRunner(cluster_name=cluster_name)
699
700 # Make sure master node is at list end
701 if master_node in nodes:
702 nodes.remove(master_node)
703 nodes.append(master_node)
704
705 for (name, port) in zip(nodes, ports):
706 result = srun.Run(name, constants.SSH_LOGIN_USER, command, port=port)
707
708 if opts.failure_only and result.exit_code == constants.EXIT_SUCCESS:
709 # Do not output anything for successful commands
710 continue
711
712 ToStdout("------------------------------------------------")
713 if opts.show_machine_names:
714 for line in result.output.splitlines():
715 ToStdout("%s: %s", name, line)
716 else:
717 ToStdout("node: %s", name)
718 ToStdout("%s", result.output)
719 ToStdout("return code = %s", result.exit_code)
720
721 return 0
722
723
724 def VerifyCluster(opts, args):
725 """Verify integrity of cluster, performing various test on nodes.
726
727 @param opts: the command line options selected by the user
728 @type args: list
729 @param args: should be an empty list
730 @rtype: int
731 @return: the desired exit code
732
733 """
734 skip_checks = []
735
736 if opts.skip_nplusone_mem:
737 skip_checks.append(constants.VERIFY_NPLUSONE_MEM)
738
739 cl = GetClient()
740
741 op = opcodes.OpClusterVerify(verbose=opts.verbose,
742 error_codes=opts.error_codes,
743 debug_simulate_errors=opts.simulate_errors,
744 skip_checks=skip_checks,
745 ignore_errors=opts.ignore_errors,
746 group_name=opts.nodegroup)
747 result = SubmitOpCode(op, cl=cl, opts=opts)
748
749 # Keep track of submitted jobs
750 jex = JobExecutor(cl=cl, opts=opts)
751
752 for (status, job_id) in result[constants.JOB_IDS_KEY]:
753 jex.AddJobId(None, status, job_id)
754
755 results = jex.GetResults()
756
757 (bad_jobs, bad_results) = \
758 map(len,
759 # Convert iterators to lists
760 map(list,
761 # Count errors
762 map(compat.partial(itertools.ifilterfalse, bool),
763 # Convert result to booleans in a tuple
764 zip(*((job_success, len(op_results) == 1 and op_results[0])
765 for (job_success, op_results) in results)))))
766
767 if bad_jobs == 0 and bad_results == 0:
768 rcode = constants.EXIT_SUCCESS
769 else:
770 rcode = constants.EXIT_FAILURE
771 if bad_jobs > 0:
772 ToStdout("%s job(s) failed while verifying the cluster.", bad_jobs)
773
774 return rcode
775
776
777 def VerifyDisks(opts, args):
778 """Verify integrity of cluster disks.
779
780 @param opts: the command line options selected by the user
781 @type args: list
782 @param args: should be an empty list
783 @rtype: int
784 @return: the desired exit code
785
786 """
787 cl = GetClient()
788
789 op = opcodes.OpClusterVerifyDisks()
790
791 result = SubmitOpCode(op, cl=cl, opts=opts)
792
793 # Keep track of submitted jobs
794 jex = JobExecutor(cl=cl, opts=opts)
795
796 for (status, job_id) in result[constants.JOB_IDS_KEY]:
797 jex.AddJobId(None, status, job_id)
798
799 retcode = constants.EXIT_SUCCESS
800
801 for (status, result) in jex.GetResults():
802 if not status:
803 ToStdout("Job failed: %s", result)
804 continue
805
806 ((bad_nodes, instances, missing), ) = result
807
808 for node, text in bad_nodes.items():
809 ToStdout("Error gathering data on node %s: %s",
810 node, utils.SafeEncode(text[-400:]))
811 retcode = constants.EXIT_FAILURE
812 ToStdout("You need to fix these nodes first before fixing instances")
813
814 for iname in instances:
815 if iname in missing:
816 continue
817 op = opcodes.OpInstanceActivateDisks(instance_name=iname)
818 try:
819 ToStdout("Activating disks for instance '%s'", iname)
820 SubmitOpCode(op, opts=opts, cl=cl)
821 except errors.GenericError, err:
822 nret, msg = FormatError(err)
823 retcode |= nret
824 ToStderr("Error activating disks for instance %s: %s", iname, msg)
825
826 if missing:
827 for iname, ival in missing.iteritems():
828 all_missing = compat.all(x[0] in bad_nodes for x in ival)
829 if all_missing:
830 ToStdout("Instance %s cannot be verified as it lives on"
831 " broken nodes", iname)
832 else:
833 ToStdout("Instance %s has missing logical volumes:", iname)
834 ival.sort()
835 for node, vol in ival:
836 if node in bad_nodes:
837 ToStdout("\tbroken node %s /dev/%s", node, vol)
838 else:
839 ToStdout("\t%s /dev/%s", node, vol)
840
841 ToStdout("You need to replace or recreate disks for all the above"
842 " instances if this message persists after fixing broken nodes.")
843 retcode = constants.EXIT_FAILURE
844 elif not instances:
845 ToStdout("No disks need to be activated.")
846
847 return retcode
848
849
850 def RepairDiskSizes(opts, args):
851 """Verify sizes of cluster disks.
852
853 @param opts: the command line options selected by the user
854 @type args: list
855 @param args: optional list of instances to restrict check to
856 @rtype: int
857 @return: the desired exit code
858
859 """
860 op = opcodes.OpClusterRepairDiskSizes(instances=args)
861 SubmitOpCode(op, opts=opts)
862
863
864 @UsesRPC
865 def MasterFailover(opts, args):
866 """Failover the master node.
867
868 This command, when run on a non-master node, will cause the current
869 master to cease being master, and the non-master to become new
870 master.
871
872 @param opts: the command line options selected by the user
873 @type args: list
874 @param args: should be an empty list
875 @rtype: int
876 @return: the desired exit code
877
878 """
879 if opts.no_voting and not opts.yes_do_it:
880 usertext = ("This will perform the failover even if most other nodes"
881 " are down, or if this node is outdated. This is dangerous"
882 " as it can lead to a non-consistent cluster. Check the"
883 " gnt-cluster(8) man page before proceeding. Continue?")
884 if not AskUser(usertext):
885 return 1
886
887 return bootstrap.MasterFailover(no_voting=opts.no_voting)
888
889
890 def MasterPing(opts, args):
891 """Checks if the master is alive.
892
893 @param opts: the command line options selected by the user
894 @type args: list
895 @param args: should be an empty list
896 @rtype: int
897 @return: the desired exit code
898
899 """
900 try:
901 cl = GetClient()
902 cl.QueryClusterInfo()
903 return 0
904 except Exception: # pylint: disable=W0703
905 return 1
906
907
908 def SearchTags(opts, args):
909 """Searches the tags on all the cluster.
910
911 @param opts: the command line options selected by the user
912 @type args: list
913 @param args: should contain only one element, the tag pattern
914 @rtype: int
915 @return: the desired exit code
916
917 """
918 op = opcodes.OpTagsSearch(pattern=args[0])
919 result = SubmitOpCode(op, opts=opts)
920 if not result:
921 return 1
922 result = list(result)
923 result.sort()
924 for path, tag in result:
925 ToStdout("%s %s", path, tag)
926
927
928 def _ReadAndVerifyCert(cert_filename, verify_private_key=False):
929 """Reads and verifies an X509 certificate.
930
931 @type cert_filename: string
932 @param cert_filename: the path of the file containing the certificate to
933 verify encoded in PEM format
934 @type verify_private_key: bool
935 @param verify_private_key: whether to verify the private key in addition to
936 the public certificate
937 @rtype: string
938 @return: a string containing the PEM-encoded certificate.
939
940 """
941 try:
942 pem = utils.ReadFile(cert_filename)
943 except IOError, err:
944 raise errors.X509CertError(cert_filename,
945 "Unable to read certificate: %s" % str(err))
946
947 try:
948 OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem)
949 except Exception, err:
950 raise errors.X509CertError(cert_filename,
951 "Unable to load certificate: %s" % str(err))
952
953 if verify_private_key:
954 try:
955 OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, pem)
956 except Exception, err:
957 raise errors.X509CertError(cert_filename,
958 "Unable to load private key: %s" % str(err))
959
960 return pem
961
962
963 def _RenewCrypto(new_cluster_cert, new_rapi_cert, # pylint: disable=R0911
964 rapi_cert_filename, new_spice_cert, spice_cert_filename,
965 spice_cacert_filename, new_confd_hmac_key, new_cds,
966 cds_filename, force, new_node_cert):
967 """Renews cluster certificates, keys and secrets.
968
969 @type new_cluster_cert: bool
970 @param new_cluster_cert: Whether to generate a new cluster certificate
971 @type new_rapi_cert: bool
972 @param new_rapi_cert: Whether to generate a new RAPI certificate
973 @type rapi_cert_filename: string
974 @param rapi_cert_filename: Path to file containing new RAPI certificate
975 @type new_spice_cert: bool
976 @param new_spice_cert: Whether to generate a new SPICE certificate
977 @type spice_cert_filename: string
978 @param spice_cert_filename: Path to file containing new SPICE certificate
979 @type spice_cacert_filename: string
980 @param spice_cacert_filename: Path to file containing the certificate of the
981 CA that signed the SPICE certificate
982 @type new_confd_hmac_key: bool
983 @param new_confd_hmac_key: Whether to generate a new HMAC key
984 @type new_cds: bool
985 @param new_cds: Whether to generate a new cluster domain secret
986 @type cds_filename: string
987 @param cds_filename: Path to file containing new cluster domain secret
988 @type force: bool
989 @param force: Whether to ask user for confirmation
990 @type new_node_cert: string
991 @param new_node_cert: Whether to generate new node certificates
992
993 """
994 if new_rapi_cert and rapi_cert_filename:
995 ToStderr("Only one of the --new-rapi-certificate and --rapi-certificate"
996 " options can be specified at the same time.")
997 return 1
998
999 if new_cds and cds_filename:
1000 ToStderr("Only one of the --new-cluster-domain-secret and"
1001 " --cluster-domain-secret options can be specified at"
1002 " the same time.")
1003 return 1
1004
1005 if new_spice_cert and (spice_cert_filename or spice_cacert_filename):
1006 ToStderr("When using --new-spice-certificate, the --spice-certificate"
1007 " and --spice-ca-certificate must not be used.")
1008 return 1
1009
1010 if bool(spice_cacert_filename) ^ bool(spice_cert_filename):
1011 ToStderr("Both --spice-certificate and --spice-ca-certificate must be"
1012 " specified.")
1013 return 1
1014
1015 rapi_cert_pem, spice_cert_pem, spice_cacert_pem = (None, None, None)
1016 try:
1017 if rapi_cert_filename:
1018 rapi_cert_pem = _ReadAndVerifyCert(rapi_cert_filename, True)
1019 if spice_cert_filename:
1020 spice_cert_pem = _ReadAndVerifyCert(spice_cert_filename, True)
1021 spice_cacert_pem = _ReadAndVerifyCert(spice_cacert_filename)
1022 except errors.X509CertError, err:
1023 ToStderr("Unable to load X509 certificate from %s: %s", err[0], err[1])
1024 return 1
1025
1026 if cds_filename:
1027 try:
1028 cds = utils.ReadFile(cds_filename)
1029 except Exception, err: # pylint: disable=W0703
1030 ToStderr("Can't load new cluster domain secret from %s: %s" %
1031 (cds_filename, str(err)))
1032 return 1
1033 else:
1034 cds = None
1035
1036 if not force:
1037 usertext = ("This requires all daemons on all nodes to be restarted and"
1038 " may take some time. Continue?")
1039 if not AskUser(usertext):
1040 return 1
1041
1042 def _RenewCryptoInner(ctx):
1043 ctx.feedback_fn("Updating certificates and keys")
1044 # Note: the node certificate will be generated in the LU
1045 bootstrap.GenerateClusterCrypto(new_cluster_cert,
1046 new_rapi_cert,
1047 new_spice_cert,
1048 new_confd_hmac_key,
1049 new_cds,
1050 rapi_cert_pem=rapi_cert_pem,
1051 spice_cert_pem=spice_cert_pem,
1052 spice_cacert_pem=spice_cacert_pem,
1053 cds=cds)
1054
1055 files_to_copy = []
1056
1057 if new_cluster_cert:
1058 files_to_copy.append(pathutils.NODED_CERT_FILE)
1059
1060 if new_rapi_cert or rapi_cert_pem:
1061 files_to_copy.append(pathutils.RAPI_CERT_FILE)
1062
1063 if new_spice_cert or spice_cert_pem:
1064 files_to_copy.append(pathutils.SPICE_CERT_FILE)
1065 files_to_copy.append(pathutils.SPICE_CACERT_FILE)
1066
1067 if new_confd_hmac_key:
1068 files_to_copy.append(pathutils.CONFD_HMAC_KEY)
1069
1070 if new_cds or cds:
1071 files_to_copy.append(pathutils.CLUSTER_DOMAIN_SECRET_FILE)
1072
1073 if files_to_copy:
1074 for node_name in ctx.nonmaster_nodes:
1075 port = ctx.ssh_ports[node_name]
1076 ctx.feedback_fn("Copying %s to %s:%d" %
1077 (", ".join(files_to_copy), node_name, port))
1078 for file_name in files_to_copy:
1079 ctx.ssh.CopyFileToNode(node_name, port, file_name)
1080
1081 RunWhileClusterStopped(ToStdout, _RenewCryptoInner)
1082
1083 ToStdout("All requested certificates and keys have been replaced."
1084 " Running \"gnt-cluster verify\" now is recommended.")
1085
1086 if new_node_cert:
1087 cl = GetClient()
1088 renew_op = opcodes.OpClusterRenewCrypto(node_certificates=new_node_cert)
1089 SubmitOpCode(renew_op, cl=cl)
1090
1091 return 0
1092
1093
1094 def RenewCrypto(opts, args):
1095 """Renews cluster certificates, keys and secrets.
1096
1097 """
1098 return _RenewCrypto(opts.new_cluster_cert,
1099 opts.new_rapi_cert,
1100 opts.rapi_cert,
1101 opts.new_spice_cert,
1102 opts.spice_cert,
1103 opts.spice_cacert,
1104 opts.new_confd_hmac_key,
1105 opts.new_cluster_domain_secret,
1106 opts.cluster_domain_secret,
1107 opts.force,
1108 opts.new_node_cert)
1109
1110
1111 def _GetEnabledDiskTemplates(opts):
1112 """Determine the list of enabled disk templates.
1113
1114 """
1115 if opts.enabled_disk_templates:
1116 return opts.enabled_disk_templates.split(",")
1117 else:
1118 return None
1119
1120
1121 def _GetVgName(opts, enabled_disk_templates):
1122 """Determine the volume group name.
1123
1124 @type enabled_disk_templates: list of strings
1125 @param enabled_disk_templates: cluster-wide enabled disk-templates
1126
1127 """
1128 # consistency between vg name and enabled disk templates
1129 vg_name = None
1130 if opts.vg_name is not None:
1131 vg_name = opts.vg_name
1132 if enabled_disk_templates:
1133 if vg_name and not utils.IsLvmEnabled(enabled_disk_templates):
1134 ToStdout("You specified a volume group with --vg-name, but you did not"
1135 " enable any of the following lvm-based disk templates: %s" %
1136 utils.CommaJoin(constants.DTS_LVM))
1137 return vg_name
1138
1139
1140 def _GetDrbdHelper(opts, enabled_disk_templates):
1141 """Determine the DRBD usermode helper.
1142
1143 """
1144 drbd_helper = opts.drbd_helper
1145 if enabled_disk_templates:
1146 drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
1147 if not drbd_enabled and opts.drbd_helper:
1148 ToStdout("You specified a DRBD usermode helper with "
1149 " --drbd-usermode-helper while DRBD is not enabled.")
1150 return drbd_helper
1151
1152
1153 def _GetCompressionTools(opts):
1154 """Determine the list of custom compression tools.
1155
1156 """
1157 if opts.compression_tools:
1158 return opts.compression_tools.split(",")
1159 elif opts.compression_tools is None:
1160 return None # To note the parameter was not provided
1161 else:
1162 return constants.IEC_DEFAULT_TOOLS # Resetting to default
1163
1164
1165 def SetClusterParams(opts, args):
1166 """Modify the cluster.
1167
1168 @param opts: the command line options selected by the user
1169 @type args: list
1170 @param args: should be an empty list
1171 @rtype: int
1172 @return: the desired exit code
1173
1174 """
1175 if not (opts.vg_name is not None or
1176 opts.drbd_helper is not None or
1177 opts.enabled_hypervisors or opts.hvparams or
1178 opts.beparams or opts.nicparams or
1179 opts.ndparams or opts.diskparams or
1180 opts.candidate_pool_size is not None or
1181 opts.max_running_jobs is not None or
1182 opts.max_tracked_jobs is not None or
1183 opts.uid_pool is not None or
1184 opts.maintain_node_health is not None or
1185 opts.add_uids is not None or
1186 opts.remove_uids is not None or
1187 opts.default_iallocator is not None or
1188 opts.default_iallocator_params or
1189 opts.reserved_lvs is not None or
1190 opts.mac_prefix is not None or
1191 opts.master_netdev is not None or
1192 opts.master_netmask is not None or
1193 opts.use_external_mip_script is not None or
1194 opts.prealloc_wipe_disks is not None or
1195 opts.hv_state or
1196 opts.enabled_disk_templates or
1197 opts.disk_state or
1198 opts.ipolicy_bounds_specs is not None or
1199 opts.ipolicy_std_specs is not None or
1200 opts.ipolicy_disk_templates is not None or
1201 opts.ipolicy_vcpu_ratio is not None or
1202 opts.ipolicy_spindle_ratio is not None or
1203 opts.modify_etc_hosts is not None or
1204 opts.file_storage_dir is not None or
1205 opts.install_image is not None or
1206 opts.instance_communication_network is not None or
1207 opts.zeroing_image is not None or
1208 opts.shared_file_storage_dir is not None or
1209 opts.compression_tools is not None or
1210 opts.shared_file_storage_dir is not None or
1211 opts.enabled_user_shutdown is not None or
1212 opts.data_collector_interval or
1213 opts.enabled_data_collectors):
1214 ToStderr("Please give at least one of the parameters.")
1215 return 1
1216
1217 enabled_disk_templates = _GetEnabledDiskTemplates(opts)
1218 vg_name = _GetVgName(opts, enabled_disk_templates)
1219
1220 try:
1221 drbd_helper = _GetDrbdHelper(opts, enabled_disk_templates)
1222 except errors.OpPrereqError, e:
1223 ToStderr(str(e))
1224 return 1
1225
1226 hvlist = opts.enabled_hypervisors
1227 if hvlist is not None:
1228 hvlist = hvlist.split(",")
1229
1230 # a list of (name, dict) we can pass directly to dict() (or [])
1231 hvparams = dict(opts.hvparams)
1232 for hv_params in hvparams.values():
1233 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1234
1235 diskparams = dict(opts.diskparams)
1236
1237 for dt_params in diskparams.values():
1238 utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
1239
1240 beparams = opts.beparams
1241 utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
1242
1243 nicparams = opts.nicparams
1244 utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
1245
1246 ndparams = opts.ndparams
1247 if ndparams is not None:
1248 utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
1249
1250 ipolicy = CreateIPolicyFromOpts(
1251 minmax_ispecs=opts.ipolicy_bounds_specs,
1252 std_ispecs=opts.ipolicy_std_specs,
1253 ipolicy_disk_templates=opts.ipolicy_disk_templates,
1254 ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
1255 ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
1256 )
1257
1258 mnh = opts.maintain_node_health
1259
1260 uid_pool = opts.uid_pool
1261 if uid_pool is not None:
1262 uid_pool = uidpool.ParseUidPool(uid_pool)
1263
1264 add_uids = opts.add_uids
1265 if add_uids is not None:
1266 add_uids = uidpool.ParseUidPool(add_uids)
1267
1268 remove_uids = opts.remove_uids
1269 if remove_uids is not None:
1270 remove_uids = uidpool.ParseUidPool(remove_uids)
1271
1272 if opts.reserved_lvs is not None:
1273 if opts.reserved_lvs == "":
1274 opts.reserved_lvs = []
1275 else:
1276 opts.reserved_lvs = utils.UnescapeAndSplit(opts.reserved_lvs, sep=",")
1277
1278 if opts.master_netmask is not None:
1279 try:
1280 opts.master_netmask = int(opts.master_netmask)
1281 except ValueError:
1282 ToStderr("The --master-netmask option expects an int parameter.")
1283 return 1
1284
1285 ext_ip_script = opts.use_external_mip_script
1286
1287 if opts.disk_state:
1288 disk_state = utils.FlatToDict(opts.disk_state)
1289 else:
1290 disk_state = {}
1291
1292 hv_state = dict(opts.hv_state)
1293
1294 compression_tools = _GetCompressionTools(opts)
1295
1296 enabled_data_collectors = dict(
1297 (k, v.lower().startswith("t"))
1298 for k, v in opts.enabled_data_collectors.items())
1299
1300 unrecognized_data_collectors = [
1301 k for k in enabled_data_collectors.keys()
1302 if k not in constants.DATA_COLLECTOR_NAMES]
1303 if unrecognized_data_collectors:
1304 ToStderr("Data collector names not recognized: %s" %
1305 ", ".join(unrecognized_data_collectors))
1306
1307 try:
1308 data_collector_interval = dict(
1309 (k, long(1e6 * float(v)))
1310 for (k, v) in opts.data_collector_interval.items())
1311 except ValueError:
1312 ToStderr("Can't transform all values to integers: {}".format(
1313 opts.data_collector_interval))
1314 return 1
1315 if any(v <= 0 for v in data_collector_interval):
1316 ToStderr("Some interval times where not above zero.")
1317 return 1
1318
1319 op = opcodes.OpClusterSetParams(
1320 vg_name=vg_name,
1321 drbd_helper=drbd_helper,
1322 enabled_hypervisors=hvlist,
1323 hvparams=hvparams,
1324 os_hvp=None,
1325 beparams=beparams,
1326 nicparams=nicparams,
1327 ndparams=ndparams,
1328 diskparams=diskparams,
1329 ipolicy=ipolicy,
1330 candidate_pool_size=opts.candidate_pool_size,
1331 max_running_jobs=opts.max_running_jobs,
1332 max_tracked_jobs=opts.max_tracked_jobs,
1333 maintain_node_health=mnh,
1334 modify_etc_hosts=opts.modify_etc_hosts,
1335 uid_pool=uid_pool,
1336 add_uids=add_uids,
1337 remove_uids=remove_uids,
1338 default_iallocator=opts.default_iallocator,
1339 default_iallocator_params=opts.default_iallocator_params,
1340 prealloc_wipe_disks=opts.prealloc_wipe_disks,
1341 mac_prefix=opts.mac_prefix,
1342 master_netdev=opts.master_netdev,
1343 master_netmask=opts.master_netmask,
1344 reserved_lvs=opts.reserved_lvs,
1345 use_external_mip_script=ext_ip_script,
1346 hv_state=hv_state,
1347 disk_state=disk_state,
1348 enabled_disk_templates=enabled_disk_templates,
1349 force=opts.force,
1350 file_storage_dir=opts.file_storage_dir,
1351 install_image=opts.install_image,
1352 instance_communication_network=opts.instance_communication_network,
1353 zeroing_image=opts.zeroing_image,
1354 shared_file_storage_dir=opts.shared_file_storage_dir,
1355 compression_tools=compression_tools,
1356 enabled_user_shutdown=opts.enabled_user_shutdown,
1357 enabled_data_collectors=enabled_data_collectors,
1358 data_collector_interval=data_collector_interval,
1359 )
1360 return base.GetResult(None, opts, SubmitOrSend(op, opts))
1361
1362
1363 def QueueOps(opts, args):
1364 """Queue operations.
1365
1366 @param opts: the command line options selected by the user
1367 @type args: list
1368 @param args: should contain only one element, the subcommand
1369 @rtype: int
1370 @return: the desired exit code
1371
1372 """
1373 command = args[0]
1374 client = GetClient()
1375 if command in ("drain", "undrain"):
1376 drain_flag = command == "drain"
1377 client.SetQueueDrainFlag(drain_flag)
1378 elif command == "info":
1379 result = client.QueryConfigValues(["drain_flag"])
1380 if result[0]:
1381 val = "set"
1382 else:
1383 val = "unset"
1384 ToStdout("The drain flag is %s" % val)
1385 else:
1386 raise errors.OpPrereqError("Command '%s' is not valid." % command,
1387 errors.ECODE_INVAL)
1388
1389 return 0
1390
1391
1392 def _ShowWatcherPause(until):
1393 if until is None or until < time.time():
1394 ToStdout("The watcher is not paused.")
1395 else:
1396 ToStdout("The watcher is paused until %s.", time.ctime(until))
1397
1398
1399 def WatcherOps(opts, args):
1400 """Watcher operations.
1401
1402 @param opts: the command line options selected by the user
1403 @type args: list
1404 @param args: should contain only one element, the subcommand
1405 @rtype: int
1406 @return: the desired exit code
1407
1408 """
1409 command = args[0]
1410 client = GetClient()
1411
1412 if command == "continue":
1413 client.SetWatcherPause(None)
1414 ToStdout("The watcher is no longer paused.")
1415
1416 elif command == "pause":
1417 if len(args) < 2:
1418 raise errors.OpPrereqError("Missing pause duration", errors.ECODE_INVAL)
1419
1420 result = client.SetWatcherPause(time.time() + ParseTimespec(args[1]))
1421 _ShowWatcherPause(result)
1422
1423 elif command == "info":
1424 result = client.QueryConfigValues(["watcher_pause"])
1425 _ShowWatcherPause(result[0])
1426
1427 else:
1428 raise errors.OpPrereqError("Command '%s' is not valid." % command,
1429 errors.ECODE_INVAL)
1430
1431 return 0
1432
1433
1434 def _OobPower(opts, node_list, power):
1435 """Puts the node in the list to desired power state.
1436
1437 @param opts: The command line options selected by the user
1438 @param node_list: The list of nodes to operate on
1439 @param power: True if they should be powered on, False otherwise
1440 @return: The success of the operation (none failed)
1441
1442 """
1443 if power:
1444 command = constants.OOB_POWER_ON
1445 else:
1446 command = constants.OOB_POWER_OFF
1447
1448 op = opcodes.OpOobCommand(node_names=node_list,
1449 command=command,
1450 ignore_status=True,
1451 timeout=opts.oob_timeout,
1452 power_delay=opts.power_delay)
1453 result = SubmitOpCode(op, opts=opts)
1454 errs = 0
1455 for node_result in result:
1456 (node_tuple, data_tuple) = node_result
1457 (_, node_name) = node_tuple
1458 (data_status, _) = data_tuple
1459 if data_status != constants.RS_NORMAL:
1460 assert data_status != constants.RS_UNAVAIL
1461 errs += 1
1462 ToStderr("There was a problem changing power for %s, please investigate",
1463 node_name)
1464
1465 if errs > 0:
1466 return False
1467
1468 return True
1469
1470
1471 def _InstanceStart(opts, inst_list, start, no_remember=False):
1472 """Puts the instances in the list to desired state.
1473
1474 @param opts: The command line options selected by the user
1475 @param inst_list: The list of instances to operate on
1476 @param start: True if they should be started, False for shutdown
1477 @param no_remember: If the instance state should be remembered
1478 @return: The success of the operation (none failed)
1479
1480 """
1481 if start:
1482 opcls = opcodes.OpInstanceStartup
1483 text_submit, text_success, text_failed = ("startup", "started", "starting")
1484 else:
1485 opcls = compat.partial(opcodes.OpInstanceShutdown,
1486 timeout=opts.shutdown_timeout,
1487 no_remember=no_remember)
1488 text_submit, text_success, text_failed = ("shutdown", "stopped", "stopping")
1489
1490 jex = JobExecutor(opts=opts)
1491
1492 for inst in inst_list:
1493 ToStdout("Submit %s of instance %s", text_submit, inst)
1494 op = opcls(instance_name=inst)
1495 jex.QueueJob(inst, op)
1496
1497 results = jex.GetResults()
1498 bad_cnt = len([1 for (success, _) in results if not success])
1499
1500 if bad_cnt == 0:
1501 ToStdout("All instances have been %s successfully", text_success)
1502 else:
1503 ToStderr("There were errors while %s instances:\n"
1504 "%d error(s) out of %d instance(s)", text_failed, bad_cnt,
1505 len(results))
1506 return False
1507
1508 return True
1509
1510
1511 class _RunWhenNodesReachableHelper(object):
1512 """Helper class to make shared internal state sharing easier.
1513
1514 @ivar success: Indicates if all action_cb calls were successful
1515
1516 """
1517 def __init__(self, node_list, action_cb, node2ip, port, feedback_fn,
1518 _ping_fn=netutils.TcpPing, _sleep_fn=time.sleep):
1519 """Init the object.
1520
1521 @param node_list: The list of nodes to be reachable
1522 @param action_cb: Callback called when a new host is reachable
1523 @type node2ip: dict
1524 @param node2ip: Node to ip mapping
1525 @param port: The port to use for the TCP ping
1526 @param feedback_fn: The function used for feedback
1527 @param _ping_fn: Function to check reachabilty (for unittest use only)
1528 @param _sleep_fn: Function to sleep (for unittest use only)
1529
1530 """
1531 self.down = set(node_list)
1532 self.up = set()
1533 self.node2ip = node2ip
1534 self.success = True
1535 self.action_cb = action_cb
1536 self.port = port
1537 self.feedback_fn = feedback_fn
1538 self._ping_fn = _ping_fn
1539 self._sleep_fn = _sleep_fn
1540
1541 def __call__(self):
1542 """When called we run action_cb.
1543
1544 @raises utils.RetryAgain: When there are still down nodes
1545
1546 """
1547 if not self.action_cb(self.up):
1548 self.success = False
1549
1550 if self.down:
1551 raise utils.RetryAgain()
1552 else:
1553 return self.success
1554
1555 def Wait(self, secs):
1556 """Checks if a host is up or waits remaining seconds.
1557
1558 @param secs: The secs remaining
1559
1560 """
1561 start = time.time()
1562 for node in self.down:
1563 if self._ping_fn(self.node2ip[node], self.port, timeout=_EPO_PING_TIMEOUT,
1564 live_port_needed=True):
1565 self.feedback_fn("Node %s became available" % node)
1566 self.up.add(node)
1567 self.down -= self.up
1568 # If we have a node available there is the possibility to run the
1569 # action callback successfully, therefore we don't wait and return
1570 return
1571
1572 self._sleep_fn(max(0.0, start + secs - time.time()))
1573
1574
1575 def _RunWhenNodesReachable(node_list, action_cb, interval):
1576 """Run action_cb when nodes become reachable.
1577
1578 @param node_list: The list of nodes to be reachable
1579 @param action_cb: Callback called when a new host is reachable
1580 @param interval: The earliest time to retry
1581
1582 """
1583 client = GetClient()
1584 cluster_info = client.QueryClusterInfo()
1585 if cluster_info["primary_ip_version"] == constants.IP4_VERSION:
1586 family = netutils.IPAddress.family
1587 else:
1588 family = netutils.IP6Address.family
1589
1590 node2ip = dict((node, netutils.GetHostname(node, family=family).ip)
1591 for node in node_list)
1592
1593 port = netutils.GetDaemonPort(constants.NODED)
1594 helper = _RunWhenNodesReachableHelper(node_list, action_cb, node2ip, port,
1595 ToStdout)
1596
1597 try:
1598 return utils.Retry(helper, interval, _EPO_REACHABLE_TIMEOUT,
1599 wait_fn=helper.Wait)
1600 except utils.RetryTimeout:
1601 ToStderr("Time exceeded while waiting for nodes to become reachable"
1602 " again:\n - %s", " - ".join(helper.down))
1603 return False
1604
1605
1606 def _MaybeInstanceStartup(opts, inst_map, nodes_online,
1607 _instance_start_fn=_InstanceStart):
1608 """Start the instances conditional based on node_states.
1609
1610 @param opts: The command line options selected by the user
1611 @param inst_map: A dict of inst -> nodes mapping
1612 @param nodes_online: A list of nodes online
1613 @param _instance_start_fn: Callback to start instances (unittest use only)
1614 @return: Success of the operation on all instances
1615
1616 """
1617 start_inst_list = []
1618 for (inst, nodes) in inst_map.items():
1619 if not (nodes - nodes_online):
1620 # All nodes the instance lives on are back online
1621 start_inst_list.append(inst)
1622
1623 for inst in start_inst_list:
1624 del inst_map[inst]
1625
1626 if start_inst_list:
1627 return _instance_start_fn(opts, start_inst_list, True)
1628
1629 return True
1630
1631
1632 def _EpoOn(opts, full_node_list, node_list, inst_map):
1633 """Does the actual power on.
1634
1635 @param opts: The command line options selected by the user
1636 @param full_node_list: All nodes to operate on (includes nodes not supporting
1637 OOB)
1638 @param node_list: The list of nodes to operate on (all need to support OOB)
1639 @param inst_map: A dict of inst -> nodes mapping
1640 @return: The desired exit status
1641
1642 """
1643 if node_list and not _OobPower(opts, node_list, False):
1644 ToStderr("Not all nodes seem to get back up, investigate and start"
1645 " manually if needed")
1646
1647 # Wait for the nodes to be back up
1648 action_cb = compat.partial(_MaybeInstanceStartup, opts, dict(inst_map))
1649
1650 ToStdout("Waiting until all nodes are available again")
1651 if not _RunWhenNodesReachable(full_node_list, action_cb, _EPO_PING_INTERVAL):
1652 ToStderr("Please investigate and start stopped instances manually")
1653 return constants.EXIT_FAILURE
1654
1655 return constants.EXIT_SUCCESS
1656
1657
1658 def _EpoOff(opts, node_list, inst_map):
1659 """Does the actual power off.
1660
1661 @param opts: The command line options selected by the user
1662 @param node_list: The list of nodes to operate on (all need to support OOB)
1663 @param inst_map: A dict of inst -> nodes mapping
1664 @return: The desired exit status
1665
1666 """
1667 if not _InstanceStart(opts, inst_map.keys(), False, no_remember=True):
1668 ToStderr("Please investigate and stop instances manually before continuing")
1669 return constants.EXIT_FAILURE
1670
1671 if not node_list:
1672 return constants.EXIT_SUCCESS
1673
1674 if _OobPower(opts, node_list, False):
1675 return constants.EXIT_SUCCESS
1676 else:
1677 return constants.EXIT_FAILURE
1678
1679
1680 def Epo(opts, args, qcl=None, _on_fn=_EpoOn, _off_fn=_EpoOff,
1681 _confirm_fn=ConfirmOperation,
1682 _stdout_fn=ToStdout, _stderr_fn=ToStderr):
1683 """EPO operations.
1684
1685 @param opts: the command line options selected by the user
1686 @type args: list
1687 @param args: should contain only one element, the subcommand
1688 @rtype: int
1689 @return: the desired exit code
1690
1691 """
1692 if opts.groups and opts.show_all:
1693 _stderr_fn("Only one of --groups or --all are allowed")
1694 return constants.EXIT_FAILURE
1695 elif args and opts.show_all:
1696 _stderr_fn("Arguments in combination with --all are not allowed")
1697 return constants.EXIT_FAILURE
1698
1699 if qcl is None:
1700 # Query client
1701 qcl = GetClient()
1702
1703 if opts.groups:
1704 node_query_list = \
1705 itertools.chain(*qcl.QueryGroups(args, ["node_list"], False))
1706 else:
1707 node_query_list = args
1708
1709 result = qcl.QueryNodes(node_query_list, ["name", "master", "pinst_list",
1710 "sinst_list", "powered", "offline"],
1711 False)
1712
1713 all_nodes = map(compat.fst, result)
1714 node_list = []
1715 inst_map = {}
1716 for (node, master, pinsts, sinsts, powered, offline) in result:
1717 if not offline:
1718 for inst in (pinsts + sinsts):
1719 if inst in inst_map:
1720 if not master:
1721 inst_map[inst].add(node)
1722 elif master:
1723 inst_map[inst] = set()
1724 else:
1725 inst_map[inst] = set([node])
1726
1727 if master and opts.on:
1728 # We ignore the master for turning on the machines, in fact we are
1729 # already operating on the master at this point :)
1730 continue
1731 elif master and not opts.show_all:
1732 _stderr_fn("%s is the master node, please do a master-failover to another"
1733 " node not affected by the EPO or use --all if you intend to"
1734 " shutdown the whole cluster", node)
1735 return constants.EXIT_FAILURE
1736 elif powered is None:
1737 _stdout_fn("Node %s does not support out-of-band handling, it can not be"
1738 " handled in a fully automated manner", node)
1739 elif powered == opts.on:
1740 _stdout_fn("Node %s is already in desired power state, skipping", node)
1741 elif not offline or (offline and powered):
1742 node_list.append(node)
1743
1744 if not (opts.force or _confirm_fn(all_nodes, "nodes", "epo")):
1745 return constants.EXIT_FAILURE
1746
1747 if opts.on:
1748 return _on_fn(opts, all_nodes, node_list, inst_map)
1749 else:
1750 return _off_fn(opts, node_list, inst_map)
1751
1752
1753 def _GetCreateCommand(info):
1754 buf = StringIO()
1755 buf.write("gnt-cluster init")
1756 PrintIPolicyCommand(buf, info["ipolicy"], False)
1757 buf.write(" ")
1758 buf.write(info["name"])
1759 return buf.getvalue()
1760
1761
1762 def ShowCreateCommand(opts, args):
1763 """Shows the command that can be used to re-create the cluster.
1764
1765 Currently it works only for ipolicy specs.
1766
1767 """
1768 cl = GetClient()
1769 result = cl.QueryClusterInfo()
1770 ToStdout(_GetCreateCommand(result))
1771
1772
1773 def _RunCommandAndReport(cmd):
1774 """Run a command and report its output, iff it failed.
1775
1776 @param cmd: the command to execute
1777 @type cmd: list
1778 @rtype: bool
1779 @return: False, if the execution failed.
1780
1781 """
1782 result = utils.RunCmd(cmd)
1783 if result.failed:
1784 ToStderr("Command %s failed: %s; Output %s" %
1785 (cmd, result.fail_reason, result.output))
1786 return False
1787 return True
1788
1789
1790 def _VerifyCommand(cmd):
1791 """Verify that a given command succeeds on all online nodes.
1792
1793 As this function is intended to run during upgrades, it
1794 is implemented in such a way that it still works, if all Ganeti
1795 daemons are down.
1796
1797 @param cmd: the command to execute
1798 @type cmd: list
1799 @rtype: list
1800 @return: the list of node names that are online where
1801 the command failed.
1802
1803 """
1804 command = utils.text.ShellQuoteArgs([str(val) for val in cmd])
1805
1806 nodes = ssconf.SimpleStore().GetOnlineNodeList()
1807 master_node = ssconf.SimpleStore().GetMasterNode()
1808 cluster_name = ssconf.SimpleStore().GetClusterName()
1809
1810 # If master node is in 'nodes', make sure master node is at list end
1811 if master_node in nodes:
1812 nodes.remove(master_node)
1813 nodes.append(master_node)
1814
1815 failed = []
1816
1817 srun = ssh.SshRunner(cluster_name=cluster_name)
1818 for name in nodes:
1819 result = srun.Run(name, constants.SSH_LOGIN_USER, command)
1820 if result.exit_code != 0:
1821 failed.append(name)
1822
1823 return failed
1824
1825
1826 def _VerifyVersionInstalled(versionstring):
1827 """Verify that the given version of ganeti is installed on all online nodes.
1828
1829 Do nothing, if this is the case, otherwise print an appropriate
1830 message to stderr.
1831
1832 @param versionstring: the version to check for
1833 @type versionstring: string
1834 @rtype: bool
1835 @return: True, if the version is installed on all online nodes
1836
1837 """
1838 badnodes = _VerifyCommand(["test", "-d",
1839 os.path.join(pathutils.PKGLIBDIR, versionstring)])
1840 if badnodes:
1841 ToStderr("Ganeti version %s not installed on nodes %s"
1842 % (versionstring, ", ".join(badnodes)))
1843 return False
1844
1845 return True
1846
1847
1848 def _GetRunning():
1849 """Determine the list of running jobs.
1850
1851 @rtype: list
1852 @return: the number of jobs still running
1853
1854 """
1855 cl = GetClient()
1856 qfilter = qlang.MakeSimpleFilter("status",
1857 frozenset([constants.JOB_STATUS_RUNNING]))
1858 return len(cl.Query(constants.QR_JOB, [], qfilter).data)
1859
1860
1861 def _SetGanetiVersion(versionstring):
1862 """Set the active version of ganeti to the given versionstring
1863
1864 @type versionstring: string
1865 @rtype: list
1866 @return: the list of nodes where the version change failed
1867
1868 """
1869 failed = []
1870 if constants.HAS_GNU_LN:
1871 failed.extend(_VerifyCommand(
1872 ["ln", "-s", "-f", "-T",
1873 os.path.join(pathutils.PKGLIBDIR, versionstring),
1874 os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
1875 failed.extend(_VerifyCommand(
1876 ["ln", "-s", "-f", "-T",
1877 os.path.join(pathutils.SHAREDIR, versionstring),
1878 os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
1879 else:
1880 failed.extend(_VerifyCommand(
1881 ["rm", "-f", os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
1882 failed.extend(_VerifyCommand(
1883 ["ln", "-s", "-f", os.path.join(pathutils.PKGLIBDIR, versionstring),
1884 os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
1885 failed.extend(_VerifyCommand(
1886 ["rm", "-f", os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
1887 failed.extend(_VerifyCommand(
1888 ["ln", "-s", "-f", os.path.join(pathutils.SHAREDIR, versionstring),
1889 os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
1890 return list(set(failed))
1891
1892
1893 def _ExecuteCommands(fns):
1894 """Execute a list of functions, in reverse order.
1895
1896 @type fns: list of functions.
1897 @param fns: the functions to be executed.
1898
1899 """
1900 for fn in reversed(fns):
1901 fn()
1902
1903
1904 def _GetConfigVersion():
1905 """Determine the version the configuration file currently has.
1906
1907 @rtype: tuple or None
1908 @return: (major, minor, revision) if the version can be determined,
1909 None otherwise
1910
1911 """
1912 config_data = serializer.LoadJson(utils.ReadFile(pathutils.CLUSTER_CONF_FILE))
1913 try:
1914 config_version = config_data["version"]
1915 except KeyError:
1916 return None
1917 return utils.SplitVersion(config_version)
1918
1919
1920 def _ReadIntentToUpgrade():
1921 """Read the file documenting the intent to upgrade the cluster.
1922
1923 @rtype: (string, string) or (None, None)
1924 @return: (old version, version to upgrade to), if the file exists,
1925 and (None, None) otherwise.
1926
1927 """
1928 if not os.path.isfile(pathutils.INTENT_TO_UPGRADE):
1929 return (None, None)
1930
1931 contentstring = utils.ReadFile(pathutils.INTENT_TO_UPGRADE)
1932 contents = utils.UnescapeAndSplit(contentstring)
1933 if len(contents) != 3:
1934 # file syntactically mal-formed
1935 return (None, None)
1936 return (contents[0], contents[1])
1937
1938
1939 def _WriteIntentToUpgrade(version):
1940 """Write file documenting the intent to upgrade the cluster.
1941
1942 @type version: string
1943 @param version: the version we intent to upgrade to
1944
1945 """
1946 utils.WriteFile(pathutils.INTENT_TO_UPGRADE,
1947 data=utils.EscapeAndJoin([constants.RELEASE_VERSION, version,
1948 "%d" % os.getpid()]))
1949
1950
1951 def _UpgradeBeforeConfigurationChange(versionstring):
1952 """
1953 Carry out all the tasks necessary for an upgrade that happen before
1954 the configuration file, or Ganeti version, changes.
1955
1956 @type versionstring: string
1957 @param versionstring: the version to upgrade to
1958 @rtype: (bool, list)
1959 @return: tuple of a bool indicating success and a list of rollback tasks
1960
1961 """
1962 rollback = []
1963
1964 if not _VerifyVersionInstalled(versionstring):
1965 return (False, rollback)
1966
1967 _WriteIntentToUpgrade(versionstring)
1968 rollback.append(
1969 lambda: utils.RunCmd(["rm", "-f", pathutils.INTENT_TO_UPGRADE]))
1970
1971 ToStdout("Draining queue")
1972 client = GetClient()
1973 client.SetQueueDrainFlag(True)
1974
1975 rollback.append(lambda: GetClient().SetQueueDrainFlag(False))
1976
1977 if utils.SimpleRetry(0, _GetRunning,
1978 constants.UPGRADE_QUEUE_POLL_INTERVAL,
1979 constants.UPGRADE_QUEUE_DRAIN_TIMEOUT):
1980 ToStderr("Failed to completely empty the queue.")
1981 return (False, rollback)
1982
1983 ToStdout("Pausing the watcher for one hour.")
1984 rollback.append(lambda: GetClient().SetWatcherPause(None))
1985 GetClient().SetWatcherPause(time.time() + 60 * 60)
1986
1987 ToStdout("Stopping daemons on master node.")
1988 if not _RunCommandAndReport([pathutils.DAEMON_UTIL, "stop-all"]):
1989 return (False, rollback)
1990
1991 if not _VerifyVersionInstalled(versionstring):
1992 utils.RunCmd([pathutils.DAEMON_UTIL, "start-all"])
1993 return (False, rollback)
1994
1995 ToStdout("Stopping daemons everywhere.")
1996 rollback.append(lambda: _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"]))
1997 badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "stop-all"])
1998 if badnodes:
1999 ToStderr("Failed to stop daemons on %s." % (", ".join(badnodes),))
2000 return (False, rollback)
2001
2002 backuptar = os.path.join(pathutils.BACKUP_DIR, "ganeti%d.tar" % time.time())
2003 ToStdout("Backing up configuration as %s" % backuptar)
2004 if not _RunCommandAndReport(["mkdir", "-p", pathutils.BACKUP_DIR]):
2005 return (False, rollback)
2006
2007 # Create the archive in a safe manner, as it contains sensitive
2008 # information.
2009 (_, tmp_name) = tempfile.mkstemp(prefix=backuptar, dir=pathutils.BACKUP_DIR)
2010 if not _RunCommandAndReport(["tar", "-cf", tmp_name,
2011 "--exclude=queue/archive",
2012 pathutils.DATA_DIR]):
2013 return (False, rollback)
2014
2015 os.rename(tmp_name, backuptar)
2016 return (True, rollback)
2017
2018
2019 def _VersionSpecificDowngrade():
2020 """
2021 Perform any additional downrade tasks that are version specific
2022 and need to be done just after the configuration downgrade. This
2023 function needs to be idempotent, so that it can be redone if the
2024 downgrade procedure gets interrupted after changing the
2025 configuration.
2026
2027 Note that this function has to be reset with every version bump.
2028
2029 @return: True upon success
2030 """
2031 ToStdout("Performing version-specific downgrade tasks.")
2032 return True
2033
2034
2035 def _SwitchVersionAndConfig(versionstring, downgrade):
2036 """
2037 Switch to the new Ganeti version and change the configuration,
2038 in correct order.
2039
2040 @type versionstring: string
2041 @param versionstring: the version to change to
2042 @type downgrade: bool
2043 @param downgrade: True, if the configuration should be downgraded
2044 @rtype: (bool, list)
2045 @return: tupe of a bool indicating success, and a list of
2046 additional rollback tasks
2047
2048 """
2049 rollback = []
2050 if downgrade:
2051 ToStdout("Downgrading configuration")
2052 if not _RunCommandAndReport([pathutils.CFGUPGRADE, "--downgrade", "-f"]):
2053 return (False, rollback)
2054 # Note: version specific downgrades need to be done before switching
2055 # binaries, so that we still have the knowledgeable binary if the downgrade
2056 # process gets interrupted at this point.
2057 if not _VersionSpecificDowngrade():
2058 return (False, rollback)
2059
2060 # Configuration change is the point of no return. From then onwards, it is
2061 # safer to push through the up/dowgrade than to try to roll it back.
2062
2063 ToStdout("Switching to version %s on all nodes" % versionstring)
2064 rollback.append(lambda: _SetGanetiVersion(constants.DIR_VERSION))
2065 badnodes = _SetGanetiVersion(versionstring)
2066 if badnodes:
2067 ToStderr("Failed to switch to Ganeti version %s on nodes %s"
2068 % (versionstring, ", ".join(badnodes)))
2069 if not downgrade:
2070 return (False, rollback)
2071
2072 # Now that we have changed to the new version of Ganeti we should
2073 # not communicate over luxi any more, as luxi might have changed in
2074 # incompatible ways. Therefore, manually call the corresponding ganeti
2075 # commands using their canonical (version independent) path.
2076
2077 if not downgrade:
2078 ToStdout("Upgrading configuration")
2079 if not _RunCommandAndReport([pathutils.CFGUPGRADE, "-f"]):
2080 return (False, rollback)
2081
2082 return (True, rollback)
2083
2084
2085 def _UpgradeAfterConfigurationChange(oldversion):
2086 """
2087 Carry out the upgrade actions necessary after switching to the new
2088 Ganeti version and updating the configuration.
2089
2090 As this part is run at a time where the new version of Ganeti is already
2091 running, no communication should happen via luxi, as this is not a stable
2092 interface. Also, as the configuration change is the point of no return,
2093 all actions are pushed trough, even if some of them fail.
2094
2095 @param oldversion: the version the upgrade started from
2096 @type oldversion: string
2097 @rtype: int
2098 @return: the intended return value
2099
2100 """
2101 returnvalue = 0
2102
2103 ToStdout("Ensuring directories everywhere.")
2104 badnodes = _VerifyCommand([pathutils.ENSURE_DIRS])
2105 if badnodes:
2106 ToStderr("Warning: failed to ensure directories on %s." %
2107 (", ".join(badnodes)))
2108 returnvalue = 1
2109
2110 ToStdout("Starting daemons everywhere.")
2111 badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"])
2112 if badnodes:
2113 ToStderr("Warning: failed to start daemons on %s." % (", ".join(badnodes),))
2114 returnvalue = 1
2115
2116 ToStdout("Redistributing the configuration.")
2117 if not _RunCommandAndReport(["gnt-cluster", "redist-conf", "--yes-do-it"]):
2118 returnvalue = 1
2119
2120 ToStdout("Restarting daemons everywhere.")
2121 badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "stop-all"])
2122 badnodes.extend(_VerifyCommand([pathutils.DAEMON_UTIL, "start-all"]))
2123 if badnodes:
2124 ToStderr("Warning: failed to start daemons on %s." %
2125 (", ".join(list(set(badnodes))),))
2126 returnvalue = 1
2127
2128 ToStdout("Undraining the queue.")
2129 if not _RunCommandAndReport(["gnt-cluster", "queue", "undrain"]):
2130 returnvalue = 1
2131
2132 _RunCommandAndReport(["rm", "-f", pathutils.INTENT_TO_UPGRADE])
2133
2134 ToStdout("Running post-upgrade hooks")
2135 if not _RunCommandAndReport([pathutils.POST_UPGRADE, oldversion]):
2136 returnvalue = 1
2137
2138 ToStdout("Unpasuing the watcher.")
2139 if not _RunCommandAndReport(["gnt-cluster", "watcher", "continue"]):
2140 returnvalue = 1
2141
2142 ToStdout("Verifying cluster.")
2143 if not _RunCommandAndReport(["gnt-cluster", "verify"]):
2144 returnvalue = 1
2145
2146 return returnvalue
2147
2148
2149 def UpgradeGanetiCommand(opts, args):
2150 """Upgrade a cluster to a new ganeti version.
2151
2152 @param opts: the command line options selected by the user
2153 @type args: list
2154 @param args: should be an empty list
2155 @rtype: int
2156 @return: the desired exit code
2157
2158 """
2159 if ((not opts.resume and opts.to is None)
2160 or (opts.resume and opts.to is not None)):
2161 ToStderr("Precisely one of the options --to and --resume"
2162 " has to be given")
2163 return 1
2164
2165 # If we're not told to resume, verify there is no upgrade
2166 # in progress.
2167 if not opts.resume:
2168 oldversion, versionstring = _ReadIntentToUpgrade()
2169 if versionstring is not None:
2170 # An upgrade is going on; verify whether the target matches
2171 if versionstring == opts.to:
2172 ToStderr("An upgrade is already in progress. Target version matches,"
2173 " resuming.")
2174 opts.resume = True
2175 opts.to = None
2176 else:
2177 ToStderr("An upgrade from %s to %s is in progress; use --resume to"
2178 " finish it first" % (oldversion, versionstring))
2179 return 1
2180
2181 oldversion = constants.RELEASE_VERSION
2182
2183 if opts.resume:
2184 ssconf.CheckMaster(False)
2185 oldversion, versionstring = _ReadIntentToUpgrade()
2186 if versionstring is None:
2187 return 0
2188 version = utils.version.ParseVersion(versionstring)
2189 if version is None:
2190 return 1
2191 configversion = _GetConfigVersion()
2192 if configversion is None:
2193 return 1
2194 # If the upgrade we resume was an upgrade between compatible
2195 # versions (like 2.10.0 to 2.10.1), the correct configversion
2196 # does not guarantee that the config has been updated.
2197 # However, in the case of a compatible update with the configuration
2198 # not touched, we are running a different dirversion with the same
2199 # config version.
2200 config_already_modified = \
2201 (utils.IsCorrectConfigVersion(version, configversion) and
2202 not (versionstring != constants.DIR_VERSION and
2203 configversion == (constants.CONFIG_MAJOR, constants.CONFIG_MINOR,
2204 constants.CONFIG_REVISION)))
2205 if not config_already_modified:
2206 # We have to start from the beginning; however, some daemons might have
2207 # already been stopped, so the only way to get into a well-defined state
2208 # is by starting all daemons again.
2209 _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"])
2210 else:
2211 versionstring = opts.to
2212 config_already_modified = False
2213 version = utils.version.ParseVersion(versionstring)
2214 if version is None:
2215 ToStderr("Could not parse version string %s" % versionstring)
2216 return 1
2217
2218 msg = utils.version.UpgradeRange(version)
2219 if msg is not None:
2220 ToStderr("Cannot upgrade to %s: %s" % (versionstring, msg))
2221 return 1
2222
2223 if not config_already_modified:
2224 success, rollback = _UpgradeBeforeConfigurationChange(versionstring)
2225 if not success:
2226 _ExecuteCommands(rollback)
2227 return 1
2228 else:
2229 rollback = []
2230
2231 downgrade = utils.version.ShouldCfgdowngrade(version)
2232
2233 success, additionalrollback = \
2234 _SwitchVersionAndConfig(versionstring, downgrade)
2235 if not success:
2236 rollback.extend(additionalrollback)
2237 _ExecuteCommands(rollback)
2238 return 1
2239
2240 return _UpgradeAfterConfigurationChange(oldversion)
2241
2242
2243 commands = {
2244 "init": (
2245 InitCluster, [ArgHost(min=1, max=1)],
2246 [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, GLOBAL_FILEDIR_OPT,
2247 HVLIST_OPT, MAC_PREFIX_OPT, MASTER_NETDEV_OPT, MASTER_NETMASK_OPT,
2248 NIC_PARAMS_OPT, NOMODIFY_ETCHOSTS_OPT, NOMODIFY_SSH_SETUP_OPT,
2249 SECONDARY_IP_OPT, VG_NAME_OPT, MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT,
2250 DRBD_HELPER_OPT, DEFAULT_IALLOCATOR_OPT, DEFAULT_IALLOCATOR_PARAMS_OPT,
2251 PRIMARY_IP_VERSION_OPT, PREALLOC_WIPE_DISKS_OPT, NODE_PARAMS_OPT,
2252 GLOBAL_SHARED_FILEDIR_OPT, USE_EXTERNAL_MIP_SCRIPT, DISK_PARAMS_OPT,
2253 HV_STATE_OPT, DISK_STATE_OPT, ENABLED_DISK_TEMPLATES_OPT,
2254 IPOLICY_STD_SPECS_OPT, GLOBAL_GLUSTER_FILEDIR_OPT, INSTALL_IMAGE_OPT,
2255 ZEROING_IMAGE_OPT, COMPRESSION_TOOLS_OPT,
2256 ENABLED_USER_SHUTDOWN_OPT,
2257 ]
2258 + INSTANCE_POLICY_OPTS + SPLIT_ISPECS_OPTS,
2259 "[opts...] <cluster_name>", "Initialises a new cluster configuration"),
2260 "destroy": (
2261 DestroyCluster, ARGS_NONE, [YES_DOIT_OPT],
2262 "", "Destroy cluster"),
2263 "rename": (
2264 RenameCluster, [ArgHost(min=1, max=1)],
2265 [FORCE_OPT, DRY_RUN_OPT],
2266 "<new_name>",
2267 "Renames the cluster"),
2268 "redist-conf": (
2269 RedistributeConfig, ARGS_NONE, SUBMIT_OPTS +
2270 [DRY_RUN_OPT, PRIORITY_OPT, FORCE_DISTRIBUTION],
2271 "", "Forces a push of the configuration file and ssconf files"
2272 " to the nodes in the cluster"),
2273 "verify": (
2274 VerifyCluster, ARGS_NONE,
2275 [VERBOSE_OPT, DEBUG_SIMERR_OPT, ERROR_CODES_OPT, NONPLUS1_OPT,
2276 DRY_RUN_OPT, PRIORITY_OPT, NODEGROUP_OPT, IGNORE_ERRORS_OPT],
2277 "", "Does a check on the cluster configuration"),
2278 "verify-disks": (
2279 VerifyDisks, ARGS_NONE, [PRIORITY_OPT],
2280 "", "Does a check on the cluster disk status"),
2281 "repair-disk-sizes": (
2282 RepairDiskSizes, ARGS_MANY_INSTANCES, [DRY_RUN_OPT, PRIORITY_OPT],
2283 "[instance...]", "Updates mismatches in recorded disk sizes"),
2284 "master-failover": (
2285 MasterFailover, ARGS_NONE, [NOVOTING_OPT, FORCE_FAILOVER],
2286 "", "Makes the current node the master"),
2287 "master-ping": (
2288 MasterPing, ARGS_NONE, [],
2289 "", "Checks if the master is alive"),
2290 "version": (
2291 ShowClusterVersion, ARGS_NONE, [],
2292 "", "Shows the cluster version"),
2293 "getmaster": (
2294 ShowClusterMaster, ARGS_NONE, [],
2295 "", "Shows the cluster master"),
2296 "copyfile": (
2297 ClusterCopyFile, [ArgFile(min=1, max=1)],
2298 [NODE_LIST_OPT, USE_REPL_NET_OPT, NODEGROUP_OPT],
2299 "[-n node...] <filename>", "Copies a file to all (or only some) nodes"),
2300 "command": (
2301 RunClusterCommand, [ArgCommand(min=1)],
2302 [NODE_LIST_OPT, NODEGROUP_OPT, SHOW_MACHINE_OPT, FAILURE_ONLY_OPT],
2303 "[-n node...] <command>", "Runs a command on all (or only some) nodes"),
2304 "info": (
2305 ShowClusterConfig, ARGS_NONE, [ROMAN_OPT],
2306 "[--roman]", "Show cluster configuration"),
2307 "list-tags": (
2308 ListTags, ARGS_NONE, [], "", "List the tags of the cluster"),
2309 "add-tags": (
2310 AddTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT] + SUBMIT_OPTS,
2311 "tag...", "Add tags to the cluster"),
2312 "remove-tags": (
2313 RemoveTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT] + SUBMIT_OPTS,
2314 "tag...", "Remove tags from the cluster"),
2315 "search-tags": (
2316 SearchTags, [ArgUnknown(min=1, max=1)], [PRIORITY_OPT], "",
2317 "Searches the tags on all objects on"
2318 " the cluster for a given pattern (regex)"),
2319 "queue": (
2320 QueueOps,
2321 [ArgChoice(min=1, max=1, choices=["drain", "undrain", "info"])],
2322 [], "drain|undrain|info", "Change queue properties"),
2323 "watcher": (
2324 WatcherOps,
2325 [ArgChoice(min=1, max=1, choices=["pause", "continue", "info"]),
2326 ArgSuggest(min=0, max=1, choices=["30m", "1h", "4h"])],
2327 [],
2328 "{pause <timespec>|continue|info}", "Change watcher properties"),
2329 "modify": (
2330 SetClusterParams, ARGS_NONE,
2331 [FORCE_OPT,
2332 BACKEND_OPT, CP_SIZE_OPT, RQL_OPT, MAX_TRACK_OPT, INSTALL_IMAGE_OPT,
2333 INSTANCE_COMMUNICATION_NETWORK_OPT, ENABLED_HV_OPT, HVLIST_OPT,
2334 MAC_PREFIX_OPT, MASTER_NETDEV_OPT, MASTER_NETMASK_OPT, NIC_PARAMS_OPT,
2335 VG_NAME_OPT, MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, ADD_UIDS_OPT,
2336 REMOVE_UIDS_OPT, DRBD_HELPER_OPT, DEFAULT_IALLOCATOR_OPT,
2337 DEFAULT_IALLOCATOR_PARAMS_OPT, RESERVED_LVS_OPT, DRY_RUN_OPT, PRIORITY_OPT,
2338 PREALLOC_WIPE_DISKS_OPT, NODE_PARAMS_OPT, USE_EXTERNAL_MIP_SCRIPT,
2339 DISK_PARAMS_OPT, HV_STATE_OPT, DISK_STATE_OPT] + SUBMIT_OPTS +
2340 [ENABLED_DISK_TEMPLATES_OPT, IPOLICY_STD_SPECS_OPT, MODIFY_ETCHOSTS_OPT,
2341 ENABLED_USER_SHUTDOWN_OPT] +
2342 INSTANCE_POLICY_OPTS +
2343 [GLOBAL_FILEDIR_OPT, GLOBAL_SHARED_FILEDIR_OPT, ZEROING_IMAGE_OPT,
2344 COMPRESSION_TOOLS_OPT] +
2345 [ENABLED_DATA_COLLECTORS_OPT, DATA_COLLECTOR_INTERVAL_OPT],
2346 "[opts...]",
2347 "Alters the parameters of the cluster"),
2348 "renew-crypto": (
2349 RenewCrypto, ARGS_NONE,
2350 [NEW_CLUSTER_CERT_OPT, NEW_RAPI_CERT_OPT, RAPI_CERT_OPT,
2351 NEW_CONFD_HMAC_KEY_OPT, FORCE_OPT,
2352 NEW_CLUSTER_DOMAIN_SECRET_OPT, CLUSTER_DOMAIN_SECRET_OPT,
2353 NEW_SPICE_CERT_OPT, SPICE_CERT_OPT, SPICE_CACERT_OPT,
2354 NEW_NODE_CERT_OPT],
2355 "[opts...]",
2356 "Renews cluster certificates, keys and secrets"),
2357 "epo": (
2358 Epo, [ArgUnknown()],
2359 [FORCE_OPT, ON_OPT, GROUPS_OPT, ALL_OPT, OOB_TIMEOUT_OPT,
2360 SHUTDOWN_TIMEOUT_OPT, POWER_DELAY_OPT],
2361 "[opts...] [args]",
2362 "Performs an emergency power-off on given args"),
2363 "activate-master-ip": (
2364 ActivateMasterIp, ARGS_NONE, [], "", "Activates the master IP"),
2365 "deactivate-master-ip": (
2366 DeactivateMasterIp, ARGS_NONE, [CONFIRM_OPT], "",
2367 "Deactivates the master IP"),
2368 "show-ispecs-cmd": (
2369 ShowCreateCommand, ARGS_NONE, [], "",
2370 "Show the command line to re-create the cluster"),
2371 "upgrade": (
2372 UpgradeGanetiCommand, ARGS_NONE, [TO_OPT, RESUME_OPT], "",
2373 "Upgrade (or downgrade) to a new Ganeti version"),
2374 }
2375
2376
2377 #: dictionary with aliases for commands
2378 aliases = {
2379 "masterfailover": "master-failover",
2380 "show": "info",
2381 }
2382
2383
2384 def Main():
2385 return GenericMain(commands, override={"tag_type": constants.TAG_CLUSTER},
2386 aliases=aliases)