4 # Copyright (C) 2006, 2007, 2010, 2011, 2012, 2013, 2014 Google Inc.
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions are
11 # 1. Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
14 # 2. Redistributions in binary form must reproduce the above copyright
15 # notice, this list of conditions and the following disclaimer in the
16 # documentation and/or other materials provided with the distribution.
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
19 # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
20 # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 """Cluster related commands"""
32 # pylint: disable=W0401,W0613,W0614,C0103
33 # W0401: Wildcard import ganeti.cli
34 # W0613: Unused argument, since all functions follow the same API
35 # W0614: Unused import %s from wildcard import (since we need cli)
36 # C0103: Invalid name gnt-cluster
38 from cStringIO
import StringIO
45 from ganeti
.cli
import *
46 from ganeti
import bootstrap
47 from ganeti
import compat
48 from ganeti
import constants
49 from ganeti
import config
50 from ganeti
import errors
51 from ganeti
import netutils
52 from ganeti
import objects
53 from ganeti
import opcodes
54 from ganeti
import pathutils
55 from ganeti
import qlang
56 from ganeti
import serializer
57 from ganeti
import ssconf
58 from ganeti
import ssh
59 from ganeti
import uidpool
60 from ganeti
import utils
61 from ganeti
import wconfd
62 from ganeti
.client
import base
65 ON_OPT
= cli_option("--on", default
=False,
66 action
="store_true", dest
="on",
67 help="Recover from an EPO")
69 GROUPS_OPT
= cli_option("--groups", default
=False,
70 action
="store_true", dest
="groups",
71 help="Arguments are node groups instead of nodes")
73 FORCE_FAILOVER
= cli_option("--yes-do-it", dest
="yes_do_it",
74 help="Override interactive check for --no-voting",
75 default
=False, action
="store_true")
77 FORCE_DISTRIBUTION
= cli_option("--yes-do-it", dest
="yes_do_it",
78 help="Unconditionally distribute the"
79 " configuration, even if the queue"
81 default
=False, action
="store_true")
83 TO_OPT
= cli_option("--to", default
=None, type="string",
84 help="The Ganeti version to upgrade to")
86 RESUME_OPT
= cli_option("--resume", default
=False, action
="store_true",
87 help="Resume any pending Ganeti upgrades")
89 DATA_COLLECTOR_INTERVAL_OPT
= cli_option(
90 "--data-collector-interval", default
={}, type="keyval",
91 help="Set collection intervals in seconds of data collectors.")
93 _EPO_PING_INTERVAL
= 30 # 30 seconds between pings
94 _EPO_PING_TIMEOUT
= 1 # 1 second
95 _EPO_REACHABLE_TIMEOUT
= 15 * 60 # 15 minutes
98 def _InitEnabledDiskTemplates(opts
):
99 """Initialize the list of enabled disk templates.
102 if opts
.enabled_disk_templates
:
103 return opts
.enabled_disk_templates
.split(",")
105 return constants
.DEFAULT_ENABLED_DISK_TEMPLATES
108 def _InitVgName(opts
, enabled_disk_templates
):
109 """Initialize the volume group name.
111 @type enabled_disk_templates: list of strings
112 @param enabled_disk_templates: cluster-wide enabled disk templates
116 if opts
.vg_name
is not None:
117 vg_name
= opts
.vg_name
119 if not utils
.IsLvmEnabled(enabled_disk_templates
):
120 ToStdout("You specified a volume group with --vg-name, but you did not"
121 " enable any disk template that uses lvm.")
122 elif utils
.IsLvmEnabled(enabled_disk_templates
):
123 raise errors
.OpPrereqError(
124 "LVM disk templates are enabled, but vg name not set.")
125 elif utils
.IsLvmEnabled(enabled_disk_templates
):
126 vg_name
= constants
.DEFAULT_VG
130 def _InitDrbdHelper(opts
, enabled_disk_templates
, feedback_fn
=ToStdout
):
131 """Initialize the DRBD usermode helper.
134 drbd_enabled
= constants
.DT_DRBD8
in enabled_disk_templates
136 if not drbd_enabled
and opts
.drbd_helper
is not None:
137 feedback_fn("Note: You specified a DRBD usermode helper, while DRBD storage"
141 if opts
.drbd_helper
is None:
142 return constants
.DEFAULT_DRBD_HELPER
143 if opts
.drbd_helper
== '':
144 raise errors
.OpPrereqError(
145 "Unsetting the drbd usermode helper while enabling DRBD is not"
148 return opts
.drbd_helper
152 def InitCluster(opts
, args
):
153 """Initialize the cluster.
155 @param opts: the command line options selected by the user
157 @param args: should contain only one element, the desired
160 @return: the desired exit code
163 enabled_disk_templates
= _InitEnabledDiskTemplates(opts
)
166 vg_name
= _InitVgName(opts
, enabled_disk_templates
)
167 drbd_helper
= _InitDrbdHelper(opts
, enabled_disk_templates
)
168 except errors
.OpPrereqError
, e
:
172 master_netdev
= opts
.master_netdev
173 if master_netdev
is None:
174 nic_mode
= opts
.nicparams
.get(constants
.NIC_MODE
, None)
176 # default case, use bridging
177 master_netdev
= constants
.DEFAULT_BRIDGE
178 elif nic_mode
== constants
.NIC_MODE_OVS
:
179 # default ovs is different from default bridge
180 master_netdev
= constants
.DEFAULT_OVS
181 opts
.nicparams
[constants
.NIC_LINK
] = constants
.DEFAULT_OVS
183 hvlist
= opts
.enabled_hypervisors
185 hvlist
= constants
.DEFAULT_ENABLED_HYPERVISOR
186 hvlist
= hvlist
.split(",")
188 hvparams
= dict(opts
.hvparams
)
189 beparams
= opts
.beparams
190 nicparams
= opts
.nicparams
192 diskparams
= dict(opts
.diskparams
)
194 # check the disk template types here, as we cannot rely on the type check done
195 # by the opcode parameter types
196 diskparams_keys
= set(diskparams
.keys())
197 if not (diskparams_keys
<= constants
.DISK_TEMPLATES
):
198 unknown
= utils
.NiceSort(diskparams_keys
- constants
.DISK_TEMPLATES
)
199 ToStderr("Disk templates unknown: %s" % utils
.CommaJoin(unknown
))
202 # prepare beparams dict
203 beparams
= objects
.FillDict(constants
.BEC_DEFAULTS
, beparams
)
204 utils
.ForceDictType(beparams
, constants
.BES_PARAMETER_COMPAT
)
206 # prepare nicparams dict
207 nicparams
= objects
.FillDict(constants
.NICC_DEFAULTS
, nicparams
)
208 utils
.ForceDictType(nicparams
, constants
.NICS_PARAMETER_TYPES
)
210 # prepare ndparams dict
211 if opts
.ndparams
is None:
212 ndparams
= dict(constants
.NDC_DEFAULTS
)
214 ndparams
= objects
.FillDict(constants
.NDC_DEFAULTS
, opts
.ndparams
)
215 utils
.ForceDictType(ndparams
, constants
.NDS_PARAMETER_TYPES
)
217 # prepare hvparams dict
218 for hv
in constants
.HYPER_TYPES
:
219 if hv
not in hvparams
:
221 hvparams
[hv
] = objects
.FillDict(constants
.HVC_DEFAULTS
[hv
], hvparams
[hv
])
222 utils
.ForceDictType(hvparams
[hv
], constants
.HVS_PARAMETER_TYPES
)
224 # prepare diskparams dict
225 for templ
in constants
.DISK_TEMPLATES
:
226 if templ
not in diskparams
:
227 diskparams
[templ
] = {}
228 diskparams
[templ
] = objects
.FillDict(constants
.DISK_DT_DEFAULTS
[templ
],
230 utils
.ForceDictType(diskparams
[templ
], constants
.DISK_DT_TYPES
)
232 # prepare ipolicy dict
233 ipolicy
= CreateIPolicyFromOpts(
234 ispecs_mem_size
=opts
.ispecs_mem_size
,
235 ispecs_cpu_count
=opts
.ispecs_cpu_count
,
236 ispecs_disk_count
=opts
.ispecs_disk_count
,
237 ispecs_disk_size
=opts
.ispecs_disk_size
,
238 ispecs_nic_count
=opts
.ispecs_nic_count
,
239 minmax_ispecs
=opts
.ipolicy_bounds_specs
,
240 std_ispecs
=opts
.ipolicy_std_specs
,
241 ipolicy_disk_templates
=opts
.ipolicy_disk_templates
,
242 ipolicy_vcpu_ratio
=opts
.ipolicy_vcpu_ratio
,
243 ipolicy_spindle_ratio
=opts
.ipolicy_spindle_ratio
,
244 ipolicy_memory_ratio
=opts
.ipolicy_memory_ratio
,
247 if opts
.candidate_pool_size
is None:
248 opts
.candidate_pool_size
= constants
.MASTER_POOL_SIZE_DEFAULT
250 if opts
.mac_prefix
is None:
251 opts
.mac_prefix
= constants
.DEFAULT_MAC_PREFIX
253 uid_pool
= opts
.uid_pool
254 if uid_pool
is not None:
255 uid_pool
= uidpool
.ParseUidPool(uid_pool
)
257 if opts
.prealloc_wipe_disks
is None:
258 opts
.prealloc_wipe_disks
= False
260 external_ip_setup_script
= opts
.use_external_mip_script
261 if external_ip_setup_script
is None:
262 external_ip_setup_script
= False
265 primary_ip_version
= int(opts
.primary_ip_version
)
266 except (ValueError, TypeError), err
:
267 ToStderr("Invalid primary ip version value: %s" % str(err
))
270 master_netmask
= opts
.master_netmask
272 if master_netmask
is not None:
273 master_netmask
= int(master_netmask
)
274 except (ValueError, TypeError), err
:
275 ToStderr("Invalid master netmask value: %s" % str(err
))
279 disk_state
= utils
.FlatToDict(opts
.disk_state
)
283 hv_state
= dict(opts
.hv_state
)
285 if opts
.install_image
:
286 install_image
= opts
.install_image
290 if opts
.zeroing_image
:
291 zeroing_image
= opts
.zeroing_image
295 compression_tools
= _GetCompressionTools(opts
)
297 default_ialloc_params
= opts
.default_iallocator_params
299 if opts
.enabled_user_shutdown
:
300 enabled_user_shutdown
= True
302 enabled_user_shutdown
= False
304 if opts
.ssh_key_type
:
305 ssh_key_type
= opts
.ssh_key_type
307 ssh_key_type
= constants
.SSH_DEFAULT_KEY_TYPE
309 ssh_key_bits
= ssh
.DetermineKeyBits(ssh_key_type
, opts
.ssh_key_bits
, None,
312 bootstrap
.InitCluster(cluster_name
=args
[0],
313 secondary_ip
=opts
.secondary_ip
,
315 mac_prefix
=opts
.mac_prefix
,
316 master_netmask
=master_netmask
,
317 master_netdev
=master_netdev
,
318 file_storage_dir
=opts
.file_storage_dir
,
319 shared_file_storage_dir
=opts
.shared_file_storage_dir
,
320 gluster_storage_dir
=opts
.gluster_storage_dir
,
321 enabled_hypervisors
=hvlist
,
326 diskparams
=diskparams
,
328 candidate_pool_size
=opts
.candidate_pool_size
,
329 modify_etc_hosts
=opts
.modify_etc_hosts
,
330 modify_ssh_setup
=opts
.modify_ssh_setup
,
331 maintain_node_health
=opts
.maintain_node_health
,
332 drbd_helper
=drbd_helper
,
334 default_iallocator
=opts
.default_iallocator
,
335 default_iallocator_params
=default_ialloc_params
,
336 primary_ip_version
=primary_ip_version
,
337 prealloc_wipe_disks
=opts
.prealloc_wipe_disks
,
338 use_external_mip_script
=external_ip_setup_script
,
340 disk_state
=disk_state
,
341 enabled_disk_templates
=enabled_disk_templates
,
342 install_image
=install_image
,
343 zeroing_image
=zeroing_image
,
344 compression_tools
=compression_tools
,
345 enabled_user_shutdown
=enabled_user_shutdown
,
346 ssh_key_type
=ssh_key_type
,
347 ssh_key_bits
=ssh_key_bits
,
349 op
= opcodes
.OpClusterPostInit()
350 SubmitOpCode(op
, opts
=opts
)
355 def DestroyCluster(opts
, args
):
356 """Destroy the cluster.
358 @param opts: the command line options selected by the user
360 @param args: should be an empty list
362 @return: the desired exit code
365 if not opts
.yes_do_it
:
366 ToStderr("Destroying a cluster is irreversible. If you really want"
367 " destroy this cluster, supply the --yes-do-it option.")
370 op
= opcodes
.OpClusterDestroy()
371 master_uuid
= SubmitOpCode(op
, opts
=opts
)
372 # if we reached this, the opcode didn't fail; we can proceed to
373 # shutdown all the daemons
374 bootstrap
.FinalizeClusterDestroy(master_uuid
)
378 def RenameCluster(opts
, args
):
379 """Rename the cluster.
381 @param opts: the command line options selected by the user
383 @param args: should contain only one element, the new cluster name
385 @return: the desired exit code
390 (cluster_name
, ) = cl
.QueryConfigValues(["cluster_name"])
394 usertext
= ("This will rename the cluster from '%s' to '%s'. If you are"
395 " connected over the network to the cluster name, the"
396 " operation is very dangerous as the IP address will be"
397 " removed from the node and the change may not go through."
398 " Continue?") % (cluster_name
, new_name
)
399 if not AskUser(usertext
):
402 op
= opcodes
.OpClusterRename(name
=new_name
)
403 result
= SubmitOpCode(op
, opts
=opts
, cl
=cl
)
406 ToStdout("Cluster renamed from '%s' to '%s'", cluster_name
, result
)
411 def ActivateMasterIp(opts
, args
):
412 """Activates the master IP.
415 op
= opcodes
.OpClusterActivateMasterIp()
420 def DeactivateMasterIp(opts
, args
):
421 """Deactivates the master IP.
425 usertext
= ("This will disable the master IP. All the open connections to"
426 " the master IP will be closed. To reach the master you will"
427 " need to use its node IP."
429 if not AskUser(usertext
):
432 op
= opcodes
.OpClusterDeactivateMasterIp()
437 def RedistributeConfig(opts
, args
):
438 """Forces push of the cluster configuration.
440 @param opts: the command line options selected by the user
442 @param args: empty list
444 @return: the desired exit code
447 op
= opcodes
.OpClusterRedistConf()
449 SubmitOpCodeToDrainedQueue(op
)
451 SubmitOrSend(op
, opts
)
455 def ShowClusterVersion(opts
, args
):
456 """Write version of ganeti software to the standard output.
458 @param opts: the command line options selected by the user
460 @param args: should be an empty list
462 @return: the desired exit code
466 result
= cl
.QueryClusterInfo()
467 ToStdout("Software version: %s", result
["software_version"])
468 ToStdout("Internode protocol: %s", result
["protocol_version"])
469 ToStdout("Configuration format: %s", result
["config_version"])
470 ToStdout("OS api version: %s", result
["os_api_version"])
471 ToStdout("Export interface: %s", result
["export_version"])
472 ToStdout("VCS version: %s", result
["vcs_version"])
476 def ShowClusterMaster(opts
, args
):
477 """Write name of master node to the standard output.
479 @param opts: the command line options selected by the user
481 @param args: should be an empty list
483 @return: the desired exit code
486 master
= bootstrap
.GetMaster()
491 def _FormatGroupedParams(paramsdict
, roman
=False):
492 """Format Grouped parameters (be, nic, disk) by group.
494 @type paramsdict: dict of dicts
495 @param paramsdict: {group: {param: value, ...}, ...}
496 @rtype: dict of dicts
497 @return: copy of the input dictionaries with strings as values
501 for (item
, val
) in paramsdict
.items():
502 if isinstance(val
, dict):
503 ret
[item
] = _FormatGroupedParams(val
, roman
=roman
)
504 elif roman
and isinstance(val
, int):
505 ret
[item
] = compat
.TryToRoman(val
)
511 def _FormatDataCollectors(paramsdict
):
512 """Format Grouped parameters (be, nic, disk) by group.
514 @type paramsdict: dict of dicts
515 @param paramsdict: response of QueryClusterInfo
516 @rtype: dict of dicts
517 @return: parameter grouped by data collector
521 enabled
= paramsdict
[constants
.DATA_COLLECTORS_ENABLED_NAME
]
522 interval
= paramsdict
[constants
.DATA_COLLECTORS_INTERVAL_NAME
]
526 ret
[key
] = dict(active
=enabled
[key
],
527 interval
="%.3fs" % (interval
[key
] / 1e6
))
531 def ShowClusterConfig(opts
, args
):
532 """Shows cluster information.
534 @param opts: the command line options selected by the user
536 @param args: should be an empty list
538 @return: the desired exit code
542 result
= cl
.QueryClusterInfo()
545 tags
= utils
.CommaJoin(utils
.NiceSort(result
["tags"]))
548 if result
["reserved_lvs"]:
549 reserved_lvs
= utils
.CommaJoin(result
["reserved_lvs"])
551 reserved_lvs
= "(none)"
553 enabled_hv
= result
["enabled_hypervisors"]
554 hvparams
= dict((k
, v
) for k
, v
in result
["hvparams"].iteritems()
558 ("Cluster name", result
["name"]),
559 ("Cluster UUID", result
["uuid"]),
561 ("Creation time", utils
.FormatTime(result
["ctime"])),
562 ("Modification time", utils
.FormatTime(result
["mtime"])),
564 ("Master node", result
["master"]),
566 ("Architecture (this node)",
567 "%s (%s)" % (result
["architecture"][0], result
["architecture"][1])),
571 ("Default hypervisor", result
["default_hypervisor"]),
572 ("Enabled hypervisors", utils
.CommaJoin(enabled_hv
)),
574 ("Hypervisor parameters", _FormatGroupedParams(hvparams
,
575 opts
.roman_integers
)),
577 ("OS-specific hypervisor parameters",
578 _FormatGroupedParams(result
["os_hvp"], opts
.roman_integers
)),
580 ("OS parameters", _FormatGroupedParams(result
["osparams"],
581 opts
.roman_integers
)),
583 ("Hidden OSes", utils
.CommaJoin(result
["hidden_os"])),
584 ("Blacklisted OSes", utils
.CommaJoin(result
["blacklisted_os"])),
586 ("Cluster parameters", [
587 ("candidate pool size",
588 compat
.TryToRoman(result
["candidate_pool_size"],
589 convert
=opts
.roman_integers
)),
590 ("maximal number of jobs running simultaneously",
591 compat
.TryToRoman(result
["max_running_jobs"],
592 convert
=opts
.roman_integers
)),
593 ("maximal number of jobs simultaneously tracked by the scheduler",
594 compat
.TryToRoman(result
["max_tracked_jobs"],
595 convert
=opts
.roman_integers
)),
596 ("mac prefix", result
["mac_prefix"]),
597 ("master netdev", result
["master_netdev"]),
598 ("master netmask", compat
.TryToRoman(result
["master_netmask"],
599 opts
.roman_integers
)),
600 ("use external master IP address setup script",
601 result
["use_external_mip_script"]),
602 ("lvm volume group", result
["volume_group_name"]),
603 ("lvm reserved volumes", reserved_lvs
),
604 ("drbd usermode helper", result
["drbd_usermode_helper"]),
605 ("file storage path", result
["file_storage_dir"]),
606 ("shared file storage path", result
["shared_file_storage_dir"]),
607 ("gluster storage path", result
["gluster_storage_dir"]),
608 ("maintenance of node health", result
["maintain_node_health"]),
609 ("uid pool", uidpool
.FormatUidPool(result
["uid_pool"])),
610 ("default instance allocator", result
["default_iallocator"]),
611 ("default instance allocator parameters",
612 result
["default_iallocator_params"]),
613 ("primary ip version", compat
.TryToRoman(result
["primary_ip_version"],
614 opts
.roman_integers
)),
615 ("preallocation wipe disks", result
["prealloc_wipe_disks"]),
616 ("OS search path", utils
.CommaJoin(pathutils
.OS_SEARCH_PATH
)),
617 ("ExtStorage Providers search path",
618 utils
.CommaJoin(pathutils
.ES_SEARCH_PATH
)),
619 ("enabled disk templates",
620 utils
.CommaJoin(result
["enabled_disk_templates"])),
621 ("install image", result
["install_image"]),
622 ("instance communication network",
623 result
["instance_communication_network"]),
624 ("zeroing image", result
["zeroing_image"]),
625 ("compression tools", result
["compression_tools"]),
626 ("enabled user shutdown", result
["enabled_user_shutdown"]),
627 ("modify ssh setup", result
["modify_ssh_setup"]),
628 ("ssh_key_type", result
["ssh_key_type"]),
629 ("ssh_key_bits", result
["ssh_key_bits"]),
632 ("Default node parameters",
633 _FormatGroupedParams(result
["ndparams"], roman
=opts
.roman_integers
)),
635 ("Default instance parameters",
636 _FormatGroupedParams(result
["beparams"], roman
=opts
.roman_integers
)),
638 ("Default nic parameters",
639 _FormatGroupedParams(result
["nicparams"], roman
=opts
.roman_integers
)),
641 ("Default disk parameters",
642 _FormatGroupedParams(result
["diskparams"], roman
=opts
.roman_integers
)),
644 ("Instance policy - limits for instances",
645 FormatPolicyInfo(result
["ipolicy"], None, True, opts
.roman_integers
)),
646 ("Data collectors", _FormatDataCollectors(result
)),
649 PrintGenericInfo(info
)
653 def ClusterCopyFile(opts
, args
):
654 """Copy a file from master to some nodes.
656 @param opts: the command line options selected by the user
658 @param args: should contain only one element, the path of
659 the file to be copied
661 @return: the desired exit code
665 filename
= os
.path
.abspath(filename
)
667 if not os
.path
.exists(filename
):
668 raise errors
.OpPrereqError("No such filename '%s'" % filename
,
674 cluster_name
= cl
.QueryConfigValues(["cluster_name"])[0]
676 results
= GetOnlineNodes(nodes
=opts
.nodes
, cl
=qcl
, filter_master
=True,
677 secondary_ips
=opts
.use_replication_network
,
678 nodegroup
=opts
.nodegroup
)
679 ports
= GetNodesSshPorts(opts
.nodes
, qcl
)
684 srun
= ssh
.SshRunner(cluster_name
)
685 for (node
, port
) in zip(results
, ports
):
686 if not srun
.CopyFileToNode(node
, port
, filename
):
687 ToStderr("Copy of file %s to node %s:%d failed", filename
, node
, port
)
692 def RunClusterCommand(opts
, args
):
693 """Run a command on some nodes.
695 @param opts: the command line options selected by the user
697 @param args: should contain the command to be run and its arguments
699 @return: the desired exit code
705 command
= " ".join(args
)
707 nodes
= GetOnlineNodes(nodes
=opts
.nodes
, cl
=qcl
, nodegroup
=opts
.nodegroup
)
708 ports
= GetNodesSshPorts(nodes
, qcl
)
710 cluster_name
, master_node
= cl
.QueryConfigValues(["cluster_name",
713 srun
= ssh
.SshRunner(cluster_name
=cluster_name
)
715 # Make sure master node is at list end
716 if master_node
in nodes
:
717 nodes
.remove(master_node
)
718 nodes
.append(master_node
)
720 for (name
, port
) in zip(nodes
, ports
):
721 result
= srun
.Run(name
, constants
.SSH_LOGIN_USER
, command
, port
=port
)
723 if opts
.failure_only
and result
.exit_code
== constants
.EXIT_SUCCESS
:
724 # Do not output anything for successful commands
727 ToStdout("------------------------------------------------")
728 if opts
.show_machine_names
:
729 for line
in result
.output
.splitlines():
730 ToStdout("%s: %s", name
, line
)
732 ToStdout("node: %s", name
)
733 ToStdout("%s", result
.output
)
734 ToStdout("return code = %s", result
.exit_code
)
739 def VerifyCluster(opts
, args
):
740 """Verify integrity of cluster, performing various test on nodes.
742 @param opts: the command line options selected by the user
744 @param args: should be an empty list
746 @return: the desired exit code
751 if opts
.skip_nplusone_mem
:
752 skip_checks
.append(constants
.VERIFY_NPLUSONE_MEM
)
756 op
= opcodes
.OpClusterVerify(verbose
=opts
.verbose
,
757 error_codes
=opts
.error_codes
,
758 debug_simulate_errors
=opts
.simulate_errors
,
759 skip_checks
=skip_checks
,
760 ignore_errors
=opts
.ignore_errors
,
761 group_name
=opts
.nodegroup
,
762 verify_clutter
=opts
.verify_clutter
)
763 result
= SubmitOpCode(op
, cl
=cl
, opts
=opts
)
765 # Keep track of submitted jobs
766 jex
= JobExecutor(cl
=cl
, opts
=opts
)
768 for (status
, job_id
) in result
[constants
.JOB_IDS_KEY
]:
769 jex
.AddJobId(None, status
, job_id
)
771 results
= jex
.GetResults()
773 (bad_jobs
, bad_results
) = \
775 # Convert iterators to lists
778 map(compat
.partial(itertools
.ifilterfalse
, bool),
779 # Convert result to booleans in a tuple
780 zip(*((job_success
, len(op_results
) == 1 and op_results
[0])
781 for (job_success
, op_results
) in results
)))))
783 if bad_jobs
== 0 and bad_results
== 0:
784 rcode
= constants
.EXIT_SUCCESS
786 rcode
= constants
.EXIT_FAILURE
788 ToStdout("%s job(s) failed while verifying the cluster.", bad_jobs
)
793 def VerifyDisks(opts
, args
):
794 """Verify integrity of cluster disks.
796 @param opts: the command line options selected by the user
798 @param args: should be an empty list
800 @return: the desired exit code
805 op
= opcodes
.OpClusterVerifyDisks(group_name
=opts
.nodegroup
)
807 result
= SubmitOpCode(op
, cl
=cl
, opts
=opts
)
809 # Keep track of submitted jobs
810 jex
= JobExecutor(cl
=cl
, opts
=opts
)
812 for (status
, job_id
) in result
[constants
.JOB_IDS_KEY
]:
813 jex
.AddJobId(None, status
, job_id
)
815 retcode
= constants
.EXIT_SUCCESS
817 for (status
, result
) in jex
.GetResults():
819 ToStdout("Job failed: %s", result
)
822 ((bad_nodes
, instances
, missing
), ) = result
824 for node
, text
in bad_nodes
.items():
825 ToStdout("Error gathering data on node %s: %s",
826 node
, utils
.SafeEncode(text
[-400:]))
827 retcode
= constants
.EXIT_FAILURE
828 ToStdout("You need to fix these nodes first before fixing instances")
830 for iname
in instances
:
833 op
= opcodes
.OpInstanceActivateDisks(instance_name
=iname
)
835 ToStdout("Activating disks for instance '%s'", iname
)
836 SubmitOpCode(op
, opts
=opts
, cl
=cl
)
837 except errors
.GenericError
, err
:
838 nret
, msg
= FormatError(err
)
840 ToStderr("Error activating disks for instance %s: %s", iname
, msg
)
843 for iname
, ival
in missing
.iteritems():
844 all_missing
= compat
.all(x
[0] in bad_nodes
for x
in ival
)
846 ToStdout("Instance %s cannot be verified as it lives on"
847 " broken nodes", iname
)
849 ToStdout("Instance %s has missing logical volumes:", iname
)
851 for node
, vol
in ival
:
852 if node
in bad_nodes
:
853 ToStdout("\tbroken node %s /dev/%s", node
, vol
)
855 ToStdout("\t%s /dev/%s", node
, vol
)
857 ToStdout("You need to replace or recreate disks for all the above"
858 " instances if this message persists after fixing broken nodes.")
859 retcode
= constants
.EXIT_FAILURE
861 ToStdout("No disks need to be activated.")
866 def RepairDiskSizes(opts
, args
):
867 """Verify sizes of cluster disks.
869 @param opts: the command line options selected by the user
871 @param args: optional list of instances to restrict check to
873 @return: the desired exit code
876 op
= opcodes
.OpClusterRepairDiskSizes(instances
=args
)
877 SubmitOpCode(op
, opts
=opts
)
881 def MasterFailover(opts
, args
):
882 """Failover the master node.
884 This command, when run on a non-master node, will cause the current
885 master to cease being master, and the non-master to become new
888 @param opts: the command line options selected by the user
890 @param args: should be an empty list
892 @return: the desired exit code
895 if not opts
.no_voting
:
896 # Verify that a majority of nodes is still healthy
897 if not bootstrap
.MajorityHealthy():
898 ToStderr("Master-failover with voting is only possible if the majority"
899 " of nodes is still healthy; use the --no-voting option after"
900 " ensuring by other means that you won't end up in a dual-master"
903 if opts
.no_voting
and not opts
.yes_do_it
:
904 usertext
= ("This will perform the failover even if most other nodes"
905 " are down, or if this node is outdated. This is dangerous"
906 " as it can lead to a non-consistent cluster. Check the"
907 " gnt-cluster(8) man page before proceeding. Continue?")
908 if not AskUser(usertext
):
911 rvlaue
, msgs
= bootstrap
.MasterFailover(no_voting
=opts
.no_voting
)
917 def MasterPing(opts
, args
):
918 """Checks if the master is alive.
920 @param opts: the command line options selected by the user
922 @param args: should be an empty list
924 @return: the desired exit code
929 cl
.QueryClusterInfo()
931 except Exception: # pylint: disable=W0703
935 def SearchTags(opts
, args
):
936 """Searches the tags on all the cluster.
938 @param opts: the command line options selected by the user
940 @param args: should contain only one element, the tag pattern
942 @return: the desired exit code
945 op
= opcodes
.OpTagsSearch(pattern
=args
[0])
946 result
= SubmitOpCode(op
, opts
=opts
)
949 result
= list(result
)
951 for path
, tag
in result
:
952 ToStdout("%s %s", path
, tag
)
955 def _ReadAndVerifyCert(cert_filename
, verify_private_key
=False):
956 """Reads and verifies an X509 certificate.
958 @type cert_filename: string
959 @param cert_filename: the path of the file containing the certificate to
960 verify encoded in PEM format
961 @type verify_private_key: bool
962 @param verify_private_key: whether to verify the private key in addition to
963 the public certificate
965 @return: a string containing the PEM-encoded certificate.
969 pem
= utils
.ReadFile(cert_filename
)
971 raise errors
.X509CertError(cert_filename
,
972 "Unable to read certificate: %s" % str(err
))
975 OpenSSL
.crypto
.load_certificate(OpenSSL
.crypto
.FILETYPE_PEM
, pem
)
976 except Exception, err
:
977 raise errors
.X509CertError(cert_filename
,
978 "Unable to load certificate: %s" % str(err
))
980 if verify_private_key
:
982 OpenSSL
.crypto
.load_privatekey(OpenSSL
.crypto
.FILETYPE_PEM
, pem
)
983 except Exception, err
:
984 raise errors
.X509CertError(cert_filename
,
985 "Unable to load private key: %s" % str(err
))
990 # pylint: disable=R0913
991 def _RenewCrypto(new_cluster_cert
, new_rapi_cert
, # pylint: disable=R0911
992 rapi_cert_filename
, new_spice_cert
, spice_cert_filename
,
993 spice_cacert_filename
, new_confd_hmac_key
, new_cds
,
994 cds_filename
, force
, new_node_cert
, new_ssh_keys
,
995 ssh_key_type
, ssh_key_bits
, verbose
, debug
):
996 """Renews cluster certificates, keys and secrets.
998 @type new_cluster_cert: bool
999 @param new_cluster_cert: Whether to generate a new cluster certificate
1000 @type new_rapi_cert: bool
1001 @param new_rapi_cert: Whether to generate a new RAPI certificate
1002 @type rapi_cert_filename: string
1003 @param rapi_cert_filename: Path to file containing new RAPI certificate
1004 @type new_spice_cert: bool
1005 @param new_spice_cert: Whether to generate a new SPICE certificate
1006 @type spice_cert_filename: string
1007 @param spice_cert_filename: Path to file containing new SPICE certificate
1008 @type spice_cacert_filename: string
1009 @param spice_cacert_filename: Path to file containing the certificate of the
1010 CA that signed the SPICE certificate
1011 @type new_confd_hmac_key: bool
1012 @param new_confd_hmac_key: Whether to generate a new HMAC key
1014 @param new_cds: Whether to generate a new cluster domain secret
1015 @type cds_filename: string
1016 @param cds_filename: Path to file containing new cluster domain secret
1018 @param force: Whether to ask user for confirmation
1019 @type new_node_cert: bool
1020 @param new_node_cert: Whether to generate new node certificates
1021 @type new_ssh_keys: bool
1022 @param new_ssh_keys: Whether to generate new node SSH keys
1023 @type ssh_key_type: One of L{constants.SSHK_ALL}
1024 @param ssh_key_type: The type of SSH key to be generated
1025 @type ssh_key_bits: int
1026 @param ssh_key_bits: The length of the key to be generated
1027 @type verbose: boolean
1028 @param verbose: Show verbose output
1029 @type debug: boolean
1030 @param debug: Show debug output
1033 ToStdout("Updating certificates now. Running \"gnt-cluster verify\" "
1034 " is recommended after this operation.")
1036 if new_rapi_cert
and rapi_cert_filename
:
1037 ToStderr("Only one of the --new-rapi-certificate and --rapi-certificate"
1038 " options can be specified at the same time.")
1041 if new_cds
and cds_filename
:
1042 ToStderr("Only one of the --new-cluster-domain-secret and"
1043 " --cluster-domain-secret options can be specified at"
1047 if new_spice_cert
and (spice_cert_filename
or spice_cacert_filename
):
1048 ToStderr("When using --new-spice-certificate, the --spice-certificate"
1049 " and --spice-ca-certificate must not be used.")
1052 if bool(spice_cacert_filename
) ^
bool(spice_cert_filename
):
1053 ToStderr("Both --spice-certificate and --spice-ca-certificate must be"
1057 rapi_cert_pem
, spice_cert_pem
, spice_cacert_pem
= (None, None, None)
1059 if rapi_cert_filename
:
1060 rapi_cert_pem
= _ReadAndVerifyCert(rapi_cert_filename
, True)
1061 if spice_cert_filename
:
1062 spice_cert_pem
= _ReadAndVerifyCert(spice_cert_filename
, True)
1063 spice_cacert_pem
= _ReadAndVerifyCert(spice_cacert_filename
)
1064 except errors
.X509CertError
, err
:
1065 ToStderr("Unable to load X509 certificate from %s: %s", err
[0], err
[1])
1070 cds
= utils
.ReadFile(cds_filename
)
1071 except Exception, err
: # pylint: disable=W0703
1072 ToStderr("Can't load new cluster domain secret from %s: %s" %
1073 (cds_filename
, str(err
)))
1079 usertext
= ("This requires all daemons on all nodes to be restarted and"
1080 " may take some time. Continue?")
1081 if not AskUser(usertext
):
1084 def _RenewCryptoInner(ctx
):
1085 ctx
.feedback_fn("Updating certificates and keys")
1087 bootstrap
.GenerateClusterCrypto(False,
1094 rapi_cert_pem
=rapi_cert_pem
,
1095 spice_cert_pem
=spice_cert_pem
,
1096 spice_cacert_pem
=spice_cacert_pem
,
1101 if new_rapi_cert
or rapi_cert_pem
:
1102 files_to_copy
.append(pathutils
.RAPI_CERT_FILE
)
1104 if new_spice_cert
or spice_cert_pem
:
1105 files_to_copy
.append(pathutils
.SPICE_CERT_FILE
)
1106 files_to_copy
.append(pathutils
.SPICE_CACERT_FILE
)
1108 if new_confd_hmac_key
:
1109 files_to_copy
.append(pathutils
.CONFD_HMAC_KEY
)
1112 files_to_copy
.append(pathutils
.CLUSTER_DOMAIN_SECRET_FILE
)
1115 for node_name
in ctx
.nonmaster_nodes
:
1116 port
= ctx
.ssh_ports
[node_name
]
1117 ctx
.feedback_fn("Copying %s to %s:%d" %
1118 (", ".join(files_to_copy
), node_name
, port
))
1119 for file_name
in files_to_copy
:
1120 ctx
.ssh
.CopyFileToNode(node_name
, port
, file_name
)
1122 def _RenewClientCerts(ctx
):
1123 ctx
.feedback_fn("Updating client SSL certificates.")
1125 cluster_name
= ssconf
.SimpleStore().GetClusterName()
1127 for node_name
in ctx
.nonmaster_nodes
+ [ctx
.master_node
]:
1128 ssh_port
= ctx
.ssh_ports
[node_name
]
1130 constants
.NDS_CLUSTER_NAME
: cluster_name
,
1131 constants
.NDS_NODE_DAEMON_CERTIFICATE
:
1132 utils
.ReadFile(pathutils
.NODED_CERT_FILE
),
1133 constants
.NDS_NODE_NAME
: node_name
,
1134 constants
.NDS_ACTION
: constants
.CRYPTO_ACTION_CREATE
,
1137 ssh
.RunSshCmdWithStdin(
1140 pathutils
.SSL_UPDATE
,
1144 verbose
=ctx
.verbose
,
1145 use_cluster_key
=True,
1147 strict_host_check
=True)
1149 # Create a temporary ssconf file using the master's client cert digest
1150 # and the 'bootstrap' keyword to enable distribution of all nodes' digests.
1151 master_digest
= utils
.GetCertificateDigest()
1152 ssconf_master_candidate_certs_filename
= os
.path
.join(
1153 pathutils
.DATA_DIR
, "%s%s" %
1154 (constants
.SSCONF_FILEPREFIX
, constants
.SS_MASTER_CANDIDATES_CERTS
))
1156 ssconf_master_candidate_certs_filename
,
1157 data
="%s=%s" % (constants
.CRYPTO_BOOTSTRAP
, master_digest
))
1158 for node_name
in ctx
.nonmaster_nodes
:
1159 port
= ctx
.ssh_ports
[node_name
]
1160 ctx
.feedback_fn("Copying %s to %s:%d" %
1161 (ssconf_master_candidate_certs_filename
, node_name
, port
))
1162 ctx
.ssh
.CopyFileToNode(node_name
, port
,
1163 ssconf_master_candidate_certs_filename
)
1165 # Write the boostrap entry to the config using wconfd.
1166 config_live_lock
= utils
.livelock
.LiveLock("renew_crypto")
1167 cfg
= config
.GetConfig(None, config_live_lock
)
1168 cfg
.AddNodeToCandidateCerts(constants
.CRYPTO_BOOTSTRAP
, master_digest
)
1169 cfg
.Update(cfg
.GetClusterInfo(), ctx
.feedback_fn
)
1171 def _RenewServerAndClientCerts(ctx
):
1172 ctx
.feedback_fn("Updating the cluster SSL certificate.")
1174 master_name
= ssconf
.SimpleStore().GetMasterNode()
1175 bootstrap
.GenerateClusterCrypto(True, # cluster cert
1178 False, # confd hmac key
1183 for node_name
in ctx
.nonmaster_nodes
:
1184 port
= ctx
.ssh_ports
[node_name
]
1185 server_cert
= pathutils
.NODED_CERT_FILE
1186 ctx
.feedback_fn("Copying %s to %s:%d" %
1187 (server_cert
, node_name
, port
))
1188 ctx
.ssh
.CopyFileToNode(node_name
, port
, server_cert
)
1190 _RenewClientCerts(ctx
)
1192 if new_rapi_cert
or new_spice_cert
or new_confd_hmac_key
or new_cds
:
1193 RunWhileClusterStopped(ToStdout
, _RenewCryptoInner
)
1195 # If only node certficates are recreated, call _RenewClientCerts only.
1196 if new_node_cert
and not new_cluster_cert
:
1197 RunWhileDaemonsStopped(ToStdout
, [constants
.NODED
, constants
.WCONFD
],
1198 _RenewClientCerts
, verbose
=verbose
, debug
=debug
)
1200 # If the cluster certificate are renewed, the client certificates need
1201 # to be renewed too.
1202 if new_cluster_cert
:
1203 RunWhileDaemonsStopped(ToStdout
, [constants
.NODED
, constants
.WCONFD
],
1204 _RenewServerAndClientCerts
, verbose
=verbose
,
1207 if new_node_cert
or new_cluster_cert
or new_ssh_keys
:
1209 renew_op
= opcodes
.OpClusterRenewCrypto(
1210 node_certificates
=new_node_cert
or new_cluster_cert
,
1211 renew_ssh_keys
=new_ssh_keys
,
1212 ssh_key_type
=ssh_key_type
,
1213 ssh_key_bits
=ssh_key_bits
,
1216 SubmitOpCode(renew_op
, cl
=cl
)
1218 ToStdout("All requested certificates and keys have been replaced."
1219 " Running \"gnt-cluster verify\" now is recommended.")
1224 def _BuildGanetiPubKeys(options
, pub_key_file
=pathutils
.SSH_PUB_KEYS
, cl
=None,
1225 get_online_nodes_fn
=GetOnlineNodes
,
1226 get_nodes_ssh_ports_fn
=GetNodesSshPorts
,
1227 get_node_uuids_fn
=GetNodeUUIDs
,
1229 """Recreates the 'ganeti_pub_key' file by polling all nodes.
1236 (cluster_name
, master_node
, modify_ssh_setup
, ssh_key_type
) = \
1237 cl
.QueryConfigValues(["cluster_name", "master_node", "modify_ssh_setup",
1240 # In case Ganeti is not supposed to modify the SSH setup, simply exit and do
1241 # not update this file.
1242 if not modify_ssh_setup
:
1245 if os
.path
.exists(pub_key_file
):
1246 utils
.CreateBackup(pub_key_file
)
1247 utils
.RemoveFile(pub_key_file
)
1249 ssh
.ClearPubKeyFile(pub_key_file
)
1251 online_nodes
= get_online_nodes_fn([], cl
=cl
)
1252 ssh_ports
= get_nodes_ssh_ports_fn(online_nodes
+ [master_node
], cl
)
1253 ssh_port_map
= dict(zip(online_nodes
+ [master_node
], ssh_ports
))
1255 node_uuids
= get_node_uuids_fn(online_nodes
+ [master_node
], cl
)
1256 node_uuid_map
= dict(zip(online_nodes
+ [master_node
], node_uuids
))
1258 nonmaster_nodes
= [name
for name
in online_nodes
1259 if name
!= master_node
]
1261 _
, pub_key_filename
, _
= \
1262 ssh
.GetUserFiles(constants
.SSH_LOGIN_USER
, mkdir
=False, dircheck
=False,
1263 kind
=ssh_key_type
, _homedir_fn
=homedir_fn
)
1265 # get the key file of the master node
1266 pub_key
= utils
.ReadFile(pub_key_filename
)
1267 ssh
.AddPublicKey(node_uuid_map
[master_node
], pub_key
,
1268 key_file
=pub_key_file
)
1270 # get the key files of all non-master nodes
1271 for node
in nonmaster_nodes
:
1272 pub_key
= ssh
.ReadRemoteSshPubKey(pub_key_filename
, node
, cluster_name
,
1274 options
.ssh_key_check
,
1275 options
.ssh_key_check
)
1276 ssh
.AddPublicKey(node_uuid_map
[node
], pub_key
, key_file
=pub_key_file
)
1279 def RenewCrypto(opts
, args
):
1280 """Renews cluster certificates, keys and secrets.
1283 if opts
.new_ssh_keys
:
1284 _BuildGanetiPubKeys(opts
)
1285 return _RenewCrypto(opts
.new_cluster_cert
,
1288 opts
.new_spice_cert
,
1291 opts
.new_confd_hmac_key
,
1292 opts
.new_cluster_domain_secret
,
1293 opts
.cluster_domain_secret
,
1303 def _GetEnabledDiskTemplates(opts
):
1304 """Determine the list of enabled disk templates.
1307 if opts
.enabled_disk_templates
:
1308 return opts
.enabled_disk_templates
.split(",")
1313 def _GetVgName(opts
, enabled_disk_templates
):
1314 """Determine the volume group name.
1316 @type enabled_disk_templates: list of strings
1317 @param enabled_disk_templates: cluster-wide enabled disk-templates
1320 # consistency between vg name and enabled disk templates
1322 if opts
.vg_name
is not None:
1323 vg_name
= opts
.vg_name
1324 if enabled_disk_templates
:
1325 if vg_name
and not utils
.IsLvmEnabled(enabled_disk_templates
):
1326 ToStdout("You specified a volume group with --vg-name, but you did not"
1327 " enable any of the following lvm-based disk templates: %s" %
1328 utils
.CommaJoin(constants
.DTS_LVM
))
1332 def _GetDrbdHelper(opts
, enabled_disk_templates
):
1333 """Determine the DRBD usermode helper.
1336 drbd_helper
= opts
.drbd_helper
1337 if enabled_disk_templates
:
1338 drbd_enabled
= constants
.DT_DRBD8
in enabled_disk_templates
1339 if not drbd_enabled
and opts
.drbd_helper
:
1340 ToStdout("You specified a DRBD usermode helper with "
1341 " --drbd-usermode-helper while DRBD is not enabled.")
1345 def _GetCompressionTools(opts
):
1346 """Determine the list of custom compression tools.
1349 if opts
.compression_tools
:
1350 return opts
.compression_tools
.split(",")
1351 elif opts
.compression_tools
is None:
1352 return None # To note the parameter was not provided
1354 return constants
.IEC_DEFAULT_TOOLS
# Resetting to default
1357 def SetClusterParams(opts
, args
):
1358 """Modify the cluster.
1360 @param opts: the command line options selected by the user
1362 @param args: should be an empty list
1364 @return: the desired exit code
1367 if not (opts
.vg_name
is not None or
1368 opts
.drbd_helper
is not None or
1369 opts
.enabled_hypervisors
or opts
.hvparams
or
1370 opts
.beparams
or opts
.nicparams
or
1371 opts
.ndparams
or opts
.diskparams
or
1372 opts
.candidate_pool_size
is not None or
1373 opts
.max_running_jobs
is not None or
1374 opts
.max_tracked_jobs
is not None or
1375 opts
.uid_pool
is not None or
1376 opts
.maintain_node_health
is not None or
1377 opts
.add_uids
is not None or
1378 opts
.remove_uids
is not None or
1379 opts
.default_iallocator
is not None or
1380 opts
.default_iallocator_params
is not None or
1381 opts
.reserved_lvs
is not None or
1382 opts
.mac_prefix
is not None or
1383 opts
.master_netdev
is not None or
1384 opts
.master_netmask
is not None or
1385 opts
.use_external_mip_script
is not None or
1386 opts
.prealloc_wipe_disks
is not None or
1388 opts
.enabled_disk_templates
or
1390 opts
.ipolicy_bounds_specs
is not None or
1391 opts
.ipolicy_std_specs
is not None or
1392 opts
.ipolicy_disk_templates
is not None or
1393 opts
.ipolicy_vcpu_ratio
is not None or
1394 opts
.ipolicy_spindle_ratio
is not None or
1395 opts
.ipolicy_memory_ratio
is not None or
1396 opts
.modify_etc_hosts
is not None or
1397 opts
.file_storage_dir
is not None or
1398 opts
.install_image
is not None or
1399 opts
.instance_communication_network
is not None or
1400 opts
.zeroing_image
is not None or
1401 opts
.shared_file_storage_dir
is not None or
1402 opts
.compression_tools
is not None or
1403 opts
.shared_file_storage_dir
is not None or
1404 opts
.enabled_user_shutdown
is not None or
1405 opts
.maint_round_delay
is not None or
1406 opts
.maint_balance
is not None or
1407 opts
.maint_balance_threshold
is not None or
1408 opts
.data_collector_interval
or
1409 opts
.diagnose_data_collector_filename
is not None or
1410 opts
.enabled_data_collectors
):
1411 ToStderr("Please give at least one of the parameters.")
1414 enabled_disk_templates
= _GetEnabledDiskTemplates(opts
)
1415 vg_name
= _GetVgName(opts
, enabled_disk_templates
)
1418 drbd_helper
= _GetDrbdHelper(opts
, enabled_disk_templates
)
1419 except errors
.OpPrereqError
, e
:
1423 hvlist
= opts
.enabled_hypervisors
1424 if hvlist
is not None:
1425 hvlist
= hvlist
.split(",")
1427 # a list of (name, dict) we can pass directly to dict() (or [])
1428 hvparams
= dict(opts
.hvparams
)
1429 for hv_params
in hvparams
.values():
1430 utils
.ForceDictType(hv_params
, constants
.HVS_PARAMETER_TYPES
)
1432 diskparams
= dict(opts
.diskparams
)
1434 for dt_params
in diskparams
.values():
1435 utils
.ForceDictType(dt_params
, constants
.DISK_DT_TYPES
)
1437 beparams
= opts
.beparams
1438 utils
.ForceDictType(beparams
, constants
.BES_PARAMETER_COMPAT
)
1440 nicparams
= opts
.nicparams
1441 utils
.ForceDictType(nicparams
, constants
.NICS_PARAMETER_TYPES
)
1443 ndparams
= opts
.ndparams
1444 if ndparams
is not None:
1445 utils
.ForceDictType(ndparams
, constants
.NDS_PARAMETER_TYPES
)
1447 ipolicy
= CreateIPolicyFromOpts(
1448 minmax_ispecs
=opts
.ipolicy_bounds_specs
,
1449 std_ispecs
=opts
.ipolicy_std_specs
,
1450 ipolicy_disk_templates
=opts
.ipolicy_disk_templates
,
1451 ipolicy_vcpu_ratio
=opts
.ipolicy_vcpu_ratio
,
1452 ipolicy_spindle_ratio
=opts
.ipolicy_spindle_ratio
,
1453 ipolicy_memory_ratio
=opts
.ipolicy_memory_ratio
,
1456 mnh
= opts
.maintain_node_health
1458 uid_pool
= opts
.uid_pool
1459 if uid_pool
is not None:
1460 uid_pool
= uidpool
.ParseUidPool(uid_pool
)
1462 add_uids
= opts
.add_uids
1463 if add_uids
is not None:
1464 add_uids
= uidpool
.ParseUidPool(add_uids
)
1466 remove_uids
= opts
.remove_uids
1467 if remove_uids
is not None:
1468 remove_uids
= uidpool
.ParseUidPool(remove_uids
)
1470 if opts
.reserved_lvs
is not None:
1471 if opts
.reserved_lvs
== "":
1472 opts
.reserved_lvs
= []
1474 opts
.reserved_lvs
= utils
.UnescapeAndSplit(opts
.reserved_lvs
, sep
=",")
1476 if opts
.master_netmask
is not None:
1478 opts
.master_netmask
= int(opts
.master_netmask
)
1480 ToStderr("The --master-netmask option expects an int parameter.")
1483 ext_ip_script
= opts
.use_external_mip_script
1486 disk_state
= utils
.FlatToDict(opts
.disk_state
)
1490 hv_state
= dict(opts
.hv_state
)
1492 compression_tools
= _GetCompressionTools(opts
)
1494 enabled_data_collectors
= dict(
1495 (k
, v
.lower().startswith("t"))
1496 for k
, v
in opts
.enabled_data_collectors
.items())
1498 unrecognized_data_collectors
= [
1499 k
for k
in enabled_data_collectors
.keys()
1500 if k
not in constants
.DATA_COLLECTOR_NAMES
]
1501 if unrecognized_data_collectors
:
1502 ToStderr("Data collector names not recognized: %s" %
1503 ", ".join(unrecognized_data_collectors
))
1506 data_collector_interval
= dict(
1507 (k
, long(1e6
* float(v
)))
1508 for (k
, v
) in opts
.data_collector_interval
.items())
1510 ToStderr("Can't transform all values to integers: {}".format(
1511 opts
.data_collector_interval
))
1513 if any(v
<= 0 for v
in data_collector_interval
):
1514 ToStderr("Some interval times where not above zero.")
1517 op
= opcodes
.OpClusterSetParams(
1519 drbd_helper
=drbd_helper
,
1520 enabled_hypervisors
=hvlist
,
1524 nicparams
=nicparams
,
1526 diskparams
=diskparams
,
1528 candidate_pool_size
=opts
.candidate_pool_size
,
1529 max_running_jobs
=opts
.max_running_jobs
,
1530 max_tracked_jobs
=opts
.max_tracked_jobs
,
1531 maintain_node_health
=mnh
,
1532 modify_etc_hosts
=opts
.modify_etc_hosts
,
1535 remove_uids
=remove_uids
,
1536 default_iallocator
=opts
.default_iallocator
,
1537 default_iallocator_params
=opts
.default_iallocator_params
,
1538 prealloc_wipe_disks
=opts
.prealloc_wipe_disks
,
1539 mac_prefix
=opts
.mac_prefix
,
1540 master_netdev
=opts
.master_netdev
,
1541 master_netmask
=opts
.master_netmask
,
1542 reserved_lvs
=opts
.reserved_lvs
,
1543 use_external_mip_script
=ext_ip_script
,
1545 disk_state
=disk_state
,
1546 enabled_disk_templates
=enabled_disk_templates
,
1548 file_storage_dir
=opts
.file_storage_dir
,
1549 install_image
=opts
.install_image
,
1550 instance_communication_network
=opts
.instance_communication_network
,
1551 zeroing_image
=opts
.zeroing_image
,
1552 shared_file_storage_dir
=opts
.shared_file_storage_dir
,
1553 compression_tools
=compression_tools
,
1554 enabled_user_shutdown
=opts
.enabled_user_shutdown
,
1555 maint_round_delay
=opts
.maint_round_delay
,
1556 maint_balance
=opts
.maint_balance
,
1557 maint_balance_threshold
=opts
.maint_balance_threshold
,
1558 enabled_data_collectors
=enabled_data_collectors
,
1559 data_collector_interval
=data_collector_interval
,
1560 diagnose_data_collector_filename
=opts
.diagnose_data_collector_filename
1562 return base
.GetResult(None, opts
, SubmitOrSend(op
, opts
))
1565 def QueueOps(opts
, args
):
1566 """Queue operations.
1568 @param opts: the command line options selected by the user
1570 @param args: should contain only one element, the subcommand
1572 @return: the desired exit code
1576 client
= GetClient()
1577 if command
in ("drain", "undrain"):
1578 drain_flag
= command
== "drain"
1579 client
.SetQueueDrainFlag(drain_flag
)
1580 elif command
== "info":
1581 result
= client
.QueryConfigValues(["drain_flag"])
1586 ToStdout("The drain flag is %s" % val
)
1588 raise errors
.OpPrereqError("Command '%s' is not valid." % command
,
1594 def _ShowWatcherPause(until
):
1595 if until
is None or until
< time
.time():
1596 ToStdout("The watcher is not paused.")
1598 ToStdout("The watcher is paused until %s.", time
.ctime(until
))
1601 def WatcherOps(opts
, args
):
1602 """Watcher operations.
1604 @param opts: the command line options selected by the user
1606 @param args: should contain only one element, the subcommand
1608 @return: the desired exit code
1612 client
= GetClient()
1614 if command
== "continue":
1615 client
.SetWatcherPause(None)
1616 ToStdout("The watcher is no longer paused.")
1618 elif command
== "pause":
1620 raise errors
.OpPrereqError("Missing pause duration", errors
.ECODE_INVAL
)
1622 result
= client
.SetWatcherPause(time
.time() + ParseTimespec(args
[1]))
1623 _ShowWatcherPause(result
)
1625 elif command
== "info":
1626 result
= client
.QueryConfigValues(["watcher_pause"])
1627 _ShowWatcherPause(result
[0])
1630 raise errors
.OpPrereqError("Command '%s' is not valid." % command
,
1636 def _OobPower(opts
, node_list
, power
):
1637 """Puts the node in the list to desired power state.
1639 @param opts: The command line options selected by the user
1640 @param node_list: The list of nodes to operate on
1641 @param power: True if they should be powered on, False otherwise
1642 @return: The success of the operation (none failed)
1646 command
= constants
.OOB_POWER_ON
1648 command
= constants
.OOB_POWER_OFF
1650 op
= opcodes
.OpOobCommand(node_names
=node_list
,
1653 timeout
=opts
.oob_timeout
,
1654 power_delay
=opts
.power_delay
)
1655 result
= SubmitOpCode(op
, opts
=opts
)
1657 for node_result
in result
:
1658 (node_tuple
, data_tuple
) = node_result
1659 (_
, node_name
) = node_tuple
1660 (data_status
, _
) = data_tuple
1661 if data_status
!= constants
.RS_NORMAL
:
1662 assert data_status
!= constants
.RS_UNAVAIL
1664 ToStderr("There was a problem changing power for %s, please investigate",
1673 def _InstanceStart(opts
, inst_list
, start
, no_remember
=False):
1674 """Puts the instances in the list to desired state.
1676 @param opts: The command line options selected by the user
1677 @param inst_list: The list of instances to operate on
1678 @param start: True if they should be started, False for shutdown
1679 @param no_remember: If the instance state should be remembered
1680 @return: The success of the operation (none failed)
1684 opcls
= opcodes
.OpInstanceStartup
1685 text_submit
, text_success
, text_failed
= ("startup", "started", "starting")
1687 opcls
= compat
.partial(opcodes
.OpInstanceShutdown
,
1688 timeout
=opts
.shutdown_timeout
,
1689 no_remember
=no_remember
)
1690 text_submit
, text_success
, text_failed
= ("shutdown", "stopped", "stopping")
1692 jex
= JobExecutor(opts
=opts
)
1694 for inst
in inst_list
:
1695 ToStdout("Submit %s of instance %s", text_submit
, inst
)
1696 op
= opcls(instance_name
=inst
)
1697 jex
.QueueJob(inst
, op
)
1699 results
= jex
.GetResults()
1700 bad_cnt
= len([1 for (success
, _
) in results
if not success
])
1703 ToStdout("All instances have been %s successfully", text_success
)
1705 ToStderr("There were errors while %s instances:\n"
1706 "%d error(s) out of %d instance(s)", text_failed
, bad_cnt
,
1713 class _RunWhenNodesReachableHelper(object):
1714 """Helper class to make shared internal state sharing easier.
1716 @ivar success: Indicates if all action_cb calls were successful
1719 def __init__(self
, node_list
, action_cb
, node2ip
, port
, feedback_fn
,
1720 _ping_fn
=netutils
.TcpPing
, _sleep_fn
=time
.sleep
):
1723 @param node_list: The list of nodes to be reachable
1724 @param action_cb: Callback called when a new host is reachable
1726 @param node2ip: Node to ip mapping
1727 @param port: The port to use for the TCP ping
1728 @param feedback_fn: The function used for feedback
1729 @param _ping_fn: Function to check reachabilty (for unittest use only)
1730 @param _sleep_fn: Function to sleep (for unittest use only)
1733 self
.down
= set(node_list
)
1735 self
.node2ip
= node2ip
1737 self
.action_cb
= action_cb
1739 self
.feedback_fn
= feedback_fn
1740 self
._ping_fn
= _ping_fn
1741 self
._sleep_fn
= _sleep_fn
1744 """When called we run action_cb.
1746 @raises utils.RetryAgain: When there are still down nodes
1749 if not self
.action_cb(self
.up
):
1750 self
.success
= False
1753 raise utils
.RetryAgain()
1757 def Wait(self
, secs
):
1758 """Checks if a host is up or waits remaining seconds.
1760 @param secs: The secs remaining
1764 for node
in self
.down
:
1765 if self
._ping_fn(self
.node2ip
[node
], self
.port
, timeout
=_EPO_PING_TIMEOUT
,
1766 live_port_needed
=True):
1767 self
.feedback_fn("Node %s became available" % node
)
1769 self
.down
-= self
.up
1770 # If we have a node available there is the possibility to run the
1771 # action callback successfully, therefore we don't wait and return
1774 self
._sleep_fn(max(0.0, start
+ secs
- time
.time()))
1777 def _RunWhenNodesReachable(node_list
, action_cb
, interval
):
1778 """Run action_cb when nodes become reachable.
1780 @param node_list: The list of nodes to be reachable
1781 @param action_cb: Callback called when a new host is reachable
1782 @param interval: The earliest time to retry
1785 client
= GetClient()
1786 cluster_info
= client
.QueryClusterInfo()
1787 if cluster_info
["primary_ip_version"] == constants
.IP4_VERSION
:
1788 family
= netutils
.IPAddress
.family
1790 family
= netutils
.IP6Address
.family
1792 node2ip
= dict((node
, netutils
.GetHostname(node
, family
=family
).ip
)
1793 for node
in node_list
)
1795 port
= netutils
.GetDaemonPort(constants
.NODED
)
1796 helper
= _RunWhenNodesReachableHelper(node_list
, action_cb
, node2ip
, port
,
1800 return utils
.Retry(helper
, interval
, _EPO_REACHABLE_TIMEOUT
,
1801 wait_fn
=helper
.Wait
)
1802 except utils
.RetryTimeout
:
1803 ToStderr("Time exceeded while waiting for nodes to become reachable"
1804 " again:\n - %s", " - ".join(helper
.down
))
1808 def _MaybeInstanceStartup(opts
, inst_map
, nodes_online
,
1809 _instance_start_fn
=_InstanceStart
):
1810 """Start the instances conditional based on node_states.
1812 @param opts: The command line options selected by the user
1813 @param inst_map: A dict of inst -> nodes mapping
1814 @param nodes_online: A list of nodes online
1815 @param _instance_start_fn: Callback to start instances (unittest use only)
1816 @return: Success of the operation on all instances
1819 start_inst_list
= []
1820 for (inst
, nodes
) in inst_map
.items():
1821 if not (nodes
- nodes_online
):
1822 # All nodes the instance lives on are back online
1823 start_inst_list
.append(inst
)
1825 for inst
in start_inst_list
:
1829 return _instance_start_fn(opts
, start_inst_list
, True)
1834 def _EpoOn(opts
, full_node_list
, node_list
, inst_map
):
1835 """Does the actual power on.
1837 @param opts: The command line options selected by the user
1838 @param full_node_list: All nodes to operate on (includes nodes not supporting
1840 @param node_list: The list of nodes to operate on (all need to support OOB)
1841 @param inst_map: A dict of inst -> nodes mapping
1842 @return: The desired exit status
1845 if node_list
and not _OobPower(opts
, node_list
, False):
1846 ToStderr("Not all nodes seem to get back up, investigate and start"
1847 " manually if needed")
1849 # Wait for the nodes to be back up
1850 action_cb
= compat
.partial(_MaybeInstanceStartup
, opts
, dict(inst_map
))
1852 ToStdout("Waiting until all nodes are available again")
1853 if not _RunWhenNodesReachable(full_node_list
, action_cb
, _EPO_PING_INTERVAL
):
1854 ToStderr("Please investigate and start stopped instances manually")
1855 return constants
.EXIT_FAILURE
1857 return constants
.EXIT_SUCCESS
1860 def _EpoOff(opts
, node_list
, inst_map
):
1861 """Does the actual power off.
1863 @param opts: The command line options selected by the user
1864 @param node_list: The list of nodes to operate on (all need to support OOB)
1865 @param inst_map: A dict of inst -> nodes mapping
1866 @return: The desired exit status
1869 if not _InstanceStart(opts
, inst_map
.keys(), False, no_remember
=True):
1870 ToStderr("Please investigate and stop instances manually before continuing")
1871 return constants
.EXIT_FAILURE
1874 return constants
.EXIT_SUCCESS
1876 if _OobPower(opts
, node_list
, False):
1877 return constants
.EXIT_SUCCESS
1879 return constants
.EXIT_FAILURE
1882 def Epo(opts
, args
, qcl
=None, _on_fn
=_EpoOn
, _off_fn
=_EpoOff
,
1883 _confirm_fn
=ConfirmOperation
,
1884 _stdout_fn
=ToStdout
, _stderr_fn
=ToStderr
):
1887 @param opts: the command line options selected by the user
1889 @param args: should contain only one element, the subcommand
1891 @return: the desired exit code
1894 if opts
.groups
and opts
.show_all
:
1895 _stderr_fn("Only one of --groups or --all are allowed")
1896 return constants
.EXIT_FAILURE
1897 elif args
and opts
.show_all
:
1898 _stderr_fn("Arguments in combination with --all are not allowed")
1899 return constants
.EXIT_FAILURE
1907 itertools
.chain(*qcl
.QueryGroups(args
, ["node_list"], False))
1909 node_query_list
= args
1911 result
= qcl
.QueryNodes(node_query_list
, ["name", "master", "pinst_list",
1912 "sinst_list", "powered", "offline"],
1915 all_nodes
= map(compat
.fst
, result
)
1918 for (node
, master
, pinsts
, sinsts
, powered
, offline
) in result
:
1920 for inst
in (pinsts
+ sinsts
):
1921 if inst
in inst_map
:
1923 inst_map
[inst
].add(node
)
1925 inst_map
[inst
] = set()
1927 inst_map
[inst
] = set([node
])
1929 if master
and opts
.on
:
1930 # We ignore the master for turning on the machines, in fact we are
1931 # already operating on the master at this point :)
1933 elif master
and not opts
.show_all
:
1934 _stderr_fn("%s is the master node, please do a master-failover to another"
1935 " node not affected by the EPO or use --all if you intend to"
1936 " shutdown the whole cluster", node
)
1937 return constants
.EXIT_FAILURE
1938 elif powered
is None:
1939 _stdout_fn("Node %s does not support out-of-band handling, it can not be"
1940 " handled in a fully automated manner", node
)
1941 elif powered
== opts
.on
:
1942 _stdout_fn("Node %s is already in desired power state, skipping", node
)
1943 elif not offline
or (offline
and powered
):
1944 node_list
.append(node
)
1946 if not (opts
.force
or _confirm_fn(all_nodes
, "nodes", "epo")):
1947 return constants
.EXIT_FAILURE
1950 return _on_fn(opts
, all_nodes
, node_list
, inst_map
)
1952 return _off_fn(opts
, node_list
, inst_map
)
1955 def RemoveRepair(opts
, args
):
1956 """Uncoditionally remove a repair event
1958 @param opts: the command line options selected by the user (ignored)
1960 @param args: one element, the uuid of the event to remove
1962 @return: the desired exit code
1966 wconfd
.Client().RmMaintdIncident(uuid
)
1970 def _GetCreateCommand(info
):
1972 buf
.write("gnt-cluster init")
1973 PrintIPolicyCommand(buf
, info
["ipolicy"], False)
1975 buf
.write(info
["name"])
1976 return buf
.getvalue()
1979 def ShowCreateCommand(opts
, args
):
1980 """Shows the command that can be used to re-create the cluster.
1982 Currently it works only for ipolicy specs.
1986 result
= cl
.QueryClusterInfo()
1987 ToStdout(_GetCreateCommand(result
))
1990 def _RunCommandAndReport(cmd
):
1991 """Run a command and report its output, iff it failed.
1993 @param cmd: the command to execute
1996 @return: False, if the execution failed.
1999 result
= utils
.RunCmd(cmd
)
2001 ToStderr("Command %s failed: %s; Output %s" %
2002 (cmd
, result
.fail_reason
, result
.output
))
2007 def _VerifyCommand(cmd
):
2008 """Verify that a given command succeeds on all online nodes.
2010 As this function is intended to run during upgrades, it
2011 is implemented in such a way that it still works, if all Ganeti
2014 @param cmd: the command to execute
2017 @return: the list of node names that are online where
2021 command
= utils
.text
.ShellQuoteArgs([str(val
) for val
in cmd
])
2023 nodes
= ssconf
.SimpleStore().GetOnlineNodeList()
2024 master_node
= ssconf
.SimpleStore().GetMasterNode()
2025 cluster_name
= ssconf
.SimpleStore().GetClusterName()
2027 # If master node is in 'nodes', make sure master node is at list end
2028 if master_node
in nodes
:
2029 nodes
.remove(master_node
)
2030 nodes
.append(master_node
)
2034 srun
= ssh
.SshRunner(cluster_name
=cluster_name
)
2036 result
= srun
.Run(name
, constants
.SSH_LOGIN_USER
, command
)
2037 if result
.exit_code
!= 0:
2043 def _VerifyVersionInstalled(versionstring
):
2044 """Verify that the given version of ganeti is installed on all online nodes.
2046 Do nothing, if this is the case, otherwise print an appropriate
2049 @param versionstring: the version to check for
2050 @type versionstring: string
2052 @return: True, if the version is installed on all online nodes
2055 badnodes
= _VerifyCommand(["test", "-d",
2056 os
.path
.join(pathutils
.PKGLIBDIR
, versionstring
)])
2058 ToStderr("Ganeti version %s not installed on nodes %s"
2059 % (versionstring
, ", ".join(badnodes
)))
2066 """Determine the list of running jobs.
2069 @return: the number of jobs still running
2073 qfilter
= qlang
.MakeSimpleFilter("status",
2074 frozenset([constants
.JOB_STATUS_RUNNING
]))
2075 return len(cl
.Query(constants
.QR_JOB
, [], qfilter
).data
)
2078 def _SetGanetiVersion(versionstring
):
2079 """Set the active version of ganeti to the given versionstring
2081 @type versionstring: string
2083 @return: the list of nodes where the version change failed
2087 if constants
.HAS_GNU_LN
:
2088 failed
.extend(_VerifyCommand(
2089 ["ln", "-s", "-f", "-T",
2090 os
.path
.join(pathutils
.PKGLIBDIR
, versionstring
),
2091 os
.path
.join(pathutils
.SYSCONFDIR
, "ganeti/lib")]))
2092 failed
.extend(_VerifyCommand(
2093 ["ln", "-s", "-f", "-T",
2094 os
.path
.join(pathutils
.SHAREDIR
, versionstring
),
2095 os
.path
.join(pathutils
.SYSCONFDIR
, "ganeti/share")]))
2097 failed
.extend(_VerifyCommand(
2098 ["rm", "-f", os
.path
.join(pathutils
.SYSCONFDIR
, "ganeti/lib")]))
2099 failed
.extend(_VerifyCommand(
2100 ["ln", "-s", "-f", os
.path
.join(pathutils
.PKGLIBDIR
, versionstring
),
2101 os
.path
.join(pathutils
.SYSCONFDIR
, "ganeti/lib")]))
2102 failed
.extend(_VerifyCommand(
2103 ["rm", "-f", os
.path
.join(pathutils
.SYSCONFDIR
, "ganeti/share")]))
2104 failed
.extend(_VerifyCommand(
2105 ["ln", "-s", "-f", os
.path
.join(pathutils
.SHAREDIR
, versionstring
),
2106 os
.path
.join(pathutils
.SYSCONFDIR
, "ganeti/share")]))
2107 return list(set(failed
))
2110 def _ExecuteCommands(fns
):
2111 """Execute a list of functions, in reverse order.
2113 @type fns: list of functions.
2114 @param fns: the functions to be executed.
2117 for fn
in reversed(fns
):
2121 def _GetConfigVersion():
2122 """Determine the version the configuration file currently has.
2124 @rtype: tuple or None
2125 @return: (major, minor, revision) if the version can be determined,
2129 config_data
= serializer
.LoadJson(utils
.ReadFile(pathutils
.CLUSTER_CONF_FILE
))
2131 config_version
= config_data
["version"]
2134 return utils
.SplitVersion(config_version
)
2137 def _ReadIntentToUpgrade():
2138 """Read the file documenting the intent to upgrade the cluster.
2140 @rtype: (string, string) or (None, None)
2141 @return: (old version, version to upgrade to), if the file exists,
2142 and (None, None) otherwise.
2145 if not os
.path
.isfile(pathutils
.INTENT_TO_UPGRADE
):
2148 contentstring
= utils
.ReadFile(pathutils
.INTENT_TO_UPGRADE
)
2149 contents
= utils
.UnescapeAndSplit(contentstring
)
2150 if len(contents
) != 3:
2151 # file syntactically mal-formed
2153 return (contents
[0], contents
[1])
2156 def _WriteIntentToUpgrade(version
):
2157 """Write file documenting the intent to upgrade the cluster.
2159 @type version: string
2160 @param version: the version we intent to upgrade to
2163 utils
.WriteFile(pathutils
.INTENT_TO_UPGRADE
,
2164 data
=utils
.EscapeAndJoin([constants
.RELEASE_VERSION
, version
,
2165 "%d" % os
.getpid()]))
2168 def _UpgradeBeforeConfigurationChange(versionstring
):
2170 Carry out all the tasks necessary for an upgrade that happen before
2171 the configuration file, or Ganeti version, changes.
2173 @type versionstring: string
2174 @param versionstring: the version to upgrade to
2175 @rtype: (bool, list)
2176 @return: tuple of a bool indicating success and a list of rollback tasks
2181 if not _VerifyVersionInstalled(versionstring
):
2182 return (False, rollback
)
2184 _WriteIntentToUpgrade(versionstring
)
2186 lambda: utils
.RunCmd(["rm", "-f", pathutils
.INTENT_TO_UPGRADE
]))
2188 ToStdoutAndLoginfo("Draining queue")
2189 client
= GetClient()
2190 client
.SetQueueDrainFlag(True)
2192 rollback
.append(lambda: GetClient().SetQueueDrainFlag(False))
2194 if utils
.SimpleRetry(0, _GetRunning
,
2195 constants
.UPGRADE_QUEUE_POLL_INTERVAL
,
2196 constants
.UPGRADE_QUEUE_DRAIN_TIMEOUT
):
2197 ToStderr("Failed to completely empty the queue.")
2198 return (False, rollback
)
2200 ToStdoutAndLoginfo("Pausing the watcher for one hour.")
2201 rollback
.append(lambda: GetClient().SetWatcherPause(None))
2202 GetClient().SetWatcherPause(time
.time() + 60 * 60)
2204 ToStdoutAndLoginfo("Stopping daemons on master node.")
2205 if not _RunCommandAndReport([pathutils
.DAEMON_UTIL
, "stop-all"]):
2206 return (False, rollback
)
2208 if not _VerifyVersionInstalled(versionstring
):
2209 utils
.RunCmd([pathutils
.DAEMON_UTIL
, "start-all"])
2210 return (False, rollback
)
2212 ToStdoutAndLoginfo("Stopping daemons everywhere.")
2213 rollback
.append(lambda: _VerifyCommand([pathutils
.DAEMON_UTIL
, "start-all"]))
2214 badnodes
= _VerifyCommand([pathutils
.DAEMON_UTIL
, "stop-all"])
2216 ToStderr("Failed to stop daemons on %s." % (", ".join(badnodes
),))
2217 return (False, rollback
)
2219 backuptar
= os
.path
.join(pathutils
.BACKUP_DIR
, "ganeti%d.tar" % time
.time())
2220 ToStdoutAndLoginfo("Backing up configuration as %s", backuptar
)
2221 if not _RunCommandAndReport(["mkdir", "-p", pathutils
.BACKUP_DIR
]):
2222 return (False, rollback
)
2224 # Create the archive in a safe manner, as it contains sensitive
2226 (_
, tmp_name
) = tempfile
.mkstemp(prefix
=backuptar
, dir=pathutils
.BACKUP_DIR
)
2227 if not _RunCommandAndReport(["tar", "-cf", tmp_name
,
2228 "--exclude=queue/archive",
2229 pathutils
.DATA_DIR
]):
2230 return (False, rollback
)
2232 os
.rename(tmp_name
, backuptar
)
2233 return (True, rollback
)
2236 def _VersionSpecificDowngrade():
2238 Perform any additional downrade tasks that are version specific
2239 and need to be done just after the configuration downgrade. This
2240 function needs to be idempotent, so that it can be redone if the
2241 downgrade procedure gets interrupted after changing the
2244 Note that this function has to be reset with every version bump.
2246 @return: True upon success
2248 ToStdoutAndLoginfo("Performing version-specific downgrade tasks.")
2253 def _SwitchVersionAndConfig(versionstring
, downgrade
):
2255 Switch to the new Ganeti version and change the configuration,
2258 @type versionstring: string
2259 @param versionstring: the version to change to
2260 @type downgrade: bool
2261 @param downgrade: True, if the configuration should be downgraded
2262 @rtype: (bool, list)
2263 @return: tupe of a bool indicating success, and a list of
2264 additional rollback tasks
2269 ToStdoutAndLoginfo("Downgrading configuration")
2270 if not _RunCommandAndReport([pathutils
.CFGUPGRADE
, "--downgrade", "-f"]):
2271 return (False, rollback
)
2272 # Note: version specific downgrades need to be done before switching
2273 # binaries, so that we still have the knowledgeable binary if the downgrade
2274 # process gets interrupted at this point.
2275 if not _VersionSpecificDowngrade():
2276 return (False, rollback
)
2278 # Configuration change is the point of no return. From then onwards, it is
2279 # safer to push through the up/dowgrade than to try to roll it back.
2281 ToStdoutAndLoginfo("Switching to version %s on all nodes", versionstring
)
2282 rollback
.append(lambda: _SetGanetiVersion(constants
.DIR_VERSION
))
2283 badnodes
= _SetGanetiVersion(versionstring
)
2285 ToStderr("Failed to switch to Ganeti version %s on nodes %s"
2286 % (versionstring
, ", ".join(badnodes
)))
2288 return (False, rollback
)
2290 # Now that we have changed to the new version of Ganeti we should
2291 # not communicate over luxi any more, as luxi might have changed in
2292 # incompatible ways. Therefore, manually call the corresponding ganeti
2293 # commands using their canonical (version independent) path.
2296 ToStdoutAndLoginfo("Upgrading configuration")
2297 if not _RunCommandAndReport([pathutils
.CFGUPGRADE
, "-f"]):
2298 return (False, rollback
)
2300 return (True, rollback
)
2303 def _UpgradeAfterConfigurationChange(oldversion
):
2305 Carry out the upgrade actions necessary after switching to the new
2306 Ganeti version and updating the configuration.
2308 As this part is run at a time where the new version of Ganeti is already
2309 running, no communication should happen via luxi, as this is not a stable
2310 interface. Also, as the configuration change is the point of no return,
2311 all actions are pushed trough, even if some of them fail.
2313 @param oldversion: the version the upgrade started from
2314 @type oldversion: string
2316 @return: the intended return value
2321 ToStdoutAndLoginfo("Ensuring directories everywhere.")
2322 badnodes
= _VerifyCommand([pathutils
.ENSURE_DIRS
])
2324 ToStderr("Warning: failed to ensure directories on %s." %
2325 (", ".join(badnodes
)))
2328 ToStdoutAndLoginfo("Starting daemons everywhere.")
2329 badnodes
= _VerifyCommand([pathutils
.DAEMON_UTIL
, "start-all"])
2331 ToStderr("Warning: failed to start daemons on %s." % (", ".join(badnodes
),))
2334 ToStdoutAndLoginfo("Redistributing the configuration.")
2335 if not _RunCommandAndReport(["gnt-cluster", "redist-conf", "--yes-do-it"]):
2338 ToStdoutAndLoginfo("Restarting daemons everywhere.")
2339 badnodes
= _VerifyCommand([pathutils
.DAEMON_UTIL
, "stop-all"])
2340 badnodes
.extend(_VerifyCommand([pathutils
.DAEMON_UTIL
, "start-all"]))
2342 ToStderr("Warning: failed to start daemons on %s." %
2343 (", ".join(list(set(badnodes
))),))
2346 ToStdoutAndLoginfo("Undraining the queue.")
2347 if not _RunCommandAndReport(["gnt-cluster", "queue", "undrain"]):
2350 _RunCommandAndReport(["rm", "-f", pathutils
.INTENT_TO_UPGRADE
])
2352 ToStdoutAndLoginfo("Running post-upgrade hooks")
2353 if not _RunCommandAndReport([pathutils
.POST_UPGRADE
, oldversion
]):
2356 ToStdoutAndLoginfo("Unpausing the watcher.")
2357 if not _RunCommandAndReport(["gnt-cluster", "watcher", "continue"]):
2360 ToStdoutAndLoginfo("Verifying cluster.")
2361 if not _RunCommandAndReport(["gnt-cluster", "verify"]):
2367 def UpgradeGanetiCommand(opts
, args
):
2368 """Upgrade a cluster to a new ganeti version.
2370 @param opts: the command line options selected by the user
2372 @param args: should be an empty list
2374 @return: the desired exit code
2377 if ((not opts
.resume
and opts
.to
is None)
2378 or (opts
.resume
and opts
.to
is not None)):
2379 ToStderr("Precisely one of the options --to and --resume"
2383 # If we're not told to resume, verify there is no upgrade
2386 oldversion
, versionstring
= _ReadIntentToUpgrade()
2387 if versionstring
is not None:
2388 # An upgrade is going on; verify whether the target matches
2389 if versionstring
== opts
.to
:
2390 ToStderr("An upgrade is already in progress. Target version matches,"
2395 ToStderr("An upgrade from %s to %s is in progress; use --resume to"
2396 " finish it first" % (oldversion
, versionstring
))
2399 utils
.SetupLogging(pathutils
.LOG_COMMANDS
, 'gnt-cluster upgrade', debug
=1)
2401 oldversion
= constants
.RELEASE_VERSION
2404 ssconf
.CheckMaster(False)
2405 oldversion
, versionstring
= _ReadIntentToUpgrade()
2406 if versionstring
is None:
2408 version
= utils
.version
.ParseVersion(versionstring
)
2411 configversion
= _GetConfigVersion()
2412 if configversion
is None:
2414 # If the upgrade we resume was an upgrade between compatible
2415 # versions (like 2.10.0 to 2.10.1), the correct configversion
2416 # does not guarantee that the config has been updated.
2417 # However, in the case of a compatible update with the configuration
2418 # not touched, we are running a different dirversion with the same
2420 config_already_modified
= \
2421 (utils
.IsCorrectConfigVersion(version
, configversion
) and
2422 not (versionstring
!= constants
.DIR_VERSION
and
2423 configversion
== (constants
.CONFIG_MAJOR
, constants
.CONFIG_MINOR
,
2424 constants
.CONFIG_REVISION
)))
2425 if not config_already_modified
:
2426 # We have to start from the beginning; however, some daemons might have
2427 # already been stopped, so the only way to get into a well-defined state
2428 # is by starting all daemons again.
2429 _VerifyCommand([pathutils
.DAEMON_UTIL
, "start-all"])
2431 versionstring
= opts
.to
2432 config_already_modified
= False
2433 version
= utils
.version
.ParseVersion(versionstring
)
2435 ToStderr("Could not parse version string %s" % versionstring
)
2438 msg
= utils
.version
.UpgradeRange(version
)
2440 ToStderr("Cannot upgrade to %s: %s" % (versionstring
, msg
))
2443 if not config_already_modified
:
2444 success
, rollback
= _UpgradeBeforeConfigurationChange(versionstring
)
2446 _ExecuteCommands(rollback
)
2451 downgrade
= utils
.version
.ShouldCfgdowngrade(version
)
2453 success
, additionalrollback
= \
2454 _SwitchVersionAndConfig(versionstring
, downgrade
)
2456 rollback
.extend(additionalrollback
)
2457 _ExecuteCommands(rollback
)
2460 return _UpgradeAfterConfigurationChange(oldversion
)
2465 InitCluster
, [ArgHost(min=1, max=1)],
2466 [BACKEND_OPT
, CP_SIZE_OPT
, ENABLED_HV_OPT
, GLOBAL_FILEDIR_OPT
,
2467 HVLIST_OPT
, MAC_PREFIX_OPT
, MASTER_NETDEV_OPT
, MASTER_NETMASK_OPT
,
2468 NIC_PARAMS_OPT
, NOMODIFY_ETCHOSTS_OPT
, NOMODIFY_SSH_SETUP_OPT
,
2469 SECONDARY_IP_OPT
, VG_NAME_OPT
, MAINTAIN_NODE_HEALTH_OPT
, UIDPOOL_OPT
,
2470 DRBD_HELPER_OPT
, DEFAULT_IALLOCATOR_OPT
, DEFAULT_IALLOCATOR_PARAMS_OPT
,
2471 PRIMARY_IP_VERSION_OPT
, PREALLOC_WIPE_DISKS_OPT
, NODE_PARAMS_OPT
,
2472 GLOBAL_SHARED_FILEDIR_OPT
, USE_EXTERNAL_MIP_SCRIPT
, DISK_PARAMS_OPT
,
2473 HV_STATE_OPT
, DISK_STATE_OPT
, ENABLED_DISK_TEMPLATES_OPT
,
2474 IPOLICY_STD_SPECS_OPT
, GLOBAL_GLUSTER_FILEDIR_OPT
, INSTALL_IMAGE_OPT
,
2475 ZEROING_IMAGE_OPT
, COMPRESSION_TOOLS_OPT
,
2476 ENABLED_USER_SHUTDOWN_OPT
, SSH_KEY_BITS_OPT
, SSH_KEY_TYPE_OPT
,
2478 + INSTANCE_POLICY_OPTS
+ SPLIT_ISPECS_OPTS
,
2479 "[opts...] <cluster_name>", "Initialises a new cluster configuration"),
2481 DestroyCluster
, ARGS_NONE
, [YES_DOIT_OPT
],
2482 "", "Destroy cluster"),
2484 RenameCluster
, [ArgHost(min=1, max=1)],
2485 [FORCE_OPT
, DRY_RUN_OPT
],
2487 "Renames the cluster"),
2489 RedistributeConfig
, ARGS_NONE
, SUBMIT_OPTS
+
2490 [DRY_RUN_OPT
, PRIORITY_OPT
, FORCE_DISTRIBUTION
],
2491 "", "Forces a push of the configuration file and ssconf files"
2492 " to the nodes in the cluster"),
2494 VerifyCluster
, ARGS_NONE
,
2495 [VERBOSE_OPT
, DEBUG_SIMERR_OPT
, ERROR_CODES_OPT
, NONPLUS1_OPT
,
2496 DRY_RUN_OPT
, PRIORITY_OPT
, NODEGROUP_OPT
, IGNORE_ERRORS_OPT
,
2497 VERIFY_CLUTTER_OPT
],
2498 "", "Does a check on the cluster configuration"),
2500 VerifyDisks
, ARGS_NONE
, [PRIORITY_OPT
, NODEGROUP_OPT
],
2501 "", "Does a check on the cluster disk status"),
2502 "repair-disk-sizes": (
2503 RepairDiskSizes
, ARGS_MANY_INSTANCES
, [DRY_RUN_OPT
, PRIORITY_OPT
],
2504 "[instance...]", "Updates mismatches in recorded disk sizes"),
2505 "master-failover": (
2506 MasterFailover
, ARGS_NONE
, [NOVOTING_OPT
, FORCE_FAILOVER
],
2507 "", "Makes the current node the master"),
2509 MasterPing
, ARGS_NONE
, [],
2510 "", "Checks if the master is alive"),
2512 ShowClusterVersion
, ARGS_NONE
, [],
2513 "", "Shows the cluster version"),
2515 ShowClusterMaster
, ARGS_NONE
, [],
2516 "", "Shows the cluster master"),
2518 ClusterCopyFile
, [ArgFile(min=1, max=1)],
2519 [NODE_LIST_OPT
, USE_REPL_NET_OPT
, NODEGROUP_OPT
],
2520 "[-n node...] <filename>", "Copies a file to all (or only some) nodes"),
2522 RunClusterCommand
, [ArgCommand(min=1)],
2523 [NODE_LIST_OPT
, NODEGROUP_OPT
, SHOW_MACHINE_OPT
, FAILURE_ONLY_OPT
],
2524 "[-n node...] <command>", "Runs a command on all (or only some) nodes"),
2526 ShowClusterConfig
, ARGS_NONE
, [ROMAN_OPT
],
2527 "[--roman]", "Show cluster configuration"),
2529 ListTags
, ARGS_NONE
, [], "", "List the tags of the cluster"),
2531 AddTags
, [ArgUnknown()], [TAG_SRC_OPT
, PRIORITY_OPT
] + SUBMIT_OPTS
,
2532 "tag...", "Add tags to the cluster"),
2534 RemoveTags
, [ArgUnknown()], [TAG_SRC_OPT
, PRIORITY_OPT
] + SUBMIT_OPTS
,
2535 "tag...", "Remove tags from the cluster"),
2537 SearchTags
, [ArgUnknown(min=1, max=1)], [PRIORITY_OPT
], "",
2538 "Searches the tags on all objects on"
2539 " the cluster for a given pattern (regex)"),
2542 [ArgChoice(min=1, max=1, choices
=["drain", "undrain", "info"])],
2543 [], "drain|undrain|info", "Change queue properties"),
2546 [ArgChoice(min=1, max=1, choices
=["pause", "continue", "info"]),
2547 ArgSuggest(min=0, max=1, choices
=["30m", "1h", "4h"])],
2549 "{pause <timespec>|continue|info}", "Change watcher properties"),
2551 SetClusterParams
, ARGS_NONE
,
2553 BACKEND_OPT
, CP_SIZE_OPT
, RQL_OPT
, MAX_TRACK_OPT
, INSTALL_IMAGE_OPT
,
2554 INSTANCE_COMMUNICATION_NETWORK_OPT
, ENABLED_HV_OPT
, HVLIST_OPT
,
2555 MAC_PREFIX_OPT
, MASTER_NETDEV_OPT
, MASTER_NETMASK_OPT
, NIC_PARAMS_OPT
,
2556 VG_NAME_OPT
, MAINTAIN_NODE_HEALTH_OPT
, UIDPOOL_OPT
, ADD_UIDS_OPT
,
2557 REMOVE_UIDS_OPT
, DRBD_HELPER_OPT
, DEFAULT_IALLOCATOR_OPT
,
2558 DEFAULT_IALLOCATOR_PARAMS_OPT
, RESERVED_LVS_OPT
, DRY_RUN_OPT
, PRIORITY_OPT
,
2559 PREALLOC_WIPE_DISKS_OPT
, NODE_PARAMS_OPT
, USE_EXTERNAL_MIP_SCRIPT
,
2560 DISK_PARAMS_OPT
, HV_STATE_OPT
, DISK_STATE_OPT
] + SUBMIT_OPTS
+
2561 [ENABLED_DISK_TEMPLATES_OPT
, IPOLICY_STD_SPECS_OPT
, MODIFY_ETCHOSTS_OPT
,
2562 ENABLED_USER_SHUTDOWN_OPT
] +
2563 INSTANCE_POLICY_OPTS
+
2564 [GLOBAL_FILEDIR_OPT
, GLOBAL_SHARED_FILEDIR_OPT
, ZEROING_IMAGE_OPT
,
2565 COMPRESSION_TOOLS_OPT
] +
2566 [ENABLED_DATA_COLLECTORS_OPT
, DATA_COLLECTOR_INTERVAL_OPT
,
2567 DIAGNOSE_DATA_COLLECTOR_FILENAME_OPT
,
2568 MAINT_INTERVAL_OPT
, MAINT_BALANCE_OPT
, MAINT_BALANCE_THRESHOLD_OPT
],
2570 "Alters the parameters of the cluster"),
2572 RenewCrypto
, ARGS_NONE
,
2573 [NEW_CLUSTER_CERT_OPT
, NEW_RAPI_CERT_OPT
, RAPI_CERT_OPT
,
2574 NEW_CONFD_HMAC_KEY_OPT
, FORCE_OPT
,
2575 NEW_CLUSTER_DOMAIN_SECRET_OPT
, CLUSTER_DOMAIN_SECRET_OPT
,
2576 NEW_SPICE_CERT_OPT
, SPICE_CERT_OPT
, SPICE_CACERT_OPT
,
2577 NEW_NODE_CERT_OPT
, NEW_SSH_KEY_OPT
, NOSSH_KEYCHECK_OPT
,
2578 VERBOSE_OPT
, SSH_KEY_BITS_OPT
, SSH_KEY_TYPE_OPT
],
2580 "Renews cluster certificates, keys and secrets"),
2582 Epo
, [ArgUnknown()],
2583 [FORCE_OPT
, ON_OPT
, GROUPS_OPT
, ALL_OPT
, OOB_TIMEOUT_OPT
,
2584 SHUTDOWN_TIMEOUT_OPT
, POWER_DELAY_OPT
],
2586 "Performs an emergency power-off on given args"),
2587 "activate-master-ip": (
2588 ActivateMasterIp
, ARGS_NONE
, [], "", "Activates the master IP"),
2589 "deactivate-master-ip": (
2590 DeactivateMasterIp
, ARGS_NONE
, [CONFIRM_OPT
], "",
2591 "Deactivates the master IP"),
2592 "show-ispecs-cmd": (
2593 ShowCreateCommand
, ARGS_NONE
, [], "",
2594 "Show the command line to re-create the cluster"),
2596 UpgradeGanetiCommand
, ARGS_NONE
, [TO_OPT
, RESUME_OPT
], "",
2597 "Upgrade (or downgrade) to a new Ganeti version"),
2599 RemoveRepair
, [ArgUnknown()], [], "<uuid>",
2600 "Remove a repair event from the list of pending events"),
2604 #: dictionary with aliases for commands
2606 "masterfailover": "master-failover",
2612 return GenericMain(commands
, override
={"tag_type": constants
.TAG_CLUSTER
},