Renew-crypto: stop daemons on master node first
[ganeti-github.git] / lib / cli.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Google Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions are
9 # met:
10 #
11 # 1. Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
13 #
14 # 2. Redistributions in binary form must reproduce the above copyright
15 # notice, this list of conditions and the following disclaimer in the
16 # documentation and/or other materials provided with the distribution.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
19 # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
20 # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30
31 """Module dealing with command line parsing"""
32
33
34 import sys
35 import textwrap
36 import os.path
37 import time
38 import logging
39 import errno
40 import itertools
41 import shlex
42 from cStringIO import StringIO
43
44 from ganeti import utils
45 from ganeti import errors
46 from ganeti import constants
47 from ganeti import opcodes
48 import ganeti.rpc.errors as rpcerr
49 import ganeti.rpc.node as rpc
50 from ganeti import ssh
51 from ganeti import compat
52 from ganeti import netutils
53 from ganeti import qlang
54 from ganeti import objects
55 from ganeti import pathutils
56 from ganeti import serializer
57 import ganeti.cli_opts
58 # Import constants
59 from ganeti.cli_opts import * # pylint: disable=W0401
60
61 from ganeti.runtime import (GetClient)
62
63 from optparse import (OptionParser, TitledHelpFormatter)
64
65
66 __all__ = [
67 # Generic functions for CLI programs
68 "ConfirmOperation",
69 "CreateIPolicyFromOpts",
70 "GenericMain",
71 "GenericInstanceCreate",
72 "GenericList",
73 "GenericListFields",
74 "GetClient",
75 "GetOnlineNodes",
76 "GetNodesSshPorts",
77 "GetNodeUUIDs",
78 "JobExecutor",
79 "JobSubmittedException",
80 "ParseTimespec",
81 "RunWhileClusterStopped",
82 "RunWhileDaemonsStopped",
83 "SubmitOpCode",
84 "SubmitOpCodeToDrainedQueue",
85 "SubmitOrSend",
86 "UsesRPC",
87 # Formatting functions
88 "ToStderr", "ToStdout",
89 "FormatError",
90 "FormatQueryResult",
91 "FormatParamsDictInfo",
92 "FormatPolicyInfo",
93 "PrintIPolicyCommand",
94 "PrintGenericInfo",
95 "GenerateTable",
96 "AskUser",
97 "FormatTimestamp",
98 "FormatLogMessage",
99 # Tags functions
100 "ListTags",
101 "AddTags",
102 "RemoveTags",
103 # command line options support infrastructure
104 "ARGS_MANY_INSTANCES",
105 "ARGS_MANY_NODES",
106 "ARGS_MANY_GROUPS",
107 "ARGS_MANY_NETWORKS",
108 "ARGS_MANY_FILTERS",
109 "ARGS_NONE",
110 "ARGS_ONE_INSTANCE",
111 "ARGS_ONE_NODE",
112 "ARGS_ONE_GROUP",
113 "ARGS_ONE_OS",
114 "ARGS_ONE_NETWORK",
115 "ARGS_ONE_FILTER",
116 "ArgChoice",
117 "ArgCommand",
118 "ArgFile",
119 "ArgGroup",
120 "ArgHost",
121 "ArgInstance",
122 "ArgJobId",
123 "ArgNetwork",
124 "ArgNode",
125 "ArgOs",
126 "ArgExtStorage",
127 "ArgFilter",
128 "ArgSuggest",
129 "ArgUnknown",
130 "FixHvParams",
131 "SplitNodeOption",
132 "CalculateOSNames",
133 "ParseFields",
134 ] + ganeti.cli_opts.__all__ # Command line options
135
136 # Query result status for clients
137 (QR_NORMAL,
138 QR_UNKNOWN,
139 QR_INCOMPLETE) = range(3)
140
141 #: Maximum batch size for ChooseJob
142 _CHOOSE_BATCH = 25
143
144
145 # constants used to create InstancePolicy dictionary
146 TISPECS_GROUP_TYPES = {
147 constants.ISPECS_MIN: constants.VTYPE_INT,
148 constants.ISPECS_MAX: constants.VTYPE_INT,
149 }
150
151 TISPECS_CLUSTER_TYPES = {
152 constants.ISPECS_MIN: constants.VTYPE_INT,
153 constants.ISPECS_MAX: constants.VTYPE_INT,
154 constants.ISPECS_STD: constants.VTYPE_INT,
155 }
156
157 #: User-friendly names for query2 field types
158 _QFT_NAMES = {
159 constants.QFT_UNKNOWN: "Unknown",
160 constants.QFT_TEXT: "Text",
161 constants.QFT_BOOL: "Boolean",
162 constants.QFT_NUMBER: "Number",
163 constants.QFT_NUMBER_FLOAT: "Floating-point number",
164 constants.QFT_UNIT: "Storage size",
165 constants.QFT_TIMESTAMP: "Timestamp",
166 constants.QFT_OTHER: "Custom",
167 }
168
169
170 class _Argument(object):
171 def __init__(self, min=0, max=None): # pylint: disable=W0622
172 self.min = min
173 self.max = max
174
175 def __repr__(self):
176 return ("<%s min=%s max=%s>" %
177 (self.__class__.__name__, self.min, self.max))
178
179
180 class ArgSuggest(_Argument):
181 """Suggesting argument.
182
183 Value can be any of the ones passed to the constructor.
184
185 """
186 # pylint: disable=W0622
187 def __init__(self, min=0, max=None, choices=None):
188 _Argument.__init__(self, min=min, max=max)
189 self.choices = choices
190
191 def __repr__(self):
192 return ("<%s min=%s max=%s choices=%r>" %
193 (self.__class__.__name__, self.min, self.max, self.choices))
194
195
196 class ArgChoice(ArgSuggest):
197 """Choice argument.
198
199 Value can be any of the ones passed to the constructor. Like L{ArgSuggest},
200 but value must be one of the choices.
201
202 """
203
204
205 class ArgUnknown(_Argument):
206 """Unknown argument to program (e.g. determined at runtime).
207
208 """
209
210
211 class ArgInstance(_Argument):
212 """Instances argument.
213
214 """
215
216
217 class ArgNode(_Argument):
218 """Node argument.
219
220 """
221
222
223 class ArgNetwork(_Argument):
224 """Network argument.
225
226 """
227
228
229 class ArgGroup(_Argument):
230 """Node group argument.
231
232 """
233
234
235 class ArgJobId(_Argument):
236 """Job ID argument.
237
238 """
239
240
241 class ArgFile(_Argument):
242 """File path argument.
243
244 """
245
246
247 class ArgCommand(_Argument):
248 """Command argument.
249
250 """
251
252
253 class ArgHost(_Argument):
254 """Host argument.
255
256 """
257
258
259 class ArgOs(_Argument):
260 """OS argument.
261
262 """
263
264
265 class ArgExtStorage(_Argument):
266 """ExtStorage argument.
267
268 """
269
270
271 class ArgFilter(_Argument):
272 """Filter UUID argument.
273
274 """
275
276
277 ARGS_NONE = []
278 ARGS_MANY_INSTANCES = [ArgInstance()]
279 ARGS_MANY_NETWORKS = [ArgNetwork()]
280 ARGS_MANY_NODES = [ArgNode()]
281 ARGS_MANY_GROUPS = [ArgGroup()]
282 ARGS_MANY_FILTERS = [ArgFilter()]
283 ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)]
284 ARGS_ONE_NETWORK = [ArgNetwork(min=1, max=1)]
285 ARGS_ONE_NODE = [ArgNode(min=1, max=1)]
286 ARGS_ONE_GROUP = [ArgGroup(min=1, max=1)]
287 ARGS_ONE_OS = [ArgOs(min=1, max=1)]
288 ARGS_ONE_FILTER = [ArgFilter(min=1, max=1)]
289
290
291 def _ExtractTagsObject(opts, args):
292 """Extract the tag type object.
293
294 Note that this function will modify its args parameter.
295
296 """
297 if not hasattr(opts, "tag_type"):
298 raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
299 kind = opts.tag_type
300 if kind == constants.TAG_CLUSTER:
301 retval = kind, ""
302 elif kind in (constants.TAG_NODEGROUP,
303 constants.TAG_NODE,
304 constants.TAG_NETWORK,
305 constants.TAG_INSTANCE):
306 if not args:
307 raise errors.OpPrereqError("no arguments passed to the command",
308 errors.ECODE_INVAL)
309 name = args.pop(0)
310 retval = kind, name
311 else:
312 raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
313 return retval
314
315
316 def _ExtendTags(opts, args):
317 """Extend the args if a source file has been given.
318
319 This function will extend the tags with the contents of the file
320 passed in the 'tags_source' attribute of the opts parameter. A file
321 named '-' will be replaced by stdin.
322
323 """
324 fname = opts.tags_source
325 if fname is None:
326 return
327 if fname == "-":
328 new_fh = sys.stdin
329 else:
330 new_fh = open(fname, "r")
331 new_data = []
332 try:
333 # we don't use the nice 'new_data = [line.strip() for line in fh]'
334 # because of python bug 1633941
335 while True:
336 line = new_fh.readline()
337 if not line:
338 break
339 new_data.append(line.strip())
340 finally:
341 new_fh.close()
342 args.extend(new_data)
343
344
345 def ListTags(opts, args):
346 """List the tags on a given object.
347
348 This is a generic implementation that knows how to deal with all
349 three cases of tag objects (cluster, node, instance). The opts
350 argument is expected to contain a tag_type field denoting what
351 object type we work on.
352
353 """
354 kind, name = _ExtractTagsObject(opts, args)
355 cl = GetClient()
356 result = cl.QueryTags(kind, name)
357 result = list(result)
358 result.sort()
359 for tag in result:
360 ToStdout(tag)
361
362
363 def AddTags(opts, args):
364 """Add tags on a given object.
365
366 This is a generic implementation that knows how to deal with all
367 three cases of tag objects (cluster, node, instance). The opts
368 argument is expected to contain a tag_type field denoting what
369 object type we work on.
370
371 """
372 kind, name = _ExtractTagsObject(opts, args)
373 _ExtendTags(opts, args)
374 if not args:
375 raise errors.OpPrereqError("No tags to be added", errors.ECODE_INVAL)
376 op = opcodes.OpTagsSet(kind=kind, name=name, tags=args)
377 SubmitOrSend(op, opts)
378
379
380 def RemoveTags(opts, args):
381 """Remove tags from a given object.
382
383 This is a generic implementation that knows how to deal with all
384 three cases of tag objects (cluster, node, instance). The opts
385 argument is expected to contain a tag_type field denoting what
386 object type we work on.
387
388 """
389 kind, name = _ExtractTagsObject(opts, args)
390 _ExtendTags(opts, args)
391 if not args:
392 raise errors.OpPrereqError("No tags to be removed", errors.ECODE_INVAL)
393 op = opcodes.OpTagsDel(kind=kind, name=name, tags=args)
394 SubmitOrSend(op, opts)
395
396
397 class _ShowUsage(Exception):
398 """Exception class for L{_ParseArgs}.
399
400 """
401 def __init__(self, exit_error):
402 """Initializes instances of this class.
403
404 @type exit_error: bool
405 @param exit_error: Whether to report failure on exit
406
407 """
408 Exception.__init__(self)
409 self.exit_error = exit_error
410
411
412 class _ShowVersion(Exception):
413 """Exception class for L{_ParseArgs}.
414
415 """
416
417
418 def _ParseArgs(binary, argv, commands, aliases, env_override):
419 """Parser for the command line arguments.
420
421 This function parses the arguments and returns the function which
422 must be executed together with its (modified) arguments.
423
424 @param binary: Script name
425 @param argv: Command line arguments
426 @param commands: Dictionary containing command definitions
427 @param aliases: dictionary with command aliases {"alias": "target", ...}
428 @param env_override: list of env variables allowed for default args
429 @raise _ShowUsage: If usage description should be shown
430 @raise _ShowVersion: If version should be shown
431
432 """
433 assert not (env_override - set(commands))
434 assert not (set(aliases.keys()) & set(commands.keys()))
435
436 if len(argv) > 1:
437 cmd = argv[1]
438 else:
439 # No option or command given
440 raise _ShowUsage(exit_error=True)
441
442 if cmd == "--version":
443 raise _ShowVersion()
444 elif cmd == "--help":
445 raise _ShowUsage(exit_error=False)
446 elif not (cmd in commands or cmd in aliases):
447 raise _ShowUsage(exit_error=True)
448
449 # get command, unalias it, and look it up in commands
450 if cmd in aliases:
451 if aliases[cmd] not in commands:
452 raise errors.ProgrammerError("Alias '%s' maps to non-existing"
453 " command '%s'" % (cmd, aliases[cmd]))
454
455 cmd = aliases[cmd]
456
457 if cmd in env_override:
458 args_env_name = ("%s_%s" % (binary.replace("-", "_"), cmd)).upper()
459 env_args = os.environ.get(args_env_name)
460 if env_args:
461 argv = utils.InsertAtPos(argv, 2, shlex.split(env_args))
462
463 func, args_def, parser_opts, usage, description = commands[cmd]
464 parser = OptionParser(option_list=parser_opts + COMMON_OPTS,
465 description=description,
466 formatter=TitledHelpFormatter(),
467 usage="%%prog %s %s" % (cmd, usage))
468 parser.disable_interspersed_args()
469 options, args = parser.parse_args(args=argv[2:])
470
471 if not _CheckArguments(cmd, args_def, args):
472 return None, None, None
473
474 return func, options, args
475
476
477 def _FormatUsage(binary, commands):
478 """Generates a nice description of all commands.
479
480 @param binary: Script name
481 @param commands: Dictionary containing command definitions
482
483 """
484 # compute the max line length for cmd + usage
485 mlen = min(60, max(map(len, commands)))
486
487 yield "Usage: %s {command} [options...] [argument...]" % binary
488 yield "%s <command> --help to see details, or man %s" % (binary, binary)
489 yield ""
490 yield "Commands:"
491
492 # and format a nice command list
493 for (cmd, (_, _, _, _, help_text)) in sorted(commands.items()):
494 help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
495 yield " %-*s - %s" % (mlen, cmd, help_lines.pop(0))
496 for line in help_lines:
497 yield " %-*s %s" % (mlen, "", line)
498
499 yield ""
500
501
502 def _CheckArguments(cmd, args_def, args):
503 """Verifies the arguments using the argument definition.
504
505 Algorithm:
506
507 1. Abort with error if values specified by user but none expected.
508
509 1. For each argument in definition
510
511 1. Keep running count of minimum number of values (min_count)
512 1. Keep running count of maximum number of values (max_count)
513 1. If it has an unlimited number of values
514
515 1. Abort with error if it's not the last argument in the definition
516
517 1. If last argument has limited number of values
518
519 1. Abort with error if number of values doesn't match or is too large
520
521 1. Abort with error if user didn't pass enough values (min_count)
522
523 """
524 if args and not args_def:
525 ToStderr("Error: Command %s expects no arguments", cmd)
526 return False
527
528 min_count = None
529 max_count = None
530 check_max = None
531
532 last_idx = len(args_def) - 1
533
534 for idx, arg in enumerate(args_def):
535 if min_count is None:
536 min_count = arg.min
537 elif arg.min is not None:
538 min_count += arg.min
539
540 if max_count is None:
541 max_count = arg.max
542 elif arg.max is not None:
543 max_count += arg.max
544
545 if idx == last_idx:
546 check_max = (arg.max is not None)
547
548 elif arg.max is None:
549 raise errors.ProgrammerError("Only the last argument can have max=None")
550
551 if check_max:
552 # Command with exact number of arguments
553 if (min_count is not None and max_count is not None and
554 min_count == max_count and len(args) != min_count):
555 ToStderr("Error: Command %s expects %d argument(s)", cmd, min_count)
556 return False
557
558 # Command with limited number of arguments
559 if max_count is not None and len(args) > max_count:
560 ToStderr("Error: Command %s expects only %d argument(s)",
561 cmd, max_count)
562 return False
563
564 # Command with some required arguments
565 if min_count is not None and len(args) < min_count:
566 ToStderr("Error: Command %s expects at least %d argument(s)",
567 cmd, min_count)
568 return False
569
570 return True
571
572
573 def SplitNodeOption(value):
574 """Splits the value of a --node option.
575
576 """
577 if value and ":" in value:
578 return value.split(":", 1)
579 else:
580 return (value, None)
581
582
583 def CalculateOSNames(os_name, os_variants):
584 """Calculates all the names an OS can be called, according to its variants.
585
586 @type os_name: string
587 @param os_name: base name of the os
588 @type os_variants: list or None
589 @param os_variants: list of supported variants
590 @rtype: list
591 @return: list of valid names
592
593 """
594 if os_variants:
595 return ["%s+%s" % (os_name, v) for v in os_variants]
596 else:
597 return [os_name]
598
599
600 def ParseFields(selected, default):
601 """Parses the values of "--field"-like options.
602
603 @type selected: string or None
604 @param selected: User-selected options
605 @type default: list
606 @param default: Default fields
607
608 """
609 if selected is None:
610 return default
611
612 if selected.startswith("+"):
613 return default + selected[1:].split(",")
614
615 return selected.split(",")
616
617
618 UsesRPC = rpc.RunWithRPC
619
620
621 def AskUser(text, choices=None):
622 """Ask the user a question.
623
624 @param text: the question to ask
625
626 @param choices: list with elements tuples (input_char, return_value,
627 description); if not given, it will default to: [('y', True,
628 'Perform the operation'), ('n', False, 'Do no do the operation')];
629 note that the '?' char is reserved for help
630
631 @return: one of the return values from the choices list; if input is
632 not possible (i.e. not running with a tty, we return the last
633 entry from the list
634
635 """
636 if choices is None:
637 choices = [("y", True, "Perform the operation"),
638 ("n", False, "Do not perform the operation")]
639 if not choices or not isinstance(choices, list):
640 raise errors.ProgrammerError("Invalid choices argument to AskUser")
641 for entry in choices:
642 if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == "?":
643 raise errors.ProgrammerError("Invalid choices element to AskUser")
644
645 answer = choices[-1][1]
646 new_text = []
647 for line in text.splitlines():
648 new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
649 text = "\n".join(new_text)
650 try:
651 f = file("/dev/tty", "a+")
652 except IOError:
653 return answer
654 try:
655 chars = [entry[0] for entry in choices]
656 chars[-1] = "[%s]" % chars[-1]
657 chars.append("?")
658 maps = dict([(entry[0], entry[1]) for entry in choices])
659 while True:
660 f.write(text)
661 f.write("\n")
662 f.write("/".join(chars))
663 f.write(": ")
664 line = f.readline(2).strip().lower()
665 if line in maps:
666 answer = maps[line]
667 break
668 elif line == "?":
669 for entry in choices:
670 f.write(" %s - %s\n" % (entry[0], entry[2]))
671 f.write("\n")
672 continue
673 finally:
674 f.close()
675 return answer
676
677
678 class JobSubmittedException(Exception):
679 """Job was submitted, client should exit.
680
681 This exception has one argument, the ID of the job that was
682 submitted. The handler should print this ID.
683
684 This is not an error, just a structured way to exit from clients.
685
686 """
687
688
689 def SendJob(ops, cl=None):
690 """Function to submit an opcode without waiting for the results.
691
692 @type ops: list
693 @param ops: list of opcodes
694 @type cl: luxi.Client
695 @param cl: the luxi client to use for communicating with the master;
696 if None, a new client will be created
697
698 """
699 if cl is None:
700 cl = GetClient()
701
702 job_id = cl.SubmitJob(ops)
703
704 return job_id
705
706
707 def GenericPollJob(job_id, cbs, report_cbs):
708 """Generic job-polling function.
709
710 @type job_id: number
711 @param job_id: Job ID
712 @type cbs: Instance of L{JobPollCbBase}
713 @param cbs: Data callbacks
714 @type report_cbs: Instance of L{JobPollReportCbBase}
715 @param report_cbs: Reporting callbacks
716
717 """
718 prev_job_info = None
719 prev_logmsg_serial = None
720
721 status = None
722
723 while True:
724 result = cbs.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
725 prev_logmsg_serial)
726 if not result:
727 # job not found, go away!
728 raise errors.JobLost("Job with id %s lost" % job_id)
729
730 if result == constants.JOB_NOTCHANGED:
731 report_cbs.ReportNotChanged(job_id, status)
732
733 # Wait again
734 continue
735
736 # Split result, a tuple of (field values, log entries)
737 (job_info, log_entries) = result
738 (status, ) = job_info
739
740 if log_entries:
741 for log_entry in log_entries:
742 (serial, timestamp, log_type, message) = log_entry
743 report_cbs.ReportLogMessage(job_id, serial, timestamp,
744 log_type, message)
745 prev_logmsg_serial = max(prev_logmsg_serial, serial)
746
747 # TODO: Handle canceled and archived jobs
748 elif status in (constants.JOB_STATUS_SUCCESS,
749 constants.JOB_STATUS_ERROR,
750 constants.JOB_STATUS_CANCELING,
751 constants.JOB_STATUS_CANCELED):
752 break
753
754 prev_job_info = job_info
755
756 jobs = cbs.QueryJobs([job_id], ["status", "opstatus", "opresult"])
757 if not jobs:
758 raise errors.JobLost("Job with id %s lost" % job_id)
759
760 status, opstatus, result = jobs[0]
761
762 if status == constants.JOB_STATUS_SUCCESS:
763 return result
764
765 if status in (constants.JOB_STATUS_CANCELING, constants.JOB_STATUS_CANCELED):
766 raise errors.OpExecError("Job was canceled")
767
768 has_ok = False
769 for idx, (status, msg) in enumerate(zip(opstatus, result)):
770 if status == constants.OP_STATUS_SUCCESS:
771 has_ok = True
772 elif status == constants.OP_STATUS_ERROR:
773 errors.MaybeRaise(msg)
774
775 if has_ok:
776 raise errors.OpExecError("partial failure (opcode %d): %s" %
777 (idx, msg))
778
779 raise errors.OpExecError(str(msg))
780
781 # default failure mode
782 raise errors.OpExecError(result)
783
784
785 class JobPollCbBase(object):
786 """Base class for L{GenericPollJob} callbacks.
787
788 """
789 def __init__(self):
790 """Initializes this class.
791
792 """
793
794 def WaitForJobChangeOnce(self, job_id, fields,
795 prev_job_info, prev_log_serial):
796 """Waits for changes on a job.
797
798 """
799 raise NotImplementedError()
800
801 def QueryJobs(self, job_ids, fields):
802 """Returns the selected fields for the selected job IDs.
803
804 @type job_ids: list of numbers
805 @param job_ids: Job IDs
806 @type fields: list of strings
807 @param fields: Fields
808
809 """
810 raise NotImplementedError()
811
812
813 class JobPollReportCbBase(object):
814 """Base class for L{GenericPollJob} reporting callbacks.
815
816 """
817 def __init__(self):
818 """Initializes this class.
819
820 """
821
822 def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
823 """Handles a log message.
824
825 """
826 raise NotImplementedError()
827
828 def ReportNotChanged(self, job_id, status):
829 """Called for if a job hasn't changed in a while.
830
831 @type job_id: number
832 @param job_id: Job ID
833 @type status: string or None
834 @param status: Job status if available
835
836 """
837 raise NotImplementedError()
838
839
840 class _LuxiJobPollCb(JobPollCbBase):
841 def __init__(self, cl):
842 """Initializes this class.
843
844 """
845 JobPollCbBase.__init__(self)
846 self.cl = cl
847
848 def WaitForJobChangeOnce(self, job_id, fields,
849 prev_job_info, prev_log_serial):
850 """Waits for changes on a job.
851
852 """
853 return self.cl.WaitForJobChangeOnce(job_id, fields,
854 prev_job_info, prev_log_serial)
855
856 def QueryJobs(self, job_ids, fields):
857 """Returns the selected fields for the selected job IDs.
858
859 """
860 return self.cl.QueryJobs(job_ids, fields)
861
862
863 class FeedbackFnJobPollReportCb(JobPollReportCbBase):
864 def __init__(self, feedback_fn):
865 """Initializes this class.
866
867 """
868 JobPollReportCbBase.__init__(self)
869
870 self.feedback_fn = feedback_fn
871
872 assert callable(feedback_fn)
873
874 def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
875 """Handles a log message.
876
877 """
878 self.feedback_fn((timestamp, log_type, log_msg))
879
880 def ReportNotChanged(self, job_id, status):
881 """Called if a job hasn't changed in a while.
882
883 """
884 # Ignore
885
886
887 class StdioJobPollReportCb(JobPollReportCbBase):
888 def __init__(self):
889 """Initializes this class.
890
891 """
892 JobPollReportCbBase.__init__(self)
893
894 self.notified_queued = False
895 self.notified_waitlock = False
896
897 def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
898 """Handles a log message.
899
900 """
901 ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)),
902 FormatLogMessage(log_type, log_msg))
903
904 def ReportNotChanged(self, job_id, status):
905 """Called if a job hasn't changed in a while.
906
907 """
908 if status is None:
909 return
910
911 if status == constants.JOB_STATUS_QUEUED and not self.notified_queued:
912 ToStderr("Job %s is waiting in queue", job_id)
913 self.notified_queued = True
914
915 elif status == constants.JOB_STATUS_WAITING and not self.notified_waitlock:
916 ToStderr("Job %s is trying to acquire all necessary locks", job_id)
917 self.notified_waitlock = True
918
919
920 def FormatLogMessage(log_type, log_msg):
921 """Formats a job message according to its type.
922
923 """
924 if log_type != constants.ELOG_MESSAGE:
925 log_msg = str(log_msg)
926
927 return utils.SafeEncode(log_msg)
928
929
930 def PollJob(job_id, cl=None, feedback_fn=None, reporter=None):
931 """Function to poll for the result of a job.
932
933 @type job_id: job identified
934 @param job_id: the job to poll for results
935 @type cl: luxi.Client
936 @param cl: the luxi client to use for communicating with the master;
937 if None, a new client will be created
938
939 """
940 if cl is None:
941 cl = GetClient()
942
943 if reporter is None:
944 if feedback_fn:
945 reporter = FeedbackFnJobPollReportCb(feedback_fn)
946 else:
947 reporter = StdioJobPollReportCb()
948 elif feedback_fn:
949 raise errors.ProgrammerError("Can't specify reporter and feedback function")
950
951 return GenericPollJob(job_id, _LuxiJobPollCb(cl), reporter)
952
953
954 def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None, reporter=None):
955 """Legacy function to submit an opcode.
956
957 This is just a simple wrapper over the construction of the processor
958 instance. It should be extended to better handle feedback and
959 interaction functions.
960
961 """
962 if cl is None:
963 cl = GetClient()
964
965 SetGenericOpcodeOpts([op], opts)
966
967 job_id = SendJob([op], cl=cl)
968 if hasattr(opts, "print_jobid") and opts.print_jobid:
969 ToStdout("%d" % job_id)
970
971 op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn,
972 reporter=reporter)
973
974 return op_results[0]
975
976
977 def SubmitOpCodeToDrainedQueue(op):
978 """Forcefully insert a job in the queue, even if it is drained.
979
980 """
981 cl = GetClient()
982 job_id = cl.SubmitJobToDrainedQueue([op])
983 op_results = PollJob(job_id, cl=cl)
984 return op_results[0]
985
986
987 def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
988 """Wrapper around SubmitOpCode or SendJob.
989
990 This function will decide, based on the 'opts' parameter, whether to
991 submit and wait for the result of the opcode (and return it), or
992 whether to just send the job and print its identifier. It is used in
993 order to simplify the implementation of the '--submit' option.
994
995 It will also process the opcodes if we're sending the via SendJob
996 (otherwise SubmitOpCode does it).
997
998 """
999 if opts and opts.submit_only:
1000 job = [op]
1001 SetGenericOpcodeOpts(job, opts)
1002 job_id = SendJob(job, cl=cl)
1003 if opts.print_jobid:
1004 ToStdout("%d" % job_id)
1005 raise JobSubmittedException(job_id)
1006 else:
1007 return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn, opts=opts)
1008
1009
1010 def _InitReasonTrail(op, opts):
1011 """Builds the first part of the reason trail
1012
1013 Builds the initial part of the reason trail, adding the user provided reason
1014 (if it exists) and the name of the command starting the operation.
1015
1016 @param op: the opcode the reason trail will be added to
1017 @param opts: the command line options selected by the user
1018
1019 """
1020 assert len(sys.argv) >= 2
1021 trail = []
1022
1023 if opts.reason:
1024 trail.append((constants.OPCODE_REASON_SRC_USER,
1025 opts.reason,
1026 utils.EpochNano()))
1027
1028 binary = os.path.basename(sys.argv[0])
1029 source = "%s:%s" % (constants.OPCODE_REASON_SRC_CLIENT, binary)
1030 command = sys.argv[1]
1031 trail.append((source, command, utils.EpochNano()))
1032 op.reason = trail
1033
1034
1035 def SetGenericOpcodeOpts(opcode_list, options):
1036 """Processor for generic options.
1037
1038 This function updates the given opcodes based on generic command
1039 line options (like debug, dry-run, etc.).
1040
1041 @param opcode_list: list of opcodes
1042 @param options: command line options or None
1043 @return: None (in-place modification)
1044
1045 """
1046 if not options:
1047 return
1048 for op in opcode_list:
1049 op.debug_level = options.debug
1050 if hasattr(options, "dry_run"):
1051 op.dry_run = options.dry_run
1052 if getattr(options, "priority", None) is not None:
1053 op.priority = options.priority
1054 _InitReasonTrail(op, options)
1055
1056
1057 def FormatError(err):
1058 """Return a formatted error message for a given error.
1059
1060 This function takes an exception instance and returns a tuple
1061 consisting of two values: first, the recommended exit code, and
1062 second, a string describing the error message (not
1063 newline-terminated).
1064
1065 """
1066 retcode = 1
1067 obuf = StringIO()
1068 msg = str(err)
1069 if isinstance(err, errors.ConfigurationError):
1070 txt = "Corrupt configuration file: %s" % msg
1071 logging.error(txt)
1072 obuf.write(txt + "\n")
1073 obuf.write("Aborting.")
1074 retcode = 2
1075 elif isinstance(err, errors.HooksAbort):
1076 obuf.write("Failure: hooks execution failed:\n")
1077 for node, script, out in err.args[0]:
1078 if out:
1079 obuf.write(" node: %s, script: %s, output: %s\n" %
1080 (node, script, out))
1081 else:
1082 obuf.write(" node: %s, script: %s (no output)\n" %
1083 (node, script))
1084 elif isinstance(err, errors.HooksFailure):
1085 obuf.write("Failure: hooks general failure: %s" % msg)
1086 elif isinstance(err, errors.ResolverError):
1087 this_host = netutils.Hostname.GetSysName()
1088 if err.args[0] == this_host:
1089 msg = "Failure: can't resolve my own hostname ('%s')"
1090 else:
1091 msg = "Failure: can't resolve hostname '%s'"
1092 obuf.write(msg % err.args[0])
1093 elif isinstance(err, errors.OpPrereqError):
1094 if len(err.args) == 2:
1095 obuf.write("Failure: prerequisites not met for this"
1096 " operation:\nerror type: %s, error details:\n%s" %
1097 (err.args[1], err.args[0]))
1098 else:
1099 obuf.write("Failure: prerequisites not met for this"
1100 " operation:\n%s" % msg)
1101 elif isinstance(err, errors.OpExecError):
1102 obuf.write("Failure: command execution error:\n%s" % msg)
1103 elif isinstance(err, errors.TagError):
1104 obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
1105 elif isinstance(err, errors.JobQueueDrainError):
1106 obuf.write("Failure: the job queue is marked for drain and doesn't"
1107 " accept new requests\n")
1108 elif isinstance(err, errors.JobQueueFull):
1109 obuf.write("Failure: the job queue is full and doesn't accept new"
1110 " job submissions until old jobs are archived\n")
1111 elif isinstance(err, errors.TypeEnforcementError):
1112 obuf.write("Parameter Error: %s" % msg)
1113 elif isinstance(err, errors.ParameterError):
1114 obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
1115 elif isinstance(err, rpcerr.NoMasterError):
1116 if err.args[0] == pathutils.MASTER_SOCKET:
1117 daemon = "the master daemon"
1118 elif err.args[0] == pathutils.QUERY_SOCKET:
1119 daemon = "the config daemon"
1120 else:
1121 daemon = "socket '%s'" % str(err.args[0])
1122 obuf.write("Cannot communicate with %s.\nIs the process running"
1123 " and listening for connections?" % daemon)
1124 elif isinstance(err, rpcerr.TimeoutError):
1125 obuf.write("Timeout while talking to the master daemon. Jobs might have"
1126 " been submitted and will continue to run even if the call"
1127 " timed out. Useful commands in this situation are \"gnt-job"
1128 " list\", \"gnt-job cancel\" and \"gnt-job watch\". Error:\n")
1129 obuf.write(msg)
1130 elif isinstance(err, rpcerr.PermissionError):
1131 obuf.write("It seems you don't have permissions to connect to the"
1132 " master daemon.\nPlease retry as a different user.")
1133 elif isinstance(err, rpcerr.ProtocolError):
1134 obuf.write("Unhandled protocol error while talking to the master daemon:\n"
1135 "%s" % msg)
1136 elif isinstance(err, errors.JobLost):
1137 obuf.write("Error checking job status: %s" % msg)
1138 elif isinstance(err, errors.QueryFilterParseError):
1139 obuf.write("Error while parsing query filter: %s\n" % err.args[0])
1140 obuf.write("\n".join(err.GetDetails()))
1141 elif isinstance(err, errors.GenericError):
1142 obuf.write("Unhandled Ganeti error: %s" % msg)
1143 elif isinstance(err, JobSubmittedException):
1144 obuf.write("JobID: %s\n" % err.args[0])
1145 retcode = 0
1146 else:
1147 obuf.write("Unhandled exception: %s" % msg)
1148 return retcode, obuf.getvalue().rstrip("\n")
1149
1150
1151 def GenericMain(commands, override=None, aliases=None,
1152 env_override=frozenset()):
1153 """Generic main function for all the gnt-* commands.
1154
1155 @param commands: a dictionary with a special structure, see the design doc
1156 for command line handling.
1157 @param override: if not None, we expect a dictionary with keys that will
1158 override command line options; this can be used to pass
1159 options from the scripts to generic functions
1160 @param aliases: dictionary with command aliases {'alias': 'target, ...}
1161 @param env_override: list of environment names which are allowed to submit
1162 default args for commands
1163
1164 """
1165 # save the program name and the entire command line for later logging
1166 if sys.argv:
1167 binary = os.path.basename(sys.argv[0])
1168 if not binary:
1169 binary = sys.argv[0]
1170
1171 if len(sys.argv) >= 2:
1172 logname = utils.ShellQuoteArgs([binary, sys.argv[1]])
1173 else:
1174 logname = binary
1175
1176 cmdline = utils.ShellQuoteArgs([binary] + sys.argv[1:])
1177 else:
1178 binary = "<unknown program>"
1179 cmdline = "<unknown>"
1180
1181 if aliases is None:
1182 aliases = {}
1183
1184 try:
1185 (func, options, args) = _ParseArgs(binary, sys.argv, commands, aliases,
1186 env_override)
1187 except _ShowVersion:
1188 ToStdout("%s (ganeti %s) %s", binary, constants.VCS_VERSION,
1189 constants.RELEASE_VERSION)
1190 return constants.EXIT_SUCCESS
1191 except _ShowUsage, err:
1192 for line in _FormatUsage(binary, commands):
1193 ToStdout(line)
1194
1195 if err.exit_error:
1196 return constants.EXIT_FAILURE
1197 else:
1198 return constants.EXIT_SUCCESS
1199 except errors.ParameterError, err:
1200 result, err_msg = FormatError(err)
1201 ToStderr(err_msg)
1202 return 1
1203
1204 if func is None: # parse error
1205 return 1
1206
1207 if override is not None:
1208 for key, val in override.iteritems():
1209 setattr(options, key, val)
1210
1211 utils.SetupLogging(pathutils.LOG_COMMANDS, logname, debug=options.debug,
1212 stderr_logging=True)
1213
1214 logging.debug("Command line: %s", cmdline)
1215
1216 try:
1217 result = func(options, args)
1218 except (errors.GenericError, rpcerr.ProtocolError,
1219 JobSubmittedException), err:
1220 result, err_msg = FormatError(err)
1221 logging.exception("Error during command processing")
1222 ToStderr(err_msg)
1223 except KeyboardInterrupt:
1224 result = constants.EXIT_FAILURE
1225 ToStderr("Aborted. Note that if the operation created any jobs, they"
1226 " might have been submitted and"
1227 " will continue to run in the background.")
1228 except IOError, err:
1229 if err.errno == errno.EPIPE:
1230 # our terminal went away, we'll exit
1231 sys.exit(constants.EXIT_FAILURE)
1232 else:
1233 raise
1234
1235 return result
1236
1237
1238 def ParseNicOption(optvalue):
1239 """Parses the value of the --net option(s).
1240
1241 """
1242 try:
1243 nic_max = max(int(nidx[0]) + 1 for nidx in optvalue)
1244 except (TypeError, ValueError), err:
1245 raise errors.OpPrereqError("Invalid NIC index passed: %s" % str(err),
1246 errors.ECODE_INVAL)
1247
1248 nics = [{}] * nic_max
1249 for nidx, ndict in optvalue:
1250 nidx = int(nidx)
1251
1252 if not isinstance(ndict, dict):
1253 raise errors.OpPrereqError("Invalid nic/%d value: expected dict,"
1254 " got %s" % (nidx, ndict), errors.ECODE_INVAL)
1255
1256 utils.ForceDictType(ndict, constants.INIC_PARAMS_TYPES)
1257
1258 nics[nidx] = ndict
1259
1260 return nics
1261
1262
1263 def FixHvParams(hvparams):
1264 # In Ganeti 2.8.4 the separator for the usb_devices hvparam was changed from
1265 # comma to space because commas cannot be accepted on the command line
1266 # (they already act as the separator between different hvparams). Still,
1267 # RAPI should be able to accept commas for backwards compatibility.
1268 # Therefore, we convert spaces into commas here, and we keep the old
1269 # parsing logic everywhere else.
1270 try:
1271 new_usb_devices = hvparams[constants.HV_USB_DEVICES].replace(" ", ",")
1272 hvparams[constants.HV_USB_DEVICES] = new_usb_devices
1273 except KeyError:
1274 #No usb_devices, no modification required
1275 pass
1276
1277
1278 def GenericInstanceCreate(mode, opts, args):
1279 """Add an instance to the cluster via either creation or import.
1280
1281 @param mode: constants.INSTANCE_CREATE or constants.INSTANCE_IMPORT
1282 @param opts: the command line options selected by the user
1283 @type args: list
1284 @param args: should contain only one element, the new instance name
1285 @rtype: int
1286 @return: the desired exit code
1287
1288 """
1289 instance = args[0]
1290
1291 (pnode, snode) = SplitNodeOption(opts.node)
1292
1293 hypervisor = None
1294 hvparams = {}
1295 if opts.hypervisor:
1296 hypervisor, hvparams = opts.hypervisor
1297
1298 if opts.nics:
1299 nics = ParseNicOption(opts.nics)
1300 elif opts.no_nics:
1301 # no nics
1302 nics = []
1303 elif mode == constants.INSTANCE_CREATE:
1304 # default of one nic, all auto
1305 nics = [{}]
1306 else:
1307 # mode == import
1308 nics = []
1309
1310 if opts.disk_template == constants.DT_DISKLESS:
1311 if opts.disks or opts.sd_size is not None:
1312 raise errors.OpPrereqError("Diskless instance but disk"
1313 " information passed", errors.ECODE_INVAL)
1314 disks = []
1315 else:
1316 if (not opts.disks and not opts.sd_size
1317 and mode == constants.INSTANCE_CREATE):
1318 raise errors.OpPrereqError("No disk information specified",
1319 errors.ECODE_INVAL)
1320 if opts.disks and opts.sd_size is not None:
1321 raise errors.OpPrereqError("Please use either the '--disk' or"
1322 " '-s' option", errors.ECODE_INVAL)
1323 if opts.sd_size is not None:
1324 opts.disks = [(0, {constants.IDISK_SIZE: opts.sd_size})]
1325
1326 if opts.disks:
1327 try:
1328 disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
1329 except ValueError, err:
1330 raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err),
1331 errors.ECODE_INVAL)
1332 disks = [{}] * disk_max
1333 else:
1334 disks = []
1335 for didx, ddict in opts.disks:
1336 didx = int(didx)
1337 if not isinstance(ddict, dict):
1338 msg = "Invalid disk/%d value: expected dict, got %s" % (didx, ddict)
1339 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
1340 elif constants.IDISK_SIZE in ddict:
1341 if constants.IDISK_ADOPT in ddict:
1342 raise errors.OpPrereqError("Only one of 'size' and 'adopt' allowed"
1343 " (disk %d)" % didx, errors.ECODE_INVAL)
1344 try:
1345 ddict[constants.IDISK_SIZE] = \
1346 utils.ParseUnit(ddict[constants.IDISK_SIZE])
1347 except ValueError, err:
1348 raise errors.OpPrereqError("Invalid disk size for disk %d: %s" %
1349 (didx, err), errors.ECODE_INVAL)
1350 elif constants.IDISK_ADOPT in ddict:
1351 if constants.IDISK_SPINDLES in ddict:
1352 raise errors.OpPrereqError("spindles is not a valid option when"
1353 " adopting a disk", errors.ECODE_INVAL)
1354 if mode == constants.INSTANCE_IMPORT:
1355 raise errors.OpPrereqError("Disk adoption not allowed for instance"
1356 " import", errors.ECODE_INVAL)
1357 ddict[constants.IDISK_SIZE] = 0
1358 else:
1359 raise errors.OpPrereqError("Missing size or adoption source for"
1360 " disk %d" % didx, errors.ECODE_INVAL)
1361 if constants.IDISK_SPINDLES in ddict:
1362 ddict[constants.IDISK_SPINDLES] = int(ddict[constants.IDISK_SPINDLES])
1363
1364 disks[didx] = ddict
1365
1366 if opts.tags is not None:
1367 tags = opts.tags.split(",")
1368 else:
1369 tags = []
1370
1371 utils.ForceDictType(opts.beparams, constants.BES_PARAMETER_COMPAT)
1372 utils.ForceDictType(hvparams, constants.HVS_PARAMETER_TYPES)
1373 FixHvParams(hvparams)
1374
1375 osparams_private = opts.osparams_private or serializer.PrivateDict()
1376 osparams_secret = opts.osparams_secret or serializer.PrivateDict()
1377
1378 helper_startup_timeout = opts.helper_startup_timeout
1379 helper_shutdown_timeout = opts.helper_shutdown_timeout
1380
1381 if mode == constants.INSTANCE_CREATE:
1382 start = opts.start
1383 os_type = opts.os
1384 force_variant = opts.force_variant
1385 src_node = None
1386 src_path = None
1387 no_install = opts.no_install
1388 identify_defaults = False
1389 compress = constants.IEC_NONE
1390 if opts.instance_communication is None:
1391 instance_communication = False
1392 else:
1393 instance_communication = opts.instance_communication
1394 elif mode == constants.INSTANCE_IMPORT:
1395 start = False
1396 os_type = None
1397 force_variant = False
1398 src_node = opts.src_node
1399 src_path = opts.src_dir
1400 no_install = None
1401 identify_defaults = opts.identify_defaults
1402 compress = opts.compress
1403 instance_communication = False
1404 else:
1405 raise errors.ProgrammerError("Invalid creation mode %s" % mode)
1406
1407 op = opcodes.OpInstanceCreate(
1408 instance_name=instance,
1409 disks=disks,
1410 disk_template=opts.disk_template,
1411 group_name=opts.nodegroup,
1412 nics=nics,
1413 conflicts_check=opts.conflicts_check,
1414 pnode=pnode, snode=snode,
1415 ip_check=opts.ip_check,
1416 name_check=opts.name_check,
1417 wait_for_sync=opts.wait_for_sync,
1418 file_storage_dir=opts.file_storage_dir,
1419 file_driver=opts.file_driver,
1420 iallocator=opts.iallocator,
1421 hypervisor=hypervisor,
1422 hvparams=hvparams,
1423 beparams=opts.beparams,
1424 osparams=opts.osparams,
1425 osparams_private=osparams_private,
1426 osparams_secret=osparams_secret,
1427 mode=mode,
1428 opportunistic_locking=opts.opportunistic_locking,
1429 start=start,
1430 os_type=os_type,
1431 force_variant=force_variant,
1432 src_node=src_node,
1433 src_path=src_path,
1434 compress=compress,
1435 tags=tags,
1436 no_install=no_install,
1437 identify_defaults=identify_defaults,
1438 ignore_ipolicy=opts.ignore_ipolicy,
1439 instance_communication=instance_communication,
1440 helper_startup_timeout=helper_startup_timeout,
1441 helper_shutdown_timeout=helper_shutdown_timeout)
1442
1443 SubmitOrSend(op, opts)
1444 return 0
1445
1446
1447 class _RunWhileDaemonsStoppedHelper(object):
1448 """Helper class for L{RunWhileDaemonsStopped} to simplify state management
1449
1450 """
1451 def __init__(self, feedback_fn, cluster_name, master_node,
1452 online_nodes, ssh_ports, exclude_daemons, debug,
1453 verbose):
1454 """Initializes this class.
1455
1456 @type feedback_fn: callable
1457 @param feedback_fn: Feedback function
1458 @type cluster_name: string
1459 @param cluster_name: Cluster name
1460 @type master_node: string
1461 @param master_node Master node name
1462 @type online_nodes: list
1463 @param online_nodes: List of names of online nodes
1464 @type ssh_ports: list
1465 @param ssh_ports: List of SSH ports of online nodes
1466 @type exclude_daemons: list of string
1467 @param exclude_daemons: list of daemons that will be restarted on master
1468 after all others are shutdown
1469 @type debug: boolean
1470 @param debug: show debug output
1471 @type verbose: boolesn
1472 @param verbose: show verbose output
1473
1474 """
1475 self.feedback_fn = feedback_fn
1476 self.cluster_name = cluster_name
1477 self.master_node = master_node
1478 self.online_nodes = online_nodes
1479 self.ssh_ports = dict(zip(online_nodes, ssh_ports))
1480
1481 self.ssh = ssh.SshRunner(self.cluster_name)
1482
1483 self.nonmaster_nodes = [name for name in online_nodes
1484 if name != master_node]
1485
1486 self.exclude_daemons = exclude_daemons
1487 self.debug = debug
1488 self.verbose = verbose
1489
1490 assert self.master_node not in self.nonmaster_nodes
1491
1492 def _RunCmd(self, node_name, cmd):
1493 """Runs a command on the local or a remote machine.
1494
1495 @type node_name: string
1496 @param node_name: Machine name
1497 @type cmd: list
1498 @param cmd: Command
1499
1500 """
1501 if node_name is None or node_name == self.master_node:
1502 # No need to use SSH
1503 result = utils.RunCmd(cmd)
1504 else:
1505 result = self.ssh.Run(node_name, constants.SSH_LOGIN_USER,
1506 utils.ShellQuoteArgs(cmd),
1507 port=self.ssh_ports[node_name])
1508
1509 if result.failed:
1510 errmsg = ["Failed to run command %s" % result.cmd]
1511 if node_name:
1512 errmsg.append("on node %s" % node_name)
1513 errmsg.append(": exitcode %s and error %s" %
1514 (result.exit_code, result.output))
1515 raise errors.OpExecError(" ".join(errmsg))
1516
1517 def Call(self, fn, *args):
1518 """Call function while all daemons are stopped.
1519
1520 @type fn: callable
1521 @param fn: Function to be called
1522
1523 """
1524 # Pause watcher by acquiring an exclusive lock on watcher state file
1525 self.feedback_fn("Blocking watcher")
1526 watcher_block = utils.FileLock.Open(pathutils.WATCHER_LOCK_FILE)
1527 try:
1528 # TODO: Currently, this just blocks. There's no timeout.
1529 # TODO: Should it be a shared lock?
1530 watcher_block.Exclusive(blocking=True)
1531
1532 # Stop master daemons, so that no new jobs can come in and all running
1533 # ones are finished
1534 self.feedback_fn("Stopping master daemons")
1535 self._RunCmd(None, [pathutils.DAEMON_UTIL, "stop-master"])
1536 try:
1537 # Stop daemons on all nodes
1538 online_nodes = [self.master_node] + [n for n in self.online_nodes
1539 if n != self.master_node]
1540 for node_name in online_nodes:
1541 self.feedback_fn("Stopping daemons on %s" % node_name)
1542 self._RunCmd(node_name, [pathutils.DAEMON_UTIL, "stop-all"])
1543 # Starting any daemons listed as exception
1544 if node_name == self.master_node:
1545 for daemon in self.exclude_daemons:
1546 self.feedback_fn("Starting daemon '%s' on %s" % (daemon,
1547 node_name))
1548 self._RunCmd(node_name, [pathutils.DAEMON_UTIL, "start", daemon])
1549
1550 # All daemons are shut down now
1551 try:
1552 return fn(self, *args)
1553 except Exception, err:
1554 _, errmsg = FormatError(err)
1555 logging.exception("Caught exception")
1556 self.feedback_fn(errmsg)
1557 raise
1558 finally:
1559 # Start cluster again, master node last
1560 for node_name in self.nonmaster_nodes + [self.master_node]:
1561 # Stopping any daemons listed as exception.
1562 # This might look unnecessary, but it makes sure that daemon-util
1563 # starts all daemons in the right order.
1564 if node_name == self.master_node:
1565 self.exclude_daemons.reverse()
1566 for daemon in self.exclude_daemons:
1567 self.feedback_fn("Stopping daemon '%s' on %s" % (daemon,
1568 node_name))
1569 self._RunCmd(node_name, [pathutils.DAEMON_UTIL, "stop", daemon])
1570 self.feedback_fn("Starting daemons on %s" % node_name)
1571 self._RunCmd(node_name, [pathutils.DAEMON_UTIL, "start-all"])
1572
1573 finally:
1574 # Resume watcher
1575 watcher_block.Close()
1576
1577
1578 def RunWhileDaemonsStopped(feedback_fn, exclude_daemons, fn, *args, **kwargs):
1579 """Calls a function while all cluster daemons are stopped.
1580
1581 @type feedback_fn: callable
1582 @param feedback_fn: Feedback function
1583 @type exclude_daemons: list of string
1584 @param exclude_daemons: list of daemons that stopped, but immediately
1585 restarted on the master to be available when calling
1586 'fn'. If None, all daemons will be stopped and none
1587 will be started before calling 'fn'.
1588 @type fn: callable
1589 @param fn: Function to be called when daemons are stopped
1590
1591 """
1592 feedback_fn("Gathering cluster information")
1593
1594 # This ensures we're running on the master daemon
1595 cl = GetClient()
1596
1597 (cluster_name, master_node) = \
1598 cl.QueryConfigValues(["cluster_name", "master_node"])
1599
1600 online_nodes = GetOnlineNodes([], cl=cl)
1601 ssh_ports = GetNodesSshPorts(online_nodes, cl)
1602
1603 # Don't keep a reference to the client. The master daemon will go away.
1604 del cl
1605
1606 assert master_node in online_nodes
1607 if exclude_daemons is None:
1608 exclude_daemons = []
1609
1610 debug = kwargs.get("debug", False)
1611 verbose = kwargs.get("verbose", False)
1612
1613 return _RunWhileDaemonsStoppedHelper(
1614 feedback_fn, cluster_name, master_node, online_nodes, ssh_ports,
1615 exclude_daemons, debug, verbose).Call(fn, *args)
1616
1617
1618 def RunWhileClusterStopped(feedback_fn, fn, *args):
1619 """Calls a function while all cluster daemons are stopped.
1620
1621 @type feedback_fn: callable
1622 @param feedback_fn: Feedback function
1623 @type fn: callable
1624 @param fn: Function to be called when daemons are stopped
1625
1626 """
1627 RunWhileDaemonsStopped(feedback_fn, None, fn, *args)
1628
1629
1630 def GenerateTable(headers, fields, separator, data,
1631 numfields=None, unitfields=None,
1632 units=None):
1633 """Prints a table with headers and different fields.
1634
1635 @type headers: dict
1636 @param headers: dictionary mapping field names to headers for
1637 the table
1638 @type fields: list
1639 @param fields: the field names corresponding to each row in
1640 the data field
1641 @param separator: the separator to be used; if this is None,
1642 the default 'smart' algorithm is used which computes optimal
1643 field width, otherwise just the separator is used between
1644 each field
1645 @type data: list
1646 @param data: a list of lists, each sublist being one row to be output
1647 @type numfields: list
1648 @param numfields: a list with the fields that hold numeric
1649 values and thus should be right-aligned
1650 @type unitfields: list
1651 @param unitfields: a list with the fields that hold numeric
1652 values that should be formatted with the units field
1653 @type units: string or None
1654 @param units: the units we should use for formatting, or None for
1655 automatic choice (human-readable for non-separator usage, otherwise
1656 megabytes); this is a one-letter string
1657
1658 """
1659 if units is None:
1660 if separator:
1661 units = "m"
1662 else:
1663 units = "h"
1664
1665 if numfields is None:
1666 numfields = []
1667 if unitfields is None:
1668 unitfields = []
1669
1670 numfields = utils.FieldSet(*numfields) # pylint: disable=W0142
1671 unitfields = utils.FieldSet(*unitfields) # pylint: disable=W0142
1672
1673 format_fields = []
1674 for field in fields:
1675 if headers and field not in headers:
1676 # TODO: handle better unknown fields (either revert to old
1677 # style of raising exception, or deal more intelligently with
1678 # variable fields)
1679 headers[field] = field
1680 if separator is not None:
1681 format_fields.append("%s")
1682 elif numfields.Matches(field):
1683 format_fields.append("%*s")
1684 else:
1685 format_fields.append("%-*s")
1686
1687 if separator is None:
1688 mlens = [0 for name in fields]
1689 format_str = " ".join(format_fields)
1690 else:
1691 format_str = separator.replace("%", "%%").join(format_fields)
1692
1693 for row in data:
1694 if row is None:
1695 continue
1696 for idx, val in enumerate(row):
1697 if unitfields.Matches(fields[idx]):
1698 try:
1699 val = int(val)
1700 except (TypeError, ValueError):
1701 pass
1702 else:
1703 val = row[idx] = utils.FormatUnit(val, units)
1704 val = row[idx] = str(val)
1705 if separator is None:
1706 mlens[idx] = max(mlens[idx], len(val))
1707
1708 result = []
1709 if headers:
1710 args = []
1711 for idx, name in enumerate(fields):
1712 hdr = headers[name]
1713 if separator is None:
1714 mlens[idx] = max(mlens[idx], len(hdr))
1715 args.append(mlens[idx])
1716 args.append(hdr)
1717 result.append(format_str % tuple(args))
1718
1719 if separator is None:
1720 assert len(mlens) == len(fields)
1721
1722 if fields and not numfields.Matches(fields[-1]):
1723 mlens[-1] = 0
1724
1725 for line in data:
1726 args = []
1727 if line is None:
1728 line = ["-" for _ in fields]
1729 for idx in range(len(fields)):
1730 if separator is None:
1731 args.append(mlens[idx])
1732 args.append(line[idx])
1733 result.append(format_str % tuple(args))
1734
1735 return result
1736
1737
1738 def _FormatBool(value):
1739 """Formats a boolean value as a string.
1740
1741 """
1742 if value:
1743 return "Y"
1744 return "N"
1745
1746
1747 #: Default formatting for query results; (callback, align right)
1748 _DEFAULT_FORMAT_QUERY = {
1749 constants.QFT_TEXT: (str, False),
1750 constants.QFT_BOOL: (_FormatBool, False),
1751 constants.QFT_NUMBER: (str, True),
1752 constants.QFT_NUMBER_FLOAT: (str, True),
1753 constants.QFT_TIMESTAMP: (utils.FormatTime, False),
1754 constants.QFT_OTHER: (str, False),
1755 constants.QFT_UNKNOWN: (str, False),
1756 }
1757
1758
1759 def _GetColumnFormatter(fdef, override, unit):
1760 """Returns formatting function for a field.
1761
1762 @type fdef: L{objects.QueryFieldDefinition}
1763 @type override: dict
1764 @param override: Dictionary for overriding field formatting functions,
1765 indexed by field name, contents like L{_DEFAULT_FORMAT_QUERY}
1766 @type unit: string
1767 @param unit: Unit used for formatting fields of type L{constants.QFT_UNIT}
1768 @rtype: tuple; (callable, bool)
1769 @return: Returns the function to format a value (takes one parameter) and a
1770 boolean for aligning the value on the right-hand side
1771
1772 """
1773 fmt = override.get(fdef.name, None)
1774 if fmt is not None:
1775 return fmt
1776
1777 assert constants.QFT_UNIT not in _DEFAULT_FORMAT_QUERY
1778
1779 if fdef.kind == constants.QFT_UNIT:
1780 # Can't keep this information in the static dictionary
1781 return (lambda value: utils.FormatUnit(value, unit), True)
1782
1783 fmt = _DEFAULT_FORMAT_QUERY.get(fdef.kind, None)
1784 if fmt is not None:
1785 return fmt
1786
1787 raise NotImplementedError("Can't format column type '%s'" % fdef.kind)
1788
1789
1790 class _QueryColumnFormatter(object):
1791 """Callable class for formatting fields of a query.
1792
1793 """
1794 def __init__(self, fn, status_fn, verbose):
1795 """Initializes this class.
1796
1797 @type fn: callable
1798 @param fn: Formatting function
1799 @type status_fn: callable
1800 @param status_fn: Function to report fields' status
1801 @type verbose: boolean
1802 @param verbose: whether to use verbose field descriptions or not
1803
1804 """
1805 self._fn = fn
1806 self._status_fn = status_fn
1807 self._verbose = verbose
1808
1809 def __call__(self, data):
1810 """Returns a field's string representation.
1811
1812 """
1813 (status, value) = data
1814
1815 # Report status
1816 self._status_fn(status)
1817
1818 if status == constants.RS_NORMAL:
1819 return self._fn(value)
1820
1821 assert value is None, \
1822 "Found value %r for abnormal status %s" % (value, status)
1823
1824 return FormatResultError(status, self._verbose)
1825
1826
1827 def FormatResultError(status, verbose):
1828 """Formats result status other than L{constants.RS_NORMAL}.
1829
1830 @param status: The result status
1831 @type verbose: boolean
1832 @param verbose: Whether to return the verbose text
1833 @return: Text of result status
1834
1835 """
1836 assert status != constants.RS_NORMAL, \
1837 "FormatResultError called with status equal to constants.RS_NORMAL"
1838 try:
1839 (verbose_text, normal_text) = constants.RSS_DESCRIPTION[status]
1840 except KeyError:
1841 raise NotImplementedError("Unknown status %s" % status)
1842 else:
1843 if verbose:
1844 return verbose_text
1845 return normal_text
1846
1847
1848 def FormatQueryResult(result, unit=None, format_override=None, separator=None,
1849 header=False, verbose=False):
1850 """Formats data in L{objects.QueryResponse}.
1851
1852 @type result: L{objects.QueryResponse}
1853 @param result: result of query operation
1854 @type unit: string
1855 @param unit: Unit used for formatting fields of type L{constants.QFT_UNIT},
1856 see L{utils.text.FormatUnit}
1857 @type format_override: dict
1858 @param format_override: Dictionary for overriding field formatting functions,
1859 indexed by field name, contents like L{_DEFAULT_FORMAT_QUERY}
1860 @type separator: string or None
1861 @param separator: String used to separate fields
1862 @type header: bool
1863 @param header: Whether to output header row
1864 @type verbose: boolean
1865 @param verbose: whether to use verbose field descriptions or not
1866
1867 """
1868 if unit is None:
1869 if separator:
1870 unit = "m"
1871 else:
1872 unit = "h"
1873
1874 if format_override is None:
1875 format_override = {}
1876
1877 stats = dict.fromkeys(constants.RS_ALL, 0)
1878
1879 def _RecordStatus(status):
1880 if status in stats:
1881 stats[status] += 1
1882
1883 columns = []
1884 for fdef in result.fields:
1885 assert fdef.title and fdef.name
1886 (fn, align_right) = _GetColumnFormatter(fdef, format_override, unit)
1887 columns.append(TableColumn(fdef.title,
1888 _QueryColumnFormatter(fn, _RecordStatus,
1889 verbose),
1890 align_right))
1891
1892 table = FormatTable(result.data, columns, header, separator)
1893
1894 # Collect statistics
1895 assert len(stats) == len(constants.RS_ALL)
1896 assert compat.all(count >= 0 for count in stats.values())
1897
1898 # Determine overall status. If there was no data, unknown fields must be
1899 # detected via the field definitions.
1900 if (stats[constants.RS_UNKNOWN] or
1901 (not result.data and _GetUnknownFields(result.fields))):
1902 status = QR_UNKNOWN
1903 elif compat.any(count > 0 for key, count in stats.items()
1904 if key != constants.RS_NORMAL):
1905 status = QR_INCOMPLETE
1906 else:
1907 status = QR_NORMAL
1908
1909 return (status, table)
1910
1911
1912 def _GetUnknownFields(fdefs):
1913 """Returns list of unknown fields included in C{fdefs}.
1914
1915 @type fdefs: list of L{objects.QueryFieldDefinition}
1916
1917 """
1918 return [fdef for fdef in fdefs
1919 if fdef.kind == constants.QFT_UNKNOWN]
1920
1921
1922 def _WarnUnknownFields(fdefs):
1923 """Prints a warning to stderr if a query included unknown fields.
1924
1925 @type fdefs: list of L{objects.QueryFieldDefinition}
1926
1927 """
1928 unknown = _GetUnknownFields(fdefs)
1929 if unknown:
1930 ToStderr("Warning: Queried for unknown fields %s",
1931 utils.CommaJoin(fdef.name for fdef in unknown))
1932 return True
1933
1934 return False
1935
1936
1937 def GenericList(resource, fields, names, unit, separator, header, cl=None,
1938 format_override=None, verbose=False, force_filter=False,
1939 namefield=None, qfilter=None, isnumeric=False):
1940 """Generic implementation for listing all items of a resource.
1941
1942 @param resource: One of L{constants.QR_VIA_LUXI}
1943 @type fields: list of strings
1944 @param fields: List of fields to query for
1945 @type names: list of strings
1946 @param names: Names of items to query for
1947 @type unit: string or None
1948 @param unit: Unit used for formatting fields of type L{constants.QFT_UNIT} or
1949 None for automatic choice (human-readable for non-separator usage,
1950 otherwise megabytes); this is a one-letter string
1951 @type separator: string or None
1952 @param separator: String used to separate fields
1953 @type header: bool
1954 @param header: Whether to show header row
1955 @type force_filter: bool
1956 @param force_filter: Whether to always treat names as filter
1957 @type format_override: dict
1958 @param format_override: Dictionary for overriding field formatting functions,
1959 indexed by field name, contents like L{_DEFAULT_FORMAT_QUERY}
1960 @type verbose: boolean
1961 @param verbose: whether to use verbose field descriptions or not
1962 @type namefield: string
1963 @param namefield: Name of field to use for simple filters (see
1964 L{qlang.MakeFilter} for details)
1965 @type qfilter: list or None
1966 @param qfilter: Query filter (in addition to names)
1967 @param isnumeric: bool
1968 @param isnumeric: Whether the namefield's type is numeric, and therefore
1969 any simple filters built by namefield should use integer values to
1970 reflect that
1971
1972 """
1973 if not names:
1974 names = None
1975
1976 namefilter = qlang.MakeFilter(names, force_filter, namefield=namefield,
1977 isnumeric=isnumeric)
1978
1979 if qfilter is None:
1980 qfilter = namefilter
1981 elif namefilter is not None:
1982 qfilter = [qlang.OP_AND, namefilter, qfilter]
1983
1984 if cl is None:
1985 cl = GetClient()
1986
1987 response = cl.Query(resource, fields, qfilter)
1988
1989 found_unknown = _WarnUnknownFields(response.fields)
1990
1991 (status, data) = FormatQueryResult(response, unit=unit, separator=separator,
1992 header=header,
1993 format_override=format_override,
1994 verbose=verbose)
1995
1996 for line in data:
1997 ToStdout(line)
1998
1999 assert ((found_unknown and status == QR_UNKNOWN) or
2000 (not found_unknown and status != QR_UNKNOWN))
2001
2002 if status == QR_UNKNOWN:
2003 return constants.EXIT_UNKNOWN_FIELD
2004
2005 # TODO: Should the list command fail if not all data could be collected?
2006 return constants.EXIT_SUCCESS
2007
2008
2009 def _FieldDescValues(fdef):
2010 """Helper function for L{GenericListFields} to get query field description.
2011
2012 @type fdef: L{objects.QueryFieldDefinition}
2013 @rtype: list
2014
2015 """
2016 return [
2017 fdef.name,
2018 _QFT_NAMES.get(fdef.kind, fdef.kind),
2019 fdef.title,
2020 fdef.doc,
2021 ]
2022
2023
2024 def GenericListFields(resource, fields, separator, header, cl=None):
2025 """Generic implementation for listing fields for a resource.
2026
2027 @param resource: One of L{constants.QR_VIA_LUXI}
2028 @type fields: list of strings
2029 @param fields: List of fields to query for
2030 @type separator: string or None
2031 @param separator: String used to separate fields
2032 @type header: bool
2033 @param header: Whether to show header row
2034
2035 """
2036 if cl is None:
2037 cl = GetClient()
2038
2039 if not fields:
2040 fields = None
2041
2042 response = cl.QueryFields(resource, fields)
2043
2044 found_unknown = _WarnUnknownFields(response.fields)
2045
2046 columns = [
2047 TableColumn("Name", str, False),
2048 TableColumn("Type", str, False),
2049 TableColumn("Title", str, False),
2050 TableColumn("Description", str, False),
2051 ]
2052
2053 rows = map(_FieldDescValues, response.fields)
2054
2055 for line in FormatTable(rows, columns, header, separator):
2056 ToStdout(line)
2057
2058 if found_unknown:
2059 return constants.EXIT_UNKNOWN_FIELD
2060
2061 return constants.EXIT_SUCCESS
2062
2063
2064 class TableColumn(object):
2065 """Describes a column for L{FormatTable}.
2066
2067 """
2068 def __init__(self, title, fn, align_right):
2069 """Initializes this class.
2070
2071 @type title: string
2072 @param title: Column title
2073 @type fn: callable
2074 @param fn: Formatting function
2075 @type align_right: bool
2076 @param align_right: Whether to align values on the right-hand side
2077
2078 """
2079 self.title = title
2080 self.format = fn
2081 self.align_right = align_right
2082
2083
2084 def _GetColFormatString(width, align_right):
2085 """Returns the format string for a field.
2086
2087 """
2088 if align_right:
2089 sign = ""
2090 else:
2091 sign = "-"
2092
2093 return "%%%s%ss" % (sign, width)
2094
2095
2096 def FormatTable(rows, columns, header, separator):
2097 """Formats data as a table.
2098
2099 @type rows: list of lists
2100 @param rows: Row data, one list per row
2101 @type columns: list of L{TableColumn}
2102 @param columns: Column descriptions
2103 @type header: bool
2104 @param header: Whether to show header row
2105 @type separator: string or None
2106 @param separator: String used to separate columns
2107
2108 """
2109 if header:
2110 data = [[col.title for col in columns]]
2111 colwidth = [len(col.title) for col in columns]
2112 else:
2113 data = []
2114 colwidth = [0 for _ in columns]
2115
2116 # Format row data
2117 for row in rows:
2118 assert len(row) == len(columns)
2119
2120 formatted = [col.format(value) for value, col in zip(row, columns)]
2121
2122 if separator is None:
2123 # Update column widths
2124 for idx, (oldwidth, value) in enumerate(zip(colwidth, formatted)):
2125 # Modifying a list's items while iterating is fine
2126 colwidth[idx] = max(oldwidth, len(value))
2127
2128 data.append(formatted)
2129
2130 if separator is not None:
2131 # Return early if a separator is used
2132 return [separator.join(row) for row in data]
2133
2134 if columns and not columns[-1].align_right:
2135 # Avoid unnecessary spaces at end of line
2136 colwidth[-1] = 0
2137
2138 # Build format string
2139 fmt = " ".join([_GetColFormatString(width, col.align_right)
2140 for col, width in zip(columns, colwidth)])
2141
2142 return [fmt % tuple(row) for row in data]
2143
2144
2145 def FormatTimestamp(ts):
2146 """Formats a given timestamp.
2147
2148 @type ts: timestamp
2149 @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
2150
2151 @rtype: string
2152 @return: a string with the formatted timestamp
2153
2154 """
2155 if not isinstance(ts, (tuple, list)) or len(ts) != 2:
2156 return "?"
2157
2158 (sec, usecs) = ts
2159 return utils.FormatTime(sec, usecs=usecs)
2160
2161
2162 def ParseTimespec(value):
2163 """Parse a time specification.
2164
2165 The following suffixed will be recognized:
2166
2167 - s: seconds
2168 - m: minutes
2169 - h: hours
2170 - d: day
2171 - w: weeks
2172
2173 Without any suffix, the value will be taken to be in seconds.
2174
2175 """
2176 value = str(value)
2177 if not value:
2178 raise errors.OpPrereqError("Empty time specification passed",
2179 errors.ECODE_INVAL)
2180 suffix_map = {
2181 "s": 1,
2182 "m": 60,
2183 "h": 3600,
2184 "d": 86400,
2185 "w": 604800,
2186 }
2187 if value[-1] not in suffix_map:
2188 try:
2189 value = int(value)
2190 except (TypeError, ValueError):
2191 raise errors.OpPrereqError("Invalid time specification '%s'" % value,
2192 errors.ECODE_INVAL)
2193 else:
2194 multiplier = suffix_map[value[-1]]
2195 value = value[:-1]
2196 if not value: # no data left after stripping the suffix
2197 raise errors.OpPrereqError("Invalid time specification (only"
2198 " suffix passed)", errors.ECODE_INVAL)
2199 try:
2200 value = int(value) * multiplier
2201 except (TypeError, ValueError):
2202 raise errors.OpPrereqError("Invalid time specification '%s'" % value,
2203 errors.ECODE_INVAL)
2204 return value
2205
2206
2207 def GetOnlineNodes(nodes, cl=None, nowarn=False, secondary_ips=False,
2208 filter_master=False, nodegroup=None):
2209 """Returns the names of online nodes.
2210
2211 This function will also log a warning on stderr with the names of
2212 the online nodes.
2213
2214 @param nodes: if not empty, use only this subset of nodes (minus the
2215 offline ones)
2216 @param cl: if not None, luxi client to use
2217 @type nowarn: boolean
2218 @param nowarn: by default, this function will output a note with the
2219 offline nodes that are skipped; if this parameter is True the
2220 note is not displayed
2221 @type secondary_ips: boolean
2222 @param secondary_ips: if True, return the secondary IPs instead of the
2223 names, useful for doing network traffic over the replication interface
2224 (if any)
2225 @type filter_master: boolean
2226 @param filter_master: if True, do not return the master node in the list
2227 (useful in coordination with secondary_ips where we cannot check our
2228 node name against the list)
2229 @type nodegroup: string
2230 @param nodegroup: If set, only return nodes in this node group
2231
2232 """
2233 if cl is None:
2234 cl = GetClient()
2235
2236 qfilter = []
2237
2238 if nodes:
2239 qfilter.append(qlang.MakeSimpleFilter("name", nodes))
2240
2241 if nodegroup is not None:
2242 qfilter.append([qlang.OP_OR, [qlang.OP_EQUAL, "group", nodegroup],
2243 [qlang.OP_EQUAL, "group.uuid", nodegroup]])
2244
2245 if filter_master:
2246 qfilter.append([qlang.OP_NOT, [qlang.OP_TRUE, "master"]])
2247
2248 if qfilter:
2249 if len(qfilter) > 1:
2250 final_filter = [qlang.OP_AND] + qfilter
2251 else:
2252 assert len(qfilter) == 1
2253 final_filter = qfilter[0]
2254 else:
2255 final_filter = None
2256
2257 result = cl.Query(constants.QR_NODE, ["name", "offline", "sip"], final_filter)
2258
2259 def _IsOffline(row):
2260 (_, (_, offline), _) = row
2261 return offline
2262
2263 def _GetName(row):
2264 ((_, name), _, _) = row
2265 return name
2266
2267 def _GetSip(row):
2268 (_, _, (_, sip)) = row
2269 return sip
2270
2271 (offline, online) = compat.partition(result.data, _IsOffline)
2272
2273 if offline and not nowarn:
2274 ToStderr("Note: skipping offline node(s): %s" %
2275 utils.CommaJoin(map(_GetName, offline)))
2276
2277 if secondary_ips:
2278 fn = _GetSip
2279 else:
2280 fn = _GetName
2281
2282 return map(fn, online)
2283
2284
2285 def GetNodesSshPorts(nodes, cl):
2286 """Retrieves SSH ports of given nodes.
2287
2288 @param nodes: the names of nodes
2289 @type nodes: a list of strings
2290 @param cl: a client to use for the query
2291 @type cl: L{ganeti.luxi.Client}
2292 @return: the list of SSH ports corresponding to the nodes
2293 @rtype: a list of tuples
2294
2295 """
2296 return map(lambda t: t[0],
2297 cl.QueryNodes(names=nodes,
2298 fields=["ndp/ssh_port"],
2299 use_locking=False))
2300
2301
2302 def GetNodeUUIDs(nodes, cl):
2303 """Retrieves the UUIDs of given nodes.
2304
2305 @param nodes: the names of nodes
2306 @type nodes: a list of string
2307 @param cl: a client to use for the query
2308 @type cl: L{ganeti.luxi.Client}
2309 @return: the list of UUIDs corresponding to the nodes
2310 @rtype: a list of tuples
2311
2312 """
2313 return map(lambda t: t[0],
2314 cl.QueryNodes(names=nodes,
2315 fields=["uuid"],
2316 use_locking=False))
2317
2318
2319 def _ToStream(stream, txt, *args):
2320 """Write a message to a stream, bypassing the logging system
2321
2322 @type stream: file object
2323 @param stream: the file to which we should write
2324 @type txt: str
2325 @param txt: the message
2326
2327 """
2328 try:
2329 if args:
2330 args = tuple(args)
2331 stream.write(txt % args)
2332 else:
2333 stream.write(txt)
2334 stream.write("\n")
2335 stream.flush()
2336 except IOError, err:
2337 if err.errno == errno.EPIPE:
2338 # our terminal went away, we'll exit
2339 sys.exit(constants.EXIT_FAILURE)
2340 else:
2341 raise
2342
2343
2344 def ToStdout(txt, *args):
2345 """Write a message to stdout only, bypassing the logging system
2346
2347 This is just a wrapper over _ToStream.
2348
2349 @type txt: str
2350 @param txt: the message
2351
2352 """
2353 _ToStream(sys.stdout, txt, *args)
2354
2355
2356 def ToStderr(txt, *args):
2357 """Write a message to stderr only, bypassing the logging system
2358
2359 This is just a wrapper over _ToStream.
2360
2361 @type txt: str
2362 @param txt: the message
2363
2364 """
2365 _ToStream(sys.stderr, txt, *args)
2366
2367
2368 class JobExecutor(object):
2369 """Class which manages the submission and execution of multiple jobs.
2370
2371 Note that instances of this class should not be reused between
2372 GetResults() calls.
2373
2374 """
2375 def __init__(self, cl=None, verbose=True, opts=None, feedback_fn=None):
2376 self.queue = []
2377 if cl is None:
2378 cl = GetClient()
2379 self.cl = cl
2380 self.verbose = verbose
2381 self.jobs = []
2382 self.opts = opts
2383 self.feedback_fn = feedback_fn
2384 self._counter = itertools.count()
2385
2386 @staticmethod
2387 def _IfName(name, fmt):
2388 """Helper function for formatting name.
2389
2390 """
2391 if name:
2392 return fmt % name
2393
2394 return ""
2395
2396 def QueueJob(self, name, *ops):
2397 """Record a job for later submit.
2398
2399 @type name: string
2400 @param name: a description of the job, will be used in WaitJobSet
2401
2402 """
2403 SetGenericOpcodeOpts(ops, self.opts)
2404 self.queue.append((self._counter.next(), name, ops))
2405
2406 def AddJobId(self, name, status, job_id):
2407 """Adds a job ID to the internal queue.
2408
2409 """
2410 self.jobs.append((self._counter.next(), status, job_id, name))
2411
2412 def SubmitPending(self, each=False):
2413 """Submit all pending jobs.
2414
2415 """
2416 if each:
2417 results = []
2418 for (_, _, ops) in self.queue:
2419 # SubmitJob will remove the success status, but raise an exception if
2420 # the submission fails, so we'll notice that anyway.
2421 results.append([True, self.cl.SubmitJob(ops)[0]])
2422 else:
2423 results = self.cl.SubmitManyJobs([ops for (_, _, ops) in self.queue])
2424 for ((status, data), (idx, name, _)) in zip(results, self.queue):
2425 self.jobs.append((idx, status, data, name))
2426
2427 def _ChooseJob(self):
2428 """Choose a non-waiting/queued job to poll next.
2429
2430 """
2431 assert self.jobs, "_ChooseJob called with empty job list"
2432
2433 result = self.cl.QueryJobs([i[2] for i in self.jobs[:_CHOOSE_BATCH]],
2434 ["status"])
2435 assert result
2436
2437 for job_data, status in zip(self.jobs, result):
2438 if (isinstance(status, list) and status and
2439 status[0] in (constants.JOB_STATUS_QUEUED,
2440 constants.JOB_STATUS_WAITING,
2441 constants.JOB_STATUS_CANCELING)):
2442 # job is still present and waiting
2443 continue
2444 # good candidate found (either running job or lost job)
2445 self.jobs.remove(job_data)
2446 return job_data
2447
2448 # no job found
2449 return self.jobs.pop(0)
2450
2451 def GetResults(self):
2452 """Wait for and return the results of all jobs.
2453
2454 @rtype: list
2455 @return: list of tuples (success, job results), in the same order
2456 as the submitted jobs; if a job has failed, instead of the result
2457 there will be the error message
2458
2459 """
2460 if not self.jobs:
2461 self.SubmitPending()
2462 results = []
2463 if self.verbose:
2464 ok_jobs = [row[2] for row in self.jobs if row[1]]
2465 if ok_jobs:
2466 ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
2467
2468 # first, remove any non-submitted jobs
2469 self.jobs, failures = compat.partition(self.jobs, lambda x: x[1])
2470 for idx, _, jid, name in failures:
2471 ToStderr("Failed to submit job%s: %s", self._IfName(name, " for %s"), jid)
2472 results.append((idx, False, jid))
2473
2474 while self.jobs:
2475 (idx, _, jid, name) = self._ChooseJob()
2476 ToStdout("Waiting for job %s%s ...", jid, self._IfName(name, " for %s"))
2477 try:
2478 job_result = PollJob(jid, cl=self.cl, feedback_fn=self.feedback_fn)
2479 success = True
2480 except errors.JobLost, err:
2481 _, job_result = FormatError(err)
2482 ToStderr("Job %s%s has been archived, cannot check its result",
2483 jid, self._IfName(name, " for %s"))
2484 success = False
2485 except (errors.GenericError, rpcerr.ProtocolError), err:
2486 _, job_result = FormatError(err)
2487 success = False
2488 # the error message will always be shown, verbose or not
2489 ToStderr("Job %s%s has failed: %s",
2490 jid, self._IfName(name, " for %s"), job_result)
2491
2492 results.append((idx, success, job_result))
2493
2494 # sort based on the index, then drop it
2495 results.sort()
2496 results = [i[1:] for i in results]
2497
2498 return results
2499
2500 def WaitOrShow(self, wait):
2501 """Wait for job results or only print the job IDs.
2502
2503 @type wait: boolean
2504 @param wait: whether to wait or not
2505
2506 """
2507 if wait:
2508 return self.GetResults()
2509 else:
2510 if not self.jobs:
2511 self.SubmitPending()
2512 for _, status, result, name in self.jobs:
2513 if status:
2514 ToStdout("%s: %s", result, name)
2515 else:
2516 ToStderr("Failure for %s: %s", name, result)
2517 return [row[1:3] for row in self.jobs]
2518
2519
2520 def FormatParamsDictInfo(param_dict, actual, roman=False):
2521 """Formats a parameter dictionary.
2522
2523 @type param_dict: dict
2524 @param param_dict: the own parameters
2525 @type actual: dict
2526 @param actual: the current parameter set (including defaults)
2527 @rtype: dict
2528 @return: dictionary where the value of each parameter is either a fully
2529 formatted string or a dictionary containing formatted strings
2530
2531 """
2532 ret = {}
2533 for (key, data) in actual.items():
2534 if isinstance(data, dict) and data:
2535 ret[key] = FormatParamsDictInfo(param_dict.get(key, {}), data, roman)
2536 else:
2537 default_str = "default (%s)" % compat.TryToRoman(data, roman)
2538 ret[key] = str(compat.TryToRoman(param_dict.get(key, default_str), roman))
2539 return ret
2540
2541
2542 def _FormatListInfoDefault(data, def_data):
2543 if data is not None:
2544 ret = utils.CommaJoin(data)
2545 else:
2546 ret = "default (%s)" % utils.CommaJoin(def_data)
2547 return ret
2548
2549
2550 def FormatPolicyInfo(custom_ipolicy, eff_ipolicy, iscluster, roman=False):
2551 """Formats an instance policy.
2552
2553 @type custom_ipolicy: dict
2554 @param custom_ipolicy: own policy
2555 @type eff_ipolicy: dict
2556 @param eff_ipolicy: effective policy (including defaults); ignored for
2557 cluster
2558 @type iscluster: bool
2559 @param iscluster: the policy is at cluster level
2560 @type roman: bool
2561 @param roman: whether to print the values in roman numerals
2562 @rtype: list of pairs
2563 @return: formatted data, suitable for L{PrintGenericInfo}
2564
2565 """
2566 if iscluster:
2567 eff_ipolicy = custom_ipolicy
2568
2569 minmax_out = []
2570 custom_minmax = custom_ipolicy.get(constants.ISPECS_MINMAX)
2571 if custom_minmax:
2572 for (k, minmax) in enumerate(custom_minmax):
2573 minmax_out.append([
2574 ("%s/%s" % (key, k),
2575 FormatParamsDictInfo(minmax[key], minmax[key], roman))
2576 for key in constants.ISPECS_MINMAX_KEYS
2577 ])
2578 else:
2579 for (k, minmax) in enumerate(eff_ipolicy[constants.ISPECS_MINMAX]):
2580 minmax_out.append([
2581 ("%s/%s" % (key, k),
2582 FormatParamsDictInfo({}, minmax[key], roman))
2583 for key in constants.ISPECS_MINMAX_KEYS
2584 ])
2585 ret = [("bounds specs", minmax_out)]
2586
2587 if iscluster:
2588 stdspecs = custom_ipolicy[constants.ISPECS_STD]
2589 ret.append(
2590 (constants.ISPECS_STD,
2591 FormatParamsDictInfo(stdspecs, stdspecs, roman))
2592 )
2593
2594 ret.append(
2595 ("allowed disk templates",
2596 _FormatListInfoDefault(custom_ipolicy.get(constants.IPOLICY_DTS),
2597 eff_ipolicy[constants.IPOLICY_DTS]))
2598 )
2599 to_roman = compat.TryToRoman
2600 ret.extend([
2601 (key, str(to_roman(custom_ipolicy.get(key,
2602 "default (%s)" % eff_ipolicy[key]),
2603 roman)))
2604 for key in constants.IPOLICY_PARAMETERS
2605 ])
2606 return ret
2607
2608
2609 def _PrintSpecsParameters(buf, specs):
2610 values = ("%s=%s" % (par, val) for (par, val) in sorted(specs.items()))
2611 buf.write(",".join(values))
2612
2613
2614 def PrintIPolicyCommand(buf, ipolicy, isgroup):
2615 """Print the command option used to generate the given instance policy.
2616
2617 Currently only the parts dealing with specs are supported.
2618
2619 @type buf: StringIO
2620 @param buf: stream to write into
2621 @type ipolicy: dict
2622 @param ipolicy: instance policy
2623 @type isgroup: bool
2624 @param isgroup: whether the policy is at group level
2625
2626 """
2627 if not isgroup:
2628 stdspecs = ipolicy.get("std")
2629 if stdspecs:
2630 buf.write(" %s " % IPOLICY_STD_SPECS_STR)
2631 _PrintSpecsParameters(buf, stdspecs)
2632 minmaxes = ipolicy.get("minmax", [])
2633 first = True
2634 for minmax in minmaxes:
2635 minspecs = minmax.get("min")
2636 maxspecs = minmax.get("max")
2637 if minspecs and maxspecs:
2638 if first:
2639 buf.write(" %s " % IPOLICY_BOUNDS_SPECS_STR)
2640 first = False
2641 else:
2642 buf.write("//")
2643 buf.write("min:")
2644 _PrintSpecsParameters(buf, minspecs)
2645 buf.write("/max:")
2646 _PrintSpecsParameters(buf, maxspecs)
2647
2648
2649 def ConfirmOperation(names, list_type, text, extra=""):
2650 """Ask the user to confirm an operation on a list of list_type.
2651
2652 This function is used to request confirmation for doing an operation
2653 on a given list of list_type.
2654
2655 @type names: list
2656 @param names: the list of names that we display when
2657 we ask for confirmation
2658 @type list_type: str
2659 @param list_type: Human readable name for elements in the list (e.g. nodes)
2660 @type text: str
2661 @param text: the operation that the user should confirm
2662 @rtype: boolean
2663 @return: True or False depending on user's confirmation.
2664
2665 """
2666 count = len(names)
2667 msg = ("The %s will operate on %d %s.\n%s"
2668 "Do you want to continue?" % (text, count, list_type, extra))
2669 affected = (("\nAffected %s:\n" % list_type) +
2670 "\n".join([" %s" % name for name in names]))
2671
2672 choices = [("y", True, "Yes, execute the %s" % text),
2673 ("n", False, "No, abort the %s" % text)]
2674
2675 if count > 20:
2676 choices.insert(1, ("v", "v", "View the list of affected %s" % list_type))
2677 question = msg
2678 else:
2679 question = msg + affected
2680
2681 choice = AskUser(question, choices)
2682 if choice == "v":
2683 choices.pop(1)
2684 choice = AskUser(msg + affected, choices)
2685 return choice
2686
2687
2688 def _MaybeParseUnit(elements):
2689 """Parses and returns an array of potential values with units.
2690
2691 """
2692 parsed = {}
2693 for k, v in elements.items():
2694 if v == constants.VALUE_DEFAULT:
2695 parsed[k] = v
2696 else:
2697 parsed[k] = utils.ParseUnit(v)
2698 return parsed
2699
2700
2701 def _InitISpecsFromSplitOpts(ipolicy, ispecs_mem_size, ispecs_cpu_count,
2702 ispecs_disk_count, ispecs_disk_size,
2703 ispecs_nic_count, group_ipolicy, fill_all):
2704 try:
2705 if ispecs_mem_size:
2706 ispecs_mem_size = _MaybeParseUnit(ispecs_mem_size)
2707 if ispecs_disk_size:
2708 ispecs_disk_size = _MaybeParseUnit(ispecs_disk_size)
2709 except (TypeError, ValueError, errors.UnitParseError), err:
2710 raise errors.OpPrereqError("Invalid disk (%s) or memory (%s) size"
2711 " in policy: %s" %
2712 (ispecs_disk_size, ispecs_mem_size, err),
2713 errors.ECODE_INVAL)
2714
2715 # prepare ipolicy dict
2716 ispecs_transposed = {
2717 constants.ISPEC_MEM_SIZE: ispecs_mem_size,
2718 constants.ISPEC_CPU_COUNT: ispecs_cpu_count,
2719 constants.ISPEC_DISK_COUNT: ispecs_disk_count,
2720 constants.ISPEC_DISK_SIZE: ispecs_disk_size,
2721 constants.ISPEC_NIC_COUNT: ispecs_nic_count,
2722 }
2723
2724 # first, check that the values given are correct
2725 if group_ipolicy:
2726 forced_type = TISPECS_GROUP_TYPES
2727 else:
2728 forced_type = TISPECS_CLUSTER_TYPES
2729 for specs in ispecs_transposed.values():
2730 assert type(specs) is dict
2731 utils.ForceDictType(specs, forced_type)
2732
2733 # then transpose
2734 ispecs = {
2735 constants.ISPECS_MIN: {},
2736 constants.ISPECS_MAX: {},
2737 constants.ISPECS_STD: {},
2738 }
2739 for (name, specs) in ispecs_transposed.iteritems():
2740 assert name in constants.ISPECS_PARAMETERS
2741 for key, val in specs.items(): # {min: .. ,max: .., std: ..}
2742 assert key in ispecs
2743 ispecs[key][name] = val
2744 minmax_out = {}
2745 for key in constants.ISPECS_MINMAX_KEYS:
2746 if fill_all:
2747 minmax_out[key] = \
2748 objects.FillDict(constants.ISPECS_MINMAX_DEFAULTS[key], ispecs[key])
2749 else:
2750 minmax_out[key] = ispecs[key]
2751 ipolicy[constants.ISPECS_MINMAX] = [minmax_out]
2752 if fill_all:
2753 ipolicy[constants.ISPECS_STD] = \
2754 objects.FillDict(constants.IPOLICY_DEFAULTS[constants.ISPECS_STD],
2755 ispecs[constants.ISPECS_STD])
2756 else:
2757 ipolicy[constants.ISPECS_STD] = ispecs[constants.ISPECS_STD]
2758
2759
2760 def _ParseSpecUnit(spec, keyname):
2761 ret = spec.copy()
2762 for k in [constants.ISPEC_DISK_SIZE, constants.ISPEC_MEM_SIZE]:
2763 if k in ret:
2764 try:
2765 ret[k] = utils.ParseUnit(ret[k])
2766 except (TypeError, ValueError, errors.UnitParseError), err:
2767 raise errors.OpPrereqError(("Invalid parameter %s (%s) in %s instance"
2768 " specs: %s" % (k, ret[k], keyname, err)),
2769 errors.ECODE_INVAL)
2770 return ret
2771
2772
2773 def _ParseISpec(spec, keyname, required):
2774 ret = _ParseSpecUnit(spec, keyname)
2775 utils.ForceDictType(ret, constants.ISPECS_PARAMETER_TYPES)
2776 missing = constants.ISPECS_PARAMETERS - frozenset(ret.keys())
2777 if required and missing:
2778 raise errors.OpPrereqError("Missing parameters in ipolicy spec %s: %s" %
2779 (keyname, utils.CommaJoin(missing)),
2780 errors.ECODE_INVAL)
2781 return ret
2782
2783
2784 def _GetISpecsInAllowedValues(minmax_ispecs, allowed_values):
2785 ret = None
2786 if (minmax_ispecs and allowed_values and len(minmax_ispecs) == 1 and
2787 len(minmax_ispecs[0]) == 1):
2788 for (key, spec) in minmax_ispecs[0].items():
2789 # This loop is executed exactly once
2790 if key in allowed_values and not spec:
2791 ret = key
2792 return ret
2793
2794
2795 def _InitISpecsFromFullOpts(ipolicy_out, minmax_ispecs, std_ispecs,
2796 group_ipolicy, allowed_values):
2797 found_allowed = _GetISpecsInAllowedValues(minmax_ispecs, allowed_values)
2798 if found_allowed is not None:
2799 ipolicy_out[constants.ISPECS_MINMAX] = found_allowed
2800 elif minmax_ispecs is not None:
2801 minmax_out = []
2802 for mmpair in minmax_ispecs:
2803 mmpair_out = {}
2804 for (key, spec) in mmpair.items():
2805 if key not in constants.ISPECS_MINMAX_KEYS:
2806 msg = "Invalid key in bounds instance specifications: %s" % key
2807 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
2808 mmpair_out[key] = _ParseISpec(spec, key, True)
2809 minmax_out.append(mmpair_out)
2810 ipolicy_out[constants.ISPECS_MINMAX] = minmax_out
2811 if std_ispecs is not None:
2812 assert not group_ipolicy # This is not an option for gnt-group
2813 ipolicy_out[constants.ISPECS_STD] = _ParseISpec(std_ispecs, "std", False)
2814
2815
2816 def CreateIPolicyFromOpts(ispecs_mem_size=None,
2817 ispecs_cpu_count=None,
2818 ispecs_disk_count=None,
2819 ispecs_disk_size=None,
2820 ispecs_nic_count=None,
2821 minmax_ispecs=None,
2822 std_ispecs=None,
2823 ipolicy_disk_templates=None,
2824 ipolicy_vcpu_ratio=None,
2825 ipolicy_spindle_ratio=None,
2826 group_ipolicy=False,
2827 allowed_values=None,
2828 fill_all=False):
2829 """Creation of instance policy based on command line options.
2830
2831 @param fill_all: whether for cluster policies we should ensure that
2832 all values are filled
2833
2834 """
2835 assert not (fill_all and allowed_values)
2836
2837 split_specs = (ispecs_mem_size or ispecs_cpu_count or ispecs_disk_count or
2838 ispecs_disk_size or ispecs_nic_count)
2839 if (split_specs and (minmax_ispecs is not None or std_ispecs is not None)):
2840 raise errors.OpPrereqError("A --specs-xxx option cannot be specified"
2841 " together with any --ipolicy-xxx-specs option",
2842 errors.ECODE_INVAL)
2843
2844 ipolicy_out = objects.MakeEmptyIPolicy()
2845 if split_specs:
2846 assert fill_all
2847 _InitISpecsFromSplitOpts(ipolicy_out, ispecs_mem_size, ispecs_cpu_count,
2848 ispecs_disk_count, ispecs_disk_size,
2849 ispecs_nic_count, group_ipolicy, fill_all)
2850 elif (minmax_ispecs is not None or std_ispecs is not None):
2851 _InitISpecsFromFullOpts(ipolicy_out, minmax_ispecs, std_ispecs,
2852 group_ipolicy, allowed_values)
2853
2854 if ipolicy_disk_templates is not None:
2855 if allowed_values and ipolicy_disk_templates in allowed_values:
2856 ipolicy_out[constants.IPOLICY_DTS] = ipolicy_disk_templates
2857 else:
2858 ipolicy_out[constants.IPOLICY_DTS] = list(ipolicy_disk_templates)
2859 if ipolicy_vcpu_ratio is not None:
2860 ipolicy_out[constants.IPOLICY_VCPU_RATIO] = ipolicy_vcpu_ratio
2861 if ipolicy_spindle_ratio is not None:
2862 ipolicy_out[constants.IPOLICY_SPINDLE_RATIO] = ipolicy_spindle_ratio
2863
2864 assert not (frozenset(ipolicy_out.keys()) - constants.IPOLICY_ALL_KEYS)
2865
2866 if not group_ipolicy and fill_all:
2867 ipolicy_out = objects.FillIPolicy(constants.IPOLICY_DEFAULTS, ipolicy_out)
2868
2869 return ipolicy_out
2870
2871
2872 def _NotAContainer(data):
2873 """ Checks whether the input is not a container data type.
2874
2875 @rtype: bool
2876
2877 """
2878 return not (isinstance(data, (list, dict, tuple)))
2879
2880
2881 def _GetAlignmentMapping(data):
2882 """ Returns info about alignment if present in an encoded ordered dictionary.
2883
2884 @type data: list of tuple
2885 @param data: The encoded ordered dictionary, as defined in
2886 L{_SerializeGenericInfo}.
2887 @rtype: dict of any to int
2888 @return: The dictionary mapping alignment groups to the maximum length of the
2889 dictionary key found in the group.
2890
2891 """
2892 alignment_map = {}
2893 for entry in data:
2894 if len(entry) > 2:
2895 group_key = entry[2]
2896 key_length = len(entry[0])
2897 if group_key in alignment_map:
2898 alignment_map[group_key] = max(alignment_map[group_key], key_length)
2899 else:
2900 alignment_map[group_key] = key_length
2901
2902 return alignment_map
2903
2904
2905 def _SerializeGenericInfo(buf, data, level, afterkey=False):
2906 """Formatting core of L{PrintGenericInfo}.
2907
2908 @param buf: (string) stream to accumulate the result into
2909 @param data: data to format
2910 @type level: int
2911 @param level: depth in the data hierarchy, used for indenting
2912 @type afterkey: bool
2913 @param afterkey: True when we are in the middle of a line after a key (used
2914 to properly add newlines or indentation)
2915
2916 """
2917 baseind = " "
2918 if isinstance(data, dict):
2919 if not data:
2920 buf.write("\n")
2921 else:
2922 if afterkey:
2923 buf.write("\n")
2924 doindent = True
2925 else:
2926 doindent = False
2927 for key in sorted(data):
2928 if doindent:
2929 buf.write(baseind * level)
2930 else:
2931 doindent = True
2932 buf.write(key)
2933 buf.write(": ")
2934 _SerializeGenericInfo(buf, data[key], level + 1, afterkey=True)
2935 elif isinstance(data, list) and len(data) > 0 and isinstance(data[0], tuple):
2936 # list of tuples (an ordered dictionary)
2937 # the tuples may have two or three members - key, value, and alignment group
2938 # if the alignment group is present, align all values sharing the same group
2939 if afterkey:
2940 buf.write("\n")
2941 doindent = True
2942 else:
2943 doindent = False
2944
2945 alignment_mapping = _GetAlignmentMapping(data)
2946 for entry in data:
2947 key, val = entry[0:2]
2948 if doindent:
2949 buf.write(baseind * level)
2950 else:
2951 doindent = True
2952 buf.write(key)
2953 buf.write(": ")
2954 if len(entry) > 2:
2955 max_key_length = alignment_mapping[entry[2]]
2956 buf.write(" " * (max_key_length - len(key)))
2957 _SerializeGenericInfo(buf, val, level + 1, afterkey=True)
2958 elif isinstance(data, tuple) and all(map(_NotAContainer, data)):
2959 # tuples with simple content are serialized as inline lists
2960 buf.write("[%s]\n" % utils.CommaJoin(data))
2961 elif isinstance(data, list) or isinstance(data, tuple):
2962 # lists and tuples
2963 if not data:
2964 buf.write("\n")
2965 else:
2966 if afterkey:
2967 buf.write("\n")
2968 doindent = True
2969 else:
2970 doindent = False
2971 for item in data:
2972 if doindent:
2973 buf.write(baseind * level)
2974 else:
2975 doindent = True
2976 buf.write("-")
2977 buf.write(baseind[1:])
2978 _SerializeGenericInfo(buf, item, level + 1)
2979 else:
2980 # This branch should be only taken for strings, but it's practically
2981 # impossible to guarantee that no other types are produced somewhere
2982 buf.write(str(data))
2983 buf.write("\n")
2984
2985
2986 def PrintGenericInfo(data):
2987 """Print information formatted according to the hierarchy.
2988
2989 The output is a valid YAML string.
2990
2991 @param data: the data to print. It's a hierarchical structure whose elements
2992 can be:
2993 - dictionaries, where keys are strings and values are of any of the
2994 types listed here
2995 - lists of tuples (key, value) or (key, value, alignment_group), where
2996 key is a string, value is of any of the types listed here, and
2997 alignment_group can be any hashable value; it's a way to encode
2998 ordered dictionaries; any entries sharing the same alignment group are
2999 aligned by appending whitespace before the value as needed
3000 - lists of any of the types listed here
3001 - strings
3002
3003 """
3004 buf = StringIO()
3005 _SerializeGenericInfo(buf, data, 0)
3006 ToStdout(buf.getvalue().rstrip("\n"))