e46c136b96f9be340fb3f368c9230b84819a44aa
[ganeti-github.git] / lib / client / gnt_cluster.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011, 2012, 2013, 2014 Google Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions are
9 # met:
10 #
11 # 1. Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
13 #
14 # 2. Redistributions in binary form must reproduce the above copyright
15 # notice, this list of conditions and the following disclaimer in the
16 # documentation and/or other materials provided with the distribution.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
19 # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
20 # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30 """Cluster related commands"""
31
32 # pylint: disable=W0401,W0613,W0614,C0103
33 # W0401: Wildcard import ganeti.cli
34 # W0613: Unused argument, since all functions follow the same API
35 # W0614: Unused import %s from wildcard import (since we need cli)
36 # C0103: Invalid name gnt-cluster
37
38 from cStringIO import StringIO
39 import os
40 import time
41 import OpenSSL
42 import tempfile
43 import itertools
44
45 from ganeti.cli import *
46 from ganeti import bootstrap
47 from ganeti import compat
48 from ganeti import constants
49 from ganeti import config
50 from ganeti import errors
51 from ganeti import netutils
52 from ganeti import objects
53 from ganeti import opcodes
54 from ganeti import pathutils
55 from ganeti import qlang
56 from ganeti import serializer
57 from ganeti import ssconf
58 from ganeti import ssh
59 from ganeti import uidpool
60 from ganeti import utils
61 from ganeti.client import base
62
63
64 ON_OPT = cli_option("--on", default=False,
65 action="store_true", dest="on",
66 help="Recover from an EPO")
67
68 GROUPS_OPT = cli_option("--groups", default=False,
69 action="store_true", dest="groups",
70 help="Arguments are node groups instead of nodes")
71
72 FORCE_FAILOVER = cli_option("--yes-do-it", dest="yes_do_it",
73 help="Override interactive check for --no-voting",
74 default=False, action="store_true")
75
76 FORCE_DISTRIBUTION = cli_option("--yes-do-it", dest="yes_do_it",
77 help="Unconditionally distribute the"
78 " configuration, even if the queue"
79 " is drained",
80 default=False, action="store_true")
81
82 TO_OPT = cli_option("--to", default=None, type="string",
83 help="The Ganeti version to upgrade to")
84
85 RESUME_OPT = cli_option("--resume", default=False, action="store_true",
86 help="Resume any pending Ganeti upgrades")
87
88 _EPO_PING_INTERVAL = 30 # 30 seconds between pings
89 _EPO_PING_TIMEOUT = 1 # 1 second
90 _EPO_REACHABLE_TIMEOUT = 15 * 60 # 15 minutes
91
92
93 def _InitEnabledDiskTemplates(opts):
94 """Initialize the list of enabled disk templates.
95
96 """
97 if opts.enabled_disk_templates:
98 return opts.enabled_disk_templates.split(",")
99 else:
100 return constants.DEFAULT_ENABLED_DISK_TEMPLATES
101
102
103 def _InitVgName(opts, enabled_disk_templates):
104 """Initialize the volume group name.
105
106 @type enabled_disk_templates: list of strings
107 @param enabled_disk_templates: cluster-wide enabled disk templates
108
109 """
110 vg_name = None
111 if opts.vg_name is not None:
112 vg_name = opts.vg_name
113 if vg_name:
114 if not utils.IsLvmEnabled(enabled_disk_templates):
115 ToStdout("You specified a volume group with --vg-name, but you did not"
116 " enable any disk template that uses lvm.")
117 elif utils.IsLvmEnabled(enabled_disk_templates):
118 raise errors.OpPrereqError(
119 "LVM disk templates are enabled, but vg name not set.")
120 elif utils.IsLvmEnabled(enabled_disk_templates):
121 vg_name = constants.DEFAULT_VG
122 return vg_name
123
124
125 def _InitDrbdHelper(opts, enabled_disk_templates):
126 """Initialize the DRBD usermode helper.
127
128 """
129 drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
130
131 if not drbd_enabled and opts.drbd_helper is not None:
132 ToStdout("Note: You specified a DRBD usermode helper, while DRBD storage"
133 " is not enabled.")
134
135 if drbd_enabled:
136 if opts.drbd_helper is None:
137 return constants.DEFAULT_DRBD_HELPER
138 if opts.drbd_helper == '':
139 raise errors.OpPrereqError(
140 "Unsetting the drbd usermode helper while enabling DRBD is not"
141 " allowed.")
142
143 return opts.drbd_helper
144
145
146 @UsesRPC
147 def InitCluster(opts, args):
148 """Initialize the cluster.
149
150 @param opts: the command line options selected by the user
151 @type args: list
152 @param args: should contain only one element, the desired
153 cluster name
154 @rtype: int
155 @return: the desired exit code
156
157 """
158 enabled_disk_templates = _InitEnabledDiskTemplates(opts)
159
160 try:
161 vg_name = _InitVgName(opts, enabled_disk_templates)
162 drbd_helper = _InitDrbdHelper(opts, enabled_disk_templates)
163 except errors.OpPrereqError, e:
164 ToStderr(str(e))
165 return 1
166
167 master_netdev = opts.master_netdev
168 if master_netdev is None:
169 nic_mode = opts.nicparams.get(constants.NIC_MODE, None)
170 if not nic_mode:
171 # default case, use bridging
172 master_netdev = constants.DEFAULT_BRIDGE
173 elif nic_mode == constants.NIC_MODE_OVS:
174 # default ovs is different from default bridge
175 master_netdev = constants.DEFAULT_OVS
176 opts.nicparams[constants.NIC_LINK] = constants.DEFAULT_OVS
177
178 hvlist = opts.enabled_hypervisors
179 if hvlist is None:
180 hvlist = constants.DEFAULT_ENABLED_HYPERVISOR
181 hvlist = hvlist.split(",")
182
183 hvparams = dict(opts.hvparams)
184 beparams = opts.beparams
185 nicparams = opts.nicparams
186
187 diskparams = dict(opts.diskparams)
188
189 # check the disk template types here, as we cannot rely on the type check done
190 # by the opcode parameter types
191 diskparams_keys = set(diskparams.keys())
192 if not (diskparams_keys <= constants.DISK_TEMPLATES):
193 unknown = utils.NiceSort(diskparams_keys - constants.DISK_TEMPLATES)
194 ToStderr("Disk templates unknown: %s" % utils.CommaJoin(unknown))
195 return 1
196
197 # prepare beparams dict
198 beparams = objects.FillDict(constants.BEC_DEFAULTS, beparams)
199 utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
200
201 # prepare nicparams dict
202 nicparams = objects.FillDict(constants.NICC_DEFAULTS, nicparams)
203 utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
204
205 # prepare ndparams dict
206 if opts.ndparams is None:
207 ndparams = dict(constants.NDC_DEFAULTS)
208 else:
209 ndparams = objects.FillDict(constants.NDC_DEFAULTS, opts.ndparams)
210 utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
211
212 # prepare hvparams dict
213 for hv in constants.HYPER_TYPES:
214 if hv not in hvparams:
215 hvparams[hv] = {}
216 hvparams[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], hvparams[hv])
217 utils.ForceDictType(hvparams[hv], constants.HVS_PARAMETER_TYPES)
218
219 # prepare diskparams dict
220 for templ in constants.DISK_TEMPLATES:
221 if templ not in diskparams:
222 diskparams[templ] = {}
223 diskparams[templ] = objects.FillDict(constants.DISK_DT_DEFAULTS[templ],
224 diskparams[templ])
225 utils.ForceDictType(diskparams[templ], constants.DISK_DT_TYPES)
226
227 # prepare ipolicy dict
228 ipolicy = CreateIPolicyFromOpts(
229 ispecs_mem_size=opts.ispecs_mem_size,
230 ispecs_cpu_count=opts.ispecs_cpu_count,
231 ispecs_disk_count=opts.ispecs_disk_count,
232 ispecs_disk_size=opts.ispecs_disk_size,
233 ispecs_nic_count=opts.ispecs_nic_count,
234 minmax_ispecs=opts.ipolicy_bounds_specs,
235 std_ispecs=opts.ipolicy_std_specs,
236 ipolicy_disk_templates=opts.ipolicy_disk_templates,
237 ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
238 ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
239 fill_all=True)
240
241 if opts.candidate_pool_size is None:
242 opts.candidate_pool_size = constants.MASTER_POOL_SIZE_DEFAULT
243
244 if opts.mac_prefix is None:
245 opts.mac_prefix = constants.DEFAULT_MAC_PREFIX
246
247 uid_pool = opts.uid_pool
248 if uid_pool is not None:
249 uid_pool = uidpool.ParseUidPool(uid_pool)
250
251 if opts.prealloc_wipe_disks is None:
252 opts.prealloc_wipe_disks = False
253
254 external_ip_setup_script = opts.use_external_mip_script
255 if external_ip_setup_script is None:
256 external_ip_setup_script = False
257
258 try:
259 primary_ip_version = int(opts.primary_ip_version)
260 except (ValueError, TypeError), err:
261 ToStderr("Invalid primary ip version value: %s" % str(err))
262 return 1
263
264 master_netmask = opts.master_netmask
265 try:
266 if master_netmask is not None:
267 master_netmask = int(master_netmask)
268 except (ValueError, TypeError), err:
269 ToStderr("Invalid master netmask value: %s" % str(err))
270 return 1
271
272 if opts.disk_state:
273 disk_state = utils.FlatToDict(opts.disk_state)
274 else:
275 disk_state = {}
276
277 hv_state = dict(opts.hv_state)
278
279 if opts.install_image:
280 install_image = opts.install_image
281 else:
282 install_image = ""
283
284 if opts.zeroing_image:
285 zeroing_image = opts.zeroing_image
286 else:
287 zeroing_image = ""
288
289 compression_tools = _GetCompressionTools(opts)
290
291 default_ialloc_params = opts.default_iallocator_params
292
293 if opts.enabled_user_shutdown:
294 enabled_user_shutdown = True
295 else:
296 enabled_user_shutdown = False
297
298 bootstrap.InitCluster(cluster_name=args[0],
299 secondary_ip=opts.secondary_ip,
300 vg_name=vg_name,
301 mac_prefix=opts.mac_prefix,
302 master_netmask=master_netmask,
303 master_netdev=master_netdev,
304 file_storage_dir=opts.file_storage_dir,
305 shared_file_storage_dir=opts.shared_file_storage_dir,
306 gluster_storage_dir=opts.gluster_storage_dir,
307 enabled_hypervisors=hvlist,
308 hvparams=hvparams,
309 beparams=beparams,
310 nicparams=nicparams,
311 ndparams=ndparams,
312 diskparams=diskparams,
313 ipolicy=ipolicy,
314 candidate_pool_size=opts.candidate_pool_size,
315 modify_etc_hosts=opts.modify_etc_hosts,
316 modify_ssh_setup=opts.modify_ssh_setup,
317 maintain_node_health=opts.maintain_node_health,
318 drbd_helper=drbd_helper,
319 uid_pool=uid_pool,
320 default_iallocator=opts.default_iallocator,
321 default_iallocator_params=default_ialloc_params,
322 primary_ip_version=primary_ip_version,
323 prealloc_wipe_disks=opts.prealloc_wipe_disks,
324 use_external_mip_script=external_ip_setup_script,
325 hv_state=hv_state,
326 disk_state=disk_state,
327 enabled_disk_templates=enabled_disk_templates,
328 install_image=install_image,
329 zeroing_image=zeroing_image,
330 compression_tools=compression_tools,
331 enabled_user_shutdown=enabled_user_shutdown,
332 )
333 op = opcodes.OpClusterPostInit()
334 SubmitOpCode(op, opts=opts)
335 return 0
336
337
338 @UsesRPC
339 def DestroyCluster(opts, args):
340 """Destroy the cluster.
341
342 @param opts: the command line options selected by the user
343 @type args: list
344 @param args: should be an empty list
345 @rtype: int
346 @return: the desired exit code
347
348 """
349 if not opts.yes_do_it:
350 ToStderr("Destroying a cluster is irreversible. If you really want"
351 " destroy this cluster, supply the --yes-do-it option.")
352 return 1
353
354 op = opcodes.OpClusterDestroy()
355 master_uuid = SubmitOpCode(op, opts=opts)
356 # if we reached this, the opcode didn't fail; we can proceed to
357 # shutdown all the daemons
358 bootstrap.FinalizeClusterDestroy(master_uuid)
359 return 0
360
361
362 def RenameCluster(opts, args):
363 """Rename the cluster.
364
365 @param opts: the command line options selected by the user
366 @type args: list
367 @param args: should contain only one element, the new cluster name
368 @rtype: int
369 @return: the desired exit code
370
371 """
372 cl = GetClient()
373
374 (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
375
376 new_name = args[0]
377 if not opts.force:
378 usertext = ("This will rename the cluster from '%s' to '%s'. If you are"
379 " connected over the network to the cluster name, the"
380 " operation is very dangerous as the IP address will be"
381 " removed from the node and the change may not go through."
382 " Continue?") % (cluster_name, new_name)
383 if not AskUser(usertext):
384 return 1
385
386 op = opcodes.OpClusterRename(name=new_name)
387 result = SubmitOpCode(op, opts=opts, cl=cl)
388
389 if result:
390 ToStdout("Cluster renamed from '%s' to '%s'", cluster_name, result)
391
392 return 0
393
394
395 def ActivateMasterIp(opts, args):
396 """Activates the master IP.
397
398 """
399 op = opcodes.OpClusterActivateMasterIp()
400 SubmitOpCode(op)
401 return 0
402
403
404 def DeactivateMasterIp(opts, args):
405 """Deactivates the master IP.
406
407 """
408 if not opts.confirm:
409 usertext = ("This will disable the master IP. All the open connections to"
410 " the master IP will be closed. To reach the master you will"
411 " need to use its node IP."
412 " Continue?")
413 if not AskUser(usertext):
414 return 1
415
416 op = opcodes.OpClusterDeactivateMasterIp()
417 SubmitOpCode(op)
418 return 0
419
420
421 def RedistributeConfig(opts, args):
422 """Forces push of the cluster configuration.
423
424 @param opts: the command line options selected by the user
425 @type args: list
426 @param args: empty list
427 @rtype: int
428 @return: the desired exit code
429
430 """
431 op = opcodes.OpClusterRedistConf()
432 if opts.yes_do_it:
433 SubmitOpCodeToDrainedQueue(op)
434 else:
435 SubmitOrSend(op, opts)
436 return 0
437
438
439 def ShowClusterVersion(opts, args):
440 """Write version of ganeti software to the standard output.
441
442 @param opts: the command line options selected by the user
443 @type args: list
444 @param args: should be an empty list
445 @rtype: int
446 @return: the desired exit code
447
448 """
449 cl = GetClient()
450 result = cl.QueryClusterInfo()
451 ToStdout("Software version: %s", result["software_version"])
452 ToStdout("Internode protocol: %s", result["protocol_version"])
453 ToStdout("Configuration format: %s", result["config_version"])
454 ToStdout("OS api version: %s", result["os_api_version"])
455 ToStdout("Export interface: %s", result["export_version"])
456 ToStdout("VCS version: %s", result["vcs_version"])
457 return 0
458
459
460 def ShowClusterMaster(opts, args):
461 """Write name of master node to the standard output.
462
463 @param opts: the command line options selected by the user
464 @type args: list
465 @param args: should be an empty list
466 @rtype: int
467 @return: the desired exit code
468
469 """
470 master = bootstrap.GetMaster()
471 ToStdout(master)
472 return 0
473
474
475 def _FormatGroupedParams(paramsdict, roman=False):
476 """Format Grouped parameters (be, nic, disk) by group.
477
478 @type paramsdict: dict of dicts
479 @param paramsdict: {group: {param: value, ...}, ...}
480 @rtype: dict of dicts
481 @return: copy of the input dictionaries with strings as values
482
483 """
484 ret = {}
485 for (item, val) in paramsdict.items():
486 if isinstance(val, dict):
487 ret[item] = _FormatGroupedParams(val, roman=roman)
488 elif roman and isinstance(val, int):
489 ret[item] = compat.TryToRoman(val)
490 else:
491 ret[item] = str(val)
492 return ret
493
494
495 def ShowClusterConfig(opts, args):
496 """Shows cluster information.
497
498 @param opts: the command line options selected by the user
499 @type args: list
500 @param args: should be an empty list
501 @rtype: int
502 @return: the desired exit code
503
504 """
505 cl = GetClient()
506 result = cl.QueryClusterInfo()
507
508 if result["tags"]:
509 tags = utils.CommaJoin(utils.NiceSort(result["tags"]))
510 else:
511 tags = "(none)"
512 if result["reserved_lvs"]:
513 reserved_lvs = utils.CommaJoin(result["reserved_lvs"])
514 else:
515 reserved_lvs = "(none)"
516
517 enabled_hv = result["enabled_hypervisors"]
518 hvparams = dict((k, v) for k, v in result["hvparams"].iteritems()
519 if k in enabled_hv)
520
521 info = [
522 ("Cluster name", result["name"]),
523 ("Cluster UUID", result["uuid"]),
524
525 ("Creation time", utils.FormatTime(result["ctime"])),
526 ("Modification time", utils.FormatTime(result["mtime"])),
527
528 ("Master node", result["master"]),
529
530 ("Architecture (this node)",
531 "%s (%s)" % (result["architecture"][0], result["architecture"][1])),
532
533 ("Tags", tags),
534
535 ("Default hypervisor", result["default_hypervisor"]),
536 ("Enabled hypervisors", utils.CommaJoin(enabled_hv)),
537
538 ("Hypervisor parameters", _FormatGroupedParams(hvparams,
539 opts.roman_integers)),
540
541 ("OS-specific hypervisor parameters",
542 _FormatGroupedParams(result["os_hvp"], opts.roman_integers)),
543
544 ("OS parameters", _FormatGroupedParams(result["osparams"],
545 opts.roman_integers)),
546
547 ("Hidden OSes", utils.CommaJoin(result["hidden_os"])),
548 ("Blacklisted OSes", utils.CommaJoin(result["blacklisted_os"])),
549
550 ("Cluster parameters", [
551 ("candidate pool size",
552 compat.TryToRoman(result["candidate_pool_size"],
553 convert=opts.roman_integers)),
554 ("maximal number of jobs running simultaneously",
555 compat.TryToRoman(result["max_running_jobs"],
556 convert=opts.roman_integers)),
557 ("maximal number of jobs simultaneously tracked by the scheduler",
558 compat.TryToRoman(result["max_tracked_jobs"],
559 convert=opts.roman_integers)),
560 ("mac prefix", result["mac_prefix"]),
561 ("master netdev", result["master_netdev"]),
562 ("master netmask", compat.TryToRoman(result["master_netmask"],
563 opts.roman_integers)),
564 ("use external master IP address setup script",
565 result["use_external_mip_script"]),
566 ("lvm volume group", result["volume_group_name"]),
567 ("lvm reserved volumes", reserved_lvs),
568 ("drbd usermode helper", result["drbd_usermode_helper"]),
569 ("file storage path", result["file_storage_dir"]),
570 ("shared file storage path", result["shared_file_storage_dir"]),
571 ("gluster storage path", result["gluster_storage_dir"]),
572 ("maintenance of node health", result["maintain_node_health"]),
573 ("uid pool", uidpool.FormatUidPool(result["uid_pool"])),
574 ("default instance allocator", result["default_iallocator"]),
575 ("default instance allocator parameters",
576 result["default_iallocator_params"]),
577 ("primary ip version", compat.TryToRoman(result["primary_ip_version"],
578 opts.roman_integers)),
579 ("preallocation wipe disks", result["prealloc_wipe_disks"]),
580 ("OS search path", utils.CommaJoin(pathutils.OS_SEARCH_PATH)),
581 ("ExtStorage Providers search path",
582 utils.CommaJoin(pathutils.ES_SEARCH_PATH)),
583 ("enabled disk templates",
584 utils.CommaJoin(result["enabled_disk_templates"])),
585 ("install image", result["install_image"]),
586 ("instance communication network",
587 result["instance_communication_network"]),
588 ("zeroing image", result["zeroing_image"]),
589 ("compression tools", result["compression_tools"]),
590 ("enabled user shutdown", result["enabled_user_shutdown"]),
591 ]),
592
593 ("Default node parameters",
594 _FormatGroupedParams(result["ndparams"], roman=opts.roman_integers)),
595
596 ("Default instance parameters",
597 _FormatGroupedParams(result["beparams"], roman=opts.roman_integers)),
598
599 ("Default nic parameters",
600 _FormatGroupedParams(result["nicparams"], roman=opts.roman_integers)),
601
602 ("Default disk parameters",
603 _FormatGroupedParams(result["diskparams"], roman=opts.roman_integers)),
604
605 ("Instance policy - limits for instances",
606 FormatPolicyInfo(result["ipolicy"], None, True, opts.roman_integers)),
607 ]
608
609 PrintGenericInfo(info)
610 return 0
611
612
613 def ClusterCopyFile(opts, args):
614 """Copy a file from master to some nodes.
615
616 @param opts: the command line options selected by the user
617 @type args: list
618 @param args: should contain only one element, the path of
619 the file to be copied
620 @rtype: int
621 @return: the desired exit code
622
623 """
624 filename = args[0]
625 filename = os.path.abspath(filename)
626
627 if not os.path.exists(filename):
628 raise errors.OpPrereqError("No such filename '%s'" % filename,
629 errors.ECODE_INVAL)
630
631 cl = GetClient()
632 qcl = GetClient()
633 try:
634 cluster_name = cl.QueryConfigValues(["cluster_name"])[0]
635
636 results = GetOnlineNodes(nodes=opts.nodes, cl=qcl, filter_master=True,
637 secondary_ips=opts.use_replication_network,
638 nodegroup=opts.nodegroup)
639 ports = GetNodesSshPorts(opts.nodes, qcl)
640 finally:
641 cl.Close()
642 qcl.Close()
643
644 srun = ssh.SshRunner(cluster_name)
645 for (node, port) in zip(results, ports):
646 if not srun.CopyFileToNode(node, port, filename):
647 ToStderr("Copy of file %s to node %s:%d failed", filename, node, port)
648
649 return 0
650
651
652 def RunClusterCommand(opts, args):
653 """Run a command on some nodes.
654
655 @param opts: the command line options selected by the user
656 @type args: list
657 @param args: should contain the command to be run and its arguments
658 @rtype: int
659 @return: the desired exit code
660
661 """
662 cl = GetClient()
663 qcl = GetClient()
664
665 command = " ".join(args)
666
667 nodes = GetOnlineNodes(nodes=opts.nodes, cl=qcl, nodegroup=opts.nodegroup)
668 ports = GetNodesSshPorts(nodes, qcl)
669
670 cluster_name, master_node = cl.QueryConfigValues(["cluster_name",
671 "master_node"])
672
673 srun = ssh.SshRunner(cluster_name=cluster_name)
674
675 # Make sure master node is at list end
676 if master_node in nodes:
677 nodes.remove(master_node)
678 nodes.append(master_node)
679
680 for (name, port) in zip(nodes, ports):
681 result = srun.Run(name, constants.SSH_LOGIN_USER, command, port=port)
682
683 if opts.failure_only and result.exit_code == constants.EXIT_SUCCESS:
684 # Do not output anything for successful commands
685 continue
686
687 ToStdout("------------------------------------------------")
688 if opts.show_machine_names:
689 for line in result.output.splitlines():
690 ToStdout("%s: %s", name, line)
691 else:
692 ToStdout("node: %s", name)
693 ToStdout("%s", result.output)
694 ToStdout("return code = %s", result.exit_code)
695
696 return 0
697
698
699 def VerifyCluster(opts, args):
700 """Verify integrity of cluster, performing various test on nodes.
701
702 @param opts: the command line options selected by the user
703 @type args: list
704 @param args: should be an empty list
705 @rtype: int
706 @return: the desired exit code
707
708 """
709 skip_checks = []
710
711 if opts.skip_nplusone_mem:
712 skip_checks.append(constants.VERIFY_NPLUSONE_MEM)
713
714 cl = GetClient()
715
716 op = opcodes.OpClusterVerify(verbose=opts.verbose,
717 error_codes=opts.error_codes,
718 debug_simulate_errors=opts.simulate_errors,
719 skip_checks=skip_checks,
720 ignore_errors=opts.ignore_errors,
721 group_name=opts.nodegroup)
722 result = SubmitOpCode(op, cl=cl, opts=opts)
723
724 # Keep track of submitted jobs
725 jex = JobExecutor(cl=cl, opts=opts)
726
727 for (status, job_id) in result[constants.JOB_IDS_KEY]:
728 jex.AddJobId(None, status, job_id)
729
730 results = jex.GetResults()
731
732 (bad_jobs, bad_results) = \
733 map(len,
734 # Convert iterators to lists
735 map(list,
736 # Count errors
737 map(compat.partial(itertools.ifilterfalse, bool),
738 # Convert result to booleans in a tuple
739 zip(*((job_success, len(op_results) == 1 and op_results[0])
740 for (job_success, op_results) in results)))))
741
742 if bad_jobs == 0 and bad_results == 0:
743 rcode = constants.EXIT_SUCCESS
744 else:
745 rcode = constants.EXIT_FAILURE
746 if bad_jobs > 0:
747 ToStdout("%s job(s) failed while verifying the cluster.", bad_jobs)
748
749 return rcode
750
751
752 def VerifyDisks(opts, args):
753 """Verify integrity of cluster disks.
754
755 @param opts: the command line options selected by the user
756 @type args: list
757 @param args: should be an empty list
758 @rtype: int
759 @return: the desired exit code
760
761 """
762 cl = GetClient()
763
764 op = opcodes.OpClusterVerifyDisks()
765
766 result = SubmitOpCode(op, cl=cl, opts=opts)
767
768 # Keep track of submitted jobs
769 jex = JobExecutor(cl=cl, opts=opts)
770
771 for (status, job_id) in result[constants.JOB_IDS_KEY]:
772 jex.AddJobId(None, status, job_id)
773
774 retcode = constants.EXIT_SUCCESS
775
776 for (status, result) in jex.GetResults():
777 if not status:
778 ToStdout("Job failed: %s", result)
779 continue
780
781 ((bad_nodes, instances, missing), ) = result
782
783 for node, text in bad_nodes.items():
784 ToStdout("Error gathering data on node %s: %s",
785 node, utils.SafeEncode(text[-400:]))
786 retcode = constants.EXIT_FAILURE
787 ToStdout("You need to fix these nodes first before fixing instances")
788
789 for iname in instances:
790 if iname in missing:
791 continue
792 op = opcodes.OpInstanceActivateDisks(instance_name=iname)
793 try:
794 ToStdout("Activating disks for instance '%s'", iname)
795 SubmitOpCode(op, opts=opts, cl=cl)
796 except errors.GenericError, err:
797 nret, msg = FormatError(err)
798 retcode |= nret
799 ToStderr("Error activating disks for instance %s: %s", iname, msg)
800
801 if missing:
802 for iname, ival in missing.iteritems():
803 all_missing = compat.all(x[0] in bad_nodes for x in ival)
804 if all_missing:
805 ToStdout("Instance %s cannot be verified as it lives on"
806 " broken nodes", iname)
807 else:
808 ToStdout("Instance %s has missing logical volumes:", iname)
809 ival.sort()
810 for node, vol in ival:
811 if node in bad_nodes:
812 ToStdout("\tbroken node %s /dev/%s", node, vol)
813 else:
814 ToStdout("\t%s /dev/%s", node, vol)
815
816 ToStdout("You need to replace or recreate disks for all the above"
817 " instances if this message persists after fixing broken nodes.")
818 retcode = constants.EXIT_FAILURE
819 elif not instances:
820 ToStdout("No disks need to be activated.")
821
822 return retcode
823
824
825 def RepairDiskSizes(opts, args):
826 """Verify sizes of cluster disks.
827
828 @param opts: the command line options selected by the user
829 @type args: list
830 @param args: optional list of instances to restrict check to
831 @rtype: int
832 @return: the desired exit code
833
834 """
835 op = opcodes.OpClusterRepairDiskSizes(instances=args)
836 SubmitOpCode(op, opts=opts)
837
838
839 @UsesRPC
840 def MasterFailover(opts, args):
841 """Failover the master node.
842
843 This command, when run on a non-master node, will cause the current
844 master to cease being master, and the non-master to become new
845 master.
846
847 @param opts: the command line options selected by the user
848 @type args: list
849 @param args: should be an empty list
850 @rtype: int
851 @return: the desired exit code
852
853 """
854 if opts.no_voting and not opts.yes_do_it:
855 usertext = ("This will perform the failover even if most other nodes"
856 " are down, or if this node is outdated. This is dangerous"
857 " as it can lead to a non-consistent cluster. Check the"
858 " gnt-cluster(8) man page before proceeding. Continue?")
859 if not AskUser(usertext):
860 return 1
861
862 rvlaue, msgs = bootstrap.MasterFailover(no_voting=opts.no_voting)
863 for msg in msgs:
864 ToStderr(msg)
865 return rvlaue
866
867
868 def MasterPing(opts, args):
869 """Checks if the master is alive.
870
871 @param opts: the command line options selected by the user
872 @type args: list
873 @param args: should be an empty list
874 @rtype: int
875 @return: the desired exit code
876
877 """
878 try:
879 cl = GetClient()
880 cl.QueryClusterInfo()
881 return 0
882 except Exception: # pylint: disable=W0703
883 return 1
884
885
886 def SearchTags(opts, args):
887 """Searches the tags on all the cluster.
888
889 @param opts: the command line options selected by the user
890 @type args: list
891 @param args: should contain only one element, the tag pattern
892 @rtype: int
893 @return: the desired exit code
894
895 """
896 op = opcodes.OpTagsSearch(pattern=args[0])
897 result = SubmitOpCode(op, opts=opts)
898 if not result:
899 return 1
900 result = list(result)
901 result.sort()
902 for path, tag in result:
903 ToStdout("%s %s", path, tag)
904
905
906 def _ReadAndVerifyCert(cert_filename, verify_private_key=False):
907 """Reads and verifies an X509 certificate.
908
909 @type cert_filename: string
910 @param cert_filename: the path of the file containing the certificate to
911 verify encoded in PEM format
912 @type verify_private_key: bool
913 @param verify_private_key: whether to verify the private key in addition to
914 the public certificate
915 @rtype: string
916 @return: a string containing the PEM-encoded certificate.
917
918 """
919 try:
920 pem = utils.ReadFile(cert_filename)
921 except IOError, err:
922 raise errors.X509CertError(cert_filename,
923 "Unable to read certificate: %s" % str(err))
924
925 try:
926 OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem)
927 except Exception, err:
928 raise errors.X509CertError(cert_filename,
929 "Unable to load certificate: %s" % str(err))
930
931 if verify_private_key:
932 try:
933 OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, pem)
934 except Exception, err:
935 raise errors.X509CertError(cert_filename,
936 "Unable to load private key: %s" % str(err))
937
938 return pem
939
940
941 def _RenewCrypto(new_cluster_cert, new_rapi_cert, # pylint: disable=R0911
942 rapi_cert_filename, new_spice_cert, spice_cert_filename,
943 spice_cacert_filename, new_confd_hmac_key, new_cds,
944 cds_filename, force, new_node_cert):
945 """Renews cluster certificates, keys and secrets.
946
947 @type new_cluster_cert: bool
948 @param new_cluster_cert: Whether to generate a new cluster certificate
949 @type new_rapi_cert: bool
950 @param new_rapi_cert: Whether to generate a new RAPI certificate
951 @type rapi_cert_filename: string
952 @param rapi_cert_filename: Path to file containing new RAPI certificate
953 @type new_spice_cert: bool
954 @param new_spice_cert: Whether to generate a new SPICE certificate
955 @type spice_cert_filename: string
956 @param spice_cert_filename: Path to file containing new SPICE certificate
957 @type spice_cacert_filename: string
958 @param spice_cacert_filename: Path to file containing the certificate of the
959 CA that signed the SPICE certificate
960 @type new_confd_hmac_key: bool
961 @param new_confd_hmac_key: Whether to generate a new HMAC key
962 @type new_cds: bool
963 @param new_cds: Whether to generate a new cluster domain secret
964 @type cds_filename: string
965 @param cds_filename: Path to file containing new cluster domain secret
966 @type force: bool
967 @param force: Whether to ask user for confirmation
968 @type new_node_cert: string
969 @param new_node_cert: Whether to generate new node certificates
970
971 """
972 if new_rapi_cert and rapi_cert_filename:
973 ToStderr("Only one of the --new-rapi-certificate and --rapi-certificate"
974 " options can be specified at the same time.")
975 return 1
976
977 if new_cds and cds_filename:
978 ToStderr("Only one of the --new-cluster-domain-secret and"
979 " --cluster-domain-secret options can be specified at"
980 " the same time.")
981 return 1
982
983 if new_spice_cert and (spice_cert_filename or spice_cacert_filename):
984 ToStderr("When using --new-spice-certificate, the --spice-certificate"
985 " and --spice-ca-certificate must not be used.")
986 return 1
987
988 if bool(spice_cacert_filename) ^ bool(spice_cert_filename):
989 ToStderr("Both --spice-certificate and --spice-ca-certificate must be"
990 " specified.")
991 return 1
992
993 rapi_cert_pem, spice_cert_pem, spice_cacert_pem = (None, None, None)
994 try:
995 if rapi_cert_filename:
996 rapi_cert_pem = _ReadAndVerifyCert(rapi_cert_filename, True)
997 if spice_cert_filename:
998 spice_cert_pem = _ReadAndVerifyCert(spice_cert_filename, True)
999 spice_cacert_pem = _ReadAndVerifyCert(spice_cacert_filename)
1000 except errors.X509CertError, err:
1001 ToStderr("Unable to load X509 certificate from %s: %s", err[0], err[1])
1002 return 1
1003
1004 if cds_filename:
1005 try:
1006 cds = utils.ReadFile(cds_filename)
1007 except Exception, err: # pylint: disable=W0703
1008 ToStderr("Can't load new cluster domain secret from %s: %s" %
1009 (cds_filename, str(err)))
1010 return 1
1011 else:
1012 cds = None
1013
1014 if not force:
1015 usertext = ("This requires all daemons on all nodes to be restarted and"
1016 " may take some time. Continue?")
1017 if not AskUser(usertext):
1018 return 1
1019
1020 def _RenewCryptoInner(ctx):
1021 ctx.feedback_fn("Updating certificates and keys")
1022
1023 bootstrap.GenerateClusterCrypto(False,
1024 new_rapi_cert,
1025 new_spice_cert,
1026 new_confd_hmac_key,
1027 new_cds,
1028 False,
1029 None,
1030 rapi_cert_pem=rapi_cert_pem,
1031 spice_cert_pem=spice_cert_pem,
1032 spice_cacert_pem=spice_cacert_pem,
1033 cds=cds)
1034
1035 files_to_copy = []
1036
1037 if new_cluster_cert:
1038 files_to_copy.append(pathutils.NODED_CERT_FILE)
1039
1040 if new_rapi_cert or rapi_cert_pem:
1041 files_to_copy.append(pathutils.RAPI_CERT_FILE)
1042
1043 if new_spice_cert or spice_cert_pem:
1044 files_to_copy.append(pathutils.SPICE_CERT_FILE)
1045 files_to_copy.append(pathutils.SPICE_CACERT_FILE)
1046
1047 if new_confd_hmac_key:
1048 files_to_copy.append(pathutils.CONFD_HMAC_KEY)
1049
1050 if new_cds or cds:
1051 files_to_copy.append(pathutils.CLUSTER_DOMAIN_SECRET_FILE)
1052
1053 if files_to_copy:
1054 for node_name in ctx.nonmaster_nodes:
1055 port = ctx.ssh_ports[node_name]
1056 ctx.feedback_fn("Copying %s to %s:%d" %
1057 (", ".join(files_to_copy), node_name, port))
1058 for file_name in files_to_copy:
1059 ctx.ssh.CopyFileToNode(node_name, port, file_name)
1060
1061 def _RenewClientCerts(ctx):
1062 ctx.feedback_fn("Updating client SSL certificates.")
1063
1064 # TODO: transport those options outside.
1065 debug = True
1066 verbose = True
1067
1068 cluster_name = ssconf.SimpleStore().GetClusterName()
1069
1070 for node_name in ctx.nonmaster_nodes + [ctx.master_node]:
1071 ssh_port = ctx.ssh_ports[node_name]
1072 data = {
1073 constants.NDS_CLUSTER_NAME: cluster_name,
1074 constants.NDS_NODE_DAEMON_CERTIFICATE:
1075 utils.ReadFile(pathutils.NODED_CERT_FILE),
1076 constants.NDS_NODE_NAME: node_name,
1077 }
1078
1079 bootstrap.RunNodeSetupCmd(
1080 cluster_name,
1081 node_name,
1082 pathutils.SSL_UPDATE,
1083 debug,
1084 verbose,
1085 True, # use cluster key
1086 False, # ask key
1087 True, # strict host check
1088 ssh_port,
1089 data)
1090
1091 # Create a temporary ssconf file using the master's client cert digest
1092 # and the 'bootstrap' keyword to enable distribution of all nodes' digests.
1093 master_digest = utils.GetCertificateDigest()
1094 ssconf_master_candidate_certs_filename = os.path.join(
1095 pathutils.DATA_DIR, "%s%s" %
1096 (constants.SSCONF_FILEPREFIX, constants.SS_MASTER_CANDIDATES_CERTS))
1097 utils.WriteFile(
1098 ssconf_master_candidate_certs_filename,
1099 data="%s=%s" % (constants.CRYPTO_BOOTSTRAP, master_digest))
1100 for node_name in ctx.nonmaster_nodes:
1101 port = ctx.ssh_ports[node_name]
1102 ctx.feedback_fn("Copying %s to %s:%d" %
1103 (ssconf_master_candidate_certs_filename, node_name, port))
1104 ctx.ssh.CopyFileToNode(node_name, port,
1105 ssconf_master_candidate_certs_filename)
1106
1107 # Write the boostrap entry to the config using wconfd.
1108 config_live_lock = utils.livelock.LiveLock("renew_crypto")
1109 cfg = config.GetConfig(None, config_live_lock)
1110 cfg.AddNodeToCandidateCerts(constants.CRYPTO_BOOTSTRAP, master_digest)
1111 cfg.Update(cfg.GetClusterInfo(), ctx.feedback_fn)
1112
1113 def _RenewServerAndClientCerts(ctx):
1114 ctx.feedback_fn("Updating the cluster SSL certificate.")
1115
1116 master_name = ssconf.SimpleStore().GetMasterNode()
1117 bootstrap.GenerateClusterCrypto(True, # cluster cert
1118 False, # rapi cert
1119 False, # spice cert
1120 False, # confd hmac key
1121 False, # cds
1122 True, # client cert
1123 master_name)
1124
1125 for node_name in ctx.nonmaster_nodes:
1126 port = ctx.ssh_ports[node_name]
1127 server_cert = pathutils.NODED_CERT_FILE
1128 ctx.feedback_fn("Copying %s to %s:%d" %
1129 (server_cert, node_name, port))
1130 ctx.ssh.CopyFileToNode(node_name, port, server_cert)
1131
1132 _RenewClientCerts(ctx)
1133
1134 if new_cluster_cert or new_rapi_cert or new_spice_cert \
1135 or new_confd_hmac_key or new_cds:
1136 RunWhileClusterStopped(ToStdout, _RenewCryptoInner)
1137
1138 # If only node certficates are recreated, call _RenewClientCerts only.
1139 if new_node_cert and not new_cluster_cert:
1140 RunWhileDaemonsStopped(ToStdout, [constants.NODED, constants.WCONFD],
1141 _RenewClientCerts)
1142
1143 # If the cluster certificate are renewed, the client certificates need
1144 # to be renewed too.
1145 if new_cluster_cert:
1146 RunWhileDaemonsStopped(ToStdout, [constants.NODED, constants.WCONFD],
1147 _RenewServerAndClientCerts)
1148
1149 ToStdout("All requested certificates and keys have been replaced."
1150 " Running \"gnt-cluster verify\" now is recommended.")
1151
1152 if new_node_cert or new_cluster_cert:
1153 cl = GetClient()
1154 renew_op = opcodes.OpClusterRenewCrypto()
1155 SubmitOpCode(renew_op, cl=cl)
1156
1157 return 0
1158
1159
1160 def RenewCrypto(opts, args):
1161 """Renews cluster certificates, keys and secrets.
1162
1163 """
1164 return _RenewCrypto(opts.new_cluster_cert,
1165 opts.new_rapi_cert,
1166 opts.rapi_cert,
1167 opts.new_spice_cert,
1168 opts.spice_cert,
1169 opts.spice_cacert,
1170 opts.new_confd_hmac_key,
1171 opts.new_cluster_domain_secret,
1172 opts.cluster_domain_secret,
1173 opts.force,
1174 opts.new_node_cert)
1175
1176
1177 def _GetEnabledDiskTemplates(opts):
1178 """Determine the list of enabled disk templates.
1179
1180 """
1181 if opts.enabled_disk_templates:
1182 return opts.enabled_disk_templates.split(",")
1183 else:
1184 return None
1185
1186
1187 def _GetVgName(opts, enabled_disk_templates):
1188 """Determine the volume group name.
1189
1190 @type enabled_disk_templates: list of strings
1191 @param enabled_disk_templates: cluster-wide enabled disk-templates
1192
1193 """
1194 # consistency between vg name and enabled disk templates
1195 vg_name = None
1196 if opts.vg_name is not None:
1197 vg_name = opts.vg_name
1198 if enabled_disk_templates:
1199 if vg_name and not utils.IsLvmEnabled(enabled_disk_templates):
1200 ToStdout("You specified a volume group with --vg-name, but you did not"
1201 " enable any of the following lvm-based disk templates: %s" %
1202 utils.CommaJoin(constants.DTS_LVM))
1203 return vg_name
1204
1205
1206 def _GetDrbdHelper(opts, enabled_disk_templates):
1207 """Determine the DRBD usermode helper.
1208
1209 """
1210 drbd_helper = opts.drbd_helper
1211 if enabled_disk_templates:
1212 drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
1213 if not drbd_enabled and opts.drbd_helper:
1214 ToStdout("You specified a DRBD usermode helper with "
1215 " --drbd-usermode-helper while DRBD is not enabled.")
1216 return drbd_helper
1217
1218
1219 def _GetCompressionTools(opts):
1220 """Determine the list of custom compression tools.
1221
1222 """
1223 if opts.compression_tools:
1224 return opts.compression_tools.split(",")
1225 elif opts.compression_tools is None:
1226 return None # To note the parameter was not provided
1227 else:
1228 return constants.IEC_DEFAULT_TOOLS # Resetting to default
1229
1230
1231 def SetClusterParams(opts, args):
1232 """Modify the cluster.
1233
1234 @param opts: the command line options selected by the user
1235 @type args: list
1236 @param args: should be an empty list
1237 @rtype: int
1238 @return: the desired exit code
1239
1240 """
1241 if not (opts.vg_name is not None or
1242 opts.drbd_helper is not None or
1243 opts.enabled_hypervisors or opts.hvparams or
1244 opts.beparams or opts.nicparams or
1245 opts.ndparams or opts.diskparams or
1246 opts.candidate_pool_size is not None or
1247 opts.max_running_jobs is not None or
1248 opts.max_tracked_jobs is not None or
1249 opts.uid_pool is not None or
1250 opts.maintain_node_health is not None or
1251 opts.add_uids is not None or
1252 opts.remove_uids is not None or
1253 opts.default_iallocator is not None or
1254 opts.default_iallocator_params or
1255 opts.reserved_lvs is not None or
1256 opts.mac_prefix is not None or
1257 opts.master_netdev is not None or
1258 opts.master_netmask is not None or
1259 opts.use_external_mip_script is not None or
1260 opts.prealloc_wipe_disks is not None or
1261 opts.hv_state or
1262 opts.enabled_disk_templates or
1263 opts.disk_state or
1264 opts.ipolicy_bounds_specs is not None or
1265 opts.ipolicy_std_specs is not None or
1266 opts.ipolicy_disk_templates is not None or
1267 opts.ipolicy_vcpu_ratio is not None or
1268 opts.ipolicy_spindle_ratio is not None or
1269 opts.modify_etc_hosts is not None or
1270 opts.file_storage_dir is not None or
1271 opts.install_image is not None or
1272 opts.instance_communication_network is not None or
1273 opts.zeroing_image is not None or
1274 opts.shared_file_storage_dir is not None or
1275 opts.compression_tools is not None or
1276 opts.shared_file_storage_dir is not None or
1277 opts.enabled_user_shutdown is not None):
1278 ToStderr("Please give at least one of the parameters.")
1279 return 1
1280
1281 enabled_disk_templates = _GetEnabledDiskTemplates(opts)
1282 vg_name = _GetVgName(opts, enabled_disk_templates)
1283
1284 try:
1285 drbd_helper = _GetDrbdHelper(opts, enabled_disk_templates)
1286 except errors.OpPrereqError, e:
1287 ToStderr(str(e))
1288 return 1
1289
1290 hvlist = opts.enabled_hypervisors
1291 if hvlist is not None:
1292 hvlist = hvlist.split(",")
1293
1294 # a list of (name, dict) we can pass directly to dict() (or [])
1295 hvparams = dict(opts.hvparams)
1296 for hv_params in hvparams.values():
1297 utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1298
1299 diskparams = dict(opts.diskparams)
1300
1301 for dt_params in diskparams.values():
1302 utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
1303
1304 beparams = opts.beparams
1305 utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
1306
1307 nicparams = opts.nicparams
1308 utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
1309
1310 ndparams = opts.ndparams
1311 if ndparams is not None:
1312 utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
1313
1314 ipolicy = CreateIPolicyFromOpts(
1315 minmax_ispecs=opts.ipolicy_bounds_specs,
1316 std_ispecs=opts.ipolicy_std_specs,
1317 ipolicy_disk_templates=opts.ipolicy_disk_templates,
1318 ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
1319 ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
1320 )
1321
1322 mnh = opts.maintain_node_health
1323
1324 uid_pool = opts.uid_pool
1325 if uid_pool is not None:
1326 uid_pool = uidpool.ParseUidPool(uid_pool)
1327
1328 add_uids = opts.add_uids
1329 if add_uids is not None:
1330 add_uids = uidpool.ParseUidPool(add_uids)
1331
1332 remove_uids = opts.remove_uids
1333 if remove_uids is not None:
1334 remove_uids = uidpool.ParseUidPool(remove_uids)
1335
1336 if opts.reserved_lvs is not None:
1337 if opts.reserved_lvs == "":
1338 opts.reserved_lvs = []
1339 else:
1340 opts.reserved_lvs = utils.UnescapeAndSplit(opts.reserved_lvs, sep=",")
1341
1342 if opts.master_netmask is not None:
1343 try:
1344 opts.master_netmask = int(opts.master_netmask)
1345 except ValueError:
1346 ToStderr("The --master-netmask option expects an int parameter.")
1347 return 1
1348
1349 ext_ip_script = opts.use_external_mip_script
1350
1351 if opts.disk_state:
1352 disk_state = utils.FlatToDict(opts.disk_state)
1353 else:
1354 disk_state = {}
1355
1356 hv_state = dict(opts.hv_state)
1357
1358 compression_tools = _GetCompressionTools(opts)
1359
1360 op = opcodes.OpClusterSetParams(
1361 vg_name=vg_name,
1362 drbd_helper=drbd_helper,
1363 enabled_hypervisors=hvlist,
1364 hvparams=hvparams,
1365 os_hvp=None,
1366 beparams=beparams,
1367 nicparams=nicparams,
1368 ndparams=ndparams,
1369 diskparams=diskparams,
1370 ipolicy=ipolicy,
1371 candidate_pool_size=opts.candidate_pool_size,
1372 max_running_jobs=opts.max_running_jobs,
1373 max_tracked_jobs=opts.max_tracked_jobs,
1374 maintain_node_health=mnh,
1375 modify_etc_hosts=opts.modify_etc_hosts,
1376 uid_pool=uid_pool,
1377 add_uids=add_uids,
1378 remove_uids=remove_uids,
1379 default_iallocator=opts.default_iallocator,
1380 default_iallocator_params=opts.default_iallocator_params,
1381 prealloc_wipe_disks=opts.prealloc_wipe_disks,
1382 mac_prefix=opts.mac_prefix,
1383 master_netdev=opts.master_netdev,
1384 master_netmask=opts.master_netmask,
1385 reserved_lvs=opts.reserved_lvs,
1386 use_external_mip_script=ext_ip_script,
1387 hv_state=hv_state,
1388 disk_state=disk_state,
1389 enabled_disk_templates=enabled_disk_templates,
1390 force=opts.force,
1391 file_storage_dir=opts.file_storage_dir,
1392 install_image=opts.install_image,
1393 instance_communication_network=opts.instance_communication_network,
1394 zeroing_image=opts.zeroing_image,
1395 shared_file_storage_dir=opts.shared_file_storage_dir,
1396 compression_tools=compression_tools,
1397 enabled_user_shutdown=opts.enabled_user_shutdown,
1398 )
1399 return base.GetResult(None, opts, SubmitOrSend(op, opts))
1400
1401
1402 def QueueOps(opts, args):
1403 """Queue operations.
1404
1405 @param opts: the command line options selected by the user
1406 @type args: list
1407 @param args: should contain only one element, the subcommand
1408 @rtype: int
1409 @return: the desired exit code
1410
1411 """
1412 command = args[0]
1413 client = GetClient()
1414 if command in ("drain", "undrain"):
1415 drain_flag = command == "drain"
1416 client.SetQueueDrainFlag(drain_flag)
1417 elif command == "info":
1418 result = client.QueryConfigValues(["drain_flag"])
1419 if result[0]:
1420 val = "set"
1421 else:
1422 val = "unset"
1423 ToStdout("The drain flag is %s" % val)
1424 else:
1425 raise errors.OpPrereqError("Command '%s' is not valid." % command,
1426 errors.ECODE_INVAL)
1427
1428 return 0
1429
1430
1431 def _ShowWatcherPause(until):
1432 if until is None or until < time.time():
1433 ToStdout("The watcher is not paused.")
1434 else:
1435 ToStdout("The watcher is paused until %s.", time.ctime(until))
1436
1437
1438 def WatcherOps(opts, args):
1439 """Watcher operations.
1440
1441 @param opts: the command line options selected by the user
1442 @type args: list
1443 @param args: should contain only one element, the subcommand
1444 @rtype: int
1445 @return: the desired exit code
1446
1447 """
1448 command = args[0]
1449 client = GetClient()
1450
1451 if command == "continue":
1452 client.SetWatcherPause(None)
1453 ToStdout("The watcher is no longer paused.")
1454
1455 elif command == "pause":
1456 if len(args) < 2:
1457 raise errors.OpPrereqError("Missing pause duration", errors.ECODE_INVAL)
1458
1459 result = client.SetWatcherPause(time.time() + ParseTimespec(args[1]))
1460 _ShowWatcherPause(result)
1461
1462 elif command == "info":
1463 result = client.QueryConfigValues(["watcher_pause"])
1464 _ShowWatcherPause(result[0])
1465
1466 else:
1467 raise errors.OpPrereqError("Command '%s' is not valid." % command,
1468 errors.ECODE_INVAL)
1469
1470 return 0
1471
1472
1473 def _OobPower(opts, node_list, power):
1474 """Puts the node in the list to desired power state.
1475
1476 @param opts: The command line options selected by the user
1477 @param node_list: The list of nodes to operate on
1478 @param power: True if they should be powered on, False otherwise
1479 @return: The success of the operation (none failed)
1480
1481 """
1482 if power:
1483 command = constants.OOB_POWER_ON
1484 else:
1485 command = constants.OOB_POWER_OFF
1486
1487 op = opcodes.OpOobCommand(node_names=node_list,
1488 command=command,
1489 ignore_status=True,
1490 timeout=opts.oob_timeout,
1491 power_delay=opts.power_delay)
1492 result = SubmitOpCode(op, opts=opts)
1493 errs = 0
1494 for node_result in result:
1495 (node_tuple, data_tuple) = node_result
1496 (_, node_name) = node_tuple
1497 (data_status, _) = data_tuple
1498 if data_status != constants.RS_NORMAL:
1499 assert data_status != constants.RS_UNAVAIL
1500 errs += 1
1501 ToStderr("There was a problem changing power for %s, please investigate",
1502 node_name)
1503
1504 if errs > 0:
1505 return False
1506
1507 return True
1508
1509
1510 def _InstanceStart(opts, inst_list, start, no_remember=False):
1511 """Puts the instances in the list to desired state.
1512
1513 @param opts: The command line options selected by the user
1514 @param inst_list: The list of instances to operate on
1515 @param start: True if they should be started, False for shutdown
1516 @param no_remember: If the instance state should be remembered
1517 @return: The success of the operation (none failed)
1518
1519 """
1520 if start:
1521 opcls = opcodes.OpInstanceStartup
1522 text_submit, text_success, text_failed = ("startup", "started", "starting")
1523 else:
1524 opcls = compat.partial(opcodes.OpInstanceShutdown,
1525 timeout=opts.shutdown_timeout,
1526 no_remember=no_remember)
1527 text_submit, text_success, text_failed = ("shutdown", "stopped", "stopping")
1528
1529 jex = JobExecutor(opts=opts)
1530
1531 for inst in inst_list:
1532 ToStdout("Submit %s of instance %s", text_submit, inst)
1533 op = opcls(instance_name=inst)
1534 jex.QueueJob(inst, op)
1535
1536 results = jex.GetResults()
1537 bad_cnt = len([1 for (success, _) in results if not success])
1538
1539 if bad_cnt == 0:
1540 ToStdout("All instances have been %s successfully", text_success)
1541 else:
1542 ToStderr("There were errors while %s instances:\n"
1543 "%d error(s) out of %d instance(s)", text_failed, bad_cnt,
1544 len(results))
1545 return False
1546
1547 return True
1548
1549
1550 class _RunWhenNodesReachableHelper(object):
1551 """Helper class to make shared internal state sharing easier.
1552
1553 @ivar success: Indicates if all action_cb calls were successful
1554
1555 """
1556 def __init__(self, node_list, action_cb, node2ip, port, feedback_fn,
1557 _ping_fn=netutils.TcpPing, _sleep_fn=time.sleep):
1558 """Init the object.
1559
1560 @param node_list: The list of nodes to be reachable
1561 @param action_cb: Callback called when a new host is reachable
1562 @type node2ip: dict
1563 @param node2ip: Node to ip mapping
1564 @param port: The port to use for the TCP ping
1565 @param feedback_fn: The function used for feedback
1566 @param _ping_fn: Function to check reachabilty (for unittest use only)
1567 @param _sleep_fn: Function to sleep (for unittest use only)
1568
1569 """
1570 self.down = set(node_list)
1571 self.up = set()
1572 self.node2ip = node2ip
1573 self.success = True
1574 self.action_cb = action_cb
1575 self.port = port
1576 self.feedback_fn = feedback_fn
1577 self._ping_fn = _ping_fn
1578 self._sleep_fn = _sleep_fn
1579
1580 def __call__(self):
1581 """When called we run action_cb.
1582
1583 @raises utils.RetryAgain: When there are still down nodes
1584
1585 """
1586 if not self.action_cb(self.up):
1587 self.success = False
1588
1589 if self.down:
1590 raise utils.RetryAgain()
1591 else:
1592 return self.success
1593
1594 def Wait(self, secs):
1595 """Checks if a host is up or waits remaining seconds.
1596
1597 @param secs: The secs remaining
1598
1599 """
1600 start = time.time()
1601 for node in self.down:
1602 if self._ping_fn(self.node2ip[node], self.port, timeout=_EPO_PING_TIMEOUT,
1603 live_port_needed=True):
1604 self.feedback_fn("Node %s became available" % node)
1605 self.up.add(node)
1606 self.down -= self.up
1607 # If we have a node available there is the possibility to run the
1608 # action callback successfully, therefore we don't wait and return
1609 return
1610
1611 self._sleep_fn(max(0.0, start + secs - time.time()))
1612
1613
1614 def _RunWhenNodesReachable(node_list, action_cb, interval):
1615 """Run action_cb when nodes become reachable.
1616
1617 @param node_list: The list of nodes to be reachable
1618 @param action_cb: Callback called when a new host is reachable
1619 @param interval: The earliest time to retry
1620
1621 """
1622 client = GetClient()
1623 cluster_info = client.QueryClusterInfo()
1624 if cluster_info["primary_ip_version"] == constants.IP4_VERSION:
1625 family = netutils.IPAddress.family
1626 else:
1627 family = netutils.IP6Address.family
1628
1629 node2ip = dict((node, netutils.GetHostname(node, family=family).ip)
1630 for node in node_list)
1631
1632 port = netutils.GetDaemonPort(constants.NODED)
1633 helper = _RunWhenNodesReachableHelper(node_list, action_cb, node2ip, port,
1634 ToStdout)
1635
1636 try:
1637 return utils.Retry(helper, interval, _EPO_REACHABLE_TIMEOUT,
1638 wait_fn=helper.Wait)
1639 except utils.RetryTimeout:
1640 ToStderr("Time exceeded while waiting for nodes to become reachable"
1641 " again:\n - %s", " - ".join(helper.down))
1642 return False
1643
1644
1645 def _MaybeInstanceStartup(opts, inst_map, nodes_online,
1646 _instance_start_fn=_InstanceStart):
1647 """Start the instances conditional based on node_states.
1648
1649 @param opts: The command line options selected by the user
1650 @param inst_map: A dict of inst -> nodes mapping
1651 @param nodes_online: A list of nodes online
1652 @param _instance_start_fn: Callback to start instances (unittest use only)
1653 @return: Success of the operation on all instances
1654
1655 """
1656 start_inst_list = []
1657 for (inst, nodes) in inst_map.items():
1658 if not (nodes - nodes_online):
1659 # All nodes the instance lives on are back online
1660 start_inst_list.append(inst)
1661
1662 for inst in start_inst_list:
1663 del inst_map[inst]
1664
1665 if start_inst_list:
1666 return _instance_start_fn(opts, start_inst_list, True)
1667
1668 return True
1669
1670
1671 def _EpoOn(opts, full_node_list, node_list, inst_map):
1672 """Does the actual power on.
1673
1674 @param opts: The command line options selected by the user
1675 @param full_node_list: All nodes to operate on (includes nodes not supporting
1676 OOB)
1677 @param node_list: The list of nodes to operate on (all need to support OOB)
1678 @param inst_map: A dict of inst -> nodes mapping
1679 @return: The desired exit status
1680
1681 """
1682 if node_list and not _OobPower(opts, node_list, False):
1683 ToStderr("Not all nodes seem to get back up, investigate and start"
1684 " manually if needed")
1685
1686 # Wait for the nodes to be back up
1687 action_cb = compat.partial(_MaybeInstanceStartup, opts, dict(inst_map))
1688
1689 ToStdout("Waiting until all nodes are available again")
1690 if not _RunWhenNodesReachable(full_node_list, action_cb, _EPO_PING_INTERVAL):
1691 ToStderr("Please investigate and start stopped instances manually")
1692 return constants.EXIT_FAILURE
1693
1694 return constants.EXIT_SUCCESS
1695
1696
1697 def _EpoOff(opts, node_list, inst_map):
1698 """Does the actual power off.
1699
1700 @param opts: The command line options selected by the user
1701 @param node_list: The list of nodes to operate on (all need to support OOB)
1702 @param inst_map: A dict of inst -> nodes mapping
1703 @return: The desired exit status
1704
1705 """
1706 if not _InstanceStart(opts, inst_map.keys(), False, no_remember=True):
1707 ToStderr("Please investigate and stop instances manually before continuing")
1708 return constants.EXIT_FAILURE
1709
1710 if not node_list:
1711 return constants.EXIT_SUCCESS
1712
1713 if _OobPower(opts, node_list, False):
1714 return constants.EXIT_SUCCESS
1715 else:
1716 return constants.EXIT_FAILURE
1717
1718
1719 def Epo(opts, args, qcl=None, _on_fn=_EpoOn, _off_fn=_EpoOff,
1720 _confirm_fn=ConfirmOperation,
1721 _stdout_fn=ToStdout, _stderr_fn=ToStderr):
1722 """EPO operations.
1723
1724 @param opts: the command line options selected by the user
1725 @type args: list
1726 @param args: should contain only one element, the subcommand
1727 @rtype: int
1728 @return: the desired exit code
1729
1730 """
1731 if opts.groups and opts.show_all:
1732 _stderr_fn("Only one of --groups or --all are allowed")
1733 return constants.EXIT_FAILURE
1734 elif args and opts.show_all:
1735 _stderr_fn("Arguments in combination with --all are not allowed")
1736 return constants.EXIT_FAILURE
1737
1738 if qcl is None:
1739 # Query client
1740 qcl = GetClient()
1741
1742 if opts.groups:
1743 node_query_list = \
1744 itertools.chain(*qcl.QueryGroups(args, ["node_list"], False))
1745 else:
1746 node_query_list = args
1747
1748 result = qcl.QueryNodes(node_query_list, ["name", "master", "pinst_list",
1749 "sinst_list", "powered", "offline"],
1750 False)
1751
1752 all_nodes = map(compat.fst, result)
1753 node_list = []
1754 inst_map = {}
1755 for (node, master, pinsts, sinsts, powered, offline) in result:
1756 if not offline:
1757 for inst in (pinsts + sinsts):
1758 if inst in inst_map:
1759 if not master:
1760 inst_map[inst].add(node)
1761 elif master:
1762 inst_map[inst] = set()
1763 else:
1764 inst_map[inst] = set([node])
1765
1766 if master and opts.on:
1767 # We ignore the master for turning on the machines, in fact we are
1768 # already operating on the master at this point :)
1769 continue
1770 elif master and not opts.show_all:
1771 _stderr_fn("%s is the master node, please do a master-failover to another"
1772 " node not affected by the EPO or use --all if you intend to"
1773 " shutdown the whole cluster", node)
1774 return constants.EXIT_FAILURE
1775 elif powered is None:
1776 _stdout_fn("Node %s does not support out-of-band handling, it can not be"
1777 " handled in a fully automated manner", node)
1778 elif powered == opts.on:
1779 _stdout_fn("Node %s is already in desired power state, skipping", node)
1780 elif not offline or (offline and powered):
1781 node_list.append(node)
1782
1783 if not (opts.force or _confirm_fn(all_nodes, "nodes", "epo")):
1784 return constants.EXIT_FAILURE
1785
1786 if opts.on:
1787 return _on_fn(opts, all_nodes, node_list, inst_map)
1788 else:
1789 return _off_fn(opts, node_list, inst_map)
1790
1791
1792 def _GetCreateCommand(info):
1793 buf = StringIO()
1794 buf.write("gnt-cluster init")
1795 PrintIPolicyCommand(buf, info["ipolicy"], False)
1796 buf.write(" ")
1797 buf.write(info["name"])
1798 return buf.getvalue()
1799
1800
1801 def ShowCreateCommand(opts, args):
1802 """Shows the command that can be used to re-create the cluster.
1803
1804 Currently it works only for ipolicy specs.
1805
1806 """
1807 cl = GetClient()
1808 result = cl.QueryClusterInfo()
1809 ToStdout(_GetCreateCommand(result))
1810
1811
1812 def _RunCommandAndReport(cmd):
1813 """Run a command and report its output, iff it failed.
1814
1815 @param cmd: the command to execute
1816 @type cmd: list
1817 @rtype: bool
1818 @return: False, if the execution failed.
1819
1820 """
1821 result = utils.RunCmd(cmd)
1822 if result.failed:
1823 ToStderr("Command %s failed: %s; Output %s" %
1824 (cmd, result.fail_reason, result.output))
1825 return False
1826 return True
1827
1828
1829 def _VerifyCommand(cmd):
1830 """Verify that a given command succeeds on all online nodes.
1831
1832 As this function is intended to run during upgrades, it
1833 is implemented in such a way that it still works, if all Ganeti
1834 daemons are down.
1835
1836 @param cmd: the command to execute
1837 @type cmd: list
1838 @rtype: list
1839 @return: the list of node names that are online where
1840 the command failed.
1841
1842 """
1843 command = utils.text.ShellQuoteArgs([str(val) for val in cmd])
1844
1845 nodes = ssconf.SimpleStore().GetOnlineNodeList()
1846 master_node = ssconf.SimpleStore().GetMasterNode()
1847 cluster_name = ssconf.SimpleStore().GetClusterName()
1848
1849 # If master node is in 'nodes', make sure master node is at list end
1850 if master_node in nodes:
1851 nodes.remove(master_node)
1852 nodes.append(master_node)
1853
1854 failed = []
1855
1856 srun = ssh.SshRunner(cluster_name=cluster_name)
1857 for name in nodes:
1858 result = srun.Run(name, constants.SSH_LOGIN_USER, command)
1859 if result.exit_code != 0:
1860 failed.append(name)
1861
1862 return failed
1863
1864
1865 def _VerifyVersionInstalled(versionstring):
1866 """Verify that the given version of ganeti is installed on all online nodes.
1867
1868 Do nothing, if this is the case, otherwise print an appropriate
1869 message to stderr.
1870
1871 @param versionstring: the version to check for
1872 @type versionstring: string
1873 @rtype: bool
1874 @return: True, if the version is installed on all online nodes
1875
1876 """
1877 badnodes = _VerifyCommand(["test", "-d",
1878 os.path.join(pathutils.PKGLIBDIR, versionstring)])
1879 if badnodes:
1880 ToStderr("Ganeti version %s not installed on nodes %s"
1881 % (versionstring, ", ".join(badnodes)))
1882 return False
1883
1884 return True
1885
1886
1887 def _GetRunning():
1888 """Determine the list of running jobs.
1889
1890 @rtype: list
1891 @return: the number of jobs still running
1892
1893 """
1894 cl = GetClient()
1895 qfilter = qlang.MakeSimpleFilter("status",
1896 frozenset([constants.JOB_STATUS_RUNNING]))
1897 return len(cl.Query(constants.QR_JOB, [], qfilter).data)
1898
1899
1900 def _SetGanetiVersion(versionstring):
1901 """Set the active version of ganeti to the given versionstring
1902
1903 @type versionstring: string
1904 @rtype: list
1905 @return: the list of nodes where the version change failed
1906
1907 """
1908 failed = []
1909 if constants.HAS_GNU_LN:
1910 failed.extend(_VerifyCommand(
1911 ["ln", "-s", "-f", "-T",
1912 os.path.join(pathutils.PKGLIBDIR, versionstring),
1913 os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
1914 failed.extend(_VerifyCommand(
1915 ["ln", "-s", "-f", "-T",
1916 os.path.join(pathutils.SHAREDIR, versionstring),
1917 os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
1918 else:
1919 failed.extend(_VerifyCommand(
1920 ["rm", "-f", os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
1921 failed.extend(_VerifyCommand(
1922 ["ln", "-s", "-f", os.path.join(pathutils.PKGLIBDIR, versionstring),
1923 os.path.join(pathutils.SYSCONFDIR, "ganeti/lib")]))
1924 failed.extend(_VerifyCommand(
1925 ["rm", "-f", os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
1926 failed.extend(_VerifyCommand(
1927 ["ln", "-s", "-f", os.path.join(pathutils.SHAREDIR, versionstring),
1928 os.path.join(pathutils.SYSCONFDIR, "ganeti/share")]))
1929 return list(set(failed))
1930
1931
1932 def _ExecuteCommands(fns):
1933 """Execute a list of functions, in reverse order.
1934
1935 @type fns: list of functions.
1936 @param fns: the functions to be executed.
1937
1938 """
1939 for fn in reversed(fns):
1940 fn()
1941
1942
1943 def _GetConfigVersion():
1944 """Determine the version the configuration file currently has.
1945
1946 @rtype: tuple or None
1947 @return: (major, minor, revision) if the version can be determined,
1948 None otherwise
1949
1950 """
1951 config_data = serializer.LoadJson(utils.ReadFile(pathutils.CLUSTER_CONF_FILE))
1952 try:
1953 config_version = config_data["version"]
1954 except KeyError:
1955 return None
1956 return utils.SplitVersion(config_version)
1957
1958
1959 def _ReadIntentToUpgrade():
1960 """Read the file documenting the intent to upgrade the cluster.
1961
1962 @rtype: (string, string) or (None, None)
1963 @return: (old version, version to upgrade to), if the file exists,
1964 and (None, None) otherwise.
1965
1966 """
1967 if not os.path.isfile(pathutils.INTENT_TO_UPGRADE):
1968 return (None, None)
1969
1970 contentstring = utils.ReadFile(pathutils.INTENT_TO_UPGRADE)
1971 contents = utils.UnescapeAndSplit(contentstring)
1972 if len(contents) != 3:
1973 # file syntactically mal-formed
1974 return (None, None)
1975 return (contents[0], contents[1])
1976
1977
1978 def _WriteIntentToUpgrade(version):
1979 """Write file documenting the intent to upgrade the cluster.
1980
1981 @type version: string
1982 @param version: the version we intent to upgrade to
1983
1984 """
1985 utils.WriteFile(pathutils.INTENT_TO_UPGRADE,
1986 data=utils.EscapeAndJoin([constants.RELEASE_VERSION, version,
1987 "%d" % os.getpid()]))
1988
1989
1990 def _UpgradeBeforeConfigurationChange(versionstring):
1991 """
1992 Carry out all the tasks necessary for an upgrade that happen before
1993 the configuration file, or Ganeti version, changes.
1994
1995 @type versionstring: string
1996 @param versionstring: the version to upgrade to
1997 @rtype: (bool, list)
1998 @return: tuple of a bool indicating success and a list of rollback tasks
1999
2000 """
2001 rollback = []
2002
2003 if not _VerifyVersionInstalled(versionstring):
2004 return (False, rollback)
2005
2006 _WriteIntentToUpgrade(versionstring)
2007 rollback.append(
2008 lambda: utils.RunCmd(["rm", "-f", pathutils.INTENT_TO_UPGRADE]))
2009
2010 ToStdout("Draining queue")
2011 client = GetClient()
2012 client.SetQueueDrainFlag(True)
2013
2014 rollback.append(lambda: GetClient().SetQueueDrainFlag(False))
2015
2016 if utils.SimpleRetry(0, _GetRunning,
2017 constants.UPGRADE_QUEUE_POLL_INTERVAL,
2018 constants.UPGRADE_QUEUE_DRAIN_TIMEOUT):
2019 ToStderr("Failed to completely empty the queue.")
2020 return (False, rollback)
2021
2022 ToStdout("Pausing the watcher for one hour.")
2023 rollback.append(lambda: GetClient().SetWatcherPause(None))
2024 GetClient().SetWatcherPause(time.time() + 60 * 60)
2025
2026 ToStdout("Stopping daemons on master node.")
2027 if not _RunCommandAndReport([pathutils.DAEMON_UTIL, "stop-all"]):
2028 return (False, rollback)
2029
2030 if not _VerifyVersionInstalled(versionstring):
2031 utils.RunCmd([pathutils.DAEMON_UTIL, "start-all"])
2032 return (False, rollback)
2033
2034 ToStdout("Stopping daemons everywhere.")
2035 rollback.append(lambda: _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"]))
2036 badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "stop-all"])
2037 if badnodes:
2038 ToStderr("Failed to stop daemons on %s." % (", ".join(badnodes),))
2039 return (False, rollback)
2040
2041 backuptar = os.path.join(pathutils.BACKUP_DIR, "ganeti%d.tar" % time.time())
2042 ToStdout("Backing up configuration as %s" % backuptar)
2043 if not _RunCommandAndReport(["mkdir", "-p", pathutils.BACKUP_DIR]):
2044 return (False, rollback)
2045
2046 # Create the archive in a safe manner, as it contains sensitive
2047 # information.
2048 (_, tmp_name) = tempfile.mkstemp(prefix=backuptar, dir=pathutils.BACKUP_DIR)
2049 if not _RunCommandAndReport(["tar", "-cf", tmp_name,
2050 "--exclude=queue/archive",
2051 pathutils.DATA_DIR]):
2052 return (False, rollback)
2053
2054 os.rename(tmp_name, backuptar)
2055 return (True, rollback)
2056
2057
2058 def _VersionSpecificDowngrade():
2059 """
2060 Perform any additional downrade tasks that are version specific
2061 and need to be done just after the configuration downgrade. This
2062 function needs to be idempotent, so that it can be redone if the
2063 downgrade procedure gets interrupted after changing the
2064 configuration.
2065
2066 Note that this function has to be reset with every version bump.
2067
2068 @return: True upon success
2069 """
2070 ToStdout("Performing version-specific downgrade tasks.")
2071 return True
2072
2073
2074 def _SwitchVersionAndConfig(versionstring, downgrade):
2075 """
2076 Switch to the new Ganeti version and change the configuration,
2077 in correct order.
2078
2079 @type versionstring: string
2080 @param versionstring: the version to change to
2081 @type downgrade: bool
2082 @param downgrade: True, if the configuration should be downgraded
2083 @rtype: (bool, list)
2084 @return: tupe of a bool indicating success, and a list of
2085 additional rollback tasks
2086
2087 """
2088 rollback = []
2089 if downgrade:
2090 ToStdout("Downgrading configuration")
2091 if not _RunCommandAndReport([pathutils.CFGUPGRADE, "--downgrade", "-f"]):
2092 return (False, rollback)
2093 # Note: version specific downgrades need to be done before switching
2094 # binaries, so that we still have the knowledgeable binary if the downgrade
2095 # process gets interrupted at this point.
2096 if not _VersionSpecificDowngrade():
2097 return (False, rollback)
2098
2099 # Configuration change is the point of no return. From then onwards, it is
2100 # safer to push through the up/dowgrade than to try to roll it back.
2101
2102 ToStdout("Switching to version %s on all nodes" % versionstring)
2103 rollback.append(lambda: _SetGanetiVersion(constants.DIR_VERSION))
2104 badnodes = _SetGanetiVersion(versionstring)
2105 if badnodes:
2106 ToStderr("Failed to switch to Ganeti version %s on nodes %s"
2107 % (versionstring, ", ".join(badnodes)))
2108 if not downgrade:
2109 return (False, rollback)
2110
2111 # Now that we have changed to the new version of Ganeti we should
2112 # not communicate over luxi any more, as luxi might have changed in
2113 # incompatible ways. Therefore, manually call the corresponding ganeti
2114 # commands using their canonical (version independent) path.
2115
2116 if not downgrade:
2117 ToStdout("Upgrading configuration")
2118 if not _RunCommandAndReport([pathutils.CFGUPGRADE, "-f"]):
2119 return (False, rollback)
2120
2121 return (True, rollback)
2122
2123
2124 def _UpgradeAfterConfigurationChange(oldversion):
2125 """
2126 Carry out the upgrade actions necessary after switching to the new
2127 Ganeti version and updating the configuration.
2128
2129 As this part is run at a time where the new version of Ganeti is already
2130 running, no communication should happen via luxi, as this is not a stable
2131 interface. Also, as the configuration change is the point of no return,
2132 all actions are pushed trough, even if some of them fail.
2133
2134 @param oldversion: the version the upgrade started from
2135 @type oldversion: string
2136 @rtype: int
2137 @return: the intended return value
2138
2139 """
2140 returnvalue = 0
2141
2142 ToStdout("Ensuring directories everywhere.")
2143 badnodes = _VerifyCommand([pathutils.ENSURE_DIRS])
2144 if badnodes:
2145 ToStderr("Warning: failed to ensure directories on %s." %
2146 (", ".join(badnodes)))
2147 returnvalue = 1
2148
2149 ToStdout("Starting daemons everywhere.")
2150 badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"])
2151 if badnodes:
2152 ToStderr("Warning: failed to start daemons on %s." % (", ".join(badnodes),))
2153 returnvalue = 1
2154
2155 ToStdout("Redistributing the configuration.")
2156 if not _RunCommandAndReport(["gnt-cluster", "redist-conf", "--yes-do-it"]):
2157 returnvalue = 1
2158
2159 ToStdout("Restarting daemons everywhere.")
2160 badnodes = _VerifyCommand([pathutils.DAEMON_UTIL, "stop-all"])
2161 badnodes.extend(_VerifyCommand([pathutils.DAEMON_UTIL, "start-all"]))
2162 if badnodes:
2163 ToStderr("Warning: failed to start daemons on %s." %
2164 (", ".join(list(set(badnodes))),))
2165 returnvalue = 1
2166
2167 ToStdout("Undraining the queue.")
2168 if not _RunCommandAndReport(["gnt-cluster", "queue", "undrain"]):
2169 returnvalue = 1
2170
2171 _RunCommandAndReport(["rm", "-f", pathutils.INTENT_TO_UPGRADE])
2172
2173 ToStdout("Running post-upgrade hooks")
2174 if not _RunCommandAndReport([pathutils.POST_UPGRADE, oldversion]):
2175 returnvalue = 1
2176
2177 ToStdout("Unpausing the watcher.")
2178 if not _RunCommandAndReport(["gnt-cluster", "watcher", "continue"]):
2179 returnvalue = 1
2180
2181 ToStdout("Verifying cluster.")
2182 if not _RunCommandAndReport(["gnt-cluster", "verify"]):
2183 returnvalue = 1
2184
2185 return returnvalue
2186
2187
2188 def UpgradeGanetiCommand(opts, args):
2189 """Upgrade a cluster to a new ganeti version.
2190
2191 @param opts: the command line options selected by the user
2192 @type args: list
2193 @param args: should be an empty list
2194 @rtype: int
2195 @return: the desired exit code
2196
2197 """
2198 if ((not opts.resume and opts.to is None)
2199 or (opts.resume and opts.to is not None)):
2200 ToStderr("Precisely one of the options --to and --resume"
2201 " has to be given")
2202 return 1
2203
2204 # If we're not told to resume, verify there is no upgrade
2205 # in progress.
2206 if not opts.resume:
2207 oldversion, versionstring = _ReadIntentToUpgrade()
2208 if versionstring is not None:
2209 # An upgrade is going on; verify whether the target matches
2210 if versionstring == opts.to:
2211 ToStderr("An upgrade is already in progress. Target version matches,"
2212 " resuming.")
2213 opts.resume = True
2214 opts.to = None
2215 else:
2216 ToStderr("An upgrade from %s to %s is in progress; use --resume to"
2217 " finish it first" % (oldversion, versionstring))
2218 return 1
2219
2220 oldversion = constants.RELEASE_VERSION
2221
2222 if opts.resume:
2223 ssconf.CheckMaster(False)
2224 oldversion, versionstring = _ReadIntentToUpgrade()
2225 if versionstring is None:
2226 return 0
2227 version = utils.version.ParseVersion(versionstring)
2228 if version is None:
2229 return 1
2230 configversion = _GetConfigVersion()
2231 if configversion is None:
2232 return 1
2233 # If the upgrade we resume was an upgrade between compatible
2234 # versions (like 2.10.0 to 2.10.1), the correct configversion
2235 # does not guarantee that the config has been updated.
2236 # However, in the case of a compatible update with the configuration
2237 # not touched, we are running a different dirversion with the same
2238 # config version.
2239 config_already_modified = \
2240 (utils.IsCorrectConfigVersion(version, configversion) and
2241 not (versionstring != constants.DIR_VERSION and
2242 configversion == (constants.CONFIG_MAJOR, constants.CONFIG_MINOR,
2243 constants.CONFIG_REVISION)))
2244 if not config_already_modified:
2245 # We have to start from the beginning; however, some daemons might have
2246 # already been stopped, so the only way to get into a well-defined state
2247 # is by starting all daemons again.
2248 _VerifyCommand([pathutils.DAEMON_UTIL, "start-all"])
2249 else:
2250 versionstring = opts.to
2251 config_already_modified = False
2252 version = utils.version.ParseVersion(versionstring)
2253 if version is None:
2254 ToStderr("Could not parse version string %s" % versionstring)
2255 return 1
2256
2257 msg = utils.version.UpgradeRange(version)
2258 if msg is not None:
2259 ToStderr("Cannot upgrade to %s: %s" % (versionstring, msg))
2260 return 1
2261
2262 if not config_already_modified:
2263 success, rollback = _UpgradeBeforeConfigurationChange(versionstring)
2264 if not success:
2265 _ExecuteCommands(rollback)
2266 return 1
2267 else:
2268 rollback = []
2269
2270 downgrade = utils.version.ShouldCfgdowngrade(version)
2271
2272 success, additionalrollback = \
2273 _SwitchVersionAndConfig(versionstring, downgrade)
2274 if not success:
2275 rollback.extend(additionalrollback)
2276 _ExecuteCommands(rollback)
2277 return 1
2278
2279 return _UpgradeAfterConfigurationChange(oldversion)
2280
2281
2282 commands = {
2283 "init": (
2284 InitCluster, [ArgHost(min=1, max=1)],
2285 [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, GLOBAL_FILEDIR_OPT,
2286 HVLIST_OPT, MAC_PREFIX_OPT, MASTER_NETDEV_OPT, MASTER_NETMASK_OPT,
2287 NIC_PARAMS_OPT, NOMODIFY_ETCHOSTS_OPT, NOMODIFY_SSH_SETUP_OPT,
2288 SECONDARY_IP_OPT, VG_NAME_OPT, MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT,
2289 DRBD_HELPER_OPT, DEFAULT_IALLOCATOR_OPT, DEFAULT_IALLOCATOR_PARAMS_OPT,
2290 PRIMARY_IP_VERSION_OPT, PREALLOC_WIPE_DISKS_OPT, NODE_PARAMS_OPT,
2291 GLOBAL_SHARED_FILEDIR_OPT, USE_EXTERNAL_MIP_SCRIPT, DISK_PARAMS_OPT,
2292 HV_STATE_OPT, DISK_STATE_OPT, ENABLED_DISK_TEMPLATES_OPT,
2293 IPOLICY_STD_SPECS_OPT, GLOBAL_GLUSTER_FILEDIR_OPT, INSTALL_IMAGE_OPT,
2294 ZEROING_IMAGE_OPT, COMPRESSION_TOOLS_OPT,
2295 ENABLED_USER_SHUTDOWN_OPT,
2296 ]
2297 + INSTANCE_POLICY_OPTS + SPLIT_ISPECS_OPTS,
2298 "[opts...] <cluster_name>", "Initialises a new cluster configuration"),
2299 "destroy": (
2300 DestroyCluster, ARGS_NONE, [YES_DOIT_OPT],
2301 "", "Destroy cluster"),
2302 "rename": (
2303 RenameCluster, [ArgHost(min=1, max=1)],
2304 [FORCE_OPT, DRY_RUN_OPT],
2305 "<new_name>",
2306 "Renames the cluster"),
2307 "redist-conf": (
2308 RedistributeConfig, ARGS_NONE, SUBMIT_OPTS +
2309 [DRY_RUN_OPT, PRIORITY_OPT, FORCE_DISTRIBUTION],
2310 "", "Forces a push of the configuration file and ssconf files"
2311 " to the nodes in the cluster"),
2312 "verify": (
2313 VerifyCluster, ARGS_NONE,
2314 [VERBOSE_OPT, DEBUG_SIMERR_OPT, ERROR_CODES_OPT, NONPLUS1_OPT,
2315 DRY_RUN_OPT, PRIORITY_OPT, NODEGROUP_OPT, IGNORE_ERRORS_OPT],
2316 "", "Does a check on the cluster configuration"),
2317 "verify-disks": (
2318 VerifyDisks, ARGS_NONE, [PRIORITY_OPT],
2319 "", "Does a check on the cluster disk status"),
2320 "repair-disk-sizes": (
2321 RepairDiskSizes, ARGS_MANY_INSTANCES, [DRY_RUN_OPT, PRIORITY_OPT],
2322 "[instance...]", "Updates mismatches in recorded disk sizes"),
2323 "master-failover": (
2324 MasterFailover, ARGS_NONE, [NOVOTING_OPT, FORCE_FAILOVER],
2325 "", "Makes the current node the master"),
2326 "master-ping": (
2327 MasterPing, ARGS_NONE, [],
2328 "", "Checks if the master is alive"),
2329 "version": (
2330 ShowClusterVersion, ARGS_NONE, [],
2331 "", "Shows the cluster version"),
2332 "getmaster": (
2333 ShowClusterMaster, ARGS_NONE, [],
2334 "", "Shows the cluster master"),
2335 "copyfile": (
2336 ClusterCopyFile, [ArgFile(min=1, max=1)],
2337 [NODE_LIST_OPT, USE_REPL_NET_OPT, NODEGROUP_OPT],
2338 "[-n node...] <filename>", "Copies a file to all (or only some) nodes"),
2339 "command": (
2340 RunClusterCommand, [ArgCommand(min=1)],
2341 [NODE_LIST_OPT, NODEGROUP_OPT, SHOW_MACHINE_OPT, FAILURE_ONLY_OPT],
2342 "[-n node...] <command>", "Runs a command on all (or only some) nodes"),
2343 "info": (
2344 ShowClusterConfig, ARGS_NONE, [ROMAN_OPT],
2345 "[--roman]", "Show cluster configuration"),
2346 "list-tags": (
2347 ListTags, ARGS_NONE, [], "", "List the tags of the cluster"),
2348 "add-tags": (
2349 AddTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT] + SUBMIT_OPTS,
2350 "tag...", "Add tags to the cluster"),
2351 "remove-tags": (
2352 RemoveTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT] + SUBMIT_OPTS,
2353 "tag...", "Remove tags from the cluster"),
2354 "search-tags": (
2355 SearchTags, [ArgUnknown(min=1, max=1)], [PRIORITY_OPT], "",
2356 "Searches the tags on all objects on"
2357 " the cluster for a given pattern (regex)"),
2358 "queue": (
2359 QueueOps,
2360 [ArgChoice(min=1, max=1, choices=["drain", "undrain", "info"])],
2361 [], "drain|undrain|info", "Change queue properties"),
2362 "watcher": (
2363 WatcherOps,
2364 [ArgChoice(min=1, max=1, choices=["pause", "continue", "info"]),
2365 ArgSuggest(min=0, max=1, choices=["30m", "1h", "4h"])],
2366 [],
2367 "{pause <timespec>|continue|info}", "Change watcher properties"),
2368 "modify": (
2369 SetClusterParams, ARGS_NONE,
2370 [FORCE_OPT,
2371 BACKEND_OPT, CP_SIZE_OPT, RQL_OPT, MAX_TRACK_OPT, INSTALL_IMAGE_OPT,
2372 INSTANCE_COMMUNICATION_NETWORK_OPT, ENABLED_HV_OPT, HVLIST_OPT,
2373 MAC_PREFIX_OPT, MASTER_NETDEV_OPT, MASTER_NETMASK_OPT, NIC_PARAMS_OPT,
2374 VG_NAME_OPT, MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, ADD_UIDS_OPT,
2375 REMOVE_UIDS_OPT, DRBD_HELPER_OPT, DEFAULT_IALLOCATOR_OPT,
2376 DEFAULT_IALLOCATOR_PARAMS_OPT, RESERVED_LVS_OPT, DRY_RUN_OPT, PRIORITY_OPT,
2377 PREALLOC_WIPE_DISKS_OPT, NODE_PARAMS_OPT, USE_EXTERNAL_MIP_SCRIPT,
2378 DISK_PARAMS_OPT, HV_STATE_OPT, DISK_STATE_OPT] + SUBMIT_OPTS +
2379 [ENABLED_DISK_TEMPLATES_OPT, IPOLICY_STD_SPECS_OPT, MODIFY_ETCHOSTS_OPT,
2380 ENABLED_USER_SHUTDOWN_OPT] +
2381 INSTANCE_POLICY_OPTS +
2382 [GLOBAL_FILEDIR_OPT, GLOBAL_SHARED_FILEDIR_OPT, ZEROING_IMAGE_OPT,
2383 COMPRESSION_TOOLS_OPT],
2384 "[opts...]",
2385 "Alters the parameters of the cluster"),
2386 "renew-crypto": (
2387 RenewCrypto, ARGS_NONE,
2388 [NEW_CLUSTER_CERT_OPT, NEW_RAPI_CERT_OPT, RAPI_CERT_OPT,
2389 NEW_CONFD_HMAC_KEY_OPT, FORCE_OPT,
2390 NEW_CLUSTER_DOMAIN_SECRET_OPT, CLUSTER_DOMAIN_SECRET_OPT,
2391 NEW_SPICE_CERT_OPT, SPICE_CERT_OPT, SPICE_CACERT_OPT,
2392 NEW_NODE_CERT_OPT],
2393 "[opts...]",
2394 "Renews cluster certificates, keys and secrets"),
2395 "epo": (
2396 Epo, [ArgUnknown()],
2397 [FORCE_OPT, ON_OPT, GROUPS_OPT, ALL_OPT, OOB_TIMEOUT_OPT,
2398 SHUTDOWN_TIMEOUT_OPT, POWER_DELAY_OPT],
2399 "[opts...] [args]",
2400 "Performs an emergency power-off on given args"),
2401 "activate-master-ip": (
2402 ActivateMasterIp, ARGS_NONE, [], "", "Activates the master IP"),
2403 "deactivate-master-ip": (
2404 DeactivateMasterIp, ARGS_NONE, [CONFIRM_OPT], "",
2405 "Deactivates the master IP"),
2406 "show-ispecs-cmd": (
2407 ShowCreateCommand, ARGS_NONE, [], "",
2408 "Show the command line to re-create the cluster"),
2409 "upgrade": (
2410 UpgradeGanetiCommand, ARGS_NONE, [TO_OPT, RESUME_OPT], "",
2411 "Upgrade (or downgrade) to a new Ganeti version"),
2412 }
2413
2414
2415 #: dictionary with aliases for commands
2416 aliases = {
2417 "masterfailover": "master-failover",
2418 "show": "info",
2419 }
2420
2421
2422 def Main():
2423 return GenericMain(commands, override={"tag_type": constants.TAG_CLUSTER},
2424 aliases=aliases)