Renew SSH keys and upgrade
[ganeti-github.git] / lib / client / gnt_cluster.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011, 2012, 2013, 2014 Google Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions are
9 # met:
10 #
11 # 1. Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
13 #
14 # 2. Redistributions in binary form must reproduce the above copyright
15 # notice, this list of conditions and the following disclaimer in the
16 # documentation and/or other materials provided with the distribution.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
19 # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
20 # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30 """Cluster related commands"""
31
32 # pylint: disable=W0401,W0613,W0614,C0103
33 # W0401: Wildcard import ganeti.cli
34 # W0613: Unused argument, since all functions follow the same API
35 # W0614: Unused import %s from wildcard import (since we need cli)
36 # C0103: Invalid name gnt-cluster
37
38 from cStringIO import StringIO
39 import os
40 import time
41 import OpenSSL
42 import tempfile
43 import itertools
44
45 from ganeti.cli import *
46 from ganeti import bootstrap
47 from ganeti import compat
48 from ganeti import constants
49 from ganeti import errors
50 from ganeti import netutils
51 from ganeti import objects
52 from ganeti import opcodes
53 from ganeti import pathutils
54 from ganeti import qlang
55 from ganeti import serializer
56 from ganeti import ssconf
57 from ganeti import ssh
58 from ganeti import uidpool
59 from ganeti import utils
60 from ganeti.client import base
61
62
63 ON_OPT = cli_option("--on", default=False,
64 action="store_true", dest="on",
65 help="Recover from an EPO")
66
67 GROUPS_OPT = cli_option("--groups", default=False,
68 action="store_true", dest="groups",
69 help="Arguments are node groups instead of nodes")
70
71 FORCE_FAILOVER = cli_option("--yes-do-it", dest="yes_do_it",
72 help="Override interactive check for --no-voting",
73 default=False, action="store_true")
74
75 FORCE_DISTRIBUTION = cli_option("--yes-do-it", dest="yes_do_it",
76 help="Unconditionally distribute the"
77 " configuration, even if the queue"
78 " is drained",
79 default=False, action="store_true")
80
81 TO_OPT = cli_option("--to", default=None, type="string",
82 help="The Ganeti version to upgrade to")
83
84 RESUME_OPT = cli_option("--resume", default=False, action="store_true",
85 help="Resume any pending Ganeti upgrades")
86
87 DATA_COLLECTOR_INTERVAL_OPT = cli_option(
88 "--data-collector-interval", default={}, type="keyval",
89 help="Set collection intervals in seconds of data collectors.")
90
91
92 _EPO_PING_INTERVAL = 30 # 30 seconds between pings
93 _EPO_PING_TIMEOUT = 1 # 1 second
94 _EPO_REACHABLE_TIMEOUT = 15 * 60 # 15 minutes
95
96
97 def _InitEnabledDiskTemplates(opts):
98 """Initialize the list of enabled disk templates.
99
100 """
101 if opts.enabled_disk_templates:
102 return opts.enabled_disk_templates.split(",")
103 else:
104 return constants.DEFAULT_ENABLED_DISK_TEMPLATES
105
106
107 def _InitVgName(opts, enabled_disk_templates):
108 """Initialize the volume group name.
109
110 @type enabled_disk_templates: list of strings
111 @param enabled_disk_templates: cluster-wide enabled disk templates
112
113 """
114 vg_name = None
115 if opts.vg_name is not None:
116 vg_name = opts.vg_name
117 if vg_name:
118 if not utils.IsLvmEnabled(enabled_disk_templates):
119 ToStdout("You specified a volume group with --vg-name, but you did not"
120 " enable any disk template that uses lvm.")
121 elif utils.IsLvmEnabled(enabled_disk_templates):
122 raise errors.OpPrereqError(
123 "LVM disk templates are enabled, but vg name not set.")
124 elif utils.IsLvmEnabled(enabled_disk_templates):
125 vg_name = constants.DEFAULT_VG
126 return vg_name
127
128
129 def _InitDrbdHelper(opts, enabled_disk_templates):
130 """Initialize the DRBD usermode helper.
131
132 """
133 drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
134
135 if not drbd_enabled and opts.drbd_helper is not None:
136 ToStdout("Note: You specified a DRBD usermode helper, while DRBD storage"
137 " is not enabled.")
138
139 if drbd_enabled:
140 if opts.drbd_helper is None:
141 return constants.DEFAULT_DRBD_HELPER
142 if opts.drbd_helper == '':
143 raise errors.OpPrereqError(
144 "Unsetting the drbd usermode helper while enabling DRBD is not"
145 " allowed.")
146
147 return opts.drbd_helper
148
149
150 @UsesRPC
151 def InitCluster(opts, args):
152 """Initialize the cluster.
153
154 @param opts: the command line options selected by the user
155 @type args: list
156 @param args: should contain only one element, the desired
157 cluster name
158 @rtype: int
159 @return: the desired exit code
160
161 """
162 enabled_disk_templates = _InitEnabledDiskTemplates(opts)
163
164 try:
165 vg_name = _InitVgName(opts, enabled_disk_templates)
166 drbd_helper = _InitDrbdHelper(opts, enabled_disk_templates)
167 except errors.OpPrereqError, e:
168 ToStderr(str(e))
169 return 1
170
171 master_netdev = opts.master_netdev
172 if master_netdev is None:
173 nic_mode = opts.nicparams.get(constants.NIC_MODE, None)
174 if not nic_mode:
175 # default case, use bridging
176 master_netdev = constants.DEFAULT_BRIDGE
177 elif nic_mode == constants.NIC_MODE_OVS:
178 # default ovs is different from default bridge
179 master_netdev = constants.DEFAULT_OVS
180 opts.nicparams[constants.NIC_LINK] = constants.DEFAULT_OVS
181
182 hvlist = opts.enabled_hypervisors
183 if hvlist is None:
184 hvlist = constants.DEFAULT_ENABLED_HYPERVISOR
185 hvlist = hvlist.split(",")
186
187 hvparams = dict(opts.hvparams)
188 beparams = opts.beparams
189 nicparams = opts.nicparams
190
191 diskparams = dict(opts.diskparams)
192
193 # check the disk template types here, as we cannot rely on the type check done
194 # by the opcode parameter types
195 diskparams_keys = set(diskparams.keys())
196 if not (diskparams_keys <= constants.DISK_TEMPLATES):
197 unknown = utils.NiceSort(diskparams_keys - constants.DISK_TEMPLATES)
198 ToStderr("Disk templates unknown: %s" % utils.CommaJoin(unknown))
199 return 1
200
201 # prepare beparams dict
202 beparams = objects.FillDict(constants.BEC_DEFAULTS, beparams)
203 utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
204
205 # prepare nicparams dict
206 nicparams = objects.FillDict(constants.NICC_DEFAULTS, nicparams)
207 utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
208
209 # prepare ndparams dict
210 if opts.ndparams is None:
211 ndparams = dict(constants.NDC_DEFAULTS)
212 else:
213 ndparams = objects.FillDict(constants.NDC_DEFAULTS, opts.ndparams)
214 utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
215
216 # prepare hvparams dict
217 for hv in constants.HYPER_TYPES:
218 if hv not in hvparams:
219 hvparams[hv] = {}
220 hvparams[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], hvparams[hv])
221 utils.ForceDictType(hvparams[hv], constants.HVS_PARAMETER_TYPES)
222
223 # prepare diskparams dict
224 for templ in constants.DISK_TEMPLATES:
225 if templ not in diskparams:
226 diskparams[templ] = {}
227 diskparams[templ] = objects.FillDict(constants.DISK_DT_DEFAULTS[templ],
228 diskparams[templ])
229 utils.ForceDictType(diskparams[templ], constants.DISK_DT_TYPES)
230
231 # prepare ipolicy dict
232 ipolicy = CreateIPolicyFromOpts(
233 ispecs_mem_size=opts.ispecs_mem_size,
234 ispecs_cpu_count=opts.ispecs_cpu_count,
235 ispecs_disk_count=opts.ispecs_disk_count,
236 ispecs_disk_size=opts.ispecs_disk_size,
237 ispecs_nic_count=opts.ispecs_nic_count,
238 minmax_ispecs=opts.ipolicy_bounds_specs,
239 std_ispecs=opts.ipolicy_std_specs,
240 ipolicy_disk_templates=opts.ipolicy_disk_templates,
241 ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
242 ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
243 fill_all=True)
244
245 if opts.candidate_pool_size is None:
246 opts.candidate_pool_size = constants.MASTER_POOL_SIZE_DEFAULT
247
248 if opts.mac_prefix is None:
249 opts.mac_prefix = constants.DEFAULT_MAC_PREFIX
250
251 uid_pool = opts.uid_pool
252 if uid_pool is not None:
253 uid_pool = uidpool.ParseUidPool(uid_pool)
254
255 if opts.prealloc_wipe_disks is None:
256 opts.prealloc_wipe_disks = False
257
258 external_ip_setup_script = opts.use_external_mip_script
259 if external_ip_setup_script is None:
260 external_ip_setup_script = False
261
262 try:
263 primary_ip_version = int(opts.primary_ip_version)
264 except (ValueError, TypeError), err:
265 ToStderr("Invalid primary ip version value: %s" % str(err))
266 return 1
267
268 master_netmask = opts.master_netmask
269 try:
270 if master_netmask is not None:
271 master_netmask = int(master_netmask)
272 except (ValueError, TypeError), err:
273 ToStderr("Invalid master netmask value: %s" % str(err))
274 return 1
275
276 if opts.disk_state:
277 disk_state = utils.FlatToDict(opts.disk_state)
278 else:
279 disk_state = {}
280
281 hv_state = dict(opts.hv_state)
282
283 if opts.install_image:
284 install_image = opts.install_image
285 else:
286 install_image = ""
287
288 if opts.zeroing_image:
289 zeroing_image = opts.zeroing_image
290 else:
291 zeroing_image = ""
292
293 compression_tools = _GetCompressionTools(opts)
294
295 default_ialloc_params = opts.default_iallocator_params
296
297 if opts.enabled_user_shutdown:
298 enabled_user_shutdown = True
299 else:
300 enabled_user_shutdown = False
301
302 bootstrap.InitCluster(cluster_name=args[0],
303 secondary_ip=opts.secondary_ip,
304 vg_name=vg_name,
305 mac_prefix=opts.mac_prefix,
306 master_netmask=master_netmask,
307 master_netdev=master_netdev,
308 file_storage_dir=opts.file_storage_dir,
309 shared_file_storage_dir=opts.shared_file_storage_dir,
310 gluster_storage_dir=opts.gluster_storage_dir,
311 enabled_hypervisors=hvlist,
312 hvparams=hvparams,
313 beparams=beparams,
314 nicparams=nicparams,
315 ndparams=ndparams,
316 diskparams=diskparams,
317 ipolicy=ipolicy,
318 candidate_pool_size=opts.candidate_pool_size,
319 modify_etc_hosts=opts.modify_etc_hosts,
320 modify_ssh_setup=opts.modify_ssh_setup,
321 maintain_node_health=opts.maintain_node_health,
322 drbd_helper=drbd_helper,
323 uid_pool=uid_pool,
324 default_iallocator=opts.default_iallocator,
325 default_iallocator_params=default_ialloc_params,
326 primary_ip_version=primary_ip_version,
327 prealloc_wipe_disks=opts.prealloc_wipe_disks,
328 use_external_mip_script=external_ip_setup_script,
329 hv_state=hv_state,
330 disk_state=disk_state,
331 enabled_disk_templates=enabled_disk_templates,
332 install_image=install_image,
333 zeroing_image=zeroing_image,
334 compression_tools=compression_tools,
335 enabled_user_shutdown=enabled_user_shutdown,
336 )
337 op = opcodes.OpClusterPostInit()
338 SubmitOpCode(op, opts=opts)
339 return 0
340
341
342 @UsesRPC
343 def DestroyCluster(opts, args):
344 """Destroy the cluster.
345
346 @param opts: the command line options selected by the user
347 @type args: list
348 @param args: should be an empty list
349 @rtype: int
350 @return: the desired exit code
351
352 """
353 if not opts.yes_do_it:
354 ToStderr("Destroying a cluster is irreversible. If you really want"
355 " destroy this cluster, supply the --yes-do-it option.")
356 return 1
357
358 op = opcodes.OpClusterDestroy()
359 master_uuid = SubmitOpCode(op, opts=opts)
360 # if we reached this, the opcode didn't fail; we can proceed to
361 # shutdown all the daemons
362 bootstrap.FinalizeClusterDestroy(master_uuid)
363 return 0
364
365
366 def RenameCluster(opts, args):
367 """Rename the cluster.
368
369 @param opts: the command line options selected by the user
370 @type args: list
371 @param args: should contain only one element, the new cluster name
372 @rtype: int
373 @return: the desired exit code
374
375 """
376 cl = GetClient()
377
378 (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
379
380 new_name = args[0]
381 if not opts.force:
382 usertext = ("This will rename the cluster from '%s' to '%s'. If you are"
383 " connected over the network to the cluster name, the"
384 " operation is very dangerous as the IP address will be"
385 " removed from the node and the change may not go through."
386 " Continue?") % (cluster_name, new_name)
387 if not AskUser(usertext):
388 return 1
389
390 op = opcodes.OpClusterRename(name=new_name)
391 result = SubmitOpCode(op, opts=opts, cl=cl)
392
393 if result:
394 ToStdout("Cluster renamed from '%s' to '%s'", cluster_name, result)
395
396 return 0
397
398
399 def ActivateMasterIp(opts, args):
400 """Activates the master IP.
401
402 """
403 op = opcodes.OpClusterActivateMasterIp()
404 SubmitOpCode(op)
405 return 0
406
407
408 def DeactivateMasterIp(opts, args):
409 """Deactivates the master IP.
410
411 """
412 if not opts.confirm:
413 usertext = ("This will disable the master IP. All the open connections to"
414 " the master IP will be closed. To reach the master you will"
415 " need to use its node IP."
416 " Continue?")
417 if not AskUser(usertext):
418 return 1
419
420 op = opcodes.OpClusterDeactivateMasterIp()
421 SubmitOpCode(op)
422 return 0
423
424
425 def RedistributeConfig(opts, args):
426 """Forces push of the cluster configuration.
427
428 @param opts: the command line options selected by the user
429 @type args: list
430 @param args: empty list
431 @rtype: int
432 @return: the desired exit code
433
434 """
435 op = opcodes.OpClusterRedistConf()
436 if opts.yes_do_it:
437 SubmitOpCodeToDrainedQueue(op)
438 else:
439 SubmitOrSend(op, opts)
440 return 0
441
442
443 def ShowClusterVersion(opts, args):
444 """Write version of ganeti software to the standard output.
445
446 @param opts: the command line options selected by the user
447 @type args: list
448 @param args: should be an empty list
449 @rtype: int
450 @return: the desired exit code
451
452 """
453 cl = GetClient()
454 result = cl.QueryClusterInfo()
455 ToStdout("Software version: %s", result["software_version"])
456 ToStdout("Internode protocol: %s", result["protocol_version"])
457 ToStdout("Configuration format: %s", result["config_version"])
458 ToStdout("OS api version: %s", result["os_api_version"])
459 ToStdout("Export interface: %s", result["export_version"])
460 ToStdout("VCS version: %s", result["vcs_version"])
461 return 0
462
463
464 def ShowClusterMaster(opts, args):
465 """Write name of master node to the standard output.
466
467 @param opts: the command line options selected by the user
468 @type args: list
469 @param args: should be an empty list
470 @rtype: int
471 @return: the desired exit code
472
473 """
474 master = bootstrap.GetMaster()
475 ToStdout(master)
476 return 0
477
478
479 def _FormatGroupedParams(paramsdict, roman=False):
480 """Format Grouped parameters (be, nic, disk) by group.
481
482 @type paramsdict: dict of dicts
483 @param paramsdict: {group: {param: value, ...}, ...}
484 @rtype: dict of dicts
485 @return: copy of the input dictionaries with strings as values
486
487 """
488 ret = {}
489 for (item, val) in paramsdict.items():
490 if isinstance(val, dict):
491 ret[item] = _FormatGroupedParams(val, roman=roman)
492 elif roman and isinstance(val, int):
493 ret[item] = compat.TryToRoman(val)
494 else:
495 ret[item] = str(val)
496 return ret
497
498
499 def _FormatDataCollectors(paramsdict):
500 """Format Grouped parameters (be, nic, disk) by group.
501
502 @type paramsdict: dict of dicts
503 @param paramsdict: response of QueryClusterInfo
504 @rtype: dict of dicts
505 @return: parameter grouped by data collector
506
507 """
508
509 enabled = paramsdict[constants.DATA_COLLECTORS_ENABLED_NAME]
510 interval = paramsdict[constants.DATA_COLLECTORS_INTERVAL_NAME]
511
512 ret = {}
513 for key in enabled:
514 ret[key] = dict(active=enabled[key],
515 interval="%.3fs" % (interval[key] / 1e6))
516 return ret
517
518
519 def ShowClusterConfig(opts, args):
520 """Shows cluster information.
521
522 @param opts: the command line options selected by the user
523 @type args: list
524 @param args: should be an empty list
525 @rtype: int
526 @return: the desired exit code
527
528 """
529 cl = GetClient()
530 result = cl.QueryClusterInfo()
531
532 if result["tags"]:
533 tags = utils.CommaJoin(utils.NiceSort(result["tags"]))
534 else:
535 tags = "(none)"
536 if result["reserved_lvs"]:
537 reserved_lvs = utils.CommaJoin(result["reserved_lvs"])
538 else:
539 reserved_lvs = "(none)"
540
541 enabled_hv = result["enabled_hypervisors"]
542 hvparams = dict((k, v) for k, v in result["hvparams"].iteritems()
543 if k in enabled_hv)
544
545 info = [
546 ("Cluster name", result["name"]),
547 ("Cluster UUID", result["uuid"]),
548
549 ("Creation time", utils.FormatTime(result["ctime"])),
550 ("Modification time", utils.FormatTime(result["mtime"])),
551
552 ("Master node", result["master"]),
553
554 ("Architecture (this node)",
555 "%s (%s)" % (result["architecture"][0], result["architecture"][1])),
556
557 ("Tags", tags),
558
559 ("Default hypervisor", result["default_hypervisor"]),
560 ("Enabled hypervisors", utils.CommaJoin(enabled_hv)),
561
562 ("Hypervisor parameters", _FormatGroupedParams(hvparams,
563 opts.roman_integers)),
564
565 ("OS-specific hypervisor parameters",
566 _FormatGroupedParams(result["os_hvp"], opts.roman_integers)),
567
568 ("OS parameters", _FormatGroupedParams(result["osparams"],
569 opts.roman_integers)),
570
571 ("Hidden OSes", utils.CommaJoin(result["hidden_os"])),
572 ("Blacklisted OSes", utils.CommaJoin(result["blacklisted_os"])),
573
574 ("Cluster parameters", [
575 ("candidate pool size",
576 compat.TryToRoman(result["candidate_pool_size"],
577 convert=opts.roman_integers)),
578 ("maximal number of jobs running simultaneously",
579 compat.TryToRoman(result["max_running_jobs"],
580 convert=opts.roman_integers)),
581 ("maximal number of jobs simultaneously tracked by the scheduler",
582 compat.TryToRoman(result["max_tracked_jobs"],
583 convert=opts.roman_integers)),
584 ("mac prefix", result["mac_prefix"]),
585 ("master netdev", result["master_netdev"]),
586 ("master netmask", compat.TryToRoman(result["master_netmask"],
587 opts.roman_integers)),
588 ("use external master IP address setup script",
589 result["use_external_mip_script"]),
590 ("lvm volume group", result["volume_group_name"]),
591 ("lvm reserved volumes", reserved_lvs),
592 ("drbd usermode helper", result["drbd_usermode_helper"]),
593 ("file storage path", result["file_storage_dir"]),
594 ("shared file storage path", result["shared_file_storage_dir"]),
595 ("gluster storage path", result["gluster_storage_dir"]),
596 ("maintenance of node health", result["maintain_node_health"]),
597 ("uid pool", uidpool.FormatUidPool(result["uid_pool"])),
598 ("default instance allocator", result["default_iallocator"]),
599 ("default instance allocator parameters",
600 result["default_iallocator_params"]),
601 ("primary ip version", compat.TryToRoman(result["primary_ip_version"],
602 opts.roman_integers)),
603 ("preallocation wipe disks", result["prealloc_wipe_disks"]),
604 ("OS search path", utils.CommaJoin(pathutils.OS_SEARCH_PATH)),
605 ("ExtStorage Providers search path",
606 utils.CommaJoin(pathutils.ES_SEARCH_PATH)),
607 ("enabled disk templates",
608 utils.CommaJoin(result["enabled_disk_templates"])),
609 ("install image", result["install_image"]),
610 ("instance communication network",
611 result["instance_communication_network"]),
612 ("zeroing image", result["zeroing_image"]),
613 ("compression tools", result["compression_tools"]),
614 ("enabled user shutdown", result["enabled_user_shutdown"]),
615 ]),
616
617 ("Default node parameters",
618 _FormatGroupedParams(result["ndparams"], roman=opts.roman_integers)),
619
620 ("Default instance parameters",
621 _FormatGroupedParams(result["beparams"], roman=opts.roman_integers)),
622
623 ("Default nic parameters",
624 _FormatGroupedParams(result["nicparams"], roman=opts.roman_integers)),
625
626 ("Default disk parameters",
627 _FormatGroupedParams(result["diskparams"], roman=opts.roman_integers)),
628
629 ("Instance policy - limits for instances",
630 FormatPolicyInfo(result["ipolicy"], None, True, opts.roman_integers)),
631 ("Data collectors", _FormatDataCollectors(result)),
632 ]
633
634 PrintGenericInfo(info)
635 return 0
636
637
638 def ClusterCopyFile(opts, args):
639 """Copy a file from master to some nodes.
640
641 @param opts: the command line options selected by the user
642 @type args: list
643 @param args: should contain only one element, the path of
644 the file to be copied
645 @rtype: int
646 @return: the desired exit code
647
648 """
649 filename = args[0]
650 filename = os.path.abspath(filename)
651
652 if not os.path.exists(filename):
653 raise errors.OpPrereqError("No such filename '%s'" % filename,
654 errors.ECODE_INVAL)
655
656 cl = GetClient()
657 qcl = GetClient()
658 try:
659 cluster_name = cl.QueryConfigValues(["cluster_name"])[0]
660
661 results = GetOnlineNodes(nodes=opts.nodes, cl=qcl, filter_master=True,
662 secondary_ips=opts.use_replication_network,
663 nodegroup=opts.nodegroup)
664 ports = GetNodesSshPorts(opts.nodes, qcl)
665 finally:
666 cl.Close()
667 qcl.Close()
668
669 srun = ssh.SshRunner(cluster_name)
670 for (node, port) in zip(results, ports):
671 if not srun.CopyFileToNode(node, port, filename):
672 ToStderr("Copy of file %s to node %s:%d failed", filename, node, port)
673
674 return 0
675
676
677 def RunClusterCommand(opts, args):
678 """Run a command on some nodes.
679
680 @param opts: the command line options selected by the user
681 @type args: list
682 @param args: should contain the command to be run and its arguments
683 @rtype: int
684 @return: the desired exit code
685
686 """
687 cl = GetClient()
688 qcl = GetClient()
689
690 command = " ".join(args)
691
692 nodes = GetOnlineNodes(nodes=opts.nodes, cl=qcl, nodegroup=opts.nodegroup)
693 ports = GetNodesSshPorts(nodes, qcl)
694
695 cluster_name, master_node = cl.QueryConfigValues(["cluster_name",
696 "master_node"])
697
698 srun = ssh.SshRunner(cluster_name=cluster_name)
699
700 # Make sure master node is at list end
701 if master_node in nodes:
702 nodes.remove(master_node)
703 nodes.append(master_node)
704
705 for (name, port) in zip(nodes, ports):
706 result = srun.Run(name, constants.SSH_LOGIN_USER, command, port=port)
707
708 if opts.failure_only and result.exit_code == constants.EXIT_SUCCESS:
709 # Do not output anything for successful commands
710 continue
711
712 ToStdout("------------------------------------------------")
713 if opts.show_machine_names:
714 for line in result.output.splitlines():
715 ToStdout("%s: %s", name, line)
716 else:
717 ToStdout("node: %s", name)
718 ToStdout("%s", result.output)
719 ToStdout("return code = %s", result.exit_code)
720
721 return 0
722
723
724 def VerifyCluster(opts, args):
725 """Verify integrity of cluster, performing various test on nodes.
726
727 @param opts: the command line options selected by the user
728 @type args: list
729 @param args: should be an empty list
730 @rtype: int
731 @return: the desired exit code
732
733 """
734 skip_checks = []
735
736 if opts.skip_nplusone_mem:
737 skip_checks.append(constants.VERIFY_NPLUSONE_MEM)
738
739 cl = GetClient()
740
741 op = opcodes.OpClusterVerify(verbose=opts.verbose,
742 error_codes=opts.error_codes,
743 debug_simulate_errors=opts.simulate_errors,
744 skip_checks=skip_checks,
745 ignore_errors=opts.ignore_errors,
746 group_name=opts.nodegroup)
747 result = SubmitOpCode(op, cl=cl, opts=opts)
748
749 # Keep track of submitted jobs
750 jex = JobExecutor(cl=cl, opts=opts)
751
752 for (status, job_id) in result[constants.JOB_IDS_KEY]:
753 jex.AddJobId(None, status, job_id)
754
755 results = jex.GetResults()
756
757 (bad_jobs, bad_results) = \
758 map(len,
759 # Convert iterators to lists
760 map(list,
761 # Count errors
762 map(compat.partial(itertools.ifilterfalse, bool),
763 # Convert result to booleans in a tuple
764 zip(*((job_success, len(op_results) == 1 and op_results[0])
765 for (job_success, op_results) in results)))))
766
767 if bad_jobs == 0 and bad_results == 0:
768 rcode = constants.EXIT_SUCCESS
769 else:
770 rcode = constants.EXIT_FAILURE
771 if bad_jobs > 0:
772 ToStdout("%s job(s) failed while verifying the cluster.", bad_jobs)
773
774 return rcode
775
776
777 def VerifyDisks(opts, args):
778 """Verify integrity of cluster disks.
779
780 @param opts: the command line options selected by the user
781 @type args: list
782 @param args: should be an empty list
783 @rtype: int
784 @return: the desired exit code
785
786 """
787 cl = GetClient()
788
789 op = opcodes.OpClusterVerifyDisks()
790
791 result = SubmitOpCode(op, cl=cl, opts=opts)
792
793 # Keep track of submitted jobs
794 jex = JobExecutor(cl=cl, opts=opts)
795
796 for (status, job_id) in result[constants.JOB_IDS_KEY]:
797 jex.AddJobId(None, status, job_id)
798
799 retcode = constants.EXIT_SUCCESS
800
801 for (status, result) in jex.GetResults():
802 if not status:
803 ToStdout("Job failed: %s", result)
804 continue
805
806 ((bad_nodes, instances, missing), ) = result
807
808 for node, text in bad_nodes.items():
809 ToStdout("Error gathering data on node %s: %s",
810 node, utils.SafeEncode(text[-400:]))
811 retcode = constants.EXIT_FAILURE
812 ToStdout("You need to fix these nodes first before fixing instances")
813
814 for iname in instances:
815 if iname in missing:
816 continue
817 op = opcodes.OpInstanceActivateDisks(instance_name=iname)
818 try:
819 ToStdout("Activating disks for instance '%s'", iname)
820 SubmitOpCode(op, opts=opts, cl=cl)
821 except errors.GenericError, err:
822 nret, msg = FormatError(err)
823 retcode |= nret
824 ToStderr("Error activating disks for instance %s: %s", iname, msg)
825
826 if missing:
827 for iname, ival in missing.iteritems():
828 all_missing = compat.all(x[0] in bad_nodes for x in ival)
829 if all_missing:
830 ToStdout("Instance %s cannot be verified as it lives on"
831 " broken nodes", iname)
832 else:
833 ToStdout("Instance %s has missing logical volumes:", iname)
834 ival.sort()
835 for node, vol in ival:
836 if node in bad_nodes:
837 ToStdout("\tbroken node %s /dev/%s", node, vol)
838 else:
839 ToStdout("\t%s /dev/%s", node, vol)
840
841 ToStdout("You need to replace or recreate disks for all the above"
842 " instances if this message persists after fixing broken nodes.")
843 retcode = constants.EXIT_FAILURE
844 elif not instances:
845 ToStdout("No disks need to be activated.")
846
847 return retcode
848
849
850 def RepairDiskSizes(opts, args):
851 """Verify sizes of cluster disks.
852
853 @param opts: the command line options selected by the user
854 @type args: list
855 @param args: optional list of instances to restrict check to
856 @rtype: int
857 @return: the desired exit code
858
859 """
860 op = opcodes.OpClusterRepairDiskSizes(instances=args)
861 SubmitOpCode(op, opts=opts)
862
863
864 @UsesRPC
865 def MasterFailover(opts, args):
866 """Failover the master node.
867
868 This command, when run on a non-master node, will cause the current
869 master to cease being master, and the non-master to become new
870 master.
871
872 @param opts: the command line options selected by the user
873 @type args: list
874 @param args: should be an empty list
875 @rtype: int
876 @return: the desired exit code
877
878 """
879 if opts.no_voting and not opts.yes_do_it:
880 usertext = ("This will perform the failover even if most other nodes"
881 " are down, or if this node is outdated. This is dangerous"
882 " as it can lead to a non-consistent cluster. Check the"
883 " gnt-cluster(8) man page before proceeding. Continue?")
884 if not AskUser(usertext):
885 return 1
886
887 return bootstrap.MasterFailover(no_voting=opts.no_voting)
888
889
890 def MasterPing(opts, args):
891 """Checks if the master is alive.
892
893 @param opts: the command line options selected by the user
894 @type args: list
895 @param args: should be an empty list
896 @rtype: int
897 @return: the desired exit code
898
899 """
900 try:
901 cl = GetClient()
902 cl.QueryClusterInfo()
903 return 0
904 except Exception: # pylint: disable=W0703
905 return 1
906
907
908 def SearchTags(opts, args):
909 """Searches the tags on all the cluster.
910
911 @param opts: the command line options selected by the user
912 @type args: list
913 @param args: should contain only one element, the tag pattern
914 @rtype: int
915 @return: the desired exit code
916
917 """
918 op = opcodes.OpTagsSearch(pattern=args[0])
919 result = SubmitOpCode(op, opts=opts)
920 if not result:
921 return 1
922 result = list(result)
923 result.sort()
924 for path, tag in result:
925 ToStdout("%s %s", path, tag)
926
927
928 def _ReadAndVerifyCert(cert_filename, verify_private_key=False):
929 """Reads and verifies an X509 certificate.
930
931 @type cert_filename: string
932 @param cert_filename: the path of the file containing the certificate to
933 verify encoded in PEM format
934 @type verify_private_key: bool
935 @param verify_private_key: whether to verify the private key in addition to
936 the public certificate
937 @rtype: string
938 @return: a string containing the PEM-encoded certificate.
939
940 """
941 try:
942 pem = utils.ReadFile(cert_filename)
943 except IOError, err:
944 raise errors.X509CertError(cert_filename,
945 "Unable to read certificate: %s" % str(err))
946
947 try:
948 OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem)
949 except Exception, err:
950 raise errors.X509CertError(cert_filename,
951 "Unable to load certificate: %s" % str(err))
952
953 if verify_private_key:
954 try:
955 OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, pem)
956 except Exception, err:
957 raise errors.X509CertError(cert_filename,
958 "Unable to load private key: %s" % str(err))
959
960 return pem
961
962
963 def _RenewCrypto(new_cluster_cert, new_rapi_cert, # pylint: disable=R0911
964 rapi_cert_filename, new_spice_cert, spice_cert_filename,
965 spice_cacert_filename, new_confd_hmac_key, new_cds,
966 cds_filename, force, new_node_cert, new_ssh_keys):
967 """Renews cluster certificates, keys and secrets.
968
969 @type new_cluster_cert: bool
970 @param new_cluster_cert: Whether to generate a new cluster certificate
971 @type new_rapi_cert: bool
972 @param new_rapi_cert: Whether to generate a new RAPI certificate
973 @type rapi_cert_filename: string
974 @param rapi_cert_filename: Path to file containing new RAPI certificate
975 @type new_spice_cert: bool
976 @param new_spice_cert: Whether to generate a new SPICE certificate
977 @type spice_cert_filename: string
978 @param spice_cert_filename: Path to file containing new SPICE certificate
979 @type spice_cacert_filename: string
980 @param spice_cacert_filename: Path to file containing the certificate of the
981 CA that signed the SPICE certificate
982 @type new_confd_hmac_key: bool
983 @param new_confd_hmac_key: Whether to generate a new HMAC key
984 @type new_cds: bool
985 @param new_cds: Whether to generate a new cluster domain secret
986 @type cds_filename: string
987 @param cds_filename: Path to file containing new cluster domain secret
988 @type force: bool
989 @param force: Whether to ask user for confirmation
990 @type new_node_cert: bool
991 @param new_node_cert: Whether to generate new node certificates
992 @type new_ssh_keys: bool
993 @param new_ssh_keys: Whether to generate new node SSH keys
994
995 """
996 if new_rapi_cert and rapi_cert_filename:
997 ToStderr("Only one of the --new-rapi-certificate and --rapi-certificate"
998 " options can be specified at the same time.")
999 return 1
1000
1001 if new_cds and cds_filename:
1002 ToStderr("Only one of the --new-cluster-domain-secret and"
1003 " --cluster-domain-secret options can be specified at"
1004 " the same time.")
1005 return 1
1006
1007 if new_spice_cert and (spice_cert_filename or spice_cacert_filename):
1008 ToStderr("When using --new-spice-certificate, the --spice-certificate"
1009 " and --spice-ca-certificate must not be used.")
1010 return 1
1011
1012 if bool(spice_cacert_filename) ^ bool(spice_cert_filename):
1013 ToStderr("Both --spice-certificate and --spice-ca-certificate must be"
1014 " specified.")
1015 return 1
1016
1017 rapi_cert_pem, spice_cert_pem, spice_cacert_pem = (None, None, None)
1018 try:
1019 if rapi_cert_filename:
1020 rapi_cert_pem = _ReadAndVerifyCert(rapi_cert_filename, True)
1021 if spice_cert_filename:
1022 spice_cert_pem = _ReadAndVerifyCert(spice_cert_filename, True)
1023 spice_cacert_pem = _ReadAndVerifyCert(spice_cacert_filename)
1024 except errors.X509CertError, err:
1025 ToStderr("Unable to load X509 certificate from %s: %s", err[0], err[1])
1026 return 1
1027
1028 if cds_filename:
1029 try:
1030 cds = utils.ReadFile(cds_filename)
1031 except Exception, err: # pylint: disable=W0703
1032 ToStderr("Can't load new cluster domain secret from %s: %s" %
1033 (cds_filename, str(err)))
1034 return 1
1035 else:
1036 cds = None
1037
1038 if not force:
1039 usertext = ("This requires all daemons on all nodes to be restarted and"
1040 " may take some time. Continue?")
1041 if not AskUser(usertext):
1042 return 1
1043
1044 def _RenewCryptoInner(ctx):
1045 ctx.feedback_fn("Updating certificates and keys")
1046 # Note: the node certificate will be generated in the LU
1047 bootstrap.GenerateClusterCrypto(new_cluster_cert,
1048 new_rapi_cert,
1049 new_spice_cert,
1050 new_confd_hmac_key,
1051 new_cds,
1052 rapi_cert_pem=rapi_cert_pem,
1053 spice_cert_pem=spice_cert_pem,
1054 spice_cacert_pem=spice_cacert_pem,
1055 cds=cds)
1056
1057 files_to_copy = []
1058
1059 if new_cluster_cert:
1060 files_to_copy.append(pathutils.NODED_CERT_FILE)
1061
1062 if new_rapi_cert or rapi_cert_pem:
1063 files_to_copy.append(pathutils.RAPI_CERT_FILE)
1064
1065 if new_spice_cert or spice_cert_pem:
1066 files_to_copy.append(pathutils.SPICE_CERT_FILE)
1067 files_to_copy.append(pathutils.SPICE_CACERT_FILE)
1068
1069 if new_confd_hmac_key:
1070 files_to_copy.append(pathutils.CONFD_HMAC_KEY)
1071
1072 if new_cds or cds:
1073 files_to_copy.append(pathutils.CLUSTER_DOMAIN_SECRET_FILE)
1074
1075 if files_to_copy:
1076 for node_name in ctx.nonmaster_nodes:
1077 port = ctx.ssh_ports[node_name]
1078 ctx.feedback_fn("Copying %s to %s:%d" %
1079 (", ".join(files_to_copy), node_name, port))
1080 for file_name in files_to_copy:
1081 ctx.ssh.CopyFileToNode(node_name, port, file_name)
1082
1083 RunWhileClusterStopped(ToStdout, _RenewCryptoInner)
1084
1085 ToStdout("All requested certificates and keys have been replaced."
1086 " Running \"gnt-cluster verify\" now is recommended.")
1087
1088 if new_node_cert or new_ssh_keys:
1089 cl = GetClient()
1090 renew_op = opcodes.OpClusterRenewCrypto(node_certificates=new_node_cert,
1091 ssh_keys=new_ssh_keys)
1092 SubmitOpCode(renew_op, cl=cl)
1093
1094 return 0
1095
1096
1097 def _BuildGanetiPubKeys(options, pub_key_file=pathutils.SSH_PUB_KEYS, cl=None,
1098 get_online_nodes_fn=GetOnlineNodes,
1099 get_nodes_ssh_ports_fn=GetNodesSshPorts,
1100 get_node_uuids_fn=GetNodeUUIDs,
1101 homedir_fn=None):
1102 """Recreates the 'ganeti_pub_key' file by polling all nodes.
1103
1104 """
1105 if os.path.exists(pub_key_file):
1106 utils.CreateBackup(pub_key_file)
1107 utils.RemoveFile(pub_key_file)
1108
1109 ssh.ClearPubKeyFile(pub_key_file)
1110
1111 if not cl:
1112 cl = GetClient()
1113
1114 (cluster_name, master_node) = \
1115 cl.QueryConfigValues(["cluster_name", "master_node"])
1116
1117 online_nodes = get_online_nodes_fn([], cl=cl)
1118 ssh_ports = get_nodes_ssh_ports_fn(online_nodes + [master_node], cl)
1119 ssh_port_map = dict(zip(online_nodes + [master_node], ssh_ports))
1120
1121 node_uuids = get_node_uuids_fn(online_nodes + [master_node], cl)
1122 node_uuid_map = dict(zip(online_nodes + [master_node], node_uuids))
1123
1124 nonmaster_nodes = [name for name in online_nodes
1125 if name != master_node]
1126
1127 (_, root_keyfiles) = \
1128 ssh.GetAllUserFiles(constants.SSH_LOGIN_USER, mkdir=False, dircheck=False,
1129 _homedir_fn=homedir_fn)
1130
1131 # get the key file of the master node
1132 for (_, (_, public_key_file)) in root_keyfiles.items():
1133 try:
1134 pub_key = utils.ReadFile(public_key_file)
1135 ssh.AddPublicKey(node_uuid_map[master_node], pub_key,
1136 key_file=pub_key_file)
1137 except IOError:
1138 # Not all types of keys might be existing
1139 pass
1140
1141 # get the key files of all non-master nodes
1142 for node in nonmaster_nodes:
1143 fetched_keys = ssh.ReadRemoteSshPubKeys(root_keyfiles, node, cluster_name,
1144 ssh_port_map[node],
1145 options.ssh_key_check,
1146 options.ssh_key_check)
1147 for pub_key in fetched_keys.values():
1148 ssh.AddPublicKey(node_uuid_map[node], pub_key, key_file=pub_key_file)
1149
1150
1151 def RenewCrypto(opts, args):
1152 """Renews cluster certificates, keys and secrets.
1153
1154 """
1155 if opts.new_ssh_keys:
1156 _BuildGanetiPubKeys(opts)
1157 return _RenewCrypto(opts.new_cluster_cert,
1158 opts.new_rapi_cert,
1159 opts.rapi_cert,
1160 opts.new_spice_cert,
1161 opts.spice_cert,
1162 opts.spice_cacert,
1163 opts.new_confd_hmac_key,
1164 opts.new_cluster_domain_secret,
1165 opts.cluster_domain_secret,
1166 opts.force,
1167 opts.new_node_cert,
1168 opts.new_ssh_keys)
1169
1170
1171 def _GetEnabledDiskTemplates(opts):
1172 """Determine the list of enabled disk templates.
1173
1174 """
1175 if opts.enabled_disk_templates:
1176 return opts.enabled_disk_templates.split(",")
1177 else:
1178 return None
1179
1180
1181 def _GetVgName(opts, enabled_disk_templates):
1182 """Determine the volume group name.
1183
1184 @type enabled_disk_templates: list of strings
1185 @param enabled_disk_templates: cluster-wide enabled disk-templates
1186
1187 """
1188 # consistency between vg name and enabled disk templates
1189 vg_name = None
1190 if opts.vg_name is not None:
1191 vg_name = opts.vg_name
1192 if enabled_disk_templates:
1193 if vg_name and not utils.IsLvmEnabled(enabled_disk_templates):
1194 ToStdout("You specified a volume group with --vg-name, but you did not"
1195 " enable any of the following lvm-based disk templates: %s" %
1196 utils.CommaJoin(constants.DTS_LVM))
1197 return vg_name
1198
1199
1200 def _GetDrbdHelper(opts, enabled_disk_templates):
1201 """Determine the DRBD usermode helper.
1202
1203 """
1204 drbd_helper = opts.drbd_helper
1205 if enabled_disk_templates:
1206 drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
1207 if not drbd_enabled and opts.drbd_helper:
1208 ToStdout("You specified a DRBD usermode helper with "
1209 " --drbd-usermode-helper while DRBD is not enabled.")
1210 return drbd_helper
1211
1212
1213 def _GetCompressionTools(opts):
1214 """Determine the list of custom compression tools.
1215
1216 """
1217 if opts.compression_tools:
1218 return opts.compression_tools.split(",")
1219 elif opts.compression_tools is None:
1220 return None # To note the parameter was not provided
1221 else:
1222 return constants.IEC_DEFAULT_TOOLS # Resetting to default
1223
1224
1225 def SetClusterParams(opts, args):
1226 """Modify the cluster.
1227
1228 @param opts: the command line options selected by the user
1229 @type args: list
1230 @param args: should be an empty list
1231 @rtype: int
1232 @return: the desired exit code
1233
1234 """
1235 if not (opts.vg_name is not None or
1236 opts.drbd_helper is not None or
1237 opts.enabled_hypervisors or opts.hvparams or
1238 opts.beparams or opts.nicparams or
1239 opts.ndparams or opts.diskparams or
1240 opts.candidate_pool_size is not None or
1241 opts.max_running_jobs is not None or
1242 opts.max_tracked_jobs is not None or
1243 opts.uid_pool is not None or
1244 opts.maintain_node_health is not None or
1245 opts.add_uids is not None or
1246 opts.remove_uids is not None or
1247 opts.default_iallocator is not None or
1248 opts.default_iallocator_params or
1249 opts.reserved_lvs is not None or
1250 opts.mac_prefix is not None or
1251 opts.master_netdev is not None or
1252 opts.master_netmask is not None or
1253 opts.use_external_mip_script is not None or
1254 opts.prealloc_wipe_disks is not None or
1255 opts.hv_state or
1256 opts.enabled_disk_templates or
1257 opts.disk_state or
1258 opts.ipolicy_bounds_specs is not None or
1259 opts.ipolicy_std_specs is not None or
1260 opts.ipolicy_disk_templates is not None or
1261 opts.ipolicy_vcpu_ratio is not None or
1262 opts.ipolicy_spindle_ratio is not None or
1263 opts.modify_etc_hosts is not None or
1264 opts.file_storage_dir is not None or
1265 opts.install_image is not None or
1266 opts.instance_communication_network is not None or
1267 opts.zeroing_image is not None or
1268 opts.shared_file_storage_dir is not None or
1269 opts.compression_tools is not None or
1270 opts.shared_file_storage_dir is not None or
1271 opts.enabled_user_shutdown is not None or
1272 opts.data_collector_interval or
1273 opts.enabled_data_collectors):
1274 ToStderr("Please give at least one of the parameters.")
1275 return 1
1276
1277 enabled_disk_templates = _GetEnabledDiskTemplates(opts)
1278 vg_name = _GetVgName(opts, enabled_disk_templates)
1279
1280 try:
1281 drbd_helper = _GetDrbdHelper(opts, enabled_disk_templates)
1282 except errors.OpPrereqError, e:
1283 ToStderr(str(e))
1284 return 1
1285
1286 hvlist = opts.enabled_hypervisors
1287 if hvlist is not None:
1288 hvlist = hvlist.split(",")
1289
1290 # a list of (name, dict) we can pass directly to dict() (or [])
1291 hvparams = dict(opts.hvparams)
1292 for hv_params in hvparams.values():
1293 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1294
1295 diskparams = dict(opts.diskparams)
1296
1297 for dt_params in diskparams.values():
1298 utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
1299
1300 beparams = opts.beparams
1301 utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
1302
1303 nicparams = opts.nicparams
1304 utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
1305
1306 ndparams = opts.ndparams
1307 if ndparams is not None:
1308 utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
1309
1310 ipolicy = CreateIPolicyFromOpts(
1311 minmax_ispecs=opts.ipolicy_bounds_specs,
1312 std_ispecs=opts.ipolicy_std_specs,
1313 ipolicy_disk_templates=opts.ipolicy_disk_templates,
1314 ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
1315 ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
1316 )
1317
1318 mnh = opts.maintain_node_health
1319
1320 uid_pool = opts.uid_pool
1321 if uid_pool is not None:
1322 uid_pool = uidpool.ParseUidPool(uid_pool)
1323
1324 add_uids = opts.add_uids
1325 if add_uids is not None:
1326 add_uids = uidpool.ParseUidPool(add_uids)
1327
1328 remove_uids = opts.remove_uids
1329 if remove_uids is not None:
1330 remove_uids = uidpool.ParseUidPool(remove_uids)
1331
1332 if opts.reserved_lvs is not None:
1333 if opts.reserved_lvs == "":
1334 opts.reserved_lvs = []
1335 else:
1336 opts.reserved_lvs = utils.UnescapeAndSplit(opts.reserved_lvs, sep=",")
1337
1338 if opts.master_netmask is not None:
1339 try:
1340 opts.master_netmask = int(opts.master_netmask)
1341 except ValueError:
1342 ToStderr("The --master-netmask option expects an int parameter.")
1343 return 1
1344
1345 ext_ip_script = opts.use_external_mip_script
1346
1347 if opts.disk_state:
1348 disk_state = utils.FlatToDict(opts.disk_state)
1349 else:
1350 disk_state = {}
1351
1352 hv_state = dict(opts.hv_state)
1353
1354 compression_tools = _GetCompressionTools(opts)
1355
1356 enabled_data_collectors = dict(
1357 (k, v.lower().startswith("t"))
1358 for k, v in opts.enabled_data_collectors.items())
1359
1360 unrecognized_data_collectors = [
1361 k for k in enabled_data_collectors.keys()
1362 if k not in constants.DATA_COLLECTOR_NAMES]
1363 if unrecognized_data_collectors:
1364 ToStderr("Data collector names not recognized: %s" %
1365 ", ".join(unrecognized_data_collectors))
1366
1367 try:
1368 data_collector_interval = dict(
1369 (k, long(1e6 * float(v)))
1370 for (k, v) in opts.data_collector_interval.items())
1371 except ValueError:
1372 ToStderr("Can't transform all values to integers: {}".format(
1373 opts.data_collector_interval))
1374 return 1
1375 if any(v <= 0 for v in data_collector_interval):
1376 ToStderr("Some interval times where not above zero.")
1377 return 1
1378
1379 op = opcodes.OpClusterSetParams(
1380 vg_name=vg_name,
1381 drbd_helper=drbd_helper,
1382 enabled_hypervisors=hvlist,
1383 hvparams=hvparams,
1384 os_hvp=None,
1385 beparams=beparams,
1386 nicparams=nicparams,
1387 ndparams=ndparams,
1388 diskparams=diskparams,
1389 ipolicy=ipolicy,
1390 candidate_pool_size=opts.candidate_pool_size,
1391 max_running_jobs=opts.max_running_jobs,
1392 max_tracked_jobs=opts.max_tracked_jobs,
1393 maintain_node_health=mnh,
1394 modify_etc_hosts=opts.modify_etc_hosts,
1395 uid_pool=uid_pool,
1396 add_uids=add_uids,
1397 remove_uids=remove_uids,
1398 default_iallocator=opts.default_iallocator,
1399 default_iallocator_params=opts.default_iallocator_params,
1400 prealloc_wipe_disks=opts.prealloc_wipe_disks,
1401 mac_prefix=opts.mac_prefix,
1402 master_netdev=opts.master_netdev,
1403 master_netmask=opts.master_netmask,
1404 reserved_lvs=opts.reserved_lvs,
1405 use_external_mip_script=ext_ip_script,
1406 hv_state=hv_state,
1407 disk_state=disk_state,
1408 enabled_disk_templates=enabled_disk_templates,
1409 force=opts.force,
1410 file_storage_dir=opts.file_storage_dir,
1411 install_image=opts.install_image,
1412 instance_communication_network=opts.instance_communication_network,
1413 zeroing_image=opts.zeroing_image,
1414 shared_file_storage_dir=opts.shared_file_storage_dir,
1415 compression_tools=compression_tools,
1416 enabled_user_shutdown=opts.enabled_user_shutdown,
1417 enabled_data_collectors=enabled_data_collectors,
1418 data_collector_interval=data_collector_interval,
1419 )
1420 return base.GetResult(None, opts, SubmitOrSend(op, opts))
1421
1422
1423 def QueueOps(opts, args):
1424 """Queue operations.
1425
1426 @param opts: the command line options selected by the user
1427 @type args: list
1428 @param args: should contain only one element, the subcommand
1429 @rtype: int
1430 @return: the desired exit code
1431
1432 """
1433 command = args[0]
1434 client = GetClient()
1435 if command in ("drain", "undrain"):
1436 drain_flag = command == "drain"
1437 client.SetQueueDrainFlag(drain_flag)
1438 elif command == "info":
1439 result = client.QueryConfigValues(["drain_flag"])
1440 if result[0]:
1441 val = "set"
1442 else:
1443 val = "unset"
1444 ToStdout("The drain flag is %s" % val)
1445 else:
1446 raise errors.OpPrereqError("Command '%s' is not valid." % command,
1447 errors.ECODE_INVAL)
1448
1449 return 0
1450
1451
1452 def _ShowWatcherPause(until):
1453 if until is None or until < time.time():
1454 ToStdout("The watcher is not paused.")
1455 else:
1456 ToStdout("The watcher is paused until %s.", time.ctime(until))
1457
1458
1459 def WatcherOps(opts, args):
1460 """Watcher operations.
1461
1462 @param opts: the command line options selected by the user
1463 @type args: list
1464 @param args: should contain only one element, the subcommand
1465 @rtype: int
1466 @return: the desired exit code
1467
1468 """
1469 command = args[0]
1470 client = GetClient()
1471
1472 if command == "continue":
1473 client.SetWatcherPause(None)
1474 ToStdout("The watcher is no longer paused.")
1475
1476 elif command == "pause":
1477 if len(args) < 2:
1478 raise errors.OpPrereqError("Missing pause duration", errors.ECODE_INVAL)
1479
1480 result = client.SetWatcherPause(time.time() + ParseTimespec(args[1]))
1481 _ShowWatcherPause(result)
1482
1483 elif command == "info":
1484 result = client.QueryConfigValues(["watcher_pause"])
1485 _ShowWatcherPause(result[0])
1486
1487 else:
1488 raise errors.OpPrereqError("Command '%s' is not valid." % command,
1489 errors.ECODE_INVAL)
1490
1491 return 0
1492
1493
1494 def _OobPower(opts, node_list, power):
1495 """Puts the node in the list to desired power state.
1496
1497 @param opts: The command line options selected by the user
1498 @param node_list: The list of nodes to operate on
1499 @param power: True if they should be powered on, False otherwise
1500 @return: The success of the operation (none failed)
1501
1502 """
1503 if power:
1504 command = constants.OOB_POWER_ON
1505 else:
1506 command = constants.OOB_POWER_OFF
1507
1508 op = opcodes.OpOobCommand(node_names=node_list,
1509 command=command,
1510 ignore_status=True,
1511 timeout=opts.oob_timeout,
1512 power_delay=opts.power_delay)
1513 result = SubmitOpCode(op, opts=opts)
1514 errs = 0
1515 for node_result in result:
1516 (node_tuple, data_tuple) = node_result
1517 (_, node_name) = node_tuple
1518 (data_status, _) = data_tuple
1519 if data_status != constants.RS_NORMAL:
1520 assert data_status != constants.RS_UNAVAIL
1521 errs += 1
1522 ToStderr("There was a problem changing power for %s, please investigate",
1523 node_name)
1524
1525 if errs > 0:
1526 return False
1527
1528 return True
1529
1530
1531 def _InstanceStart(opts, inst_list, start, no_remember=False):
1532 """Puts the instances in the list to desired state.
1533
1534 @param opts: The command line options selected by the user
1535 @param inst_list: The list of instances to operate on
1536 @param start: True if they should be started, False for shutdown
1537 @param no_remember: If the instance state should be remembered
1538 @return: The success of the operation (none failed)
1539
1540 """
1541 if start:
1542 opcls = opcodes.OpInstanceStartup
1543 text_submit, text_success, text_failed = ("startup", "started", "starting")
1544 else:
1545 opcls = compat.partial(opcodes.OpInstanceShutdown,
1546 timeout=opts.shutdown_timeout,
1547 no_remember=no_remember)
1548 text_submit, text_success, text_failed = ("shutdown", "stopped", "stopping")
1549
1550 jex = JobExecutor(opts=opts)
1551
1552 for inst in inst_list:
1553 ToStdout("Submit %s of instance %s", text_submit, inst)
1554 op = opcls(instance_name=inst)
1555 jex.QueueJob(inst, op)
1556
1557 results = jex.GetResults()
1558 bad_cnt = len([1 for (success, _) in results if not success])
1559
1560 if bad_cnt == 0:
1561 ToStdout("All instances have been %s successfully", text_success)
1562 else:
1563 ToStderr("There were errors while %s instances:\n"
1564 "%d error(s) out of %d instance(s)", text_failed, bad_cnt,
1565 len(results))
1566 return False
1567
1568 return True
1569
1570
1571 class _RunWhenNodesReachableHelper(object):
1572 """Helper class to make shared internal state sharing easier.
1573
1574 @ivar success: Indicates if all action_cb calls were successful
1575
1576 """
1577 def __init__(self, node_list, action_cb, node2ip, port, feedback_fn,
1578 _ping_fn=netutils.TcpPing, _sleep_fn=time.sleep):
1579 """Init the object.
1580
1581 @param node_list: The list of nodes to be reachable
1582 @param action_cb: Callback called when a new host is reachable
1583 @type node2ip: dict
1584 @param node2ip: Node to ip mapping
1585 @param port: The port to use for the TCP ping
1586 @param feedback_fn: The function used for feedback
1587 @param _ping_fn: Function to check reachabilty (for unittest use only)
1588 @param _sleep_fn: Function to sleep (for unittest use only)
1589
1590 """
1591 self.down = set(node_list)
1592 self.up = set()
1593 self.node2ip = node2ip
1594 self.success = True
1595 self.action_cb = action_cb
1596 self.port = port
1597 self.feedback_fn = feedback_fn
1598 self._ping_fn = _ping_fn
1599 self._sleep_fn = _sleep_fn
1600
1601 def __call__(self):
1602 """When called we run action_cb.
1603
1604 @raises utils.RetryAgain: When there are still down nodes
1605
1606 """
1607 if not self.action_cb(self.up):
1608 self.success = False
1609
1610 if self.down:
1611 raise utils.RetryAgain()
1612 else:
1613 return self.success
1614
1615 def Wait(self, secs):
1616 """Checks if a host is up or waits remaining seconds.
1617
1618 @param secs: The secs remaining
1619
1620 """
1621 start = time.time()
1622 for node in self.down:
1623 if self._ping_fn(self.node2ip[node], self.port, timeout=_EPO_PING_TIMEOUT,
1624 live_port_needed=True):
1625 self.feedback_fn("Node %s became available" % node)
1626 self.up.add(node)
1627 self.down -= self.up
1628 # If we have a node available there is the possibility to run the
1629 # action callback successfully, therefore we don't wait and return
1630 return
1631
1632 self._sleep_fn(max(0.0, start + secs - time.time()))
1633
1634
1635 def _RunWhenNodesReachable(node_list, action_cb, interval):
1636 """Run action_cb when nodes become reachable.
1637
1638 @param node_list: The list of nodes to be reachable
1639 @param action_cb: Callback called when a new host is reachable
1640 @param interval: The earliest time to retry
1641
1642 """
1643 client = GetClient()
1644 cluster_info = client.QueryClusterInfo()
1645 if cluster_info["primary_ip_version"] == constants.IP4_VERSION:
1646 family = netutils.IPAddress.family
1647 else:
1648 family = netutils.IP6Address.family
1649
1650 node2ip = dict((node, netutils.GetHostname(node, family=family).ip)
1651 for node in node_list)
1652
1653 port = netutils.GetDaemonPort(constants.NODED)
1654 helper = _RunWhenNodesReachableHelper(node_list, action_cb, node2ip, port,
1655 ToStdout)
1656
1657 try:
1658 return utils.Retry(helper, interval, _EPO_REACHABLE_TIMEOUT,
1659 wait_fn=helper.Wait)
1660 except utils.RetryTimeout:
1661 ToStderr("Time exceeded while waiting for nodes to become reachable"
1662 " again:\n - %s", " - ".join(helper.down))
1663 return False
1664
1665
1666 def _MaybeInstanceStartup(opts, inst_map, nodes_online,
1667 _instance_start_fn=_InstanceStart):
1668 """Start the instances conditional based on node_states.
1669
1670 @param opts: The command line options selected by the user
1671 @param inst_map: A dict of inst -> nodes mapping
1672 @param nodes_online: A list of nodes online
1673 @param _instance_start_fn: Callback to start instances (unittest use only)
1674 @return: Success of the operation on all instances
1675
1676 """
1677 start_inst_list = []
1678 for (inst, nodes) in inst_map.items():
1679 if not (nodes - nodes_online):
1680 # All nodes the instance lives on are back online
1681 start_inst_list.append(inst)
1682
1683 for inst in start_inst_list:
1684 del inst_map[inst]
1685
1686 if start_inst_list:
1687 return _instance_start_fn(opts, start_inst_list, True)
1688
1689 return True
1690
1691
1692 def _EpoOn(opts, full_node_list, node_list, inst_map):
1693 """Does the actual power on.
1694
1695 @param opts: The command line options selected by the user
1696 @param full_node_list: All nodes to operate on (includes nodes not supporting
1697 OOB)
1698 @param node_list: The list of nodes to operate on (all need to support OOB)
1699 @param inst_map: A dict of inst -> nodes mapping
1700 @return: The desired exit status
1701
1702 """
1703 if node_list and not _OobPower(opts, node_list, False):
1704 ToStderr("Not all nodes seem to get back up, investigate and start"
1705 " manually if needed")
1706
1707 # Wait for the nodes to be back up
1708 action_cb = compat.partial(_MaybeInstanceStartup, opts, dict(inst_map))
1709
1710 ToStdout("Waiting until all nodes are available again")
1711 if not _RunWhenNodesReachable(full_node_list, action_cb, _EPO_PING_INTERVAL):
1712 ToStderr("Please investigate and start stopped instances manually")
1713 return constants.EXIT_FAILURE
1714
1715 return constants.EXIT_SUCCESS
1716
1717
1718 def _EpoOff(opts, node_list, inst_map):
1719 """Does the actual power off.
1720
1721 @param opts: The command line options selected by the user
1722 @param node_list: The list of nodes to operate on (all need to support OOB)
1723 @param inst_map: A dict of inst -> nodes mapping
1724 @return: The desired exit status
1725
1726 """
1727 if not _InstanceStart(opts, inst_map.keys(), False, no_remember=True):
1728 ToStderr("Please investigate and stop instances manually before continuing")
1729 return constants.EXIT_FAILURE
1730
1731 if not node_list:
1732 return constants.EXIT_SUCCESS
1733
1734 if _OobPower(opts, node_list, False):
1735 return constants.EXIT_SUCCESS
1736 else:
1737 return constants.EXIT_FAILURE
1738
1739
1740 def Epo(opts, args, qcl=None, _on_fn=_EpoOn, _off_fn=_EpoOff,
1741 _confirm_fn=ConfirmOperation,
1742 _stdout_fn=ToStdout, _stderr_fn=ToStderr):
1743 """EPO operations.
1744
1745 @param opts: the command line options selected by the user
1746 @type args: list
1747 @param args: should contain only one element, the subcommand
1748 @rtype: int
1749 @return: the desired exit code
1750
1751 """
1752 if opts.groups and opts.show_all:
1753 _stderr_fn("Only one of --groups or --all are allowed")
1754 return constants.EXIT_FAILURE
1755 elif args and opts.show_all:
1756 _stderr_fn("Arguments in combination with --all are not allowed")
1757 return constants.EXIT_FAILURE
1758
1759 if qcl is None:
1760 # Query client
1761 qcl = GetClient()
1762
1763 if opts.groups:
1764 node_query_list = \
1765 itertools.chain(*qcl.QueryGroups(args, ["node_list"], False))
1766 else:
1767 node_query_list = args
1768
1769 result = qcl.QueryNodes(node_query_list, ["name", "master", "pinst_list",
1770 "sinst_list", "powered", "offline"],
1771 False)
1772
1773 all_nodes = map(compat.fst, result)
1774 node_list = []
1775 inst_map = {}
1776 for (node, master, pinsts, sinsts, powered, offline) in result:
1777 if not offline:
1778 for inst in (pinsts + sinsts):
1779 if inst in inst_map:
1780 if not master:
1781 inst_map[inst].add(node)
1782 elif master:
1783 inst_map[inst] = set()
1784 else:
1785 inst_map[inst] = set([node])
1786
1787 if master and opts.on:
1788 # We ignore the master for turning on the machines, in fact we are
1789 # already operating on the master at this point :)
1790 continue
1791 elif master and not opts.show_all:
1792 _stderr_fn("%s is the master node, please do a master-failover to another"
1793 " node not affected by the EPO or use --all if you intend to"
1794 " shutdown the whole cluster", node)
1795 return constants.EXIT_FAILURE
1796 elif powered is None:
1797 _stdout_fn("Node %s does not support out-of-band handling, it can not be"
1798 " handled in a fully automated manner", node)
1799 elif powered == opts.on:
1800 _stdout_fn("Node %s is already in desired power state, skipping", node)
1801 elif not offline or (offline and powered):
1802 node_list.append(node)
1803
1804 if not (opts.force or _confirm_fn(all_nodes, "nodes", "epo")):
1805 return constants.EXIT_FAILURE
1806
1807 if opts.on:
1808 return _on_fn(opts, all_nodes, node_list, inst_map)
1809 else:
1810 return _off_fn(opts, node_list, inst_map)
1811
1812
1813 def _GetCreateCommand(info):
1814 buf = StringIO()
1815 buf.write("gnt-cluster init")
1816 PrintIPolicyCommand(buf, info["ipolicy"], False)
1817 buf.write(" ")
1818 buf.write(info["name"])
1819 return buf.getvalue()
1820
1821
1822 def ShowCreateCommand(opts, args):
1823 """Shows the command that can be used to re-create the cluster.
1824
1825 Currently it works only for ipolicy specs.
1826
1827 """
1828 cl = GetClient()
1829 result = cl.QueryClusterInfo()
1830 ToStdout(_GetCreateCommand(result))
1831
1832
1833 def _RunCommandAndReport(cmd):
1834 """Run a command and report its output, iff it failed.
1835
1836 @param cmd: the command to execute
1837 @type cmd: list
1838 @rtype: bool
1839 @return: False, if the execution failed.
1840
1841 """
1842 result = utils.RunCmd(cmd)
1843 if result.failed:
1844 ToStderr("Command %s failed: %s; Output %s" %
1845 (cmd, result.fail_reason, result.output))
1846 return False
1847 return True
1848
1849
1850 def _VerifyCommand(cmd):
1851 """Verify that a given command succeeds on all online nodes.
1852
1853 As this function is intended to run during upgrades, it
1854 is implemented in such a way that it still works, if all Ganeti
1855 daemons are down.
1856
1857 @param cmd: the command to execute
1858 @type cmd: list
1859 @rtype: list
1860 @return: the list of node names that are online where
1861 the command failed.
1862
1863 """
1864 command = utils.text.ShellQuoteArgs([str(val) for val in cmd])
1865
1866 nodes = ssconf.SimpleStore().GetOnlineNodeList()
1867 master_node = ssconf.SimpleStore().GetMasterNode()
1868 cluster_name = ssconf.SimpleStore().GetClusterName()
1869
1870 # If master node is in 'nodes', make sure master node is at list end
1871 if master_node in nodes:
1872 nodes.remove(master_node)
1873 nodes.append(master_node)
1874
1875 failed = []
1876
1877 srun = ssh.SshRunner(cluster_name=cluster_name)
1878 for name in nodes:
1879 result = srun.Run(name, constants.SSH_LOGIN_USER, command)
1880 if result.exit_code != 0:
1881 failed.append(name)
1882
1883 return failed
1884
1885
1886 def _VerifyVersionInstalled(versionstring):
1887 """Verify that the given version of ganeti is installed on all online nodes.
1888
1889 Do nothing, if this is the case, otherwise print an appropriate
1890 message to stderr.
1891
1892 @param versionstring: the version to check for
1893 @type versionstring: string
1894 @rtype: bool
1895 @return: True, if the version is installed on all online nodes
1896
1897 """
1898 badnodes = _VerifyCommand(["test", "-d",
1899 os.path.join(pathutils.PKGLIBDIR, versionstring)])
1900 if badnodes:
1901 ToStderr("Ganeti version %s not installed on nodes %s"
1902 % (versionstring, ", ".join(badnodes)))
1903 return False
1904
1905 return True
1906
1907
1908 def _GetRunning():
1909 """Determine the list of running jobs.
1910
1911 @rtype: list
1912 @return: the number of jobs still running
1913
1914 """
1915 cl = GetClient()
1916 qfilter = qlang.MakeSimpleFilter("status",
1917 frozenset([constants.JOB_STATUS_RUNNING]))
1918 return len(cl.Query(constants.QR_JOB, [], qfilter).data)
1919
1920
1921 def _SetGanetiVersion(versionstring):
1922 """Set the active version of ganeti to the given versionstring
1923
1924 @type versionstring: string
1925 @rtype: list
1926 @return: the list of nodes where the version change failed
1927
1928 """
1929 failed = []
1930 if constants.HAS_GNU_LN:
1931 failed.extend(_VerifyCommand(
1932 ["ln", "-s", "-f", "-T",
1933 os.path.join(pathutils.PKGLIBDIR, versionstring),
1934 os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
1935 failed.extend(_VerifyCommand(
1936 ["ln", "-s", "-f", "-T",
1937 os.path.join(pathutils.SHAREDIR, versionstring),
1938 os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
1939 else:
1940 failed.extend(_VerifyCommand(
1941 ["rm", "-f", os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
1942 failed.extend(_VerifyCommand(
1943 ["ln", "-s", "-f", os.path.join(pathutils.PKGLIBDIR, versionstring),
1944 os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
1945 failed.extend(_VerifyCommand(
1946 ["rm", "-f", os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
1947 failed.extend(_VerifyCommand(
1948 ["ln", "-s", "-f", os.path.join(pathutils.SHAREDIR, versionstring),
1949 os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
1950 return list(set(failed))
1951
1952
1953 def _ExecuteCommands(fns):
1954 """Execute a list of functions, in reverse order.
1955
1956 @type fns: list of functions.
1957 @param fns: the functions to be executed.
1958
1959 """
1960 for fn in reversed(fns):
1961 fn()
1962
1963
1964 def _GetConfigVersion():
1965 """Determine the version the configuration file currently has.
1966
1967 @rtype: tuple or None
1968 @return: (major, minor, revision) if the version can be determined,
1969 None otherwise
1970
1971 """
1972 config_data = serializer.LoadJson(utils.ReadFile(pathutils.CLUSTER_CONF_FILE))
1973 try:
1974 config_version = config_data["version"]
1975 except KeyError:
1976 return None
1977 return utils.SplitVersion(config_version)
1978
1979
1980 def _ReadIntentToUpgrade():
1981 """Read the file documenting the intent to upgrade the cluster.
1982
1983 @rtype: (string, string) or (None, None)
1984 @return: (old version, version to upgrade to), if the file exists,
1985 and (None, None) otherwise.
1986
1987 """
1988 if not os.path.isfile(pathutils.INTENT_TO_UPGRADE):
1989 return (None, None)
1990
1991 contentstring = utils.ReadFile(pathutils.INTENT_TO_UPGRADE)
1992 contents = utils.UnescapeAndSplit(contentstring)
1993 if len(contents) != 3:
1994 # file syntactically mal-formed
1995 return (None, None)
1996 return (contents[0], contents[1])
1997
1998
1999 def _WriteIntentToUpgrade(version):
2000 """Write file documenting the intent to upgrade the cluster.
2001
2002 @type version: string
2003 @param version: the version we intent to upgrade to
2004
2005 """
2006 utils.WriteFile(pathutils.INTENT_TO_UPGRADE,
2007 data=utils.EscapeAndJoin([constants.RELEASE_VERSION, version,
2008 "%d" % os.getpid()]))
2009
2010
2011 def _UpgradeBeforeConfigurationChange(versionstring):
2012 """
2013 Carry out all the tasks necessary for an upgrade that happen before
2014 the configuration file, or Ganeti version, changes.
2015
2016 @type versionstring: string
2017 @param versionstring: the version to upgrade to
2018 @rtype: (bool, list)
2019 @return: tuple of a bool indicating success and a list of rollback tasks
2020
2021 """
2022 rollback = []
2023
2024 if not _VerifyVersionInstalled(versionstring):
2025 return (False, rollback)
2026
2027 _WriteIntentToUpgrade(versionstring)
2028 rollback.append(
2029 lambda: utils.RunCmd(["rm", "-f", pathutils.INTENT_TO_UPGRADE]))
2030
2031 ToStdout("Draining queue")
2032 client = GetClient()
2033 client.SetQueueDrainFlag(True)
2034
2035 rollback.append(lambda: GetClient().SetQueueDrainFlag(False))
2036
2037 if utils.SimpleRetry(0, _GetRunning,
2038 constants.UPGRADE_QUEUE_POLL_INTERVAL,
2039 constants.UPGRADE_QUEUE_DRAIN_TIMEOUT):
2040 ToStderr("Failed to completely empty the queue.")
2041 return (False, rollback)
2042
2043 ToStdout("Pausing the watcher for one hour.")
2044 rollback.append(lambda: GetClient().SetWatcherPause(None))
2045 GetClient().SetWatcherPause(time.time() + 60 * 60)
2046
2047 ToStdout("Stopping daemons on master node.")
2048 if not _RunCommandAndReport([pathutils.DAEMON_UTIL, "stop-all"]):
2049 return (False, rollback)
2050
2051 if not _VerifyVersionInstalled(versionstring):
2052 utils.RunCmd([pathutils.DAEMON_UTIL, "start-all"])
2053 return (False, rollback)
2054
2055 ToStdout("Stopping daemons everywhere.")
2056 rollback.append(lambda: _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"]))
2057 badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "stop-all"])
2058 if badnodes:
2059 ToStderr("Failed to stop daemons on %s." % (", ".join(badnodes),))
2060 return (False, rollback)
2061
2062 backuptar = os.path.join(pathutils.BACKUP_DIR, "ganeti%d.tar" % time.time())
2063 ToStdout("Backing up configuration as %s" % backuptar)
2064 if not _RunCommandAndReport(["mkdir", "-p", pathutils.BACKUP_DIR]):
2065 return (False, rollback)
2066
2067 # Create the archive in a safe manner, as it contains sensitive
2068 # information.
2069 (_, tmp_name) = tempfile.mkstemp(prefix=backuptar, dir=pathutils.BACKUP_DIR)
2070 if not _RunCommandAndReport(["tar", "-cf", tmp_name,
2071 "--exclude=queue/archive",
2072 pathutils.DATA_DIR]):
2073 return (False, rollback)
2074
2075 os.rename(tmp_name, backuptar)
2076 return (True, rollback)
2077
2078
2079 def _VersionSpecificDowngrade():
2080 """
2081 Perform any additional downrade tasks that are version specific
2082 and need to be done just after the configuration downgrade. This
2083 function needs to be idempotent, so that it can be redone if the
2084 downgrade procedure gets interrupted after changing the
2085 configuration.
2086
2087 Note that this function has to be reset with every version bump.
2088
2089 @return: True upon success
2090 """
2091 ToStdout("Performing version-specific downgrade tasks.")
2092 return True
2093
2094
2095 def _SwitchVersionAndConfig(versionstring, downgrade):
2096 """
2097 Switch to the new Ganeti version and change the configuration,
2098 in correct order.
2099
2100 @type versionstring: string
2101 @param versionstring: the version to change to
2102 @type downgrade: bool
2103 @param downgrade: True, if the configuration should be downgraded
2104 @rtype: (bool, list)
2105 @return: tupe of a bool indicating success, and a list of
2106 additional rollback tasks
2107
2108 """
2109 rollback = []
2110 if downgrade:
2111 ToStdout("Downgrading configuration")
2112 if not _RunCommandAndReport([pathutils.CFGUPGRADE, "--downgrade", "-f"]):
2113 return (False, rollback)
2114 # Note: version specific downgrades need to be done before switching
2115 # binaries, so that we still have the knowledgeable binary if the downgrade
2116 # process gets interrupted at this point.
2117 if not _VersionSpecificDowngrade():
2118 return (False, rollback)
2119
2120 # Configuration change is the point of no return. From then onwards, it is
2121 # safer to push through the up/dowgrade than to try to roll it back.
2122
2123 ToStdout("Switching to version %s on all nodes" % versionstring)
2124 rollback.append(lambda: _SetGanetiVersion(constants.DIR_VERSION))
2125 badnodes = _SetGanetiVersion(versionstring)
2126 if badnodes:
2127 ToStderr("Failed to switch to Ganeti version %s on nodes %s"
2128 % (versionstring, ", ".join(badnodes)))
2129 if not downgrade:
2130 return (False, rollback)
2131
2132 # Now that we have changed to the new version of Ganeti we should
2133 # not communicate over luxi any more, as luxi might have changed in
2134 # incompatible ways. Therefore, manually call the corresponding ganeti
2135 # commands using their canonical (version independent) path.
2136
2137 if not downgrade:
2138 ToStdout("Upgrading configuration")
2139 if not _RunCommandAndReport([pathutils.CFGUPGRADE, "-f"]):
2140 return (False, rollback)
2141
2142 return (True, rollback)
2143
2144
2145 def _UpgradeAfterConfigurationChange(oldversion):
2146 """
2147 Carry out the upgrade actions necessary after switching to the new
2148 Ganeti version and updating the configuration.
2149
2150 As this part is run at a time where the new version of Ganeti is already
2151 running, no communication should happen via luxi, as this is not a stable
2152 interface. Also, as the configuration change is the point of no return,
2153 all actions are pushed trough, even if some of them fail.
2154
2155 @param oldversion: the version the upgrade started from
2156 @type oldversion: string
2157 @rtype: int
2158 @return: the intended return value
2159
2160 """
2161 returnvalue = 0
2162
2163 ToStdout("Ensuring directories everywhere.")
2164 badnodes = _VerifyCommand([pathutils.ENSURE_DIRS])
2165 if badnodes:
2166 ToStderr("Warning: failed to ensure directories on %s." %
2167 (", ".join(badnodes)))
2168 returnvalue = 1
2169
2170 ToStdout("Starting daemons everywhere.")
2171 badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"])
2172 if badnodes:
2173 ToStderr("Warning: failed to start daemons on %s." % (", ".join(badnodes),))
2174 returnvalue = 1
2175
2176 ToStdout("Redistributing the configuration.")
2177 if not _RunCommandAndReport(["gnt-cluster", "redist-conf", "--yes-do-it"]):
2178 returnvalue = 1
2179
2180 ToStdout("Restarting daemons everywhere.")
2181 badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "stop-all"])
2182 badnodes.extend(_VerifyCommand([pathutils.DAEMON_UTIL, "start-all"]))
2183 if badnodes:
2184 ToStderr("Warning: failed to start daemons on %s." %
2185 (", ".join(list(set(badnodes))),))
2186 returnvalue = 1
2187
2188 ToStdout("Undraining the queue.")
2189 if not _RunCommandAndReport(["gnt-cluster", "queue", "undrain"]):
2190 returnvalue = 1
2191
2192 _RunCommandAndReport(["rm", "-f", pathutils.INTENT_TO_UPGRADE])
2193
2194 ToStdout("Running post-upgrade hooks")
2195 if not _RunCommandAndReport([pathutils.POST_UPGRADE, oldversion]):
2196 returnvalue = 1
2197
2198 ToStdout("Unpasuing the watcher.")
2199 if not _RunCommandAndReport(["gnt-cluster", "watcher", "continue"]):
2200 returnvalue = 1
2201
2202 ToStdout("Verifying cluster.")
2203 if not _RunCommandAndReport(["gnt-cluster", "verify"]):
2204 returnvalue = 1
2205
2206 return returnvalue
2207
2208
2209 def UpgradeGanetiCommand(opts, args):
2210 """Upgrade a cluster to a new ganeti version.
2211
2212 @param opts: the command line options selected by the user
2213 @type args: list
2214 @param args: should be an empty list
2215 @rtype: int
2216 @return: the desired exit code
2217
2218 """
2219 if ((not opts.resume and opts.to is None)
2220 or (opts.resume and opts.to is not None)):
2221 ToStderr("Precisely one of the options --to and --resume"
2222 " has to be given")
2223 return 1
2224
2225 # If we're not told to resume, verify there is no upgrade
2226 # in progress.
2227 if not opts.resume:
2228 oldversion, versionstring = _ReadIntentToUpgrade()
2229 if versionstring is not None:
2230 # An upgrade is going on; verify whether the target matches
2231 if versionstring == opts.to:
2232 ToStderr("An upgrade is already in progress. Target version matches,"
2233 " resuming.")
2234 opts.resume = True
2235 opts.to = None
2236 else:
2237 ToStderr("An upgrade from %s to %s is in progress; use --resume to"
2238 " finish it first" % (oldversion, versionstring))
2239 return 1
2240
2241 oldversion = constants.RELEASE_VERSION
2242
2243 if opts.resume:
2244 ssconf.CheckMaster(False)
2245 oldversion, versionstring = _ReadIntentToUpgrade()
2246 if versionstring is None:
2247 return 0
2248 version = utils.version.ParseVersion(versionstring)
2249 if version is None:
2250 return 1
2251 configversion = _GetConfigVersion()
2252 if configversion is None:
2253 return 1
2254 # If the upgrade we resume was an upgrade between compatible
2255 # versions (like 2.10.0 to 2.10.1), the correct configversion
2256 # does not guarantee that the config has been updated.
2257 # However, in the case of a compatible update with the configuration
2258 # not touched, we are running a different dirversion with the same
2259 # config version.
2260 config_already_modified = \
2261 (utils.IsCorrectConfigVersion(version, configversion) and
2262 not (versionstring != constants.DIR_VERSION and
2263 configversion == (constants.CONFIG_MAJOR, constants.CONFIG_MINOR,
2264 constants.CONFIG_REVISION)))
2265 if not config_already_modified:
2266 # We have to start from the beginning; however, some daemons might have
2267 # already been stopped, so the only way to get into a well-defined state
2268 # is by starting all daemons again.
2269 _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"])
2270 else:
2271 versionstring = opts.to
2272 config_already_modified = False
2273 version = utils.version.ParseVersion(versionstring)
2274 if version is None:
2275 ToStderr("Could not parse version string %s" % versionstring)
2276 return 1
2277
2278 msg = utils.version.UpgradeRange(version)
2279 if msg is not None:
2280 ToStderr("Cannot upgrade to %s: %s" % (versionstring, msg))
2281 return 1
2282
2283 if not config_already_modified:
2284 success, rollback = _UpgradeBeforeConfigurationChange(versionstring)
2285 if not success:
2286 _ExecuteCommands(rollback)
2287 return 1
2288 else:
2289 rollback = []
2290
2291 downgrade = utils.version.ShouldCfgdowngrade(version)
2292
2293 success, additionalrollback = \
2294 _SwitchVersionAndConfig(versionstring, downgrade)
2295 if not success:
2296 rollback.extend(additionalrollback)
2297 _ExecuteCommands(rollback)
2298 return 1
2299
2300 return _UpgradeAfterConfigurationChange(oldversion)
2301
2302
2303 commands = {
2304 "init": (
2305 InitCluster, [ArgHost(min=1, max=1)],
2306 [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, GLOBAL_FILEDIR_OPT,
2307 HVLIST_OPT, MAC_PREFIX_OPT, MASTER_NETDEV_OPT, MASTER_NETMASK_OPT,
2308 NIC_PARAMS_OPT, NOMODIFY_ETCHOSTS_OPT, NOMODIFY_SSH_SETUP_OPT,
2309 SECONDARY_IP_OPT, VG_NAME_OPT, MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT,
2310 DRBD_HELPER_OPT, DEFAULT_IALLOCATOR_OPT, DEFAULT_IALLOCATOR_PARAMS_OPT,
2311 PRIMARY_IP_VERSION_OPT, PREALLOC_WIPE_DISKS_OPT, NODE_PARAMS_OPT,
2312 GLOBAL_SHARED_FILEDIR_OPT, USE_EXTERNAL_MIP_SCRIPT, DISK_PARAMS_OPT,
2313 HV_STATE_OPT, DISK_STATE_OPT, ENABLED_DISK_TEMPLATES_OPT,
2314 IPOLICY_STD_SPECS_OPT, GLOBAL_GLUSTER_FILEDIR_OPT, INSTALL_IMAGE_OPT,
2315 ZEROING_IMAGE_OPT, COMPRESSION_TOOLS_OPT,
2316 ENABLED_USER_SHUTDOWN_OPT,
2317 ]
2318 + INSTANCE_POLICY_OPTS + SPLIT_ISPECS_OPTS,
2319 "[opts...] <cluster_name>", "Initialises a new cluster configuration"),
2320 "destroy": (
2321 DestroyCluster, ARGS_NONE, [YES_DOIT_OPT],
2322 "", "Destroy cluster"),
2323 "rename": (
2324 RenameCluster, [ArgHost(min=1, max=1)],
2325 [FORCE_OPT, DRY_RUN_OPT],
2326 "<new_name>",
2327 "Renames the cluster"),
2328 "redist-conf": (
2329 RedistributeConfig, ARGS_NONE, SUBMIT_OPTS +
2330 [DRY_RUN_OPT, PRIORITY_OPT, FORCE_DISTRIBUTION],
2331 "", "Forces a push of the configuration file and ssconf files"
2332 " to the nodes in the cluster"),
2333 "verify": (
2334 VerifyCluster, ARGS_NONE,
2335 [VERBOSE_OPT, DEBUG_SIMERR_OPT, ERROR_CODES_OPT, NONPLUS1_OPT,
2336 DRY_RUN_OPT, PRIORITY_OPT, NODEGROUP_OPT, IGNORE_ERRORS_OPT],
2337 "", "Does a check on the cluster configuration"),
2338 "verify-disks": (
2339 VerifyDisks, ARGS_NONE, [PRIORITY_OPT],
2340 "", "Does a check on the cluster disk status"),
2341 "repair-disk-sizes": (
2342 RepairDiskSizes, ARGS_MANY_INSTANCES, [DRY_RUN_OPT, PRIORITY_OPT],
2343 "[instance...]", "Updates mismatches in recorded disk sizes"),
2344 "master-failover": (
2345 MasterFailover, ARGS_NONE, [NOVOTING_OPT, FORCE_FAILOVER],
2346 "", "Makes the current node the master"),
2347 "master-ping": (
2348 MasterPing, ARGS_NONE, [],
2349 "", "Checks if the master is alive"),
2350 "version": (
2351 ShowClusterVersion, ARGS_NONE, [],
2352 "", "Shows the cluster version"),
2353 "getmaster": (
2354 ShowClusterMaster, ARGS_NONE, [],
2355 "", "Shows the cluster master"),
2356 "copyfile": (
2357 ClusterCopyFile, [ArgFile(min=1, max=1)],
2358 [NODE_LIST_OPT, USE_REPL_NET_OPT, NODEGROUP_OPT],
2359 "[-n node...] <filename>", "Copies a file to all (or only some) nodes"),
2360 "command": (
2361 RunClusterCommand, [ArgCommand(min=1)],
2362 [NODE_LIST_OPT, NODEGROUP_OPT, SHOW_MACHINE_OPT, FAILURE_ONLY_OPT],
2363 "[-n node...] <command>", "Runs a command on all (or only some) nodes"),
2364 "info": (
2365 ShowClusterConfig, ARGS_NONE, [ROMAN_OPT],
2366 "[--roman]", "Show cluster configuration"),
2367 "list-tags": (
2368 ListTags, ARGS_NONE, [], "", "List the tags of the cluster"),
2369 "add-tags": (
2370 AddTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT] + SUBMIT_OPTS,
2371 "tag...", "Add tags to the cluster"),
2372 "remove-tags": (
2373 RemoveTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT] + SUBMIT_OPTS,
2374 "tag...", "Remove tags from the cluster"),
2375 "search-tags": (
2376 SearchTags, [ArgUnknown(min=1, max=1)], [PRIORITY_OPT], "",
2377 "Searches the tags on all objects on"
2378 " the cluster for a given pattern (regex)"),
2379 "queue": (
2380 QueueOps,
2381 [ArgChoice(min=1, max=1, choices=["drain", "undrain", "info"])],
2382 [], "drain|undrain|info", "Change queue properties"),
2383 "watcher": (
2384 WatcherOps,
2385 [ArgChoice(min=1, max=1, choices=["pause", "continue", "info"]),
2386 ArgSuggest(min=0, max=1, choices=["30m", "1h", "4h"])],
2387 [],
2388 "{pause <timespec>|continue|info}", "Change watcher properties"),
2389 "modify": (
2390 SetClusterParams, ARGS_NONE,
2391 [FORCE_OPT,
2392 BACKEND_OPT, CP_SIZE_OPT, RQL_OPT, MAX_TRACK_OPT, INSTALL_IMAGE_OPT,
2393 INSTANCE_COMMUNICATION_NETWORK_OPT, ENABLED_HV_OPT, HVLIST_OPT,
2394 MAC_PREFIX_OPT, MASTER_NETDEV_OPT, MASTER_NETMASK_OPT, NIC_PARAMS_OPT,
2395 VG_NAME_OPT, MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, ADD_UIDS_OPT,
2396 REMOVE_UIDS_OPT, DRBD_HELPER_OPT, DEFAULT_IALLOCATOR_OPT,
2397 DEFAULT_IALLOCATOR_PARAMS_OPT, RESERVED_LVS_OPT, DRY_RUN_OPT, PRIORITY_OPT,
2398 PREALLOC_WIPE_DISKS_OPT, NODE_PARAMS_OPT, USE_EXTERNAL_MIP_SCRIPT,
2399 DISK_PARAMS_OPT, HV_STATE_OPT, DISK_STATE_OPT] + SUBMIT_OPTS +
2400 [ENABLED_DISK_TEMPLATES_OPT, IPOLICY_STD_SPECS_OPT, MODIFY_ETCHOSTS_OPT,
2401 ENABLED_USER_SHUTDOWN_OPT] +
2402 INSTANCE_POLICY_OPTS +
2403 [GLOBAL_FILEDIR_OPT, GLOBAL_SHARED_FILEDIR_OPT, ZEROING_IMAGE_OPT,
2404 COMPRESSION_TOOLS_OPT] +
2405 [ENABLED_DATA_COLLECTORS_OPT, DATA_COLLECTOR_INTERVAL_OPT],
2406 "[opts...]",
2407 "Alters the parameters of the cluster"),
2408 "renew-crypto": (
2409 RenewCrypto, ARGS_NONE,
2410 [NEW_CLUSTER_CERT_OPT, NEW_RAPI_CERT_OPT, RAPI_CERT_OPT,
2411 NEW_CONFD_HMAC_KEY_OPT, FORCE_OPT,
2412 NEW_CLUSTER_DOMAIN_SECRET_OPT, CLUSTER_DOMAIN_SECRET_OPT,
2413 NEW_SPICE_CERT_OPT, SPICE_CERT_OPT, SPICE_CACERT_OPT,
2414 NEW_NODE_CERT_OPT, NEW_SSH_KEY_OPT, NOSSH_KEYCHECK_OPT],
2415 "[opts...]",
2416 "Renews cluster certificates, keys and secrets"),
2417 "epo": (
2418 Epo, [ArgUnknown()],
2419 [FORCE_OPT, ON_OPT, GROUPS_OPT, ALL_OPT, OOB_TIMEOUT_OPT,
2420 SHUTDOWN_TIMEOUT_OPT, POWER_DELAY_OPT],
2421 "[opts...] [args]",
2422 "Performs an emergency power-off on given args"),
2423 "activate-master-ip": (
2424 ActivateMasterIp, ARGS_NONE, [], "", "Activates the master IP"),
2425 "deactivate-master-ip": (
2426 DeactivateMasterIp, ARGS_NONE, [CONFIRM_OPT], "",
2427 "Deactivates the master IP"),
2428 "show-ispecs-cmd": (
2429 ShowCreateCommand, ARGS_NONE, [], "",
2430 "Show the command line to re-create the cluster"),
2431 "upgrade": (
2432 UpgradeGanetiCommand, ARGS_NONE, [TO_OPT, RESUME_OPT], "",
2433 "Upgrade (or downgrade) to a new Ganeti version"),
2434 }
2435
2436
2437 #: dictionary with aliases for commands
2438 aliases = {
2439 "masterfailover": "master-failover",
2440 "show": "info",
2441 }
2442
2443
2444 def Main():
2445 return GenericMain(commands, override={"tag_type": constants.TAG_CLUSTER},
2446 aliases=aliases)