5d2fb6c7f653e1d73d340e71f4fb55e549ceb574
[ganeti-github.git] / lib / cli.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Google Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions are
9 # met:
10 #
11 # 1. Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
13 #
14 # 2. Redistributions in binary form must reproduce the above copyright
15 # notice, this list of conditions and the following disclaimer in the
16 # documentation and/or other materials provided with the distribution.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
19 # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
20 # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30
31 """Module dealing with command line parsing"""
32
33
34 import sys
35 import textwrap
36 import os.path
37 import time
38 import logging
39 import errno
40 import itertools
41 import shlex
42 from cStringIO import StringIO
43
44 from ganeti import utils
45 from ganeti import errors
46 from ganeti import constants
47 from ganeti import opcodes
48 import ganeti.rpc.errors as rpcerr
49 import ganeti.rpc.node as rpc
50 from ganeti import ssh
51 from ganeti import compat
52 from ganeti import netutils
53 from ganeti import qlang
54 from ganeti import objects
55 from ganeti import pathutils
56 from ganeti import serializer
57 import ganeti.cli_opts
58 # Import constants
59 from ganeti.cli_opts import * # pylint: disable=W0401
60
61 from ganeti.runtime import (GetClient)
62
63 from optparse import (OptionParser, TitledHelpFormatter)
64
65
66 __all__ = [
67 # Generic functions for CLI programs
68 "ConfirmOperation",
69 "CreateIPolicyFromOpts",
70 "GenericMain",
71 "GenericInstanceCreate",
72 "GenericList",
73 "GenericListFields",
74 "GetClient",
75 "GetOnlineNodes",
76 "GetNodesSshPorts",
77 "GetNodeUUIDs",
78 "JobExecutor",
79 "JobSubmittedException",
80 "ParseTimespec",
81 "RunWhileClusterStopped",
82 "RunWhileDaemonsStopped",
83 "SubmitOpCode",
84 "SubmitOpCodeToDrainedQueue",
85 "SubmitOrSend",
86 "UsesRPC",
87 # Formatting functions
88 "ToStderr", "ToStdout",
89 "FormatError",
90 "FormatQueryResult",
91 "FormatParamsDictInfo",
92 "FormatPolicyInfo",
93 "PrintIPolicyCommand",
94 "PrintGenericInfo",
95 "GenerateTable",
96 "AskUser",
97 "FormatTimestamp",
98 "FormatLogMessage",
99 # Tags functions
100 "ListTags",
101 "AddTags",
102 "RemoveTags",
103 # command line options support infrastructure
104 "ARGS_MANY_INSTANCES",
105 "ARGS_MANY_NODES",
106 "ARGS_MANY_GROUPS",
107 "ARGS_MANY_NETWORKS",
108 "ARGS_MANY_FILTERS",
109 "ARGS_NONE",
110 "ARGS_ONE_INSTANCE",
111 "ARGS_ONE_NODE",
112 "ARGS_ONE_GROUP",
113 "ARGS_ONE_OS",
114 "ARGS_ONE_NETWORK",
115 "ARGS_ONE_FILTER",
116 "ArgChoice",
117 "ArgCommand",
118 "ArgFile",
119 "ArgGroup",
120 "ArgHost",
121 "ArgInstance",
122 "ArgJobId",
123 "ArgNetwork",
124 "ArgNode",
125 "ArgOs",
126 "ArgExtStorage",
127 "ArgFilter",
128 "ArgSuggest",
129 "ArgUnknown",
130 "FixHvParams",
131 "SplitNodeOption",
132 "CalculateOSNames",
133 "ParseFields",
134 ] + ganeti.cli_opts.__all__ # Command line options
135
136 # Query result status for clients
137 (QR_NORMAL,
138 QR_UNKNOWN,
139 QR_INCOMPLETE) = range(3)
140
141 #: Maximum batch size for ChooseJob
142 _CHOOSE_BATCH = 25
143
144
145 # constants used to create InstancePolicy dictionary
146 TISPECS_GROUP_TYPES = {
147 constants.ISPECS_MIN: constants.VTYPE_INT,
148 constants.ISPECS_MAX: constants.VTYPE_INT,
149 }
150
151 TISPECS_CLUSTER_TYPES = {
152 constants.ISPECS_MIN: constants.VTYPE_INT,
153 constants.ISPECS_MAX: constants.VTYPE_INT,
154 constants.ISPECS_STD: constants.VTYPE_INT,
155 }
156
157 #: User-friendly names for query2 field types
158 _QFT_NAMES = {
159 constants.QFT_UNKNOWN: "Unknown",
160 constants.QFT_TEXT: "Text",
161 constants.QFT_BOOL: "Boolean",
162 constants.QFT_NUMBER: "Number",
163 constants.QFT_NUMBER_FLOAT: "Floating-point number",
164 constants.QFT_UNIT: "Storage size",
165 constants.QFT_TIMESTAMP: "Timestamp",
166 constants.QFT_OTHER: "Custom",
167 }
168
169
170 class _Argument(object):
171 def __init__(self, min=0, max=None): # pylint: disable=W0622
172 self.min = min
173 self.max = max
174
175 def __repr__(self):
176 return ("<%s min=%s max=%s>" %
177 (self.__class__.__name__, self.min, self.max))
178
179
180 class ArgSuggest(_Argument):
181 """Suggesting argument.
182
183 Value can be any of the ones passed to the constructor.
184
185 """
186 # pylint: disable=W0622
187 def __init__(self, min=0, max=None, choices=None):
188 _Argument.__init__(self, min=min, max=max)
189 self.choices = choices
190
191 def __repr__(self):
192 return ("<%s min=%s max=%s choices=%r>" %
193 (self.__class__.__name__, self.min, self.max, self.choices))
194
195
196 class ArgChoice(ArgSuggest):
197 """Choice argument.
198
199 Value can be any of the ones passed to the constructor. Like L{ArgSuggest},
200 but value must be one of the choices.
201
202 """
203
204
205 class ArgUnknown(_Argument):
206 """Unknown argument to program (e.g. determined at runtime).
207
208 """
209
210
211 class ArgInstance(_Argument):
212 """Instances argument.
213
214 """
215
216
217 class ArgNode(_Argument):
218 """Node argument.
219
220 """
221
222
223 class ArgNetwork(_Argument):
224 """Network argument.
225
226 """
227
228
229 class ArgGroup(_Argument):
230 """Node group argument.
231
232 """
233
234
235 class ArgJobId(_Argument):
236 """Job ID argument.
237
238 """
239
240
241 class ArgFile(_Argument):
242 """File path argument.
243
244 """
245
246
247 class ArgCommand(_Argument):
248 """Command argument.
249
250 """
251
252
253 class ArgHost(_Argument):
254 """Host argument.
255
256 """
257
258
259 class ArgOs(_Argument):
260 """OS argument.
261
262 """
263
264
265 class ArgExtStorage(_Argument):
266 """ExtStorage argument.
267
268 """
269
270
271 class ArgFilter(_Argument):
272 """Filter UUID argument.
273
274 """
275
276
277 ARGS_NONE = []
278 ARGS_MANY_INSTANCES = [ArgInstance()]
279 ARGS_MANY_NETWORKS = [ArgNetwork()]
280 ARGS_MANY_NODES = [ArgNode()]
281 ARGS_MANY_GROUPS = [ArgGroup()]
282 ARGS_MANY_FILTERS = [ArgFilter()]
283 ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)]
284 ARGS_ONE_NETWORK = [ArgNetwork(min=1, max=1)]
285 ARGS_ONE_NODE = [ArgNode(min=1, max=1)]
286 ARGS_ONE_GROUP = [ArgGroup(min=1, max=1)]
287 ARGS_ONE_OS = [ArgOs(min=1, max=1)]
288 ARGS_ONE_FILTER = [ArgFilter(min=1, max=1)]
289
290
291 def _ExtractTagsObject(opts, args):
292 """Extract the tag type object.
293
294 Note that this function will modify its args parameter.
295
296 """
297 if not hasattr(opts, "tag_type"):
298 raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
299 kind = opts.tag_type
300 if kind == constants.TAG_CLUSTER:
301 retval = kind, ""
302 elif kind in (constants.TAG_NODEGROUP,
303 constants.TAG_NODE,
304 constants.TAG_NETWORK,
305 constants.TAG_INSTANCE):
306 if not args:
307 raise errors.OpPrereqError("no arguments passed to the command",
308 errors.ECODE_INVAL)
309 name = args.pop(0)
310 retval = kind, name
311 else:
312 raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
313 return retval
314
315
316 def _ExtendTags(opts, args):
317 """Extend the args if a source file has been given.
318
319 This function will extend the tags with the contents of the file
320 passed in the 'tags_source' attribute of the opts parameter. A file
321 named '-' will be replaced by stdin.
322
323 """
324 fname = opts.tags_source
325 if fname is None:
326 return
327 if fname == "-":
328 new_fh = sys.stdin
329 else:
330 new_fh = open(fname, "r")
331 new_data = []
332 try:
333 # we don't use the nice 'new_data = [line.strip() for line in fh]'
334 # because of python bug 1633941
335 while True:
336 line = new_fh.readline()
337 if not line:
338 break
339 new_data.append(line.strip())
340 finally:
341 new_fh.close()
342 args.extend(new_data)
343
344
345 def ListTags(opts, args):
346 """List the tags on a given object.
347
348 This is a generic implementation that knows how to deal with all
349 three cases of tag objects (cluster, node, instance). The opts
350 argument is expected to contain a tag_type field denoting what
351 object type we work on.
352
353 """
354 kind, name = _ExtractTagsObject(opts, args)
355 cl = GetClient()
356 result = cl.QueryTags(kind, name)
357 result = list(result)
358 result.sort()
359 for tag in result:
360 ToStdout(tag)
361
362
363 def AddTags(opts, args):
364 """Add tags on a given object.
365
366 This is a generic implementation that knows how to deal with all
367 three cases of tag objects (cluster, node, instance). The opts
368 argument is expected to contain a tag_type field denoting what
369 object type we work on.
370
371 """
372 kind, name = _ExtractTagsObject(opts, args)
373 _ExtendTags(opts, args)
374 if not args:
375 raise errors.OpPrereqError("No tags to be added", errors.ECODE_INVAL)
376 op = opcodes.OpTagsSet(kind=kind, name=name, tags=args)
377 SubmitOrSend(op, opts)
378
379
380 def RemoveTags(opts, args):
381 """Remove tags from a given object.
382
383 This is a generic implementation that knows how to deal with all
384 three cases of tag objects (cluster, node, instance). The opts
385 argument is expected to contain a tag_type field denoting what
386 object type we work on.
387
388 """
389 kind, name = _ExtractTagsObject(opts, args)
390 _ExtendTags(opts, args)
391 if not args:
392 raise errors.OpPrereqError("No tags to be removed", errors.ECODE_INVAL)
393 op = opcodes.OpTagsDel(kind=kind, name=name, tags=args)
394 SubmitOrSend(op, opts)
395
396
397 class _ShowUsage(Exception):
398 """Exception class for L{_ParseArgs}.
399
400 """
401 def __init__(self, exit_error):
402 """Initializes instances of this class.
403
404 @type exit_error: bool
405 @param exit_error: Whether to report failure on exit
406
407 """
408 Exception.__init__(self)
409 self.exit_error = exit_error
410
411
412 class _ShowVersion(Exception):
413 """Exception class for L{_ParseArgs}.
414
415 """
416
417
418 def _ParseArgs(binary, argv, commands, aliases, env_override):
419 """Parser for the command line arguments.
420
421 This function parses the arguments and returns the function which
422 must be executed together with its (modified) arguments.
423
424 @param binary: Script name
425 @param argv: Command line arguments
426 @param commands: Dictionary containing command definitions
427 @param aliases: dictionary with command aliases {"alias": "target", ...}
428 @param env_override: list of env variables allowed for default args
429 @raise _ShowUsage: If usage description should be shown
430 @raise _ShowVersion: If version should be shown
431
432 """
433 assert not (env_override - set(commands))
434 assert not (set(aliases.keys()) & set(commands.keys()))
435
436 if len(argv) > 1:
437 cmd = argv[1]
438 else:
439 # No option or command given
440 raise _ShowUsage(exit_error=True)
441
442 if cmd == "--version":
443 raise _ShowVersion()
444 elif cmd == "--help":
445 raise _ShowUsage(exit_error=False)
446 elif not (cmd in commands or cmd in aliases):
447 raise _ShowUsage(exit_error=True)
448
449 # get command, unalias it, and look it up in commands
450 if cmd in aliases:
451 if aliases[cmd] not in commands:
452 raise errors.ProgrammerError("Alias '%s' maps to non-existing"
453 " command '%s'" % (cmd, aliases[cmd]))
454
455 cmd = aliases[cmd]
456
457 if cmd in env_override:
458 args_env_name = ("%s_%s" % (binary.replace("-", "_"), cmd)).upper()
459 env_args = os.environ.get(args_env_name)
460 if env_args:
461 argv = utils.InsertAtPos(argv, 2, shlex.split(env_args))
462
463 func, args_def, parser_opts, usage, description = commands[cmd]
464 parser = OptionParser(option_list=parser_opts + COMMON_OPTS,
465 description=description,
466 formatter=TitledHelpFormatter(),
467 usage="%%prog %s %s" % (cmd, usage))
468 parser.disable_interspersed_args()
469 options, args = parser.parse_args(args=argv[2:])
470
471 if not _CheckArguments(cmd, args_def, args):
472 return None, None, None
473
474 return func, options, args
475
476
477 def _FormatUsage(binary, commands):
478 """Generates a nice description of all commands.
479
480 @param binary: Script name
481 @param commands: Dictionary containing command definitions
482
483 """
484 # compute the max line length for cmd + usage
485 mlen = min(60, max(map(len, commands)))
486
487 yield "Usage: %s {command} [options...] [argument...]" % binary
488 yield "%s <command> --help to see details, or man %s" % (binary, binary)
489 yield ""
490 yield "Commands:"
491
492 # and format a nice command list
493 for (cmd, (_, _, _, _, help_text)) in sorted(commands.items()):
494 help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
495 yield " %-*s - %s" % (mlen, cmd, help_lines.pop(0))
496 for line in help_lines:
497 yield " %-*s %s" % (mlen, "", line)
498
499 yield ""
500
501
502 def _CheckArguments(cmd, args_def, args):
503 """Verifies the arguments using the argument definition.
504
505 Algorithm:
506
507 1. Abort with error if values specified by user but none expected.
508
509 1. For each argument in definition
510
511 1. Keep running count of minimum number of values (min_count)
512 1. Keep running count of maximum number of values (max_count)
513 1. If it has an unlimited number of values
514
515 1. Abort with error if it's not the last argument in the definition
516
517 1. If last argument has limited number of values
518
519 1. Abort with error if number of values doesn't match or is too large
520
521 1. Abort with error if user didn't pass enough values (min_count)
522
523 """
524 if args and not args_def:
525 ToStderr("Error: Command %s expects no arguments", cmd)
526 return False
527
528 min_count = None
529 max_count = None
530 check_max = None
531
532 last_idx = len(args_def) - 1
533
534 for idx, arg in enumerate(args_def):
535 if min_count is None:
536 min_count = arg.min
537 elif arg.min is not None:
538 min_count += arg.min
539
540 if max_count is None:
541 max_count = arg.max
542 elif arg.max is not None:
543 max_count += arg.max
544
545 if idx == last_idx:
546 check_max = (arg.max is not None)
547
548 elif arg.max is None:
549 raise errors.ProgrammerError("Only the last argument can have max=None")
550
551 if check_max:
552 # Command with exact number of arguments
553 if (min_count is not None and max_count is not None and
554 min_count == max_count and len(args) != min_count):
555 ToStderr("Error: Command %s expects %d argument(s)", cmd, min_count)
556 return False
557
558 # Command with limited number of arguments
559 if max_count is not None and len(args) > max_count:
560 ToStderr("Error: Command %s expects only %d argument(s)",
561 cmd, max_count)
562 return False
563
564 # Command with some required arguments
565 if min_count is not None and len(args) < min_count:
566 ToStderr("Error: Command %s expects at least %d argument(s)",
567 cmd, min_count)
568 return False
569
570 return True
571
572
573 def SplitNodeOption(value):
574 """Splits the value of a --node option.
575
576 """
577 if value and ":" in value:
578 return value.split(":", 1)
579 else:
580 return (value, None)
581
582
583 def CalculateOSNames(os_name, os_variants):
584 """Calculates all the names an OS can be called, according to its variants.
585
586 @type os_name: string
587 @param os_name: base name of the os
588 @type os_variants: list or None
589 @param os_variants: list of supported variants
590 @rtype: list
591 @return: list of valid names
592
593 """
594 if os_variants:
595 return ["%s+%s" % (os_name, v) for v in os_variants]
596 else:
597 return [os_name]
598
599
600 def ParseFields(selected, default):
601 """Parses the values of "--field"-like options.
602
603 @type selected: string or None
604 @param selected: User-selected options
605 @type default: list
606 @param default: Default fields
607
608 """
609 if selected is None:
610 return default
611
612 if selected.startswith("+"):
613 return default + selected[1:].split(",")
614
615 return selected.split(",")
616
617
618 UsesRPC = rpc.RunWithRPC
619
620
621 def AskUser(text, choices=None):
622 """Ask the user a question.
623
624 @param text: the question to ask
625
626 @param choices: list with elements tuples (input_char, return_value,
627 description); if not given, it will default to: [('y', True,
628 'Perform the operation'), ('n', False, 'Do no do the operation')];
629 note that the '?' char is reserved for help
630
631 @return: one of the return values from the choices list; if input is
632 not possible (i.e. not running with a tty, we return the last
633 entry from the list
634
635 """
636 if choices is None:
637 choices = [("y", True, "Perform the operation"),
638 ("n", False, "Do not perform the operation")]
639 if not choices or not isinstance(choices, list):
640 raise errors.ProgrammerError("Invalid choices argument to AskUser")
641 for entry in choices:
642 if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == "?":
643 raise errors.ProgrammerError("Invalid choices element to AskUser")
644
645 answer = choices[-1][1]
646 new_text = []
647 for line in text.splitlines():
648 new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
649 text = "\n".join(new_text)
650 try:
651 f = file("/dev/tty", "a+")
652 except IOError:
653 return answer
654 try:
655 chars = [entry[0] for entry in choices]
656 chars[-1] = "[%s]" % chars[-1]
657 chars.append("?")
658 maps = dict([(entry[0], entry[1]) for entry in choices])
659 while True:
660 f.write(text)
661 f.write("\n")
662 f.write("/".join(chars))
663 f.write(": ")
664 line = f.readline(2).strip().lower()
665 if line in maps:
666 answer = maps[line]
667 break
668 elif line == "?":
669 for entry in choices:
670 f.write(" %s - %s\n" % (entry[0], entry[2]))
671 f.write("\n")
672 continue
673 finally:
674 f.close()
675 return answer
676
677
678 class JobSubmittedException(Exception):
679 """Job was submitted, client should exit.
680
681 This exception has one argument, the ID of the job that was
682 submitted. The handler should print this ID.
683
684 This is not an error, just a structured way to exit from clients.
685
686 """
687
688
689 def SendJob(ops, cl=None):
690 """Function to submit an opcode without waiting for the results.
691
692 @type ops: list
693 @param ops: list of opcodes
694 @type cl: luxi.Client
695 @param cl: the luxi client to use for communicating with the master;
696 if None, a new client will be created
697
698 """
699 if cl is None:
700 cl = GetClient()
701
702 job_id = cl.SubmitJob(ops)
703
704 return job_id
705
706
707 def GenericPollJob(job_id, cbs, report_cbs):
708 """Generic job-polling function.
709
710 @type job_id: number
711 @param job_id: Job ID
712 @type cbs: Instance of L{JobPollCbBase}
713 @param cbs: Data callbacks
714 @type report_cbs: Instance of L{JobPollReportCbBase}
715 @param report_cbs: Reporting callbacks
716
717 """
718 prev_job_info = None
719 prev_logmsg_serial = None
720
721 status = None
722
723 while True:
724 result = cbs.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
725 prev_logmsg_serial)
726 if not result:
727 # job not found, go away!
728 raise errors.JobLost("Job with id %s lost" % job_id)
729
730 if result == constants.JOB_NOTCHANGED:
731 report_cbs.ReportNotChanged(job_id, status)
732
733 # Wait again
734 continue
735
736 # Split result, a tuple of (field values, log entries)
737 (job_info, log_entries) = result
738 (status, ) = job_info
739
740 if log_entries:
741 for log_entry in log_entries:
742 (serial, timestamp, log_type, message) = log_entry
743 report_cbs.ReportLogMessage(job_id, serial, timestamp,
744 log_type, message)
745 prev_logmsg_serial = max(prev_logmsg_serial, serial)
746
747 # TODO: Handle canceled and archived jobs
748 elif status in (constants.JOB_STATUS_SUCCESS,
749 constants.JOB_STATUS_ERROR,
750 constants.JOB_STATUS_CANCELING,
751 constants.JOB_STATUS_CANCELED):
752 break
753
754 prev_job_info = job_info
755
756 jobs = cbs.QueryJobs([job_id], ["status", "opstatus", "opresult"])
757 if not jobs:
758 raise errors.JobLost("Job with id %s lost" % job_id)
759
760 status, opstatus, result = jobs[0]
761
762 if status == constants.JOB_STATUS_SUCCESS:
763 return result
764
765 if status in (constants.JOB_STATUS_CANCELING, constants.JOB_STATUS_CANCELED):
766 raise errors.OpExecError("Job was canceled")
767
768 has_ok = False
769 for idx, (status, msg) in enumerate(zip(opstatus, result)):
770 if status == constants.OP_STATUS_SUCCESS:
771 has_ok = True
772 elif status == constants.OP_STATUS_ERROR:
773 errors.MaybeRaise(msg)
774
775 if has_ok:
776 raise errors.OpExecError("partial failure (opcode %d): %s" %
777 (idx, msg))
778
779 raise errors.OpExecError(str(msg))
780
781 # default failure mode
782 raise errors.OpExecError(result)
783
784
785 class JobPollCbBase(object):
786 """Base class for L{GenericPollJob} callbacks.
787
788 """
789 def __init__(self):
790 """Initializes this class.
791
792 """
793
794 def WaitForJobChangeOnce(self, job_id, fields,
795 prev_job_info, prev_log_serial):
796 """Waits for changes on a job.
797
798 """
799 raise NotImplementedError()
800
801 def QueryJobs(self, job_ids, fields):
802 """Returns the selected fields for the selected job IDs.
803
804 @type job_ids: list of numbers
805 @param job_ids: Job IDs
806 @type fields: list of strings
807 @param fields: Fields
808
809 """
810 raise NotImplementedError()
811
812
813 class JobPollReportCbBase(object):
814 """Base class for L{GenericPollJob} reporting callbacks.
815
816 """
817 def __init__(self):
818 """Initializes this class.
819
820 """
821
822 def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
823 """Handles a log message.
824
825 """
826 raise NotImplementedError()
827
828 def ReportNotChanged(self, job_id, status):
829 """Called for if a job hasn't changed in a while.
830
831 @type job_id: number
832 @param job_id: Job ID
833 @type status: string or None
834 @param status: Job status if available
835
836 """
837 raise NotImplementedError()
838
839
840 class _LuxiJobPollCb(JobPollCbBase):
841 def __init__(self, cl):
842 """Initializes this class.
843
844 """
845 JobPollCbBase.__init__(self)
846 self.cl = cl
847
848 def WaitForJobChangeOnce(self, job_id, fields,
849 prev_job_info, prev_log_serial):
850 """Waits for changes on a job.
851
852 """
853 return self.cl.WaitForJobChangeOnce(job_id, fields,
854 prev_job_info, prev_log_serial)
855
856 def QueryJobs(self, job_ids, fields):
857 """Returns the selected fields for the selected job IDs.
858
859 """
860 return self.cl.QueryJobs(job_ids, fields)
861
862
863 class FeedbackFnJobPollReportCb(JobPollReportCbBase):
864 def __init__(self, feedback_fn):
865 """Initializes this class.
866
867 """
868 JobPollReportCbBase.__init__(self)
869
870 self.feedback_fn = feedback_fn
871
872 assert callable(feedback_fn)
873
874 def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
875 """Handles a log message.
876
877 """
878 self.feedback_fn((timestamp, log_type, log_msg))
879
880 def ReportNotChanged(self, job_id, status):
881 """Called if a job hasn't changed in a while.
882
883 """
884 # Ignore
885
886
887 class StdioJobPollReportCb(JobPollReportCbBase):
888 def __init__(self):
889 """Initializes this class.
890
891 """
892 JobPollReportCbBase.__init__(self)
893
894 self.notified_queued = False
895 self.notified_waitlock = False
896
897 def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
898 """Handles a log message.
899
900 """
901 ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)),
902 FormatLogMessage(log_type, log_msg))
903
904 def ReportNotChanged(self, job_id, status):
905 """Called if a job hasn't changed in a while.
906
907 """
908 if status is None:
909 return
910
911 if status == constants.JOB_STATUS_QUEUED and not self.notified_queued:
912 ToStderr("Job %s is waiting in queue", job_id)
913 self.notified_queued = True
914
915 elif status == constants.JOB_STATUS_WAITING and not self.notified_waitlock:
916 ToStderr("Job %s is trying to acquire all necessary locks", job_id)
917 self.notified_waitlock = True
918
919
920 def FormatLogMessage(log_type, log_msg):
921 """Formats a job message according to its type.
922
923 """
924 if log_type != constants.ELOG_MESSAGE:
925 log_msg = str(log_msg)
926
927 return utils.SafeEncode(log_msg)
928
929
930 def PollJob(job_id, cl=None, feedback_fn=None, reporter=None):
931 """Function to poll for the result of a job.
932
933 @type job_id: job identified
934 @param job_id: the job to poll for results
935 @type cl: luxi.Client
936 @param cl: the luxi client to use for communicating with the master;
937 if None, a new client will be created
938
939 """
940 if cl is None:
941 cl = GetClient()
942
943 if reporter is None:
944 if feedback_fn:
945 reporter = FeedbackFnJobPollReportCb(feedback_fn)
946 else:
947 reporter = StdioJobPollReportCb()
948 elif feedback_fn:
949 raise errors.ProgrammerError("Can't specify reporter and feedback function")
950
951 return GenericPollJob(job_id, _LuxiJobPollCb(cl), reporter)
952
953
954 def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None, reporter=None):
955 """Legacy function to submit an opcode.
956
957 This is just a simple wrapper over the construction of the processor
958 instance. It should be extended to better handle feedback and
959 interaction functions.
960
961 """
962 if cl is None:
963 cl = GetClient()
964
965 SetGenericOpcodeOpts([op], opts)
966
967 job_id = SendJob([op], cl=cl)
968 if hasattr(opts, "print_jobid") and opts.print_jobid:
969 ToStdout("%d" % job_id)
970
971 op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn,
972 reporter=reporter)
973
974 return op_results[0]
975
976
977 def SubmitOpCodeToDrainedQueue(op):
978 """Forcefully insert a job in the queue, even if it is drained.
979
980 """
981 cl = GetClient()
982 job_id = cl.SubmitJobToDrainedQueue([op])
983 op_results = PollJob(job_id, cl=cl)
984 return op_results[0]
985
986
987 def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
988 """Wrapper around SubmitOpCode or SendJob.
989
990 This function will decide, based on the 'opts' parameter, whether to
991 submit and wait for the result of the opcode (and return it), or
992 whether to just send the job and print its identifier. It is used in
993 order to simplify the implementation of the '--submit' option.
994
995 It will also process the opcodes if we're sending the via SendJob
996 (otherwise SubmitOpCode does it).
997
998 """
999 if opts and opts.submit_only:
1000 job = [op]
1001 SetGenericOpcodeOpts(job, opts)
1002 job_id = SendJob(job, cl=cl)
1003 if opts.print_jobid:
1004 ToStdout("%d" % job_id)
1005 raise JobSubmittedException(job_id)
1006 else:
1007 return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn, opts=opts)
1008
1009
1010 def _InitReasonTrail(op, opts):
1011 """Builds the first part of the reason trail
1012
1013 Builds the initial part of the reason trail, adding the user provided reason
1014 (if it exists) and the name of the command starting the operation.
1015
1016 @param op: the opcode the reason trail will be added to
1017 @param opts: the command line options selected by the user
1018
1019 """
1020 assert len(sys.argv) >= 2
1021 trail = []
1022
1023 if opts.reason:
1024 trail.append((constants.OPCODE_REASON_SRC_USER,
1025 opts.reason,
1026 utils.EpochNano()))
1027
1028 binary = os.path.basename(sys.argv[0])
1029 source = "%s:%s" % (constants.OPCODE_REASON_SRC_CLIENT, binary)
1030 command = sys.argv[1]
1031 trail.append((source, command, utils.EpochNano()))
1032 op.reason = trail
1033
1034
1035 def SetGenericOpcodeOpts(opcode_list, options):
1036 """Processor for generic options.
1037
1038 This function updates the given opcodes based on generic command
1039 line options (like debug, dry-run, etc.).
1040
1041 @param opcode_list: list of opcodes
1042 @param options: command line options or None
1043 @return: None (in-place modification)
1044
1045 """
1046 if not options:
1047 return
1048 for op in opcode_list:
1049 op.debug_level = options.debug
1050 if hasattr(options, "dry_run"):
1051 op.dry_run = options.dry_run
1052 if getattr(options, "priority", None) is not None:
1053 op.priority = options.priority
1054 _InitReasonTrail(op, options)
1055
1056
1057 def FormatError(err):
1058 """Return a formatted error message for a given error.
1059
1060 This function takes an exception instance and returns a tuple
1061 consisting of two values: first, the recommended exit code, and
1062 second, a string describing the error message (not
1063 newline-terminated).
1064
1065 """
1066 retcode = 1
1067 obuf = StringIO()
1068 msg = str(err)
1069 if isinstance(err, errors.ConfigurationError):
1070 txt = "Corrupt configuration file: %s" % msg
1071 logging.error(txt)
1072 obuf.write(txt + "\n")
1073 obuf.write("Aborting.")
1074 retcode = 2
1075 elif isinstance(err, errors.HooksAbort):
1076 obuf.write("Failure: hooks execution failed:\n")
1077 for node, script, out in err.args[0]:
1078 if out:
1079 obuf.write(" node: %s, script: %s, output: %s\n" %
1080 (node, script, out))
1081 else:
1082 obuf.write(" node: %s, script: %s (no output)\n" %
1083 (node, script))
1084 elif isinstance(err, errors.HooksFailure):
1085 obuf.write("Failure: hooks general failure: %s" % msg)
1086 elif isinstance(err, errors.ResolverError):
1087 this_host = netutils.Hostname.GetSysName()
1088 if err.args[0] == this_host:
1089 msg = "Failure: can't resolve my own hostname ('%s')"
1090 else:
1091 msg = "Failure: can't resolve hostname '%s'"
1092 obuf.write(msg % err.args[0])
1093 elif isinstance(err, errors.OpPrereqError):
1094 if len(err.args) == 2:
1095 obuf.write("Failure: prerequisites not met for this"
1096 " operation:\nerror type: %s, error details:\n%s" %
1097 (err.args[1], err.args[0]))
1098 else:
1099 obuf.write("Failure: prerequisites not met for this"
1100 " operation:\n%s" % msg)
1101 elif isinstance(err, errors.OpExecError):
1102 obuf.write("Failure: command execution error:\n%s" % msg)
1103 elif isinstance(err, errors.TagError):
1104 obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
1105 elif isinstance(err, errors.JobQueueDrainError):
1106 obuf.write("Failure: the job queue is marked for drain and doesn't"
1107 " accept new requests\n")
1108 elif isinstance(err, errors.JobQueueFull):
1109 obuf.write("Failure: the job queue is full and doesn't accept new"
1110 " job submissions until old jobs are archived\n")
1111 elif isinstance(err, errors.TypeEnforcementError):
1112 obuf.write("Parameter Error: %s" % msg)
1113 elif isinstance(err, errors.ParameterError):
1114 obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
1115 elif isinstance(err, rpcerr.NoMasterError):
1116 if err.args[0] == pathutils.MASTER_SOCKET:
1117 daemon = "the master daemon"
1118 elif err.args[0] == pathutils.QUERY_SOCKET:
1119 daemon = "the config daemon"
1120 else:
1121 daemon = "socket '%s'" % str(err.args[0])
1122 obuf.write("Cannot communicate with %s.\nIs the process running"
1123 " and listening for connections?" % daemon)
1124 elif isinstance(err, rpcerr.TimeoutError):
1125 obuf.write("Timeout while talking to the master daemon. Jobs might have"
1126 " been submitted and will continue to run even if the call"
1127 " timed out. Useful commands in this situation are \"gnt-job"
1128 " list\", \"gnt-job cancel\" and \"gnt-job watch\". Error:\n")
1129 obuf.write(msg)
1130 elif isinstance(err, rpcerr.PermissionError):
1131 obuf.write("It seems you don't have permissions to connect to the"
1132 " master daemon.\nPlease retry as a different user.")
1133 elif isinstance(err, rpcerr.ProtocolError):
1134 obuf.write("Unhandled protocol error while talking to the master daemon:\n"
1135 "%s" % msg)
1136 elif isinstance(err, errors.JobLost):
1137 obuf.write("Error checking job status: %s" % msg)
1138 elif isinstance(err, errors.QueryFilterParseError):
1139 obuf.write("Error while parsing query filter: %s\n" % err.args[0])
1140 obuf.write("\n".join(err.GetDetails()))
1141 elif isinstance(err, errors.GenericError):
1142 obuf.write("Unhandled Ganeti error: %s" % msg)
1143 elif isinstance(err, JobSubmittedException):
1144 obuf.write("JobID: %s\n" % err.args[0])
1145 retcode = 0
1146 else:
1147 obuf.write("Unhandled exception: %s" % msg)
1148 return retcode, obuf.getvalue().rstrip("\n")
1149
1150
1151 def GenericMain(commands, override=None, aliases=None,
1152 env_override=frozenset()):
1153 """Generic main function for all the gnt-* commands.
1154
1155 @param commands: a dictionary with a special structure, see the design doc
1156 for command line handling.
1157 @param override: if not None, we expect a dictionary with keys that will
1158 override command line options; this can be used to pass
1159 options from the scripts to generic functions
1160 @param aliases: dictionary with command aliases {'alias': 'target, ...}
1161 @param env_override: list of environment names which are allowed to submit
1162 default args for commands
1163
1164 """
1165 # save the program name and the entire command line for later logging
1166 if sys.argv:
1167 binary = os.path.basename(sys.argv[0])
1168 if not binary:
1169 binary = sys.argv[0]
1170
1171 if len(sys.argv) >= 2:
1172 logname = utils.ShellQuoteArgs([binary, sys.argv[1]])
1173 else:
1174 logname = binary
1175
1176 cmdline = utils.ShellQuoteArgs([binary] + sys.argv[1:])
1177 else:
1178 binary = "<unknown program>"
1179 cmdline = "<unknown>"
1180
1181 if aliases is None:
1182 aliases = {}
1183
1184 try:
1185 (func, options, args) = _ParseArgs(binary, sys.argv, commands, aliases,
1186 env_override)
1187 except _ShowVersion:
1188 ToStdout("%s (ganeti %s) %s", binary, constants.VCS_VERSION,
1189 constants.RELEASE_VERSION)
1190 return constants.EXIT_SUCCESS
1191 except _ShowUsage, err:
1192 for line in _FormatUsage(binary, commands):
1193 ToStdout(line)
1194
1195 if err.exit_error:
1196 return constants.EXIT_FAILURE
1197 else:
1198 return constants.EXIT_SUCCESS
1199 except errors.ParameterError, err:
1200 result, err_msg = FormatError(err)
1201 ToStderr(err_msg)
1202 return 1
1203
1204 if func is None: # parse error
1205 return 1
1206
1207 if override is not None:
1208 for key, val in override.iteritems():
1209 setattr(options, key, val)
1210
1211 utils.SetupLogging(pathutils.LOG_COMMANDS, logname, debug=options.debug,
1212 stderr_logging=True)
1213
1214 logging.debug("Command line: %s", cmdline)
1215
1216 try:
1217 result = func(options, args)
1218 except (errors.GenericError, rpcerr.ProtocolError,
1219 JobSubmittedException), err:
1220 result, err_msg = FormatError(err)
1221 logging.exception("Error during command processing")
1222 ToStderr(err_msg)
1223 except KeyboardInterrupt:
1224 result = constants.EXIT_FAILURE
1225 ToStderr("Aborted. Note that if the operation created any jobs, they"
1226 " might have been submitted and"
1227 " will continue to run in the background.")
1228 except IOError, err:
1229 if err.errno == errno.EPIPE:
1230 # our terminal went away, we'll exit
1231 sys.exit(constants.EXIT_FAILURE)
1232 else:
1233 raise
1234
1235 return result
1236
1237
1238 def ParseNicOption(optvalue):
1239 """Parses the value of the --net option(s).
1240
1241 """
1242 try:
1243 nic_max = max(int(nidx[0]) + 1 for nidx in optvalue)
1244 except (TypeError, ValueError), err:
1245 raise errors.OpPrereqError("Invalid NIC index passed: %s" % str(err),
1246 errors.ECODE_INVAL)
1247
1248 nics = [{}] * nic_max
1249 for nidx, ndict in optvalue:
1250 nidx = int(nidx)
1251
1252 if not isinstance(ndict, dict):
1253 raise errors.OpPrereqError("Invalid nic/%d value: expected dict,"
1254 " got %s" % (nidx, ndict), errors.ECODE_INVAL)
1255
1256 utils.ForceDictType(ndict, constants.INIC_PARAMS_TYPES)
1257
1258 nics[nidx] = ndict
1259
1260 return nics
1261
1262
1263 def FixHvParams(hvparams):
1264 # In Ganeti 2.8.4 the separator for the usb_devices hvparam was changed from
1265 # comma to space because commas cannot be accepted on the command line
1266 # (they already act as the separator between different hvparams). Still,
1267 # RAPI should be able to accept commas for backwards compatibility.
1268 # Therefore, we convert spaces into commas here, and we keep the old
1269 # parsing logic everywhere else.
1270 try:
1271 new_usb_devices = hvparams[constants.HV_USB_DEVICES].replace(" ", ",")
1272 hvparams[constants.HV_USB_DEVICES] = new_usb_devices
1273 except KeyError:
1274 #No usb_devices, no modification required
1275 pass
1276
1277
1278 def GenericInstanceCreate(mode, opts, args):
1279 """Add an instance to the cluster via either creation or import.
1280
1281 @param mode: constants.INSTANCE_CREATE or constants.INSTANCE_IMPORT
1282 @param opts: the command line options selected by the user
1283 @type args: list
1284 @param args: should contain only one element, the new instance name
1285 @rtype: int
1286 @return: the desired exit code
1287
1288 """
1289 instance = args[0]
1290
1291 (pnode, snode) = SplitNodeOption(opts.node)
1292
1293 hypervisor = None
1294 hvparams = {}
1295 if opts.hypervisor:
1296 hypervisor, hvparams = opts.hypervisor
1297
1298 if opts.nics:
1299 nics = ParseNicOption(opts.nics)
1300 elif opts.no_nics:
1301 # no nics
1302 nics = []
1303 elif mode == constants.INSTANCE_CREATE:
1304 # default of one nic, all auto
1305 nics = [{}]
1306 else:
1307 # mode == import
1308 nics = []
1309
1310 if opts.disk_template == constants.DT_DISKLESS:
1311 if opts.disks or opts.sd_size is not None:
1312 raise errors.OpPrereqError("Diskless instance but disk"
1313 " information passed", errors.ECODE_INVAL)
1314 disks = []
1315 else:
1316 if (not opts.disks and not opts.sd_size
1317 and mode == constants.INSTANCE_CREATE):
1318 raise errors.OpPrereqError("No disk information specified",
1319 errors.ECODE_INVAL)
1320 if opts.disks and opts.sd_size is not None:
1321 raise errors.OpPrereqError("Please use either the '--disk' or"
1322 " '-s' option", errors.ECODE_INVAL)
1323 if opts.sd_size is not None:
1324 opts.disks = [(0, {constants.IDISK_SIZE: opts.sd_size})]
1325
1326 if opts.disks:
1327 try:
1328 disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
1329 except ValueError, err:
1330 raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err),
1331 errors.ECODE_INVAL)
1332 disks = [{}] * disk_max
1333 else:
1334 disks = []
1335 for didx, ddict in opts.disks:
1336 didx = int(didx)
1337 if not isinstance(ddict, dict):
1338 msg = "Invalid disk/%d value: expected dict, got %s" % (didx, ddict)
1339 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
1340 elif constants.IDISK_SIZE in ddict:
1341 if constants.IDISK_ADOPT in ddict:
1342 raise errors.OpPrereqError("Only one of 'size' and 'adopt' allowed"
1343 " (disk %d)" % didx, errors.ECODE_INVAL)
1344 try:
1345 ddict[constants.IDISK_SIZE] = \
1346 utils.ParseUnit(ddict[constants.IDISK_SIZE])
1347 except ValueError, err:
1348 raise errors.OpPrereqError("Invalid disk size for disk %d: %s" %
1349 (didx, err), errors.ECODE_INVAL)
1350 elif constants.IDISK_ADOPT in ddict:
1351 if constants.IDISK_SPINDLES in ddict:
1352 raise errors.OpPrereqError("spindles is not a valid option when"
1353 " adopting a disk", errors.ECODE_INVAL)
1354 if mode == constants.INSTANCE_IMPORT:
1355 raise errors.OpPrereqError("Disk adoption not allowed for instance"
1356 " import", errors.ECODE_INVAL)
1357 ddict[constants.IDISK_SIZE] = 0
1358 else:
1359 raise errors.OpPrereqError("Missing size or adoption source for"
1360 " disk %d" % didx, errors.ECODE_INVAL)
1361 if constants.IDISK_SPINDLES in ddict:
1362 ddict[constants.IDISK_SPINDLES] = int(ddict[constants.IDISK_SPINDLES])
1363
1364 disks[didx] = ddict
1365
1366 if opts.tags is not None:
1367 tags = opts.tags.split(",")
1368 else:
1369 tags = []
1370
1371 utils.ForceDictType(opts.beparams, constants.BES_PARAMETER_COMPAT)
1372 utils.ForceDictType(hvparams, constants.HVS_PARAMETER_TYPES)
1373 FixHvParams(hvparams)
1374
1375 osparams_private = opts.osparams_private or serializer.PrivateDict()
1376 osparams_secret = opts.osparams_secret or serializer.PrivateDict()
1377
1378 helper_startup_timeout = opts.helper_startup_timeout
1379 helper_shutdown_timeout = opts.helper_shutdown_timeout
1380
1381 if mode == constants.INSTANCE_CREATE:
1382 start = opts.start
1383 os_type = opts.os
1384 force_variant = opts.force_variant
1385 src_node = None
1386 src_path = None
1387 no_install = opts.no_install
1388 identify_defaults = False
1389 compress = constants.IEC_NONE
1390 if opts.instance_communication is None:
1391 instance_communication = False
1392 else:
1393 instance_communication = opts.instance_communication
1394 elif mode == constants.INSTANCE_IMPORT:
1395 start = False
1396 os_type = None
1397 force_variant = False
1398 src_node = opts.src_node
1399 src_path = opts.src_dir
1400 no_install = None
1401 identify_defaults = opts.identify_defaults
1402 compress = opts.compress
1403 instance_communication = False
1404 else:
1405 raise errors.ProgrammerError("Invalid creation mode %s" % mode)
1406
1407 op = opcodes.OpInstanceCreate(
1408 instance_name=instance,
1409 disks=disks,
1410 disk_template=opts.disk_template,
1411 group_name=opts.nodegroup,
1412 nics=nics,
1413 conflicts_check=opts.conflicts_check,
1414 pnode=pnode, snode=snode,
1415 ip_check=opts.ip_check,
1416 name_check=opts.name_check,
1417 wait_for_sync=opts.wait_for_sync,
1418 file_storage_dir=opts.file_storage_dir,
1419 file_driver=opts.file_driver,
1420 iallocator=opts.iallocator,
1421 hypervisor=hypervisor,
1422 hvparams=hvparams,
1423 beparams=opts.beparams,
1424 osparams=opts.osparams,
1425 osparams_private=osparams_private,
1426 osparams_secret=osparams_secret,
1427 mode=mode,
1428 opportunistic_locking=opts.opportunistic_locking,
1429 start=start,
1430 os_type=os_type,
1431 force_variant=force_variant,
1432 src_node=src_node,
1433 src_path=src_path,
1434 compress=compress,
1435 tags=tags,
1436 no_install=no_install,
1437 identify_defaults=identify_defaults,
1438 ignore_ipolicy=opts.ignore_ipolicy,
1439 instance_communication=instance_communication,
1440 helper_startup_timeout=helper_startup_timeout,
1441 helper_shutdown_timeout=helper_shutdown_timeout)
1442
1443 SubmitOrSend(op, opts)
1444 return 0
1445
1446
1447 class _RunWhileDaemonsStoppedHelper(object):
1448 """Helper class for L{RunWhileDaemonsStopped} to simplify state management
1449
1450 """
1451 def __init__(self, feedback_fn, cluster_name, master_node,
1452 online_nodes, ssh_ports, exclude_daemons, debug,
1453 verbose):
1454 """Initializes this class.
1455
1456 @type feedback_fn: callable
1457 @param feedback_fn: Feedback function
1458 @type cluster_name: string
1459 @param cluster_name: Cluster name
1460 @type master_node: string
1461 @param master_node Master node name
1462 @type online_nodes: list
1463 @param online_nodes: List of names of online nodes
1464 @type ssh_ports: list
1465 @param ssh_ports: List of SSH ports of online nodes
1466 @type exclude_daemons: list of string
1467 @param exclude_daemons: list of daemons that will be restarted on master
1468 after all others are shutdown
1469 @type debug: boolean
1470 @param debug: show debug output
1471 @type verbose: boolesn
1472 @param verbose: show verbose output
1473
1474 """
1475 self.feedback_fn = feedback_fn
1476 self.cluster_name = cluster_name
1477 self.master_node = master_node
1478 self.online_nodes = online_nodes
1479 self.ssh_ports = dict(zip(online_nodes, ssh_ports))
1480
1481 self.ssh = ssh.SshRunner(self.cluster_name)
1482
1483 self.nonmaster_nodes = [name for name in online_nodes
1484 if name != master_node]
1485
1486 self.exclude_daemons = exclude_daemons
1487 self.debug = debug
1488 self.verbose = verbose
1489
1490 assert self.master_node not in self.nonmaster_nodes
1491
1492 def _RunCmd(self, node_name, cmd):
1493 """Runs a command on the local or a remote machine.
1494
1495 @type node_name: string
1496 @param node_name: Machine name
1497 @type cmd: list
1498 @param cmd: Command
1499
1500 """
1501 if node_name is None or node_name == self.master_node:
1502 # No need to use SSH
1503 result = utils.RunCmd(cmd)
1504 else:
1505 result = self.ssh.Run(node_name, constants.SSH_LOGIN_USER,
1506 utils.ShellQuoteArgs(cmd),
1507 port=self.ssh_ports[node_name])
1508
1509 if result.failed:
1510 errmsg = ["Failed to run command %s" % result.cmd]
1511 if node_name:
1512 errmsg.append("on node %s" % node_name)
1513 errmsg.append(": exitcode %s and error %s" %
1514 (result.exit_code, result.output))
1515 raise errors.OpExecError(" ".join(errmsg))
1516
1517 def Call(self, fn, *args):
1518 """Call function while all daemons are stopped.
1519
1520 @type fn: callable
1521 @param fn: Function to be called
1522
1523 """
1524 # Pause watcher by acquiring an exclusive lock on watcher state file
1525 self.feedback_fn("Blocking watcher")
1526 watcher_block = utils.FileLock.Open(pathutils.WATCHER_LOCK_FILE)
1527 try:
1528 # TODO: Currently, this just blocks. There's no timeout.
1529 # TODO: Should it be a shared lock?
1530 watcher_block.Exclusive(blocking=True)
1531
1532 # Stop master daemons, so that no new jobs can come in and all running
1533 # ones are finished
1534 self.feedback_fn("Stopping master daemons")
1535 self._RunCmd(None, [pathutils.DAEMON_UTIL, "stop-master"])
1536 try:
1537 # Stop daemons on all nodes
1538 for node_name in self.online_nodes:
1539 self.feedback_fn("Stopping daemons on %s" % node_name)
1540 self._RunCmd(node_name, [pathutils.DAEMON_UTIL, "stop-all"])
1541 # Starting any daemons listed as exception
1542 if node_name == self.master_node:
1543 for daemon in self.exclude_daemons:
1544 self.feedback_fn("Starting daemon '%s' on %s" % (daemon,
1545 node_name))
1546 self._RunCmd(node_name, [pathutils.DAEMON_UTIL, "start", daemon])
1547
1548 # All daemons are shut down now
1549 try:
1550 return fn(self, *args)
1551 except Exception, err:
1552 _, errmsg = FormatError(err)
1553 logging.exception("Caught exception")
1554 self.feedback_fn(errmsg)
1555 raise
1556 finally:
1557 # Start cluster again, master node last
1558 for node_name in self.nonmaster_nodes + [self.master_node]:
1559 # Stopping any daemons listed as exception.
1560 # This might look unnecessary, but it makes sure that daemon-util
1561 # starts all daemons in the right order.
1562 if node_name == self.master_node:
1563 self.exclude_daemons.reverse()
1564 for daemon in self.exclude_daemons:
1565 self.feedback_fn("Stopping daemon '%s' on %s" % (daemon,
1566 node_name))
1567 self._RunCmd(node_name, [pathutils.DAEMON_UTIL, "stop", daemon])
1568 self.feedback_fn("Starting daemons on %s" % node_name)
1569 self._RunCmd(node_name, [pathutils.DAEMON_UTIL, "start-all"])
1570
1571 finally:
1572 # Resume watcher
1573 watcher_block.Close()
1574
1575
1576 def RunWhileDaemonsStopped(feedback_fn, exclude_daemons, fn, *args, **kwargs):
1577 """Calls a function while all cluster daemons are stopped.
1578
1579 @type feedback_fn: callable
1580 @param feedback_fn: Feedback function
1581 @type exclude_daemons: list of string
1582 @param exclude_daemons: list of daemons that stopped, but immediately
1583 restarted on the master to be available when calling
1584 'fn'. If None, all daemons will be stopped and none
1585 will be started before calling 'fn'.
1586 @type fn: callable
1587 @param fn: Function to be called when daemons are stopped
1588
1589 """
1590 feedback_fn("Gathering cluster information")
1591
1592 # This ensures we're running on the master daemon
1593 cl = GetClient()
1594
1595 (cluster_name, master_node) = \
1596 cl.QueryConfigValues(["cluster_name", "master_node"])
1597
1598 online_nodes = GetOnlineNodes([], cl=cl)
1599 ssh_ports = GetNodesSshPorts(online_nodes, cl)
1600
1601 # Don't keep a reference to the client. The master daemon will go away.
1602 del cl
1603
1604 assert master_node in online_nodes
1605 if exclude_daemons is None:
1606 exclude_daemons = []
1607
1608 debug = kwargs.get("debug", False)
1609 verbose = kwargs.get("verbose", False)
1610
1611 return _RunWhileDaemonsStoppedHelper(
1612 feedback_fn, cluster_name, master_node, online_nodes, ssh_ports,
1613 exclude_daemons, debug, verbose).Call(fn, *args)
1614
1615
1616 def RunWhileClusterStopped(feedback_fn, fn, *args):
1617 """Calls a function while all cluster daemons are stopped.
1618
1619 @type feedback_fn: callable
1620 @param feedback_fn: Feedback function
1621 @type fn: callable
1622 @param fn: Function to be called when daemons are stopped
1623
1624 """
1625 RunWhileDaemonsStopped(feedback_fn, None, fn, *args)
1626
1627
1628 def GenerateTable(headers, fields, separator, data,
1629 numfields=None, unitfields=None,
1630 units=None):
1631 """Prints a table with headers and different fields.
1632
1633 @type headers: dict
1634 @param headers: dictionary mapping field names to headers for
1635 the table
1636 @type fields: list
1637 @param fields: the field names corresponding to each row in
1638 the data field
1639 @param separator: the separator to be used; if this is None,
1640 the default 'smart' algorithm is used which computes optimal
1641 field width, otherwise just the separator is used between
1642 each field
1643 @type data: list
1644 @param data: a list of lists, each sublist being one row to be output
1645 @type numfields: list
1646 @param numfields: a list with the fields that hold numeric
1647 values and thus should be right-aligned
1648 @type unitfields: list
1649 @param unitfields: a list with the fields that hold numeric
1650 values that should be formatted with the units field
1651 @type units: string or None
1652 @param units: the units we should use for formatting, or None for
1653 automatic choice (human-readable for non-separator usage, otherwise
1654 megabytes); this is a one-letter string
1655
1656 """
1657 if units is None:
1658 if separator:
1659 units = "m"
1660 else:
1661 units = "h"
1662
1663 if numfields is None:
1664 numfields = []
1665 if unitfields is None:
1666 unitfields = []
1667
1668 numfields = utils.FieldSet(*numfields) # pylint: disable=W0142
1669 unitfields = utils.FieldSet(*unitfields) # pylint: disable=W0142
1670
1671 format_fields = []
1672 for field in fields:
1673 if headers and field not in headers:
1674 # TODO: handle better unknown fields (either revert to old
1675 # style of raising exception, or deal more intelligently with
1676 # variable fields)
1677 headers[field] = field
1678 if separator is not None:
1679 format_fields.append("%s")
1680 elif numfields.Matches(field):
1681 format_fields.append("%*s")
1682 else:
1683 format_fields.append("%-*s")
1684
1685 if separator is None:
1686 mlens = [0 for name in fields]
1687 format_str = " ".join(format_fields)
1688 else:
1689 format_str = separator.replace("%", "%%").join(format_fields)
1690
1691 for row in data:
1692 if row is None:
1693 continue
1694 for idx, val in enumerate(row):
1695 if unitfields.Matches(fields[idx]):
1696 try:
1697 val = int(val)
1698 except (TypeError, ValueError):
1699 pass
1700 else:
1701 val = row[idx] = utils.FormatUnit(val, units)
1702 val = row[idx] = str(val)
1703 if separator is None:
1704 mlens[idx] = max(mlens[idx], len(val))
1705
1706 result = []
1707 if headers:
1708 args = []
1709 for idx, name in enumerate(fields):
1710 hdr = headers[name]
1711 if separator is None:
1712 mlens[idx] = max(mlens[idx], len(hdr))
1713 args.append(mlens[idx])
1714 args.append(hdr)
1715 result.append(format_str % tuple(args))
1716
1717 if separator is None:
1718 assert len(mlens) == len(fields)
1719
1720 if fields and not numfields.Matches(fields[-1]):
1721 mlens[-1] = 0
1722
1723 for line in data:
1724 args = []
1725 if line is None:
1726 line = ["-" for _ in fields]
1727 for idx in range(len(fields)):
1728 if separator is None:
1729 args.append(mlens[idx])
1730 args.append(line[idx])
1731 result.append(format_str % tuple(args))
1732
1733 return result
1734
1735
1736 def _FormatBool(value):
1737 """Formats a boolean value as a string.
1738
1739 """
1740 if value:
1741 return "Y"
1742 return "N"
1743
1744
1745 #: Default formatting for query results; (callback, align right)
1746 _DEFAULT_FORMAT_QUERY = {
1747 constants.QFT_TEXT: (str, False),
1748 constants.QFT_BOOL: (_FormatBool, False),
1749 constants.QFT_NUMBER: (str, True),
1750 constants.QFT_NUMBER_FLOAT: (str, True),
1751 constants.QFT_TIMESTAMP: (utils.FormatTime, False),
1752 constants.QFT_OTHER: (str, False),
1753 constants.QFT_UNKNOWN: (str, False),
1754 }
1755
1756
1757 def _GetColumnFormatter(fdef, override, unit):
1758 """Returns formatting function for a field.
1759
1760 @type fdef: L{objects.QueryFieldDefinition}
1761 @type override: dict
1762 @param override: Dictionary for overriding field formatting functions,
1763 indexed by field name, contents like L{_DEFAULT_FORMAT_QUERY}
1764 @type unit: string
1765 @param unit: Unit used for formatting fields of type L{constants.QFT_UNIT}
1766 @rtype: tuple; (callable, bool)
1767 @return: Returns the function to format a value (takes one parameter) and a
1768 boolean for aligning the value on the right-hand side
1769
1770 """
1771 fmt = override.get(fdef.name, None)
1772 if fmt is not None:
1773 return fmt
1774
1775 assert constants.QFT_UNIT not in _DEFAULT_FORMAT_QUERY
1776
1777 if fdef.kind == constants.QFT_UNIT:
1778 # Can't keep this information in the static dictionary
1779 return (lambda value: utils.FormatUnit(value, unit), True)
1780
1781 fmt = _DEFAULT_FORMAT_QUERY.get(fdef.kind, None)
1782 if fmt is not None:
1783 return fmt
1784
1785 raise NotImplementedError("Can't format column type '%s'" % fdef.kind)
1786
1787
1788 class _QueryColumnFormatter(object):
1789 """Callable class for formatting fields of a query.
1790
1791 """
1792 def __init__(self, fn, status_fn, verbose):
1793 """Initializes this class.
1794
1795 @type fn: callable
1796 @param fn: Formatting function
1797 @type status_fn: callable
1798 @param status_fn: Function to report fields' status
1799 @type verbose: boolean
1800 @param verbose: whether to use verbose field descriptions or not
1801
1802 """
1803 self._fn = fn
1804 self._status_fn = status_fn
1805 self._verbose = verbose
1806
1807 def __call__(self, data):
1808 """Returns a field's string representation.
1809
1810 """
1811 (status, value) = data
1812
1813 # Report status
1814 self._status_fn(status)
1815
1816 if status == constants.RS_NORMAL:
1817 return self._fn(value)
1818
1819 assert value is None, \
1820 "Found value %r for abnormal status %s" % (value, status)
1821
1822 return FormatResultError(status, self._verbose)
1823
1824
1825 def FormatResultError(status, verbose):
1826 """Formats result status other than L{constants.RS_NORMAL}.
1827
1828 @param status: The result status
1829 @type verbose: boolean
1830 @param verbose: Whether to return the verbose text
1831 @return: Text of result status
1832
1833 """
1834 assert status != constants.RS_NORMAL, \
1835 "FormatResultError called with status equal to constants.RS_NORMAL"
1836 try:
1837 (verbose_text, normal_text) = constants.RSS_DESCRIPTION[status]
1838 except KeyError:
1839 raise NotImplementedError("Unknown status %s" % status)
1840 else:
1841 if verbose:
1842 return verbose_text
1843 return normal_text
1844
1845
1846 def FormatQueryResult(result, unit=None, format_override=None, separator=None,
1847 header=False, verbose=False):
1848 """Formats data in L{objects.QueryResponse}.
1849
1850 @type result: L{objects.QueryResponse}
1851 @param result: result of query operation
1852 @type unit: string
1853 @param unit: Unit used for formatting fields of type L{constants.QFT_UNIT},
1854 see L{utils.text.FormatUnit}
1855 @type format_override: dict
1856 @param format_override: Dictionary for overriding field formatting functions,
1857 indexed by field name, contents like L{_DEFAULT_FORMAT_QUERY}
1858 @type separator: string or None
1859 @param separator: String used to separate fields
1860 @type header: bool
1861 @param header: Whether to output header row
1862 @type verbose: boolean
1863 @param verbose: whether to use verbose field descriptions or not
1864
1865 """
1866 if unit is None:
1867 if separator:
1868 unit = "m"
1869 else:
1870 unit = "h"
1871
1872 if format_override is None:
1873 format_override = {}
1874
1875 stats = dict.fromkeys(constants.RS_ALL, 0)
1876
1877 def _RecordStatus(status):
1878 if status in stats:
1879 stats[status] += 1
1880
1881 columns = []
1882 for fdef in result.fields:
1883 assert fdef.title and fdef.name
1884 (fn, align_right) = _GetColumnFormatter(fdef, format_override, unit)
1885 columns.append(TableColumn(fdef.title,
1886 _QueryColumnFormatter(fn, _RecordStatus,
1887 verbose),
1888 align_right))
1889
1890 table = FormatTable(result.data, columns, header, separator)
1891
1892 # Collect statistics
1893 assert len(stats) == len(constants.RS_ALL)
1894 assert compat.all(count >= 0 for count in stats.values())
1895
1896 # Determine overall status. If there was no data, unknown fields must be
1897 # detected via the field definitions.
1898 if (stats[constants.RS_UNKNOWN] or
1899 (not result.data and _GetUnknownFields(result.fields))):
1900 status = QR_UNKNOWN
1901 elif compat.any(count > 0 for key, count in stats.items()
1902 if key != constants.RS_NORMAL):
1903 status = QR_INCOMPLETE
1904 else:
1905 status = QR_NORMAL
1906
1907 return (status, table)
1908
1909
1910 def _GetUnknownFields(fdefs):
1911 """Returns list of unknown fields included in C{fdefs}.
1912
1913 @type fdefs: list of L{objects.QueryFieldDefinition}
1914
1915 """
1916 return [fdef for fdef in fdefs
1917 if fdef.kind == constants.QFT_UNKNOWN]
1918
1919
1920 def _WarnUnknownFields(fdefs):
1921 """Prints a warning to stderr if a query included unknown fields.
1922
1923 @type fdefs: list of L{objects.QueryFieldDefinition}
1924
1925 """
1926 unknown = _GetUnknownFields(fdefs)
1927 if unknown:
1928 ToStderr("Warning: Queried for unknown fields %s",
1929 utils.CommaJoin(fdef.name for fdef in unknown))
1930 return True
1931
1932 return False
1933
1934
1935 def GenericList(resource, fields, names, unit, separator, header, cl=None,
1936 format_override=None, verbose=False, force_filter=False,
1937 namefield=None, qfilter=None, isnumeric=False):
1938 """Generic implementation for listing all items of a resource.
1939
1940 @param resource: One of L{constants.QR_VIA_LUXI}
1941 @type fields: list of strings
1942 @param fields: List of fields to query for
1943 @type names: list of strings
1944 @param names: Names of items to query for
1945 @type unit: string or None
1946 @param unit: Unit used for formatting fields of type L{constants.QFT_UNIT} or
1947 None for automatic choice (human-readable for non-separator usage,
1948 otherwise megabytes); this is a one-letter string
1949 @type separator: string or None
1950 @param separator: String used to separate fields
1951 @type header: bool
1952 @param header: Whether to show header row
1953 @type force_filter: bool
1954 @param force_filter: Whether to always treat names as filter
1955 @type format_override: dict
1956 @param format_override: Dictionary for overriding field formatting functions,
1957 indexed by field name, contents like L{_DEFAULT_FORMAT_QUERY}
1958 @type verbose: boolean
1959 @param verbose: whether to use verbose field descriptions or not
1960 @type namefield: string
1961 @param namefield: Name of field to use for simple filters (see
1962 L{qlang.MakeFilter} for details)
1963 @type qfilter: list or None
1964 @param qfilter: Query filter (in addition to names)
1965 @param isnumeric: bool
1966 @param isnumeric: Whether the namefield's type is numeric, and therefore
1967 any simple filters built by namefield should use integer values to
1968 reflect that
1969
1970 """
1971 if not names:
1972 names = None
1973
1974 namefilter = qlang.MakeFilter(names, force_filter, namefield=namefield,
1975 isnumeric=isnumeric)
1976
1977 if qfilter is None:
1978 qfilter = namefilter
1979 elif namefilter is not None:
1980 qfilter = [qlang.OP_AND, namefilter, qfilter]
1981
1982 if cl is None:
1983 cl = GetClient()
1984
1985 response = cl.Query(resource, fields, qfilter)
1986
1987 found_unknown = _WarnUnknownFields(response.fields)
1988
1989 (status, data) = FormatQueryResult(response, unit=unit, separator=separator,
1990 header=header,
1991 format_override=format_override,
1992 verbose=verbose)
1993
1994 for line in data:
1995 ToStdout(line)
1996
1997 assert ((found_unknown and status == QR_UNKNOWN) or
1998 (not found_unknown and status != QR_UNKNOWN))
1999
2000 if status == QR_UNKNOWN:
2001 return constants.EXIT_UNKNOWN_FIELD
2002
2003 # TODO: Should the list command fail if not all data could be collected?
2004 return constants.EXIT_SUCCESS
2005
2006
2007 def _FieldDescValues(fdef):
2008 """Helper function for L{GenericListFields} to get query field description.
2009
2010 @type fdef: L{objects.QueryFieldDefinition}
2011 @rtype: list
2012
2013 """
2014 return [
2015 fdef.name,
2016 _QFT_NAMES.get(fdef.kind, fdef.kind),
2017 fdef.title,
2018 fdef.doc,
2019 ]
2020
2021
2022 def GenericListFields(resource, fields, separator, header, cl=None):
2023 """Generic implementation for listing fields for a resource.
2024
2025 @param resource: One of L{constants.QR_VIA_LUXI}
2026 @type fields: list of strings
2027 @param fields: List of fields to query for
2028 @type separator: string or None
2029 @param separator: String used to separate fields
2030 @type header: bool
2031 @param header: Whether to show header row
2032
2033 """
2034 if cl is None:
2035 cl = GetClient()
2036
2037 if not fields:
2038 fields = None
2039
2040 response = cl.QueryFields(resource, fields)
2041
2042 found_unknown = _WarnUnknownFields(response.fields)
2043
2044 columns = [
2045 TableColumn("Name", str, False),
2046 TableColumn("Type", str, False),
2047 TableColumn("Title", str, False),
2048 TableColumn("Description", str, False),
2049 ]
2050
2051 rows = map(_FieldDescValues, response.fields)
2052
2053 for line in FormatTable(rows, columns, header, separator):
2054 ToStdout(line)
2055
2056 if found_unknown:
2057 return constants.EXIT_UNKNOWN_FIELD
2058
2059 return constants.EXIT_SUCCESS
2060
2061
2062 class TableColumn(object):
2063 """Describes a column for L{FormatTable}.
2064
2065 """
2066 def __init__(self, title, fn, align_right):
2067 """Initializes this class.
2068
2069 @type title: string
2070 @param title: Column title
2071 @type fn: callable
2072 @param fn: Formatting function
2073 @type align_right: bool
2074 @param align_right: Whether to align values on the right-hand side
2075
2076 """
2077 self.title = title
2078 self.format = fn
2079 self.align_right = align_right
2080
2081
2082 def _GetColFormatString(width, align_right):
2083 """Returns the format string for a field.
2084
2085 """
2086 if align_right:
2087 sign = ""
2088 else:
2089 sign = "-"
2090
2091 return "%%%s%ss" % (sign, width)
2092
2093
2094 def FormatTable(rows, columns, header, separator):
2095 """Formats data as a table.
2096
2097 @type rows: list of lists
2098 @param rows: Row data, one list per row
2099 @type columns: list of L{TableColumn}
2100 @param columns: Column descriptions
2101 @type header: bool
2102 @param header: Whether to show header row
2103 @type separator: string or None
2104 @param separator: String used to separate columns
2105
2106 """
2107 if header:
2108 data = [[col.title for col in columns]]
2109 colwidth = [len(col.title) for col in columns]
2110 else:
2111 data = []
2112 colwidth = [0 for _ in columns]
2113
2114 # Format row data
2115 for row in rows:
2116 assert len(row) == len(columns)
2117
2118 formatted = [col.format(value) for value, col in zip(row, columns)]
2119
2120 if separator is None:
2121 # Update column widths
2122 for idx, (oldwidth, value) in enumerate(zip(colwidth, formatted)):
2123 # Modifying a list's items while iterating is fine
2124 colwidth[idx] = max(oldwidth, len(value))
2125
2126 data.append(formatted)
2127
2128 if separator is not None:
2129 # Return early if a separator is used
2130 return [separator.join(row) for row in data]
2131
2132 if columns and not columns[-1].align_right:
2133 # Avoid unnecessary spaces at end of line
2134 colwidth[-1] = 0
2135
2136 # Build format string
2137 fmt = " ".join([_GetColFormatString(width, col.align_right)
2138 for col, width in zip(columns, colwidth)])
2139
2140 return [fmt % tuple(row) for row in data]
2141
2142
2143 def FormatTimestamp(ts):
2144 """Formats a given timestamp.
2145
2146 @type ts: timestamp
2147 @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
2148
2149 @rtype: string
2150 @return: a string with the formatted timestamp
2151
2152 """
2153 if not isinstance(ts, (tuple, list)) or len(ts) != 2:
2154 return "?"
2155
2156 (sec, usecs) = ts
2157 return utils.FormatTime(sec, usecs=usecs)
2158
2159
2160 def ParseTimespec(value):
2161 """Parse a time specification.
2162
2163 The following suffixed will be recognized:
2164
2165 - s: seconds
2166 - m: minutes
2167 - h: hours
2168 - d: day
2169 - w: weeks
2170
2171 Without any suffix, the value will be taken to be in seconds.
2172
2173 """
2174 value = str(value)
2175 if not value:
2176 raise errors.OpPrereqError("Empty time specification passed",
2177 errors.ECODE_INVAL)
2178 suffix_map = {
2179 "s": 1,
2180 "m": 60,
2181 "h": 3600,
2182 "d": 86400,
2183 "w": 604800,
2184 }
2185 if value[-1] not in suffix_map:
2186 try:
2187 value = int(value)
2188 except (TypeError, ValueError):
2189 raise errors.OpPrereqError("Invalid time specification '%s'" % value,
2190 errors.ECODE_INVAL)
2191 else:
2192 multiplier = suffix_map[value[-1]]
2193 value = value[:-1]
2194 if not value: # no data left after stripping the suffix
2195 raise errors.OpPrereqError("Invalid time specification (only"
2196 " suffix passed)", errors.ECODE_INVAL)
2197 try:
2198 value = int(value) * multiplier
2199 except (TypeError, ValueError):
2200 raise errors.OpPrereqError("Invalid time specification '%s'" % value,
2201 errors.ECODE_INVAL)
2202 return value
2203
2204
2205 def GetOnlineNodes(nodes, cl=None, nowarn=False, secondary_ips=False,
2206 filter_master=False, nodegroup=None):
2207 """Returns the names of online nodes.
2208
2209 This function will also log a warning on stderr with the names of
2210 the online nodes.
2211
2212 @param nodes: if not empty, use only this subset of nodes (minus the
2213 offline ones)
2214 @param cl: if not None, luxi client to use
2215 @type nowarn: boolean
2216 @param nowarn: by default, this function will output a note with the
2217 offline nodes that are skipped; if this parameter is True the
2218 note is not displayed
2219 @type secondary_ips: boolean
2220 @param secondary_ips: if True, return the secondary IPs instead of the
2221 names, useful for doing network traffic over the replication interface
2222 (if any)
2223 @type filter_master: boolean
2224 @param filter_master: if True, do not return the master node in the list
2225 (useful in coordination with secondary_ips where we cannot check our
2226 node name against the list)
2227 @type nodegroup: string
2228 @param nodegroup: If set, only return nodes in this node group
2229
2230 """
2231 if cl is None:
2232 cl = GetClient()
2233
2234 qfilter = []
2235
2236 if nodes:
2237 qfilter.append(qlang.MakeSimpleFilter("name", nodes))
2238
2239 if nodegroup is not None:
2240 qfilter.append([qlang.OP_OR, [qlang.OP_EQUAL, "group", nodegroup],
2241 [qlang.OP_EQUAL, "group.uuid", nodegroup]])
2242
2243 if filter_master:
2244 qfilter.append([qlang.OP_NOT, [qlang.OP_TRUE, "master"]])
2245
2246 if qfilter:
2247 if len(qfilter) > 1:
2248 final_filter = [qlang.OP_AND] + qfilter
2249 else:
2250 assert len(qfilter) == 1
2251 final_filter = qfilter[0]
2252 else:
2253 final_filter = None
2254
2255 result = cl.Query(constants.QR_NODE, ["name", "offline", "sip"], final_filter)
2256
2257 def _IsOffline(row):
2258 (_, (_, offline), _) = row
2259 return offline
2260
2261 def _GetName(row):
2262 ((_, name), _, _) = row
2263 return name
2264
2265 def _GetSip(row):
2266 (_, _, (_, sip)) = row
2267 return sip
2268
2269 (offline, online) = compat.partition(result.data, _IsOffline)
2270
2271 if offline and not nowarn:
2272 ToStderr("Note: skipping offline node(s): %s" %
2273 utils.CommaJoin(map(_GetName, offline)))
2274
2275 if secondary_ips:
2276 fn = _GetSip
2277 else:
2278 fn = _GetName
2279
2280 return map(fn, online)
2281
2282
2283 def GetNodesSshPorts(nodes, cl):
2284 """Retrieves SSH ports of given nodes.
2285
2286 @param nodes: the names of nodes
2287 @type nodes: a list of strings
2288 @param cl: a client to use for the query
2289 @type cl: L{ganeti.luxi.Client}
2290 @return: the list of SSH ports corresponding to the nodes
2291 @rtype: a list of tuples
2292
2293 """
2294 return map(lambda t: t[0],
2295 cl.QueryNodes(names=nodes,
2296 fields=["ndp/ssh_port"],
2297 use_locking=False))
2298
2299
2300 def GetNodeUUIDs(nodes, cl):
2301 """Retrieves the UUIDs of given nodes.
2302
2303 @param nodes: the names of nodes
2304 @type nodes: a list of string
2305 @param cl: a client to use for the query
2306 @type cl: L{ganeti.luxi.Client}
2307 @return: the list of UUIDs corresponding to the nodes
2308 @rtype: a list of tuples
2309
2310 """
2311 return map(lambda t: t[0],
2312 cl.QueryNodes(names=nodes,
2313 fields=["uuid"],
2314 use_locking=False))
2315
2316
2317 def _ToStream(stream, txt, *args):
2318 """Write a message to a stream, bypassing the logging system
2319
2320 @type stream: file object
2321 @param stream: the file to which we should write
2322 @type txt: str
2323 @param txt: the message
2324
2325 """
2326 try:
2327 if args:
2328 args = tuple(args)
2329 stream.write(txt % args)
2330 else:
2331 stream.write(txt)
2332 stream.write("\n")
2333 stream.flush()
2334 except IOError, err:
2335 if err.errno == errno.EPIPE:
2336 # our terminal went away, we'll exit
2337 sys.exit(constants.EXIT_FAILURE)
2338 else:
2339 raise
2340
2341
2342 def ToStdout(txt, *args):
2343 """Write a message to stdout only, bypassing the logging system
2344
2345 This is just a wrapper over _ToStream.
2346
2347 @type txt: str
2348 @param txt: the message
2349
2350 """
2351 _ToStream(sys.stdout, txt, *args)
2352
2353
2354 def ToStderr(txt, *args):
2355 """Write a message to stderr only, bypassing the logging system
2356
2357 This is just a wrapper over _ToStream.
2358
2359 @type txt: str
2360 @param txt: the message
2361
2362 """
2363 _ToStream(sys.stderr, txt, *args)
2364
2365
2366 class JobExecutor(object):
2367 """Class which manages the submission and execution of multiple jobs.
2368
2369 Note that instances of this class should not be reused between
2370 GetResults() calls.
2371
2372 """
2373 def __init__(self, cl=None, verbose=True, opts=None, feedback_fn=None):
2374 self.queue = []
2375 if cl is None:
2376 cl = GetClient()
2377 self.cl = cl
2378 self.verbose = verbose
2379 self.jobs = []
2380 self.opts = opts
2381 self.feedback_fn = feedback_fn
2382 self._counter = itertools.count()
2383
2384 @staticmethod
2385 def _IfName(name, fmt):
2386 """Helper function for formatting name.
2387
2388 """
2389 if name:
2390 return fmt % name
2391
2392 return ""
2393
2394 def QueueJob(self, name, *ops):
2395 """Record a job for later submit.
2396
2397 @type name: string
2398 @param name: a description of the job, will be used in WaitJobSet
2399
2400 """
2401 SetGenericOpcodeOpts(ops, self.opts)
2402 self.queue.append((self._counter.next(), name, ops))
2403
2404 def AddJobId(self, name, status, job_id):
2405 """Adds a job ID to the internal queue.
2406
2407 """
2408 self.jobs.append((self._counter.next(), status, job_id, name))
2409
2410 def SubmitPending(self, each=False):
2411 """Submit all pending jobs.
2412
2413 """
2414 if each:
2415 results = []
2416 for (_, _, ops) in self.queue:
2417 # SubmitJob will remove the success status, but raise an exception if
2418 # the submission fails, so we'll notice that anyway.
2419 results.append([True, self.cl.SubmitJob(ops)[0]])
2420 else:
2421 results = self.cl.SubmitManyJobs([ops for (_, _, ops) in self.queue])
2422 for ((status, data), (idx, name, _)) in zip(results, self.queue):
2423 self.jobs.append((idx, status, data, name))
2424
2425 def _ChooseJob(self):
2426 """Choose a non-waiting/queued job to poll next.
2427
2428 """
2429 assert self.jobs, "_ChooseJob called with empty job list"
2430
2431 result = self.cl.QueryJobs([i[2] for i in self.jobs[:_CHOOSE_BATCH]],
2432 ["status"])
2433 assert result
2434
2435 for job_data, status in zip(self.jobs, result):
2436 if (isinstance(status, list) and status and
2437 status[0] in (constants.JOB_STATUS_QUEUED,
2438 constants.JOB_STATUS_WAITING,
2439 constants.JOB_STATUS_CANCELING)):
2440 # job is still present and waiting
2441 continue
2442 # good candidate found (either running job or lost job)
2443 self.jobs.remove(job_data)
2444 return job_data
2445
2446 # no job found
2447 return self.jobs.pop(0)
2448
2449 def GetResults(self):
2450 """Wait for and return the results of all jobs.
2451
2452 @rtype: list
2453 @return: list of tuples (success, job results), in the same order
2454 as the submitted jobs; if a job has failed, instead of the result
2455 there will be the error message
2456
2457 """
2458 if not self.jobs:
2459 self.SubmitPending()
2460 results = []
2461 if self.verbose:
2462 ok_jobs = [row[2] for row in self.jobs if row[1]]
2463 if ok_jobs:
2464 ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
2465
2466 # first, remove any non-submitted jobs
2467 self.jobs, failures = compat.partition(self.jobs, lambda x: x[1])
2468 for idx, _, jid, name in failures:
2469 ToStderr("Failed to submit job%s: %s", self._IfName(name, " for %s"), jid)
2470 results.append((idx, False, jid))
2471
2472 while self.jobs:
2473 (idx, _, jid, name) = self._ChooseJob()
2474 ToStdout("Waiting for job %s%s ...", jid, self._IfName(name, " for %s"))
2475 try:
2476 job_result = PollJob(jid, cl=self.cl, feedback_fn=self.feedback_fn)
2477 success = True
2478 except errors.JobLost, err:
2479 _, job_result = FormatError(err)
2480 ToStderr("Job %s%s has been archived, cannot check its result",
2481 jid, self._IfName(name, " for %s"))
2482 success = False
2483 except (errors.GenericError, rpcerr.ProtocolError), err:
2484 _, job_result = FormatError(err)
2485 success = False
2486 # the error message will always be shown, verbose or not
2487 ToStderr("Job %s%s has failed: %s",
2488 jid, self._IfName(name, " for %s"), job_result)
2489
2490 results.append((idx, success, job_result))
2491
2492 # sort based on the index, then drop it
2493 results.sort()
2494 results = [i[1:] for i in results]
2495
2496 return results
2497
2498 def WaitOrShow(self, wait):
2499 """Wait for job results or only print the job IDs.
2500
2501 @type wait: boolean
2502 @param wait: whether to wait or not
2503
2504 """
2505 if wait:
2506 return self.GetResults()
2507 else:
2508 if not self.jobs:
2509 self.SubmitPending()
2510 for _, status, result, name in self.jobs:
2511 if status:
2512 ToStdout("%s: %s", result, name)
2513 else:
2514 ToStderr("Failure for %s: %s", name, result)
2515 return [row[1:3] for row in self.jobs]
2516
2517
2518 def FormatParamsDictInfo(param_dict, actual, roman=False):
2519 """Formats a parameter dictionary.
2520
2521 @type param_dict: dict
2522 @param param_dict: the own parameters
2523 @type actual: dict
2524 @param actual: the current parameter set (including defaults)
2525 @rtype: dict
2526 @return: dictionary where the value of each parameter is either a fully
2527 formatted string or a dictionary containing formatted strings
2528
2529 """
2530 ret = {}
2531 for (key, data) in actual.items():
2532 if isinstance(data, dict) and data:
2533 ret[key] = FormatParamsDictInfo(param_dict.get(key, {}), data, roman)
2534 else:
2535 default_str = "default (%s)" % compat.TryToRoman(data, roman)
2536 ret[key] = str(compat.TryToRoman(param_dict.get(key, default_str), roman))
2537 return ret
2538
2539
2540 def _FormatListInfoDefault(data, def_data):
2541 if data is not None:
2542 ret = utils.CommaJoin(data)
2543 else:
2544 ret = "default (%s)" % utils.CommaJoin(def_data)
2545 return ret
2546
2547
2548 def FormatPolicyInfo(custom_ipolicy, eff_ipolicy, iscluster, roman=False):
2549 """Formats an instance policy.
2550
2551 @type custom_ipolicy: dict
2552 @param custom_ipolicy: own policy
2553 @type eff_ipolicy: dict
2554 @param eff_ipolicy: effective policy (including defaults); ignored for
2555 cluster
2556 @type iscluster: bool
2557 @param iscluster: the policy is at cluster level
2558 @type roman: bool
2559 @param roman: whether to print the values in roman numerals
2560 @rtype: list of pairs
2561 @return: formatted data, suitable for L{PrintGenericInfo}
2562
2563 """
2564 if iscluster:
2565 eff_ipolicy = custom_ipolicy
2566
2567 minmax_out = []
2568 custom_minmax = custom_ipolicy.get(constants.ISPECS_MINMAX)
2569 if custom_minmax:
2570 for (k, minmax) in enumerate(custom_minmax):
2571 minmax_out.append([
2572 ("%s/%s" % (key, k),
2573 FormatParamsDictInfo(minmax[key], minmax[key], roman))
2574 for key in constants.ISPECS_MINMAX_KEYS
2575 ])
2576 else:
2577 for (k, minmax) in enumerate(eff_ipolicy[constants.ISPECS_MINMAX]):
2578 minmax_out.append([
2579 ("%s/%s" % (key, k),
2580 FormatParamsDictInfo({}, minmax[key], roman))
2581 for key in constants.ISPECS_MINMAX_KEYS
2582 ])
2583 ret = [("bounds specs", minmax_out)]
2584
2585 if iscluster:
2586 stdspecs = custom_ipolicy[constants.ISPECS_STD]
2587 ret.append(
2588 (constants.ISPECS_STD,
2589 FormatParamsDictInfo(stdspecs, stdspecs, roman))
2590 )
2591
2592 ret.append(
2593 ("allowed disk templates",
2594 _FormatListInfoDefault(custom_ipolicy.get(constants.IPOLICY_DTS),
2595 eff_ipolicy[constants.IPOLICY_DTS]))
2596 )
2597 to_roman = compat.TryToRoman
2598 ret.extend([
2599 (key, str(to_roman(custom_ipolicy.get(key,
2600 "default (%s)" % eff_ipolicy[key]),
2601 roman)))
2602 for key in constants.IPOLICY_PARAMETERS
2603 ])
2604 return ret
2605
2606
2607 def _PrintSpecsParameters(buf, specs):
2608 values = ("%s=%s" % (par, val) for (par, val) in sorted(specs.items()))
2609 buf.write(",".join(values))
2610
2611
2612 def PrintIPolicyCommand(buf, ipolicy, isgroup):
2613 """Print the command option used to generate the given instance policy.
2614
2615 Currently only the parts dealing with specs are supported.
2616
2617 @type buf: StringIO
2618 @param buf: stream to write into
2619 @type ipolicy: dict
2620 @param ipolicy: instance policy
2621 @type isgroup: bool
2622 @param isgroup: whether the policy is at group level
2623
2624 """
2625 if not isgroup:
2626 stdspecs = ipolicy.get("std")
2627 if stdspecs:
2628 buf.write(" %s " % IPOLICY_STD_SPECS_STR)
2629 _PrintSpecsParameters(buf, stdspecs)
2630 minmaxes = ipolicy.get("minmax", [])
2631 first = True
2632 for minmax in minmaxes:
2633 minspecs = minmax.get("min")
2634 maxspecs = minmax.get("max")
2635 if minspecs and maxspecs:
2636 if first:
2637 buf.write(" %s " % IPOLICY_BOUNDS_SPECS_STR)
2638 first = False
2639 else:
2640 buf.write("//")
2641 buf.write("min:")
2642 _PrintSpecsParameters(buf, minspecs)
2643 buf.write("/max:")
2644 _PrintSpecsParameters(buf, maxspecs)
2645
2646
2647 def ConfirmOperation(names, list_type, text, extra=""):
2648 """Ask the user to confirm an operation on a list of list_type.
2649
2650 This function is used to request confirmation for doing an operation
2651 on a given list of list_type.
2652
2653 @type names: list
2654 @param names: the list of names that we display when
2655 we ask for confirmation
2656 @type list_type: str
2657 @param list_type: Human readable name for elements in the list (e.g. nodes)
2658 @type text: str
2659 @param text: the operation that the user should confirm
2660 @rtype: boolean
2661 @return: True or False depending on user's confirmation.
2662
2663 """
2664 count = len(names)
2665 msg = ("The %s will operate on %d %s.\n%s"
2666 "Do you want to continue?" % (text, count, list_type, extra))
2667 affected = (("\nAffected %s:\n" % list_type) +
2668 "\n".join([" %s" % name for name in names]))
2669
2670 choices = [("y", True, "Yes, execute the %s" % text),
2671 ("n", False, "No, abort the %s" % text)]
2672
2673 if count > 20:
2674 choices.insert(1, ("v", "v", "View the list of affected %s" % list_type))
2675 question = msg
2676 else:
2677 question = msg + affected
2678
2679 choice = AskUser(question, choices)
2680 if choice == "v":
2681 choices.pop(1)
2682 choice = AskUser(msg + affected, choices)
2683 return choice
2684
2685
2686 def _MaybeParseUnit(elements):
2687 """Parses and returns an array of potential values with units.
2688
2689 """
2690 parsed = {}
2691 for k, v in elements.items():
2692 if v == constants.VALUE_DEFAULT:
2693 parsed[k] = v
2694 else:
2695 parsed[k] = utils.ParseUnit(v)
2696 return parsed
2697
2698
2699 def _InitISpecsFromSplitOpts(ipolicy, ispecs_mem_size, ispecs_cpu_count,
2700 ispecs_disk_count, ispecs_disk_size,
2701 ispecs_nic_count, group_ipolicy, fill_all):
2702 try:
2703 if ispecs_mem_size:
2704 ispecs_mem_size = _MaybeParseUnit(ispecs_mem_size)
2705 if ispecs_disk_size:
2706 ispecs_disk_size = _MaybeParseUnit(ispecs_disk_size)
2707 except (TypeError, ValueError, errors.UnitParseError), err:
2708 raise errors.OpPrereqError("Invalid disk (%s) or memory (%s) size"
2709 " in policy: %s" %
2710 (ispecs_disk_size, ispecs_mem_size, err),
2711 errors.ECODE_INVAL)
2712
2713 # prepare ipolicy dict
2714 ispecs_transposed = {
2715 constants.ISPEC_MEM_SIZE: ispecs_mem_size,
2716 constants.ISPEC_CPU_COUNT: ispecs_cpu_count,
2717 constants.ISPEC_DISK_COUNT: ispecs_disk_count,
2718 constants.ISPEC_DISK_SIZE: ispecs_disk_size,
2719 constants.ISPEC_NIC_COUNT: ispecs_nic_count,
2720 }
2721
2722 # first, check that the values given are correct
2723 if group_ipolicy:
2724 forced_type = TISPECS_GROUP_TYPES
2725 else:
2726 forced_type = TISPECS_CLUSTER_TYPES
2727 for specs in ispecs_transposed.values():
2728 assert type(specs) is dict
2729 utils.ForceDictType(specs, forced_type)
2730
2731 # then transpose
2732 ispecs = {
2733 constants.ISPECS_MIN: {},
2734 constants.ISPECS_MAX: {},
2735 constants.ISPECS_STD: {},
2736 }
2737 for (name, specs) in ispecs_transposed.iteritems():
2738 assert name in constants.ISPECS_PARAMETERS
2739 for key, val in specs.items(): # {min: .. ,max: .., std: ..}
2740 assert key in ispecs
2741 ispecs[key][name] = val
2742 minmax_out = {}
2743 for key in constants.ISPECS_MINMAX_KEYS:
2744 if fill_all:
2745 minmax_out[key] = \
2746 objects.FillDict(constants.ISPECS_MINMAX_DEFAULTS[key], ispecs[key])
2747 else:
2748 minmax_out[key] = ispecs[key]
2749 ipolicy[constants.ISPECS_MINMAX] = [minmax_out]
2750 if fill_all:
2751 ipolicy[constants.ISPECS_STD] = \
2752 objects.FillDict(constants.IPOLICY_DEFAULTS[constants.ISPECS_STD],
2753 ispecs[constants.ISPECS_STD])
2754 else:
2755 ipolicy[constants.ISPECS_STD] = ispecs[constants.ISPECS_STD]
2756
2757
2758 def _ParseSpecUnit(spec, keyname):
2759 ret = spec.copy()
2760 for k in [constants.ISPEC_DISK_SIZE, constants.ISPEC_MEM_SIZE]:
2761 if k in ret:
2762 try:
2763 ret[k] = utils.ParseUnit(ret[k])
2764 except (TypeError, ValueError, errors.UnitParseError), err:
2765 raise errors.OpPrereqError(("Invalid parameter %s (%s) in %s instance"
2766 " specs: %s" % (k, ret[k], keyname, err)),
2767 errors.ECODE_INVAL)
2768 return ret
2769
2770
2771 def _ParseISpec(spec, keyname, required):
2772 ret = _ParseSpecUnit(spec, keyname)
2773 utils.ForceDictType(ret, constants.ISPECS_PARAMETER_TYPES)
2774 missing = constants.ISPECS_PARAMETERS - frozenset(ret.keys())
2775 if required and missing:
2776 raise errors.OpPrereqError("Missing parameters in ipolicy spec %s: %s" %
2777 (keyname, utils.CommaJoin(missing)),
2778 errors.ECODE_INVAL)
2779 return ret
2780
2781
2782 def _GetISpecsInAllowedValues(minmax_ispecs, allowed_values):
2783 ret = None
2784 if (minmax_ispecs and allowed_values and len(minmax_ispecs) == 1 and
2785 len(minmax_ispecs[0]) == 1):
2786 for (key, spec) in minmax_ispecs[0].items():
2787 # This loop is executed exactly once
2788 if key in allowed_values and not spec:
2789 ret = key
2790 return ret
2791
2792
2793 def _InitISpecsFromFullOpts(ipolicy_out, minmax_ispecs, std_ispecs,
2794 group_ipolicy, allowed_values):
2795 found_allowed = _GetISpecsInAllowedValues(minmax_ispecs, allowed_values)
2796 if found_allowed is not None:
2797 ipolicy_out[constants.ISPECS_MINMAX] = found_allowed
2798 elif minmax_ispecs is not None:
2799 minmax_out = []
2800 for mmpair in minmax_ispecs:
2801 mmpair_out = {}
2802 for (key, spec) in mmpair.items():
2803 if key not in constants.ISPECS_MINMAX_KEYS:
2804 msg = "Invalid key in bounds instance specifications: %s" % key
2805 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
2806 mmpair_out[key] = _ParseISpec(spec, key, True)
2807 minmax_out.append(mmpair_out)
2808 ipolicy_out[constants.ISPECS_MINMAX] = minmax_out
2809 if std_ispecs is not None:
2810 assert not group_ipolicy # This is not an option for gnt-group
2811 ipolicy_out[constants.ISPECS_STD] = _ParseISpec(std_ispecs, "std", False)
2812
2813
2814 def CreateIPolicyFromOpts(ispecs_mem_size=None,
2815 ispecs_cpu_count=None,
2816 ispecs_disk_count=None,
2817 ispecs_disk_size=None,
2818 ispecs_nic_count=None,
2819 minmax_ispecs=None,
2820 std_ispecs=None,
2821 ipolicy_disk_templates=None,
2822 ipolicy_vcpu_ratio=None,
2823 ipolicy_spindle_ratio=None,
2824 group_ipolicy=False,
2825 allowed_values=None,
2826 fill_all=False):
2827 """Creation of instance policy based on command line options.
2828
2829 @param fill_all: whether for cluster policies we should ensure that
2830 all values are filled
2831
2832 """
2833 assert not (fill_all and allowed_values)
2834
2835 split_specs = (ispecs_mem_size or ispecs_cpu_count or ispecs_disk_count or
2836 ispecs_disk_size or ispecs_nic_count)
2837 if (split_specs and (minmax_ispecs is not None or std_ispecs is not None)):
2838 raise errors.OpPrereqError("A --specs-xxx option cannot be specified"
2839 " together with any --ipolicy-xxx-specs option",
2840 errors.ECODE_INVAL)
2841
2842 ipolicy_out = objects.MakeEmptyIPolicy()
2843 if split_specs:
2844 assert fill_all
2845 _InitISpecsFromSplitOpts(ipolicy_out, ispecs_mem_size, ispecs_cpu_count,
2846 ispecs_disk_count, ispecs_disk_size,
2847 ispecs_nic_count, group_ipolicy, fill_all)
2848 elif (minmax_ispecs is not None or std_ispecs is not None):
2849 _InitISpecsFromFullOpts(ipolicy_out, minmax_ispecs, std_ispecs,
2850 group_ipolicy, allowed_values)
2851
2852 if ipolicy_disk_templates is not None:
2853 if allowed_values and ipolicy_disk_templates in allowed_values:
2854 ipolicy_out[constants.IPOLICY_DTS] = ipolicy_disk_templates
2855 else:
2856 ipolicy_out[constants.IPOLICY_DTS] = list(ipolicy_disk_templates)
2857 if ipolicy_vcpu_ratio is not None:
2858 ipolicy_out[constants.IPOLICY_VCPU_RATIO] = ipolicy_vcpu_ratio
2859 if ipolicy_spindle_ratio is not None:
2860 ipolicy_out[constants.IPOLICY_SPINDLE_RATIO] = ipolicy_spindle_ratio
2861
2862 assert not (frozenset(ipolicy_out.keys()) - constants.IPOLICY_ALL_KEYS)
2863
2864 if not group_ipolicy and fill_all:
2865 ipolicy_out = objects.FillIPolicy(constants.IPOLICY_DEFAULTS, ipolicy_out)
2866
2867 return ipolicy_out
2868
2869
2870 def _NotAContainer(data):
2871 """ Checks whether the input is not a container data type.
2872
2873 @rtype: bool
2874
2875 """
2876 return not (isinstance(data, (list, dict, tuple)))
2877
2878
2879 def _GetAlignmentMapping(data):
2880 """ Returns info about alignment if present in an encoded ordered dictionary.
2881
2882 @type data: list of tuple
2883 @param data: The encoded ordered dictionary, as defined in
2884 L{_SerializeGenericInfo}.
2885 @rtype: dict of any to int
2886 @return: The dictionary mapping alignment groups to the maximum length of the
2887 dictionary key found in the group.
2888
2889 """
2890 alignment_map = {}
2891 for entry in data:
2892 if len(entry) > 2:
2893 group_key = entry[2]
2894 key_length = len(entry[0])
2895 if group_key in alignment_map:
2896 alignment_map[group_key] = max(alignment_map[group_key], key_length)
2897 else:
2898 alignment_map[group_key] = key_length
2899
2900 return alignment_map
2901
2902
2903 def _SerializeGenericInfo(buf, data, level, afterkey=False):
2904 """Formatting core of L{PrintGenericInfo}.
2905
2906 @param buf: (string) stream to accumulate the result into
2907 @param data: data to format
2908 @type level: int
2909 @param level: depth in the data hierarchy, used for indenting
2910 @type afterkey: bool
2911 @param afterkey: True when we are in the middle of a line after a key (used
2912 to properly add newlines or indentation)
2913
2914 """
2915 baseind = " "
2916 if isinstance(data, dict):
2917 if not data:
2918 buf.write("\n")
2919 else:
2920 if afterkey:
2921 buf.write("\n")
2922 doindent = True
2923 else:
2924 doindent = False
2925 for key in sorted(data):
2926 if doindent:
2927 buf.write(baseind * level)
2928 else:
2929 doindent = True
2930 buf.write(key)
2931 buf.write(": ")
2932 _SerializeGenericInfo(buf, data[key], level + 1, afterkey=True)
2933 elif isinstance(data, list) and len(data) > 0 and isinstance(data[0], tuple):
2934 # list of tuples (an ordered dictionary)
2935 # the tuples may have two or three members - key, value, and alignment group
2936 # if the alignment group is present, align all values sharing the same group
2937 if afterkey:
2938 buf.write("\n")
2939 doindent = True
2940 else:
2941 doindent = False
2942
2943 alignment_mapping = _GetAlignmentMapping(data)
2944 for entry in data:
2945 key, val = entry[0:2]
2946 if doindent:
2947 buf.write(baseind * level)
2948 else:
2949 doindent = True
2950 buf.write(key)
2951 buf.write(": ")
2952 if len(entry) > 2:
2953 max_key_length = alignment_mapping[entry[2]]
2954 buf.write(" " * (max_key_length - len(key)))
2955 _SerializeGenericInfo(buf, val, level + 1, afterkey=True)
2956 elif isinstance(data, tuple) and all(map(_NotAContainer, data)):
2957 # tuples with simple content are serialized as inline lists
2958 buf.write("[%s]\n" % utils.CommaJoin(data))
2959 elif isinstance(data, list) or isinstance(data, tuple):
2960 # lists and tuples
2961 if not data:
2962 buf.write("\n")
2963 else:
2964 if afterkey:
2965 buf.write("\n")
2966 doindent = True
2967 else:
2968 doindent = False
2969 for item in data:
2970 if doindent:
2971 buf.write(baseind * level)
2972 else:
2973 doindent = True
2974 buf.write("-")
2975 buf.write(baseind[1:])
2976 _SerializeGenericInfo(buf, item, level + 1)
2977 else:
2978 # This branch should be only taken for strings, but it's practically
2979 # impossible to guarantee that no other types are produced somewhere
2980 buf.write(str(data))
2981 buf.write("\n")
2982
2983
2984 def PrintGenericInfo(data):
2985 """Print information formatted according to the hierarchy.
2986
2987 The output is a valid YAML string.
2988
2989 @param data: the data to print. It's a hierarchical structure whose elements
2990 can be:
2991 - dictionaries, where keys are strings and values are of any of the
2992 types listed here
2993 - lists of tuples (key, value) or (key, value, alignment_group), where
2994 key is a string, value is of any of the types listed here, and
2995 alignment_group can be any hashable value; it's a way to encode
2996 ordered dictionaries; any entries sharing the same alignment group are
2997 aligned by appending whitespace before the value as needed
2998 - lists of any of the types listed here
2999 - strings
3000
3001 """
3002 buf = StringIO()
3003 _SerializeGenericInfo(buf, data, 0)
3004 ToStdout(buf.getvalue().rstrip("\n"))