Merge branch 'stable-2.12' into stable-2.13
[ganeti-github.git] / lib / cli.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Google Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions are
9 # met:
10 #
11 # 1. Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
13 #
14 # 2. Redistributions in binary form must reproduce the above copyright
15 # notice, this list of conditions and the following disclaimer in the
16 # documentation and/or other materials provided with the distribution.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
19 # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
20 # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30
31 """Module dealing with command line parsing"""
32
33
34 import sys
35 import textwrap
36 import os.path
37 import time
38 import logging
39 import errno
40 import itertools
41 import shlex
42 from cStringIO import StringIO
43
44 from ganeti import utils
45 from ganeti import errors
46 from ganeti import constants
47 from ganeti import opcodes
48 import ganeti.rpc.errors as rpcerr
49 import ganeti.rpc.node as rpc
50 from ganeti import ssh
51 from ganeti import compat
52 from ganeti import netutils
53 from ganeti import qlang
54 from ganeti import objects
55 from ganeti import pathutils
56 from ganeti import serializer
57 import ganeti.cli_opts
58 # Import constants
59 from ganeti.cli_opts import * # pylint: disable=W0401
60
61 from ganeti.runtime import (GetClient)
62
63 from optparse import (OptionParser, TitledHelpFormatter)
64
65
66 __all__ = [
67 # Generic functions for CLI programs
68 "ConfirmOperation",
69 "CreateIPolicyFromOpts",
70 "GenericMain",
71 "GenericInstanceCreate",
72 "GenericList",
73 "GenericListFields",
74 "GetClient",
75 "GetOnlineNodes",
76 "GetNodesSshPorts",
77 "GetNodeUUIDs",
78 "JobExecutor",
79 "JobSubmittedException",
80 "ParseTimespec",
81 "RunWhileClusterStopped",
82 "RunWhileDaemonsStopped",
83 "SubmitOpCode",
84 "SubmitOpCodeToDrainedQueue",
85 "SubmitOrSend",
86 "UsesRPC",
87 # Formatting functions
88 "ToStderr", "ToStdout",
89 "ToStdoutAndLoginfo",
90 "FormatError",
91 "FormatQueryResult",
92 "FormatParamsDictInfo",
93 "FormatPolicyInfo",
94 "PrintIPolicyCommand",
95 "PrintGenericInfo",
96 "GenerateTable",
97 "AskUser",
98 "FormatTimestamp",
99 "FormatLogMessage",
100 # Tags functions
101 "ListTags",
102 "AddTags",
103 "RemoveTags",
104 # command line options support infrastructure
105 "ARGS_MANY_INSTANCES",
106 "ARGS_MANY_NODES",
107 "ARGS_MANY_GROUPS",
108 "ARGS_MANY_NETWORKS",
109 "ARGS_MANY_FILTERS",
110 "ARGS_NONE",
111 "ARGS_ONE_INSTANCE",
112 "ARGS_ONE_NODE",
113 "ARGS_ONE_GROUP",
114 "ARGS_ONE_OS",
115 "ARGS_ONE_NETWORK",
116 "ARGS_ONE_FILTER",
117 "ArgChoice",
118 "ArgCommand",
119 "ArgFile",
120 "ArgGroup",
121 "ArgHost",
122 "ArgInstance",
123 "ArgJobId",
124 "ArgNetwork",
125 "ArgNode",
126 "ArgOs",
127 "ArgExtStorage",
128 "ArgFilter",
129 "ArgSuggest",
130 "ArgUnknown",
131 "FixHvParams",
132 "SplitNodeOption",
133 "CalculateOSNames",
134 "ParseFields",
135 ] + ganeti.cli_opts.__all__ # Command line options
136
137 # Query result status for clients
138 (QR_NORMAL,
139 QR_UNKNOWN,
140 QR_INCOMPLETE) = range(3)
141
142 #: Maximum batch size for ChooseJob
143 _CHOOSE_BATCH = 25
144
145
146 # constants used to create InstancePolicy dictionary
147 TISPECS_GROUP_TYPES = {
148 constants.ISPECS_MIN: constants.VTYPE_INT,
149 constants.ISPECS_MAX: constants.VTYPE_INT,
150 }
151
152 TISPECS_CLUSTER_TYPES = {
153 constants.ISPECS_MIN: constants.VTYPE_INT,
154 constants.ISPECS_MAX: constants.VTYPE_INT,
155 constants.ISPECS_STD: constants.VTYPE_INT,
156 }
157
158 #: User-friendly names for query2 field types
159 _QFT_NAMES = {
160 constants.QFT_UNKNOWN: "Unknown",
161 constants.QFT_TEXT: "Text",
162 constants.QFT_BOOL: "Boolean",
163 constants.QFT_NUMBER: "Number",
164 constants.QFT_NUMBER_FLOAT: "Floating-point number",
165 constants.QFT_UNIT: "Storage size",
166 constants.QFT_TIMESTAMP: "Timestamp",
167 constants.QFT_OTHER: "Custom",
168 }
169
170
171 class _Argument(object):
172 def __init__(self, min=0, max=None): # pylint: disable=W0622
173 self.min = min
174 self.max = max
175
176 def __repr__(self):
177 return ("<%s min=%s max=%s>" %
178 (self.__class__.__name__, self.min, self.max))
179
180
181 class ArgSuggest(_Argument):
182 """Suggesting argument.
183
184 Value can be any of the ones passed to the constructor.
185
186 """
187 # pylint: disable=W0622
188 def __init__(self, min=0, max=None, choices=None):
189 _Argument.__init__(self, min=min, max=max)
190 self.choices = choices
191
192 def __repr__(self):
193 return ("<%s min=%s max=%s choices=%r>" %
194 (self.__class__.__name__, self.min, self.max, self.choices))
195
196
197 class ArgChoice(ArgSuggest):
198 """Choice argument.
199
200 Value can be any of the ones passed to the constructor. Like L{ArgSuggest},
201 but value must be one of the choices.
202
203 """
204
205
206 class ArgUnknown(_Argument):
207 """Unknown argument to program (e.g. determined at runtime).
208
209 """
210
211
212 class ArgInstance(_Argument):
213 """Instances argument.
214
215 """
216
217
218 class ArgNode(_Argument):
219 """Node argument.
220
221 """
222
223
224 class ArgNetwork(_Argument):
225 """Network argument.
226
227 """
228
229
230 class ArgGroup(_Argument):
231 """Node group argument.
232
233 """
234
235
236 class ArgJobId(_Argument):
237 """Job ID argument.
238
239 """
240
241
242 class ArgFile(_Argument):
243 """File path argument.
244
245 """
246
247
248 class ArgCommand(_Argument):
249 """Command argument.
250
251 """
252
253
254 class ArgHost(_Argument):
255 """Host argument.
256
257 """
258
259
260 class ArgOs(_Argument):
261 """OS argument.
262
263 """
264
265
266 class ArgExtStorage(_Argument):
267 """ExtStorage argument.
268
269 """
270
271
272 class ArgFilter(_Argument):
273 """Filter UUID argument.
274
275 """
276
277
278 ARGS_NONE = []
279 ARGS_MANY_INSTANCES = [ArgInstance()]
280 ARGS_MANY_NETWORKS = [ArgNetwork()]
281 ARGS_MANY_NODES = [ArgNode()]
282 ARGS_MANY_GROUPS = [ArgGroup()]
283 ARGS_MANY_FILTERS = [ArgFilter()]
284 ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)]
285 ARGS_ONE_NETWORK = [ArgNetwork(min=1, max=1)]
286 ARGS_ONE_NODE = [ArgNode(min=1, max=1)]
287 ARGS_ONE_GROUP = [ArgGroup(min=1, max=1)]
288 ARGS_ONE_OS = [ArgOs(min=1, max=1)]
289 ARGS_ONE_FILTER = [ArgFilter(min=1, max=1)]
290
291
292 def _ExtractTagsObject(opts, args):
293 """Extract the tag type object.
294
295 Note that this function will modify its args parameter.
296
297 """
298 if not hasattr(opts, "tag_type"):
299 raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
300 kind = opts.tag_type
301 if kind == constants.TAG_CLUSTER:
302 retval = kind, ""
303 elif kind in (constants.TAG_NODEGROUP,
304 constants.TAG_NODE,
305 constants.TAG_NETWORK,
306 constants.TAG_INSTANCE):
307 if not args:
308 raise errors.OpPrereqError("no arguments passed to the command",
309 errors.ECODE_INVAL)
310 name = args.pop(0)
311 retval = kind, name
312 else:
313 raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
314 return retval
315
316
317 def _ExtendTags(opts, args):
318 """Extend the args if a source file has been given.
319
320 This function will extend the tags with the contents of the file
321 passed in the 'tags_source' attribute of the opts parameter. A file
322 named '-' will be replaced by stdin.
323
324 """
325 fname = opts.tags_source
326 if fname is None:
327 return
328 if fname == "-":
329 new_fh = sys.stdin
330 else:
331 new_fh = open(fname, "r")
332 new_data = []
333 try:
334 # we don't use the nice 'new_data = [line.strip() for line in fh]'
335 # because of python bug 1633941
336 while True:
337 line = new_fh.readline()
338 if not line:
339 break
340 new_data.append(line.strip())
341 finally:
342 new_fh.close()
343 args.extend(new_data)
344
345
346 def ListTags(opts, args):
347 """List the tags on a given object.
348
349 This is a generic implementation that knows how to deal with all
350 three cases of tag objects (cluster, node, instance). The opts
351 argument is expected to contain a tag_type field denoting what
352 object type we work on.
353
354 """
355 kind, name = _ExtractTagsObject(opts, args)
356 cl = GetClient()
357 result = cl.QueryTags(kind, name)
358 result = list(result)
359 result.sort()
360 for tag in result:
361 ToStdout(tag)
362
363
364 def AddTags(opts, args):
365 """Add tags on a given object.
366
367 This is a generic implementation that knows how to deal with all
368 three cases of tag objects (cluster, node, instance). The opts
369 argument is expected to contain a tag_type field denoting what
370 object type we work on.
371
372 """
373 kind, name = _ExtractTagsObject(opts, args)
374 _ExtendTags(opts, args)
375 if not args:
376 raise errors.OpPrereqError("No tags to be added", errors.ECODE_INVAL)
377 op = opcodes.OpTagsSet(kind=kind, name=name, tags=args)
378 SubmitOrSend(op, opts)
379
380
381 def RemoveTags(opts, args):
382 """Remove tags from a given object.
383
384 This is a generic implementation that knows how to deal with all
385 three cases of tag objects (cluster, node, instance). The opts
386 argument is expected to contain a tag_type field denoting what
387 object type we work on.
388
389 """
390 kind, name = _ExtractTagsObject(opts, args)
391 _ExtendTags(opts, args)
392 if not args:
393 raise errors.OpPrereqError("No tags to be removed", errors.ECODE_INVAL)
394 op = opcodes.OpTagsDel(kind=kind, name=name, tags=args)
395 SubmitOrSend(op, opts)
396
397
398 class _ShowUsage(Exception):
399 """Exception class for L{_ParseArgs}.
400
401 """
402 def __init__(self, exit_error):
403 """Initializes instances of this class.
404
405 @type exit_error: bool
406 @param exit_error: Whether to report failure on exit
407
408 """
409 Exception.__init__(self)
410 self.exit_error = exit_error
411
412
413 class _ShowVersion(Exception):
414 """Exception class for L{_ParseArgs}.
415
416 """
417
418
419 def _ParseArgs(binary, argv, commands, aliases, env_override):
420 """Parser for the command line arguments.
421
422 This function parses the arguments and returns the function which
423 must be executed together with its (modified) arguments.
424
425 @param binary: Script name
426 @param argv: Command line arguments
427 @param commands: Dictionary containing command definitions
428 @param aliases: dictionary with command aliases {"alias": "target", ...}
429 @param env_override: list of env variables allowed for default args
430 @raise _ShowUsage: If usage description should be shown
431 @raise _ShowVersion: If version should be shown
432
433 """
434 assert not (env_override - set(commands))
435 assert not (set(aliases.keys()) & set(commands.keys()))
436
437 if len(argv) > 1:
438 cmd = argv[1]
439 else:
440 # No option or command given
441 raise _ShowUsage(exit_error=True)
442
443 if cmd == "--version":
444 raise _ShowVersion()
445 elif cmd == "--help":
446 raise _ShowUsage(exit_error=False)
447 elif not (cmd in commands or cmd in aliases):
448 raise _ShowUsage(exit_error=True)
449
450 # get command, unalias it, and look it up in commands
451 if cmd in aliases:
452 if aliases[cmd] not in commands:
453 raise errors.ProgrammerError("Alias '%s' maps to non-existing"
454 " command '%s'" % (cmd, aliases[cmd]))
455
456 cmd = aliases[cmd]
457
458 if cmd in env_override:
459 args_env_name = ("%s_%s" % (binary.replace("-", "_"), cmd)).upper()
460 env_args = os.environ.get(args_env_name)
461 if env_args:
462 argv = utils.InsertAtPos(argv, 2, shlex.split(env_args))
463
464 func, args_def, parser_opts, usage, description = commands[cmd]
465 parser = OptionParser(option_list=parser_opts + COMMON_OPTS,
466 description=description,
467 formatter=TitledHelpFormatter(),
468 usage="%%prog %s %s" % (cmd, usage))
469 parser.disable_interspersed_args()
470 options, args = parser.parse_args(args=argv[2:])
471
472 if not _CheckArguments(cmd, args_def, args):
473 return None, None, None
474
475 return func, options, args
476
477
478 def _FormatUsage(binary, commands):
479 """Generates a nice description of all commands.
480
481 @param binary: Script name
482 @param commands: Dictionary containing command definitions
483
484 """
485 # compute the max line length for cmd + usage
486 mlen = min(60, max(map(len, commands)))
487
488 yield "Usage: %s {command} [options...] [argument...]" % binary
489 yield "%s <command> --help to see details, or man %s" % (binary, binary)
490 yield ""
491 yield "Commands:"
492
493 # and format a nice command list
494 for (cmd, (_, _, _, _, help_text)) in sorted(commands.items()):
495 help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
496 yield " %-*s - %s" % (mlen, cmd, help_lines.pop(0))
497 for line in help_lines:
498 yield " %-*s %s" % (mlen, "", line)
499
500 yield ""
501
502
503 def _CheckArguments(cmd, args_def, args):
504 """Verifies the arguments using the argument definition.
505
506 Algorithm:
507
508 1. Abort with error if values specified by user but none expected.
509
510 1. For each argument in definition
511
512 1. Keep running count of minimum number of values (min_count)
513 1. Keep running count of maximum number of values (max_count)
514 1. If it has an unlimited number of values
515
516 1. Abort with error if it's not the last argument in the definition
517
518 1. If last argument has limited number of values
519
520 1. Abort with error if number of values doesn't match or is too large
521
522 1. Abort with error if user didn't pass enough values (min_count)
523
524 """
525 if args and not args_def:
526 ToStderr("Error: Command %s expects no arguments", cmd)
527 return False
528
529 min_count = None
530 max_count = None
531 check_max = None
532
533 last_idx = len(args_def) - 1
534
535 for idx, arg in enumerate(args_def):
536 if min_count is None:
537 min_count = arg.min
538 elif arg.min is not None:
539 min_count += arg.min
540
541 if max_count is None:
542 max_count = arg.max
543 elif arg.max is not None:
544 max_count += arg.max
545
546 if idx == last_idx:
547 check_max = (arg.max is not None)
548
549 elif arg.max is None:
550 raise errors.ProgrammerError("Only the last argument can have max=None")
551
552 if check_max:
553 # Command with exact number of arguments
554 if (min_count is not None and max_count is not None and
555 min_count == max_count and len(args) != min_count):
556 ToStderr("Error: Command %s expects %d argument(s)", cmd, min_count)
557 return False
558
559 # Command with limited number of arguments
560 if max_count is not None and len(args) > max_count:
561 ToStderr("Error: Command %s expects only %d argument(s)",
562 cmd, max_count)
563 return False
564
565 # Command with some required arguments
566 if min_count is not None and len(args) < min_count:
567 ToStderr("Error: Command %s expects at least %d argument(s)",
568 cmd, min_count)
569 return False
570
571 return True
572
573
574 def SplitNodeOption(value):
575 """Splits the value of a --node option.
576
577 """
578 if value and ":" in value:
579 return value.split(":", 1)
580 else:
581 return (value, None)
582
583
584 def CalculateOSNames(os_name, os_variants):
585 """Calculates all the names an OS can be called, according to its variants.
586
587 @type os_name: string
588 @param os_name: base name of the os
589 @type os_variants: list or None
590 @param os_variants: list of supported variants
591 @rtype: list
592 @return: list of valid names
593
594 """
595 if os_variants:
596 return ["%s+%s" % (os_name, v) for v in os_variants]
597 else:
598 return [os_name]
599
600
601 def ParseFields(selected, default):
602 """Parses the values of "--field"-like options.
603
604 @type selected: string or None
605 @param selected: User-selected options
606 @type default: list
607 @param default: Default fields
608
609 """
610 if selected is None:
611 return default
612
613 if selected.startswith("+"):
614 return default + selected[1:].split(",")
615
616 return selected.split(",")
617
618
619 UsesRPC = rpc.RunWithRPC
620
621
622 def AskUser(text, choices=None):
623 """Ask the user a question.
624
625 @param text: the question to ask
626
627 @param choices: list with elements tuples (input_char, return_value,
628 description); if not given, it will default to: [('y', True,
629 'Perform the operation'), ('n', False, 'Do no do the operation')];
630 note that the '?' char is reserved for help
631
632 @return: one of the return values from the choices list; if input is
633 not possible (i.e. not running with a tty, we return the last
634 entry from the list
635
636 """
637 if choices is None:
638 choices = [("y", True, "Perform the operation"),
639 ("n", False, "Do not perform the operation")]
640 if not choices or not isinstance(choices, list):
641 raise errors.ProgrammerError("Invalid choices argument to AskUser")
642 for entry in choices:
643 if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == "?":
644 raise errors.ProgrammerError("Invalid choices element to AskUser")
645
646 answer = choices[-1][1]
647 new_text = []
648 for line in text.splitlines():
649 new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
650 text = "\n".join(new_text)
651 try:
652 f = file("/dev/tty", "a+")
653 except IOError:
654 return answer
655 try:
656 chars = [entry[0] for entry in choices]
657 chars[-1] = "[%s]" % chars[-1]
658 chars.append("?")
659 maps = dict([(entry[0], entry[1]) for entry in choices])
660 while True:
661 f.write(text)
662 f.write("\n")
663 f.write("/".join(chars))
664 f.write(": ")
665 line = f.readline(2).strip().lower()
666 if line in maps:
667 answer = maps[line]
668 break
669 elif line == "?":
670 for entry in choices:
671 f.write(" %s - %s\n" % (entry[0], entry[2]))
672 f.write("\n")
673 continue
674 finally:
675 f.close()
676 return answer
677
678
679 class JobSubmittedException(Exception):
680 """Job was submitted, client should exit.
681
682 This exception has one argument, the ID of the job that was
683 submitted. The handler should print this ID.
684
685 This is not an error, just a structured way to exit from clients.
686
687 """
688
689
690 def SendJob(ops, cl=None):
691 """Function to submit an opcode without waiting for the results.
692
693 @type ops: list
694 @param ops: list of opcodes
695 @type cl: luxi.Client
696 @param cl: the luxi client to use for communicating with the master;
697 if None, a new client will be created
698
699 """
700 if cl is None:
701 cl = GetClient()
702
703 job_id = cl.SubmitJob(ops)
704
705 return job_id
706
707
708 def GenericPollJob(job_id, cbs, report_cbs):
709 """Generic job-polling function.
710
711 @type job_id: number
712 @param job_id: Job ID
713 @type cbs: Instance of L{JobPollCbBase}
714 @param cbs: Data callbacks
715 @type report_cbs: Instance of L{JobPollReportCbBase}
716 @param report_cbs: Reporting callbacks
717
718 """
719 prev_job_info = None
720 prev_logmsg_serial = None
721
722 status = None
723
724 while True:
725 result = cbs.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
726 prev_logmsg_serial)
727 if not result:
728 # job not found, go away!
729 raise errors.JobLost("Job with id %s lost" % job_id)
730
731 if result == constants.JOB_NOTCHANGED:
732 report_cbs.ReportNotChanged(job_id, status)
733
734 # Wait again
735 continue
736
737 # Split result, a tuple of (field values, log entries)
738 (job_info, log_entries) = result
739 (status, ) = job_info
740
741 if log_entries:
742 for log_entry in log_entries:
743 (serial, timestamp, log_type, message) = log_entry
744 report_cbs.ReportLogMessage(job_id, serial, timestamp,
745 log_type, message)
746 prev_logmsg_serial = max(prev_logmsg_serial, serial)
747
748 # TODO: Handle canceled and archived jobs
749 elif status in (constants.JOB_STATUS_SUCCESS,
750 constants.JOB_STATUS_ERROR,
751 constants.JOB_STATUS_CANCELING,
752 constants.JOB_STATUS_CANCELED):
753 break
754
755 prev_job_info = job_info
756
757 jobs = cbs.QueryJobs([job_id], ["status", "opstatus", "opresult"])
758 if not jobs:
759 raise errors.JobLost("Job with id %s lost" % job_id)
760
761 status, opstatus, result = jobs[0]
762
763 if status == constants.JOB_STATUS_SUCCESS:
764 return result
765
766 if status in (constants.JOB_STATUS_CANCELING, constants.JOB_STATUS_CANCELED):
767 raise errors.OpExecError("Job was canceled")
768
769 has_ok = False
770 for idx, (status, msg) in enumerate(zip(opstatus, result)):
771 if status == constants.OP_STATUS_SUCCESS:
772 has_ok = True
773 elif status == constants.OP_STATUS_ERROR:
774 errors.MaybeRaise(msg)
775
776 if has_ok:
777 raise errors.OpExecError("partial failure (opcode %d): %s" %
778 (idx, msg))
779
780 raise errors.OpExecError(str(msg))
781
782 # default failure mode
783 raise errors.OpExecError(result)
784
785
786 class JobPollCbBase(object):
787 """Base class for L{GenericPollJob} callbacks.
788
789 """
790 def __init__(self):
791 """Initializes this class.
792
793 """
794
795 def WaitForJobChangeOnce(self, job_id, fields,
796 prev_job_info, prev_log_serial):
797 """Waits for changes on a job.
798
799 """
800 raise NotImplementedError()
801
802 def QueryJobs(self, job_ids, fields):
803 """Returns the selected fields for the selected job IDs.
804
805 @type job_ids: list of numbers
806 @param job_ids: Job IDs
807 @type fields: list of strings
808 @param fields: Fields
809
810 """
811 raise NotImplementedError()
812
813
814 class JobPollReportCbBase(object):
815 """Base class for L{GenericPollJob} reporting callbacks.
816
817 """
818 def __init__(self):
819 """Initializes this class.
820
821 """
822
823 def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
824 """Handles a log message.
825
826 """
827 raise NotImplementedError()
828
829 def ReportNotChanged(self, job_id, status):
830 """Called for if a job hasn't changed in a while.
831
832 @type job_id: number
833 @param job_id: Job ID
834 @type status: string or None
835 @param status: Job status if available
836
837 """
838 raise NotImplementedError()
839
840
841 class _LuxiJobPollCb(JobPollCbBase):
842 def __init__(self, cl):
843 """Initializes this class.
844
845 """
846 JobPollCbBase.__init__(self)
847 self.cl = cl
848
849 def WaitForJobChangeOnce(self, job_id, fields,
850 prev_job_info, prev_log_serial):
851 """Waits for changes on a job.
852
853 """
854 return self.cl.WaitForJobChangeOnce(job_id, fields,
855 prev_job_info, prev_log_serial)
856
857 def QueryJobs(self, job_ids, fields):
858 """Returns the selected fields for the selected job IDs.
859
860 """
861 return self.cl.QueryJobs(job_ids, fields)
862
863
864 class FeedbackFnJobPollReportCb(JobPollReportCbBase):
865 def __init__(self, feedback_fn):
866 """Initializes this class.
867
868 """
869 JobPollReportCbBase.__init__(self)
870
871 self.feedback_fn = feedback_fn
872
873 assert callable(feedback_fn)
874
875 def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
876 """Handles a log message.
877
878 """
879 self.feedback_fn((timestamp, log_type, log_msg))
880
881 def ReportNotChanged(self, job_id, status):
882 """Called if a job hasn't changed in a while.
883
884 """
885 # Ignore
886
887
888 class StdioJobPollReportCb(JobPollReportCbBase):
889 def __init__(self):
890 """Initializes this class.
891
892 """
893 JobPollReportCbBase.__init__(self)
894
895 self.notified_queued = False
896 self.notified_waitlock = False
897
898 def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
899 """Handles a log message.
900
901 """
902 ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)),
903 FormatLogMessage(log_type, log_msg))
904
905 def ReportNotChanged(self, job_id, status):
906 """Called if a job hasn't changed in a while.
907
908 """
909 if status is None:
910 return
911
912 if status == constants.JOB_STATUS_QUEUED and not self.notified_queued:
913 ToStderr("Job %s is waiting in queue", job_id)
914 self.notified_queued = True
915
916 elif status == constants.JOB_STATUS_WAITING and not self.notified_waitlock:
917 ToStderr("Job %s is trying to acquire all necessary locks", job_id)
918 self.notified_waitlock = True
919
920
921 def FormatLogMessage(log_type, log_msg):
922 """Formats a job message according to its type.
923
924 """
925 if log_type != constants.ELOG_MESSAGE:
926 log_msg = str(log_msg)
927
928 return utils.SafeEncode(log_msg)
929
930
931 def PollJob(job_id, cl=None, feedback_fn=None, reporter=None):
932 """Function to poll for the result of a job.
933
934 @type job_id: job identified
935 @param job_id: the job to poll for results
936 @type cl: luxi.Client
937 @param cl: the luxi client to use for communicating with the master;
938 if None, a new client will be created
939
940 """
941 if cl is None:
942 cl = GetClient()
943
944 if reporter is None:
945 if feedback_fn:
946 reporter = FeedbackFnJobPollReportCb(feedback_fn)
947 else:
948 reporter = StdioJobPollReportCb()
949 elif feedback_fn:
950 raise errors.ProgrammerError("Can't specify reporter and feedback function")
951
952 return GenericPollJob(job_id, _LuxiJobPollCb(cl), reporter)
953
954
955 def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None, reporter=None):
956 """Legacy function to submit an opcode.
957
958 This is just a simple wrapper over the construction of the processor
959 instance. It should be extended to better handle feedback and
960 interaction functions.
961
962 """
963 if cl is None:
964 cl = GetClient()
965
966 SetGenericOpcodeOpts([op], opts)
967
968 job_id = SendJob([op], cl=cl)
969 if hasattr(opts, "print_jobid") and opts.print_jobid:
970 ToStdout("%d" % job_id)
971
972 op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn,
973 reporter=reporter)
974
975 return op_results[0]
976
977
978 def SubmitOpCodeToDrainedQueue(op):
979 """Forcefully insert a job in the queue, even if it is drained.
980
981 """
982 cl = GetClient()
983 job_id = cl.SubmitJobToDrainedQueue([op])
984 op_results = PollJob(job_id, cl=cl)
985 return op_results[0]
986
987
988 def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
989 """Wrapper around SubmitOpCode or SendJob.
990
991 This function will decide, based on the 'opts' parameter, whether to
992 submit and wait for the result of the opcode (and return it), or
993 whether to just send the job and print its identifier. It is used in
994 order to simplify the implementation of the '--submit' option.
995
996 It will also process the opcodes if we're sending the via SendJob
997 (otherwise SubmitOpCode does it).
998
999 """
1000 if opts and opts.submit_only:
1001 job = [op]
1002 SetGenericOpcodeOpts(job, opts)
1003 job_id = SendJob(job, cl=cl)
1004 if opts.print_jobid:
1005 ToStdout("%d" % job_id)
1006 raise JobSubmittedException(job_id)
1007 else:
1008 return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn, opts=opts)
1009
1010
1011 def _InitReasonTrail(op, opts):
1012 """Builds the first part of the reason trail
1013
1014 Builds the initial part of the reason trail, adding the user provided reason
1015 (if it exists) and the name of the command starting the operation.
1016
1017 @param op: the opcode the reason trail will be added to
1018 @param opts: the command line options selected by the user
1019
1020 """
1021 assert len(sys.argv) >= 2
1022 trail = []
1023
1024 if opts.reason:
1025 trail.append((constants.OPCODE_REASON_SRC_USER,
1026 opts.reason,
1027 utils.EpochNano()))
1028
1029 binary = os.path.basename(sys.argv[0])
1030 source = "%s:%s" % (constants.OPCODE_REASON_SRC_CLIENT, binary)
1031 command = sys.argv[1]
1032 trail.append((source, command, utils.EpochNano()))
1033 op.reason = trail
1034
1035
1036 def SetGenericOpcodeOpts(opcode_list, options):
1037 """Processor for generic options.
1038
1039 This function updates the given opcodes based on generic command
1040 line options (like debug, dry-run, etc.).
1041
1042 @param opcode_list: list of opcodes
1043 @param options: command line options or None
1044 @return: None (in-place modification)
1045
1046 """
1047 if not options:
1048 return
1049 for op in opcode_list:
1050 op.debug_level = options.debug
1051 if hasattr(options, "dry_run"):
1052 op.dry_run = options.dry_run
1053 if getattr(options, "priority", None) is not None:
1054 op.priority = options.priority
1055 _InitReasonTrail(op, options)
1056
1057
1058 def FormatError(err):
1059 """Return a formatted error message for a given error.
1060
1061 This function takes an exception instance and returns a tuple
1062 consisting of two values: first, the recommended exit code, and
1063 second, a string describing the error message (not
1064 newline-terminated).
1065
1066 """
1067 retcode = 1
1068 obuf = StringIO()
1069 msg = str(err)
1070 if isinstance(err, errors.ConfigurationError):
1071 txt = "Corrupt configuration file: %s" % msg
1072 logging.error(txt)
1073 obuf.write(txt + "\n")
1074 obuf.write("Aborting.")
1075 retcode = 2
1076 elif isinstance(err, errors.HooksAbort):
1077 obuf.write("Failure: hooks execution failed:\n")
1078 for node, script, out in err.args[0]:
1079 if out:
1080 obuf.write(" node: %s, script: %s, output: %s\n" %
1081 (node, script, out))
1082 else:
1083 obuf.write(" node: %s, script: %s (no output)\n" %
1084 (node, script))
1085 elif isinstance(err, errors.HooksFailure):
1086 obuf.write("Failure: hooks general failure: %s" % msg)
1087 elif isinstance(err, errors.ResolverError):
1088 this_host = netutils.Hostname.GetSysName()
1089 if err.args[0] == this_host:
1090 msg = "Failure: can't resolve my own hostname ('%s')"
1091 else:
1092 msg = "Failure: can't resolve hostname '%s'"
1093 obuf.write(msg % err.args[0])
1094 elif isinstance(err, errors.OpPrereqError):
1095 if len(err.args) == 2:
1096 obuf.write("Failure: prerequisites not met for this"
1097 " operation:\nerror type: %s, error details:\n%s" %
1098 (err.args[1], err.args[0]))
1099 else:
1100 obuf.write("Failure: prerequisites not met for this"
1101 " operation:\n%s" % msg)
1102 elif isinstance(err, errors.OpExecError):
1103 obuf.write("Failure: command execution error:\n%s" % msg)
1104 elif isinstance(err, errors.TagError):
1105 obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
1106 elif isinstance(err, errors.JobQueueDrainError):
1107 obuf.write("Failure: the job queue is marked for drain and doesn't"
1108 " accept new requests\n")
1109 elif isinstance(err, errors.JobQueueFull):
1110 obuf.write("Failure: the job queue is full and doesn't accept new"
1111 " job submissions until old jobs are archived\n")
1112 elif isinstance(err, errors.TypeEnforcementError):
1113 obuf.write("Parameter Error: %s" % msg)
1114 elif isinstance(err, errors.ParameterError):
1115 obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
1116 elif isinstance(err, rpcerr.NoMasterError):
1117 if err.args[0] == pathutils.MASTER_SOCKET:
1118 daemon = "the master daemon"
1119 elif err.args[0] == pathutils.QUERY_SOCKET:
1120 daemon = "the config daemon"
1121 else:
1122 daemon = "socket '%s'" % str(err.args[0])
1123 obuf.write("Cannot communicate with %s.\nIs the process running"
1124 " and listening for connections?" % daemon)
1125 elif isinstance(err, rpcerr.TimeoutError):
1126 obuf.write("Timeout while talking to the master daemon. Jobs might have"
1127 " been submitted and will continue to run even if the call"
1128 " timed out. Useful commands in this situation are \"gnt-job"
1129 " list\", \"gnt-job cancel\" and \"gnt-job watch\". Error:\n")
1130 obuf.write(msg)
1131 elif isinstance(err, rpcerr.PermissionError):
1132 obuf.write("It seems you don't have permissions to connect to the"
1133 " master daemon.\nPlease retry as a different user.")
1134 elif isinstance(err, rpcerr.ProtocolError):
1135 obuf.write("Unhandled protocol error while talking to the master daemon:\n"
1136 "%s" % msg)
1137 elif isinstance(err, errors.JobLost):
1138 obuf.write("Error checking job status: %s" % msg)
1139 elif isinstance(err, errors.QueryFilterParseError):
1140 obuf.write("Error while parsing query filter: %s\n" % err.args[0])
1141 obuf.write("\n".join(err.GetDetails()))
1142 elif isinstance(err, errors.GenericError):
1143 obuf.write("Unhandled Ganeti error: %s" % msg)
1144 elif isinstance(err, JobSubmittedException):
1145 obuf.write("JobID: %s\n" % err.args[0])
1146 retcode = 0
1147 else:
1148 obuf.write("Unhandled exception: %s" % msg)
1149 return retcode, obuf.getvalue().rstrip("\n")
1150
1151
1152 def GenericMain(commands, override=None, aliases=None,
1153 env_override=frozenset()):
1154 """Generic main function for all the gnt-* commands.
1155
1156 @param commands: a dictionary with a special structure, see the design doc
1157 for command line handling.
1158 @param override: if not None, we expect a dictionary with keys that will
1159 override command line options; this can be used to pass
1160 options from the scripts to generic functions
1161 @param aliases: dictionary with command aliases {'alias': 'target, ...}
1162 @param env_override: list of environment names which are allowed to submit
1163 default args for commands
1164
1165 """
1166 # save the program name and the entire command line for later logging
1167 if sys.argv:
1168 binary = os.path.basename(sys.argv[0])
1169 if not binary:
1170 binary = sys.argv[0]
1171
1172 if len(sys.argv) >= 2:
1173 logname = utils.ShellQuoteArgs([binary, sys.argv[1]])
1174 else:
1175 logname = binary
1176
1177 cmdline = utils.ShellQuoteArgs([binary] + sys.argv[1:])
1178 else:
1179 binary = "<unknown program>"
1180 cmdline = "<unknown>"
1181
1182 if aliases is None:
1183 aliases = {}
1184
1185 try:
1186 (func, options, args) = _ParseArgs(binary, sys.argv, commands, aliases,
1187 env_override)
1188 except _ShowVersion:
1189 ToStdout("%s (ganeti %s) %s", binary, constants.VCS_VERSION,
1190 constants.RELEASE_VERSION)
1191 return constants.EXIT_SUCCESS
1192 except _ShowUsage, err:
1193 for line in _FormatUsage(binary, commands):
1194 ToStdout(line)
1195
1196 if err.exit_error:
1197 return constants.EXIT_FAILURE
1198 else:
1199 return constants.EXIT_SUCCESS
1200 except errors.ParameterError, err:
1201 result, err_msg = FormatError(err)
1202 ToStderr(err_msg)
1203 return 1
1204
1205 if func is None: # parse error
1206 return 1
1207
1208 if override is not None:
1209 for key, val in override.iteritems():
1210 setattr(options, key, val)
1211
1212 utils.SetupLogging(pathutils.LOG_COMMANDS, logname, debug=options.debug,
1213 stderr_logging=True)
1214
1215 logging.debug("Command line: %s", cmdline)
1216
1217 try:
1218 result = func(options, args)
1219 except (errors.GenericError, rpcerr.ProtocolError,
1220 JobSubmittedException), err:
1221 result, err_msg = FormatError(err)
1222 logging.exception("Error during command processing")
1223 ToStderr(err_msg)
1224 except KeyboardInterrupt:
1225 result = constants.EXIT_FAILURE
1226 ToStderr("Aborted. Note that if the operation created any jobs, they"
1227 " might have been submitted and"
1228 " will continue to run in the background.")
1229 except IOError, err:
1230 if err.errno == errno.EPIPE:
1231 # our terminal went away, we'll exit
1232 sys.exit(constants.EXIT_FAILURE)
1233 else:
1234 raise
1235
1236 return result
1237
1238
1239 def ParseNicOption(optvalue):
1240 """Parses the value of the --net option(s).
1241
1242 """
1243 try:
1244 nic_max = max(int(nidx[0]) + 1 for nidx in optvalue)
1245 except (TypeError, ValueError), err:
1246 raise errors.OpPrereqError("Invalid NIC index passed: %s" % str(err),
1247 errors.ECODE_INVAL)
1248
1249 nics = [{}] * nic_max
1250 for nidx, ndict in optvalue:
1251 nidx = int(nidx)
1252
1253 if not isinstance(ndict, dict):
1254 raise errors.OpPrereqError("Invalid nic/%d value: expected dict,"
1255 " got %s" % (nidx, ndict), errors.ECODE_INVAL)
1256
1257 utils.ForceDictType(ndict, constants.INIC_PARAMS_TYPES)
1258
1259 nics[nidx] = ndict
1260
1261 return nics
1262
1263
1264 def FixHvParams(hvparams):
1265 # In Ganeti 2.8.4 the separator for the usb_devices hvparam was changed from
1266 # comma to space because commas cannot be accepted on the command line
1267 # (they already act as the separator between different hvparams). Still,
1268 # RAPI should be able to accept commas for backwards compatibility.
1269 # Therefore, we convert spaces into commas here, and we keep the old
1270 # parsing logic everywhere else.
1271 try:
1272 new_usb_devices = hvparams[constants.HV_USB_DEVICES].replace(" ", ",")
1273 hvparams[constants.HV_USB_DEVICES] = new_usb_devices
1274 except KeyError:
1275 #No usb_devices, no modification required
1276 pass
1277
1278
1279 def GenericInstanceCreate(mode, opts, args):
1280 """Add an instance to the cluster via either creation or import.
1281
1282 @param mode: constants.INSTANCE_CREATE or constants.INSTANCE_IMPORT
1283 @param opts: the command line options selected by the user
1284 @type args: list
1285 @param args: should contain only one element, the new instance name
1286 @rtype: int
1287 @return: the desired exit code
1288
1289 """
1290 instance = args[0]
1291
1292 (pnode, snode) = SplitNodeOption(opts.node)
1293
1294 hypervisor = None
1295 hvparams = {}
1296 if opts.hypervisor:
1297 hypervisor, hvparams = opts.hypervisor
1298
1299 if opts.nics:
1300 nics = ParseNicOption(opts.nics)
1301 elif opts.no_nics:
1302 # no nics
1303 nics = []
1304 elif mode == constants.INSTANCE_CREATE:
1305 # default of one nic, all auto
1306 nics = [{}]
1307 else:
1308 # mode == import
1309 nics = []
1310
1311 if opts.disk_template == constants.DT_DISKLESS:
1312 if opts.disks or opts.sd_size is not None:
1313 raise errors.OpPrereqError("Diskless instance but disk"
1314 " information passed", errors.ECODE_INVAL)
1315 disks = []
1316 else:
1317 if (not opts.disks and not opts.sd_size
1318 and mode == constants.INSTANCE_CREATE):
1319 raise errors.OpPrereqError("No disk information specified",
1320 errors.ECODE_INVAL)
1321 if opts.disks and opts.sd_size is not None:
1322 raise errors.OpPrereqError("Please use either the '--disk' or"
1323 " '-s' option", errors.ECODE_INVAL)
1324 if opts.sd_size is not None:
1325 opts.disks = [(0, {constants.IDISK_SIZE: opts.sd_size})]
1326
1327 if opts.disks:
1328 try:
1329 disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
1330 except ValueError, err:
1331 raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err),
1332 errors.ECODE_INVAL)
1333 disks = [{}] * disk_max
1334 else:
1335 disks = []
1336 for didx, ddict in opts.disks:
1337 didx = int(didx)
1338 if not isinstance(ddict, dict):
1339 msg = "Invalid disk/%d value: expected dict, got %s" % (didx, ddict)
1340 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
1341 elif constants.IDISK_SIZE in ddict:
1342 if constants.IDISK_ADOPT in ddict:
1343 raise errors.OpPrereqError("Only one of 'size' and 'adopt' allowed"
1344 " (disk %d)" % didx, errors.ECODE_INVAL)
1345 try:
1346 ddict[constants.IDISK_SIZE] = \
1347 utils.ParseUnit(ddict[constants.IDISK_SIZE])
1348 except ValueError, err:
1349 raise errors.OpPrereqError("Invalid disk size for disk %d: %s" %
1350 (didx, err), errors.ECODE_INVAL)
1351 elif constants.IDISK_ADOPT in ddict:
1352 if constants.IDISK_SPINDLES in ddict:
1353 raise errors.OpPrereqError("spindles is not a valid option when"
1354 " adopting a disk", errors.ECODE_INVAL)
1355 if mode == constants.INSTANCE_IMPORT:
1356 raise errors.OpPrereqError("Disk adoption not allowed for instance"
1357 " import", errors.ECODE_INVAL)
1358 ddict[constants.IDISK_SIZE] = 0
1359 else:
1360 raise errors.OpPrereqError("Missing size or adoption source for"
1361 " disk %d" % didx, errors.ECODE_INVAL)
1362 if constants.IDISK_SPINDLES in ddict:
1363 ddict[constants.IDISK_SPINDLES] = int(ddict[constants.IDISK_SPINDLES])
1364
1365 disks[didx] = ddict
1366
1367 if opts.tags is not None:
1368 tags = opts.tags.split(",")
1369 else:
1370 tags = []
1371
1372 utils.ForceDictType(opts.beparams, constants.BES_PARAMETER_COMPAT)
1373 utils.ForceDictType(hvparams, constants.HVS_PARAMETER_TYPES)
1374 FixHvParams(hvparams)
1375
1376 osparams_private = opts.osparams_private or serializer.PrivateDict()
1377 osparams_secret = opts.osparams_secret or serializer.PrivateDict()
1378
1379 helper_startup_timeout = opts.helper_startup_timeout
1380 helper_shutdown_timeout = opts.helper_shutdown_timeout
1381
1382 if mode == constants.INSTANCE_CREATE:
1383 start = opts.start
1384 os_type = opts.os
1385 force_variant = opts.force_variant
1386 src_node = None
1387 src_path = None
1388 no_install = opts.no_install
1389 identify_defaults = False
1390 compress = constants.IEC_NONE
1391 if opts.instance_communication is None:
1392 instance_communication = False
1393 else:
1394 instance_communication = opts.instance_communication
1395 elif mode == constants.INSTANCE_IMPORT:
1396 start = False
1397 os_type = None
1398 force_variant = False
1399 src_node = opts.src_node
1400 src_path = opts.src_dir
1401 no_install = None
1402 identify_defaults = opts.identify_defaults
1403 compress = opts.compress
1404 instance_communication = False
1405 else:
1406 raise errors.ProgrammerError("Invalid creation mode %s" % mode)
1407
1408 op = opcodes.OpInstanceCreate(
1409 instance_name=instance,
1410 disks=disks,
1411 disk_template=opts.disk_template,
1412 group_name=opts.nodegroup,
1413 nics=nics,
1414 conflicts_check=opts.conflicts_check,
1415 pnode=pnode, snode=snode,
1416 ip_check=opts.ip_check,
1417 name_check=opts.name_check,
1418 wait_for_sync=opts.wait_for_sync,
1419 file_storage_dir=opts.file_storage_dir,
1420 file_driver=opts.file_driver,
1421 iallocator=opts.iallocator,
1422 hypervisor=hypervisor,
1423 hvparams=hvparams,
1424 beparams=opts.beparams,
1425 osparams=opts.osparams,
1426 osparams_private=osparams_private,
1427 osparams_secret=osparams_secret,
1428 mode=mode,
1429 opportunistic_locking=opts.opportunistic_locking,
1430 start=start,
1431 os_type=os_type,
1432 force_variant=force_variant,
1433 src_node=src_node,
1434 src_path=src_path,
1435 compress=compress,
1436 tags=tags,
1437 no_install=no_install,
1438 identify_defaults=identify_defaults,
1439 ignore_ipolicy=opts.ignore_ipolicy,
1440 instance_communication=instance_communication,
1441 helper_startup_timeout=helper_startup_timeout,
1442 helper_shutdown_timeout=helper_shutdown_timeout)
1443
1444 SubmitOrSend(op, opts)
1445 return 0
1446
1447
1448 class _RunWhileDaemonsStoppedHelper(object):
1449 """Helper class for L{RunWhileDaemonsStopped} to simplify state management
1450
1451 """
1452 def __init__(self, feedback_fn, cluster_name, master_node,
1453 online_nodes, ssh_ports, exclude_daemons, debug,
1454 verbose):
1455 """Initializes this class.
1456
1457 @type feedback_fn: callable
1458 @param feedback_fn: Feedback function
1459 @type cluster_name: string
1460 @param cluster_name: Cluster name
1461 @type master_node: string
1462 @param master_node Master node name
1463 @type online_nodes: list
1464 @param online_nodes: List of names of online nodes
1465 @type ssh_ports: list
1466 @param ssh_ports: List of SSH ports of online nodes
1467 @type exclude_daemons: list of string
1468 @param exclude_daemons: list of daemons that will be restarted on master
1469 after all others are shutdown
1470 @type debug: boolean
1471 @param debug: show debug output
1472 @type verbose: boolesn
1473 @param verbose: show verbose output
1474
1475 """
1476 self.feedback_fn = feedback_fn
1477 self.cluster_name = cluster_name
1478 self.master_node = master_node
1479 self.online_nodes = online_nodes
1480 self.ssh_ports = dict(zip(online_nodes, ssh_ports))
1481
1482 self.ssh = ssh.SshRunner(self.cluster_name)
1483
1484 self.nonmaster_nodes = [name for name in online_nodes
1485 if name != master_node]
1486
1487 self.exclude_daemons = exclude_daemons
1488 self.debug = debug
1489 self.verbose = verbose
1490
1491 assert self.master_node not in self.nonmaster_nodes
1492
1493 def _RunCmd(self, node_name, cmd):
1494 """Runs a command on the local or a remote machine.
1495
1496 @type node_name: string
1497 @param node_name: Machine name
1498 @type cmd: list
1499 @param cmd: Command
1500
1501 """
1502 if node_name is None or node_name == self.master_node:
1503 # No need to use SSH
1504 result = utils.RunCmd(cmd)
1505 else:
1506 result = self.ssh.Run(node_name, constants.SSH_LOGIN_USER,
1507 utils.ShellQuoteArgs(cmd),
1508 port=self.ssh_ports[node_name])
1509
1510 if result.failed:
1511 errmsg = ["Failed to run command %s" % result.cmd]
1512 if node_name:
1513 errmsg.append("on node %s" % node_name)
1514 errmsg.append(": exitcode %s and error %s" %
1515 (result.exit_code, result.output))
1516 raise errors.OpExecError(" ".join(errmsg))
1517
1518 def Call(self, fn, *args):
1519 """Call function while all daemons are stopped.
1520
1521 @type fn: callable
1522 @param fn: Function to be called
1523
1524 """
1525 # Pause watcher by acquiring an exclusive lock on watcher state file
1526 self.feedback_fn("Blocking watcher")
1527 watcher_block = utils.FileLock.Open(pathutils.WATCHER_LOCK_FILE)
1528 try:
1529 # TODO: Currently, this just blocks. There's no timeout.
1530 # TODO: Should it be a shared lock?
1531 watcher_block.Exclusive(blocking=True)
1532
1533 # Stop master daemons, so that no new jobs can come in and all running
1534 # ones are finished
1535 self.feedback_fn("Stopping master daemons")
1536 self._RunCmd(None, [pathutils.DAEMON_UTIL, "stop-master"])
1537 try:
1538 # Stop daemons on all nodes
1539 online_nodes = [self.master_node] + [n for n in self.online_nodes
1540 if n != self.master_node]
1541 for node_name in online_nodes:
1542 self.feedback_fn("Stopping daemons on %s" % node_name)
1543 self._RunCmd(node_name, [pathutils.DAEMON_UTIL, "stop-all"])
1544 # Starting any daemons listed as exception
1545 if node_name == self.master_node:
1546 for daemon in self.exclude_daemons:
1547 self.feedback_fn("Starting daemon '%s' on %s" % (daemon,
1548 node_name))
1549 self._RunCmd(node_name, [pathutils.DAEMON_UTIL, "start", daemon])
1550
1551 # All daemons are shut down now
1552 try:
1553 return fn(self, *args)
1554 except Exception, err:
1555 _, errmsg = FormatError(err)
1556 logging.exception("Caught exception")
1557 self.feedback_fn(errmsg)
1558 raise
1559 finally:
1560 # Start cluster again, master node last
1561 for node_name in self.nonmaster_nodes + [self.master_node]:
1562 # Stopping any daemons listed as exception.
1563 # This might look unnecessary, but it makes sure that daemon-util
1564 # starts all daemons in the right order.
1565 if node_name == self.master_node:
1566 self.exclude_daemons.reverse()
1567 for daemon in self.exclude_daemons:
1568 self.feedback_fn("Stopping daemon '%s' on %s" % (daemon,
1569 node_name))
1570 self._RunCmd(node_name, [pathutils.DAEMON_UTIL, "stop", daemon])
1571 self.feedback_fn("Starting daemons on %s" % node_name)
1572 self._RunCmd(node_name, [pathutils.DAEMON_UTIL, "start-all"])
1573
1574 finally:
1575 # Resume watcher
1576 watcher_block.Close()
1577
1578
1579 def RunWhileDaemonsStopped(feedback_fn, exclude_daemons, fn, *args, **kwargs):
1580 """Calls a function while all cluster daemons are stopped.
1581
1582 @type feedback_fn: callable
1583 @param feedback_fn: Feedback function
1584 @type exclude_daemons: list of string
1585 @param exclude_daemons: list of daemons that stopped, but immediately
1586 restarted on the master to be available when calling
1587 'fn'. If None, all daemons will be stopped and none
1588 will be started before calling 'fn'.
1589 @type fn: callable
1590 @param fn: Function to be called when daemons are stopped
1591
1592 """
1593 feedback_fn("Gathering cluster information")
1594
1595 # This ensures we're running on the master daemon
1596 cl = GetClient()
1597
1598 (cluster_name, master_node) = \
1599 cl.QueryConfigValues(["cluster_name", "master_node"])
1600
1601 online_nodes = GetOnlineNodes([], cl=cl)
1602 ssh_ports = GetNodesSshPorts(online_nodes, cl)
1603
1604 # Don't keep a reference to the client. The master daemon will go away.
1605 del cl
1606
1607 assert master_node in online_nodes
1608 if exclude_daemons is None:
1609 exclude_daemons = []
1610
1611 debug = kwargs.get("debug", False)
1612 verbose = kwargs.get("verbose", False)
1613
1614 return _RunWhileDaemonsStoppedHelper(
1615 feedback_fn, cluster_name, master_node, online_nodes, ssh_ports,
1616 exclude_daemons, debug, verbose).Call(fn, *args)
1617
1618
1619 def RunWhileClusterStopped(feedback_fn, fn, *args):
1620 """Calls a function while all cluster daemons are stopped.
1621
1622 @type feedback_fn: callable
1623 @param feedback_fn: Feedback function
1624 @type fn: callable
1625 @param fn: Function to be called when daemons are stopped
1626
1627 """
1628 RunWhileDaemonsStopped(feedback_fn, None, fn, *args)
1629
1630
1631 def GenerateTable(headers, fields, separator, data,
1632 numfields=None, unitfields=None,
1633 units=None):
1634 """Prints a table with headers and different fields.
1635
1636 @type headers: dict
1637 @param headers: dictionary mapping field names to headers for
1638 the table
1639 @type fields: list
1640 @param fields: the field names corresponding to each row in
1641 the data field
1642 @param separator: the separator to be used; if this is None,
1643 the default 'smart' algorithm is used which computes optimal
1644 field width, otherwise just the separator is used between
1645 each field
1646 @type data: list
1647 @param data: a list of lists, each sublist being one row to be output
1648 @type numfields: list
1649 @param numfields: a list with the fields that hold numeric
1650 values and thus should be right-aligned
1651 @type unitfields: list
1652 @param unitfields: a list with the fields that hold numeric
1653 values that should be formatted with the units field
1654 @type units: string or None
1655 @param units: the units we should use for formatting, or None for
1656 automatic choice (human-readable for non-separator usage, otherwise
1657 megabytes); this is a one-letter string
1658
1659 """
1660 if units is None:
1661 if separator:
1662 units = "m"
1663 else:
1664 units = "h"
1665
1666 if numfields is None:
1667 numfields = []
1668 if unitfields is None:
1669 unitfields = []
1670
1671 numfields = utils.FieldSet(*numfields) # pylint: disable=W0142
1672 unitfields = utils.FieldSet(*unitfields) # pylint: disable=W0142
1673
1674 format_fields = []
1675 for field in fields:
1676 if headers and field not in headers:
1677 # TODO: handle better unknown fields (either revert to old
1678 # style of raising exception, or deal more intelligently with
1679 # variable fields)
1680 headers[field] = field
1681 if separator is not None:
1682 format_fields.append("%s")
1683 elif numfields.Matches(field):
1684 format_fields.append("%*s")
1685 else:
1686 format_fields.append("%-*s")
1687
1688 if separator is None:
1689 mlens = [0 for name in fields]
1690 format_str = " ".join(format_fields)
1691 else:
1692 format_str = separator.replace("%", "%%").join(format_fields)
1693
1694 for row in data:
1695 if row is None:
1696 continue
1697 for idx, val in enumerate(row):
1698 if unitfields.Matches(fields[idx]):
1699 try:
1700 val = int(val)
1701 except (TypeError, ValueError):
1702 pass
1703 else:
1704 val = row[idx] = utils.FormatUnit(val, units)
1705 val = row[idx] = str(val)
1706 if separator is None:
1707 mlens[idx] = max(mlens[idx], len(val))
1708
1709 result = []
1710 if headers:
1711 args = []
1712 for idx, name in enumerate(fields):
1713 hdr = headers[name]
1714 if separator is None:
1715 mlens[idx] = max(mlens[idx], len(hdr))
1716 args.append(mlens[idx])
1717 args.append(hdr)
1718 result.append(format_str % tuple(args))
1719
1720 if separator is None:
1721 assert len(mlens) == len(fields)
1722
1723 if fields and not numfields.Matches(fields[-1]):
1724 mlens[-1] = 0
1725
1726 for line in data:
1727 args = []
1728 if line is None:
1729 line = ["-" for _ in fields]
1730 for idx in range(len(fields)):
1731 if separator is None:
1732 args.append(mlens[idx])
1733 args.append(line[idx])
1734 result.append(format_str % tuple(args))
1735
1736 return result
1737
1738
1739 def _FormatBool(value):
1740 """Formats a boolean value as a string.
1741
1742 """
1743 if value:
1744 return "Y"
1745 return "N"
1746
1747
1748 #: Default formatting for query results; (callback, align right)
1749 _DEFAULT_FORMAT_QUERY = {
1750 constants.QFT_TEXT: (str, False),
1751 constants.QFT_BOOL: (_FormatBool, False),
1752 constants.QFT_NUMBER: (str, True),
1753 constants.QFT_NUMBER_FLOAT: (str, True),
1754 constants.QFT_TIMESTAMP: (utils.FormatTime, False),
1755 constants.QFT_OTHER: (str, False),
1756 constants.QFT_UNKNOWN: (str, False),
1757 }
1758
1759
1760 def _GetColumnFormatter(fdef, override, unit):
1761 """Returns formatting function for a field.
1762
1763 @type fdef: L{objects.QueryFieldDefinition}
1764 @type override: dict
1765 @param override: Dictionary for overriding field formatting functions,
1766 indexed by field name, contents like L{_DEFAULT_FORMAT_QUERY}
1767 @type unit: string
1768 @param unit: Unit used for formatting fields of type L{constants.QFT_UNIT}
1769 @rtype: tuple; (callable, bool)
1770 @return: Returns the function to format a value (takes one parameter) and a
1771 boolean for aligning the value on the right-hand side
1772
1773 """
1774 fmt = override.get(fdef.name, None)
1775 if fmt is not None:
1776 return fmt
1777
1778 assert constants.QFT_UNIT not in _DEFAULT_FORMAT_QUERY
1779
1780 if fdef.kind == constants.QFT_UNIT:
1781 # Can't keep this information in the static dictionary
1782 return (lambda value: utils.FormatUnit(value, unit), True)
1783
1784 fmt = _DEFAULT_FORMAT_QUERY.get(fdef.kind, None)
1785 if fmt is not None:
1786 return fmt
1787
1788 raise NotImplementedError("Can't format column type '%s'" % fdef.kind)
1789
1790
1791 class _QueryColumnFormatter(object):
1792 """Callable class for formatting fields of a query.
1793
1794 """
1795 def __init__(self, fn, status_fn, verbose):
1796 """Initializes this class.
1797
1798 @type fn: callable
1799 @param fn: Formatting function
1800 @type status_fn: callable
1801 @param status_fn: Function to report fields' status
1802 @type verbose: boolean
1803 @param verbose: whether to use verbose field descriptions or not
1804
1805 """
1806 self._fn = fn
1807 self._status_fn = status_fn
1808 self._verbose = verbose
1809
1810 def __call__(self, data):
1811 """Returns a field's string representation.
1812
1813 """
1814 (status, value) = data
1815
1816 # Report status
1817 self._status_fn(status)
1818
1819 if status == constants.RS_NORMAL:
1820 return self._fn(value)
1821
1822 assert value is None, \
1823 "Found value %r for abnormal status %s" % (value, status)
1824
1825 return FormatResultError(status, self._verbose)
1826
1827
1828 def FormatResultError(status, verbose):
1829 """Formats result status other than L{constants.RS_NORMAL}.
1830
1831 @param status: The result status
1832 @type verbose: boolean
1833 @param verbose: Whether to return the verbose text
1834 @return: Text of result status
1835
1836 """
1837 assert status != constants.RS_NORMAL, \
1838 "FormatResultError called with status equal to constants.RS_NORMAL"
1839 try:
1840 (verbose_text, normal_text) = constants.RSS_DESCRIPTION[status]
1841 except KeyError:
1842 raise NotImplementedError("Unknown status %s" % status)
1843 else:
1844 if verbose:
1845 return verbose_text
1846 return normal_text
1847
1848
1849 def FormatQueryResult(result, unit=None, format_override=None, separator=None,
1850 header=False, verbose=False):
1851 """Formats data in L{objects.QueryResponse}.
1852
1853 @type result: L{objects.QueryResponse}
1854 @param result: result of query operation
1855 @type unit: string
1856 @param unit: Unit used for formatting fields of type L{constants.QFT_UNIT},
1857 see L{utils.text.FormatUnit}
1858 @type format_override: dict
1859 @param format_override: Dictionary for overriding field formatting functions,
1860 indexed by field name, contents like L{_DEFAULT_FORMAT_QUERY}
1861 @type separator: string or None
1862 @param separator: String used to separate fields
1863 @type header: bool
1864 @param header: Whether to output header row
1865 @type verbose: boolean
1866 @param verbose: whether to use verbose field descriptions or not
1867
1868 """
1869 if unit is None:
1870 if separator:
1871 unit = "m"
1872 else:
1873 unit = "h"
1874
1875 if format_override is None:
1876 format_override = {}
1877
1878 stats = dict.fromkeys(constants.RS_ALL, 0)
1879
1880 def _RecordStatus(status):
1881 if status in stats:
1882 stats[status] += 1
1883
1884 columns = []
1885 for fdef in result.fields:
1886 assert fdef.title and fdef.name
1887 (fn, align_right) = _GetColumnFormatter(fdef, format_override, unit)
1888 columns.append(TableColumn(fdef.title,
1889 _QueryColumnFormatter(fn, _RecordStatus,
1890 verbose),
1891 align_right))
1892
1893 table = FormatTable(result.data, columns, header, separator)
1894
1895 # Collect statistics
1896 assert len(stats) == len(constants.RS_ALL)
1897 assert compat.all(count >= 0 for count in stats.values())
1898
1899 # Determine overall status. If there was no data, unknown fields must be
1900 # detected via the field definitions.
1901 if (stats[constants.RS_UNKNOWN] or
1902 (not result.data and _GetUnknownFields(result.fields))):
1903 status = QR_UNKNOWN
1904 elif compat.any(count > 0 for key, count in stats.items()
1905 if key != constants.RS_NORMAL):
1906 status = QR_INCOMPLETE
1907 else:
1908 status = QR_NORMAL
1909
1910 return (status, table)
1911
1912
1913 def _GetUnknownFields(fdefs):
1914 """Returns list of unknown fields included in C{fdefs}.
1915
1916 @type fdefs: list of L{objects.QueryFieldDefinition}
1917
1918 """
1919 return [fdef for fdef in fdefs
1920 if fdef.kind == constants.QFT_UNKNOWN]
1921
1922
1923 def _WarnUnknownFields(fdefs):
1924 """Prints a warning to stderr if a query included unknown fields.
1925
1926 @type fdefs: list of L{objects.QueryFieldDefinition}
1927
1928 """
1929 unknown = _GetUnknownFields(fdefs)
1930 if unknown:
1931 ToStderr("Warning: Queried for unknown fields %s",
1932 utils.CommaJoin(fdef.name for fdef in unknown))
1933 return True
1934
1935 return False
1936
1937
1938 def GenericList(resource, fields, names, unit, separator, header, cl=None,
1939 format_override=None, verbose=False, force_filter=False,
1940 namefield=None, qfilter=None, isnumeric=False):
1941 """Generic implementation for listing all items of a resource.
1942
1943 @param resource: One of L{constants.QR_VIA_LUXI}
1944 @type fields: list of strings
1945 @param fields: List of fields to query for
1946 @type names: list of strings
1947 @param names: Names of items to query for
1948 @type unit: string or None
1949 @param unit: Unit used for formatting fields of type L{constants.QFT_UNIT} or
1950 None for automatic choice (human-readable for non-separator usage,
1951 otherwise megabytes); this is a one-letter string
1952 @type separator: string or None
1953 @param separator: String used to separate fields
1954 @type header: bool
1955 @param header: Whether to show header row
1956 @type force_filter: bool
1957 @param force_filter: Whether to always treat names as filter
1958 @type format_override: dict
1959 @param format_override: Dictionary for overriding field formatting functions,
1960 indexed by field name, contents like L{_DEFAULT_FORMAT_QUERY}
1961 @type verbose: boolean
1962 @param verbose: whether to use verbose field descriptions or not
1963 @type namefield: string
1964 @param namefield: Name of field to use for simple filters (see
1965 L{qlang.MakeFilter} for details)
1966 @type qfilter: list or None
1967 @param qfilter: Query filter (in addition to names)
1968 @param isnumeric: bool
1969 @param isnumeric: Whether the namefield's type is numeric, and therefore
1970 any simple filters built by namefield should use integer values to
1971 reflect that
1972
1973 """
1974 if not names:
1975 names = None
1976
1977 namefilter = qlang.MakeFilter(names, force_filter, namefield=namefield,
1978 isnumeric=isnumeric)
1979
1980 if qfilter is None:
1981 qfilter = namefilter
1982 elif namefilter is not None:
1983 qfilter = [qlang.OP_AND, namefilter, qfilter]
1984
1985 if cl is None:
1986 cl = GetClient()
1987
1988 response = cl.Query(resource, fields, qfilter)
1989
1990 found_unknown = _WarnUnknownFields(response.fields)
1991
1992 (status, data) = FormatQueryResult(response, unit=unit, separator=separator,
1993 header=header,
1994 format_override=format_override,
1995 verbose=verbose)
1996
1997 for line in data:
1998 ToStdout(line)
1999
2000 assert ((found_unknown and status == QR_UNKNOWN) or
2001 (not found_unknown and status != QR_UNKNOWN))
2002
2003 if status == QR_UNKNOWN:
2004 return constants.EXIT_UNKNOWN_FIELD
2005
2006 # TODO: Should the list command fail if not all data could be collected?
2007 return constants.EXIT_SUCCESS
2008
2009
2010 def _FieldDescValues(fdef):
2011 """Helper function for L{GenericListFields} to get query field description.
2012
2013 @type fdef: L{objects.QueryFieldDefinition}
2014 @rtype: list
2015
2016 """
2017 return [
2018 fdef.name,
2019 _QFT_NAMES.get(fdef.kind, fdef.kind),
2020 fdef.title,
2021 fdef.doc,
2022 ]
2023
2024
2025 def GenericListFields(resource, fields, separator, header, cl=None):
2026 """Generic implementation for listing fields for a resource.
2027
2028 @param resource: One of L{constants.QR_VIA_LUXI}
2029 @type fields: list of strings
2030 @param fields: List of fields to query for
2031 @type separator: string or None
2032 @param separator: String used to separate fields
2033 @type header: bool
2034 @param header: Whether to show header row
2035
2036 """
2037 if cl is None:
2038 cl = GetClient()
2039
2040 if not fields:
2041 fields = None
2042
2043 response = cl.QueryFields(resource, fields)
2044
2045 found_unknown = _WarnUnknownFields(response.fields)
2046
2047 columns = [
2048 TableColumn("Name", str, False),
2049 TableColumn("Type", str, False),
2050 TableColumn("Title", str, False),
2051 TableColumn("Description", str, False),
2052 ]
2053
2054 rows = map(_FieldDescValues, response.fields)
2055
2056 for line in FormatTable(rows, columns, header, separator):
2057 ToStdout(line)
2058
2059 if found_unknown:
2060 return constants.EXIT_UNKNOWN_FIELD
2061
2062 return constants.EXIT_SUCCESS
2063
2064
2065 class TableColumn(object):
2066 """Describes a column for L{FormatTable}.
2067
2068 """
2069 def __init__(self, title, fn, align_right):
2070 """Initializes this class.
2071
2072 @type title: string
2073 @param title: Column title
2074 @type fn: callable
2075 @param fn: Formatting function
2076 @type align_right: bool
2077 @param align_right: Whether to align values on the right-hand side
2078
2079 """
2080 self.title = title
2081 self.format = fn
2082 self.align_right = align_right
2083
2084
2085 def _GetColFormatString(width, align_right):
2086 """Returns the format string for a field.
2087
2088 """
2089 if align_right:
2090 sign = ""
2091 else:
2092 sign = "-"
2093
2094 return "%%%s%ss" % (sign, width)
2095
2096
2097 def FormatTable(rows, columns, header, separator):
2098 """Formats data as a table.
2099
2100 @type rows: list of lists
2101 @param rows: Row data, one list per row
2102 @type columns: list of L{TableColumn}
2103 @param columns: Column descriptions
2104 @type header: bool
2105 @param header: Whether to show header row
2106 @type separator: string or None
2107 @param separator: String used to separate columns
2108
2109 """
2110 if header:
2111 data = [[col.title for col in columns]]
2112 colwidth = [len(col.title) for col in columns]
2113 else:
2114 data = []
2115 colwidth = [0 for _ in columns]
2116
2117 # Format row data
2118 for row in rows:
2119 assert len(row) == len(columns)
2120
2121 formatted = [col.format(value) for value, col in zip(row, columns)]
2122
2123 if separator is None:
2124 # Update column widths
2125 for idx, (oldwidth, value) in enumerate(zip(colwidth, formatted)):
2126 # Modifying a list's items while iterating is fine
2127 colwidth[idx] = max(oldwidth, len(value))
2128
2129 data.append(formatted)
2130
2131 if separator is not None:
2132 # Return early if a separator is used
2133 return [separator.join(row) for row in data]
2134
2135 if columns and not columns[-1].align_right:
2136 # Avoid unnecessary spaces at end of line
2137 colwidth[-1] = 0
2138
2139 # Build format string
2140 fmt = " ".join([_GetColFormatString(width, col.align_right)
2141 for col, width in zip(columns, colwidth)])
2142
2143 return [fmt % tuple(row) for row in data]
2144
2145
2146 def FormatTimestamp(ts):
2147 """Formats a given timestamp.
2148
2149 @type ts: timestamp
2150 @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
2151
2152 @rtype: string
2153 @return: a string with the formatted timestamp
2154
2155 """
2156 if not isinstance(ts, (tuple, list)) or len(ts) != 2:
2157 return "?"
2158
2159 (sec, usecs) = ts
2160 return utils.FormatTime(sec, usecs=usecs)
2161
2162
2163 def ParseTimespec(value):
2164 """Parse a time specification.
2165
2166 The following suffixed will be recognized:
2167
2168 - s: seconds
2169 - m: minutes
2170 - h: hours
2171 - d: day
2172 - w: weeks
2173
2174 Without any suffix, the value will be taken to be in seconds.
2175
2176 """
2177 value = str(value)
2178 if not value:
2179 raise errors.OpPrereqError("Empty time specification passed",
2180 errors.ECODE_INVAL)
2181 suffix_map = {
2182 "s": 1,
2183 "m": 60,
2184 "h": 3600,
2185 "d": 86400,
2186 "w": 604800,
2187 }
2188 if value[-1] not in suffix_map:
2189 try:
2190 value = int(value)
2191 except (TypeError, ValueError):
2192 raise errors.OpPrereqError("Invalid time specification '%s'" % value,
2193 errors.ECODE_INVAL)
2194 else:
2195 multiplier = suffix_map[value[-1]]
2196 value = value[:-1]
2197 if not value: # no data left after stripping the suffix
2198 raise errors.OpPrereqError("Invalid time specification (only"
2199 " suffix passed)", errors.ECODE_INVAL)
2200 try:
2201 value = int(value) * multiplier
2202 except (TypeError, ValueError):
2203 raise errors.OpPrereqError("Invalid time specification '%s'" % value,
2204 errors.ECODE_INVAL)
2205 return value
2206
2207
2208 def GetOnlineNodes(nodes, cl=None, nowarn=False, secondary_ips=False,
2209 filter_master=False, nodegroup=None):
2210 """Returns the names of online nodes.
2211
2212 This function will also log a warning on stderr with the names of
2213 the online nodes.
2214
2215 @param nodes: if not empty, use only this subset of nodes (minus the
2216 offline ones)
2217 @param cl: if not None, luxi client to use
2218 @type nowarn: boolean
2219 @param nowarn: by default, this function will output a note with the
2220 offline nodes that are skipped; if this parameter is True the
2221 note is not displayed
2222 @type secondary_ips: boolean
2223 @param secondary_ips: if True, return the secondary IPs instead of the
2224 names, useful for doing network traffic over the replication interface
2225 (if any)
2226 @type filter_master: boolean
2227 @param filter_master: if True, do not return the master node in the list
2228 (useful in coordination with secondary_ips where we cannot check our
2229 node name against the list)
2230 @type nodegroup: string
2231 @param nodegroup: If set, only return nodes in this node group
2232
2233 """
2234 if cl is None:
2235 cl = GetClient()
2236
2237 qfilter = []
2238
2239 if nodes:
2240 qfilter.append(qlang.MakeSimpleFilter("name", nodes))
2241
2242 if nodegroup is not None:
2243 qfilter.append([qlang.OP_OR, [qlang.OP_EQUAL, "group", nodegroup],
2244 [qlang.OP_EQUAL, "group.uuid", nodegroup]])
2245
2246 if filter_master:
2247 qfilter.append([qlang.OP_NOT, [qlang.OP_TRUE, "master"]])
2248
2249 if qfilter:
2250 if len(qfilter) > 1:
2251 final_filter = [qlang.OP_AND] + qfilter
2252 else:
2253 assert len(qfilter) == 1
2254 final_filter = qfilter[0]
2255 else:
2256 final_filter = None
2257
2258 result = cl.Query(constants.QR_NODE, ["name", "offline", "sip"], final_filter)
2259
2260 def _IsOffline(row):
2261 (_, (_, offline), _) = row
2262 return offline
2263
2264 def _GetName(row):
2265 ((_, name), _, _) = row
2266 return name
2267
2268 def _GetSip(row):
2269 (_, _, (_, sip)) = row
2270 return sip
2271
2272 (offline, online) = compat.partition(result.data, _IsOffline)
2273
2274 if offline and not nowarn:
2275 ToStderr("Note: skipping offline node(s): %s" %
2276 utils.CommaJoin(map(_GetName, offline)))
2277
2278 if secondary_ips:
2279 fn = _GetSip
2280 else:
2281 fn = _GetName
2282
2283 return map(fn, online)
2284
2285
2286 def GetNodesSshPorts(nodes, cl):
2287 """Retrieves SSH ports of given nodes.
2288
2289 @param nodes: the names of nodes
2290 @type nodes: a list of strings
2291 @param cl: a client to use for the query
2292 @type cl: L{ganeti.luxi.Client}
2293 @return: the list of SSH ports corresponding to the nodes
2294 @rtype: a list of tuples
2295
2296 """
2297 return map(lambda t: t[0],
2298 cl.QueryNodes(names=nodes,
2299 fields=["ndp/ssh_port"],
2300 use_locking=False))
2301
2302
2303 def GetNodeUUIDs(nodes, cl):
2304 """Retrieves the UUIDs of given nodes.
2305
2306 @param nodes: the names of nodes
2307 @type nodes: a list of string
2308 @param cl: a client to use for the query
2309 @type cl: L{ganeti.luxi.Client}
2310 @return: the list of UUIDs corresponding to the nodes
2311 @rtype: a list of tuples
2312
2313 """
2314 return map(lambda t: t[0],
2315 cl.QueryNodes(names=nodes,
2316 fields=["uuid"],
2317 use_locking=False))
2318
2319
2320 def _ToStream(stream, txt, *args):
2321 """Write a message to a stream, bypassing the logging system
2322
2323 @type stream: file object
2324 @param stream: the file to which we should write
2325 @type txt: str
2326 @param txt: the message
2327
2328 """
2329 try:
2330 if args:
2331 args = tuple(args)
2332 stream.write(txt % args)
2333 else:
2334 stream.write(txt)
2335 stream.write("\n")
2336 stream.flush()
2337 except IOError, err:
2338 if err.errno == errno.EPIPE:
2339 # our terminal went away, we'll exit
2340 sys.exit(constants.EXIT_FAILURE)
2341 else:
2342 raise
2343
2344
2345 def ToStdout(txt, *args):
2346 """Write a message to stdout only, bypassing the logging system
2347
2348 This is just a wrapper over _ToStream.
2349
2350 @type txt: str
2351 @param txt: the message
2352
2353 """
2354 _ToStream(sys.stdout, txt, *args)
2355
2356
2357 def ToStdoutAndLoginfo(txt, *args):
2358 """Write a message to stdout and additionally log it at INFO level"""
2359 ToStdout(txt, *args)
2360 logging.info(txt, *args)
2361
2362
2363 def ToStderr(txt, *args):
2364 """Write a message to stderr only, bypassing the logging system
2365
2366 This is just a wrapper over _ToStream.
2367
2368 @type txt: str
2369 @param txt: the message
2370
2371 """
2372 _ToStream(sys.stderr, txt, *args)
2373
2374
2375 class JobExecutor(object):
2376 """Class which manages the submission and execution of multiple jobs.
2377
2378 Note that instances of this class should not be reused between
2379 GetResults() calls.
2380
2381 """
2382 def __init__(self, cl=None, verbose=True, opts=None, feedback_fn=None):
2383 self.queue = []
2384 if cl is None:
2385 cl = GetClient()
2386 self.cl = cl
2387 self.verbose = verbose
2388 self.jobs = []
2389 self.opts = opts
2390 self.feedback_fn = feedback_fn
2391 self._counter = itertools.count()
2392
2393 @staticmethod
2394 def _IfName(name, fmt):
2395 """Helper function for formatting name.
2396
2397 """
2398 if name:
2399 return fmt % name
2400
2401 return ""
2402
2403 def QueueJob(self, name, *ops):
2404 """Record a job for later submit.
2405
2406 @type name: string
2407 @param name: a description of the job, will be used in WaitJobSet
2408
2409 """
2410 SetGenericOpcodeOpts(ops, self.opts)
2411 self.queue.append((self._counter.next(), name, ops))
2412
2413 def AddJobId(self, name, status, job_id):
2414 """Adds a job ID to the internal queue.
2415
2416 """
2417 self.jobs.append((self._counter.next(), status, job_id, name))
2418
2419 def SubmitPending(self, each=False):
2420 """Submit all pending jobs.
2421
2422 """
2423 if each:
2424 results = []
2425 for (_, _, ops) in self.queue:
2426 # SubmitJob will remove the success status, but raise an exception if
2427 # the submission fails, so we'll notice that anyway.
2428 results.append([True, self.cl.SubmitJob(ops)[0]])
2429 else:
2430 results = self.cl.SubmitManyJobs([ops for (_, _, ops) in self.queue])
2431 for ((status, data), (idx, name, _)) in zip(results, self.queue):
2432 self.jobs.append((idx, status, data, name))
2433
2434 def _ChooseJob(self):
2435 """Choose a non-waiting/queued job to poll next.
2436
2437 """
2438 assert self.jobs, "_ChooseJob called with empty job list"
2439
2440 result = self.cl.QueryJobs([i[2] for i in self.jobs[:_CHOOSE_BATCH]],
2441 ["status"])
2442 assert result
2443
2444 for job_data, status in zip(self.jobs, result):
2445 if (isinstance(status, list) and status and
2446 status[0] in (constants.JOB_STATUS_QUEUED,
2447 constants.JOB_STATUS_WAITING,
2448 constants.JOB_STATUS_CANCELING)):
2449 # job is still present and waiting
2450 continue
2451 # good candidate found (either running job or lost job)
2452 self.jobs.remove(job_data)
2453 return job_data
2454
2455 # no job found
2456 return self.jobs.pop(0)
2457
2458 def GetResults(self):
2459 """Wait for and return the results of all jobs.
2460
2461 @rtype: list
2462 @return: list of tuples (success, job results), in the same order
2463 as the submitted jobs; if a job has failed, instead of the result
2464 there will be the error message
2465
2466 """
2467 if not self.jobs:
2468 self.SubmitPending()
2469 results = []
2470 if self.verbose:
2471 ok_jobs = [row[2] for row in self.jobs if row[1]]
2472 if ok_jobs:
2473 ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
2474
2475 # first, remove any non-submitted jobs
2476 self.jobs, failures = compat.partition(self.jobs, lambda x: x[1])
2477 for idx, _, jid, name in failures:
2478 ToStderr("Failed to submit job%s: %s", self._IfName(name, " for %s"), jid)
2479 results.append((idx, False, jid))
2480
2481 while self.jobs:
2482 (idx, _, jid, name) = self._ChooseJob()
2483 ToStdout("Waiting for job %s%s ...", jid, self._IfName(name, " for %s"))
2484 try:
2485 job_result = PollJob(jid, cl=self.cl, feedback_fn=self.feedback_fn)
2486 success = True
2487 except errors.JobLost, err:
2488 _, job_result = FormatError(err)
2489 ToStderr("Job %s%s has been archived, cannot check its result",
2490 jid, self._IfName(name, " for %s"))
2491 success = False
2492 except (errors.GenericError, rpcerr.ProtocolError), err:
2493 _, job_result = FormatError(err)
2494 success = False
2495 # the error message will always be shown, verbose or not
2496 ToStderr("Job %s%s has failed: %s",
2497 jid, self._IfName(name, " for %s"), job_result)
2498
2499 results.append((idx, success, job_result))
2500
2501 # sort based on the index, then drop it
2502 results.sort()
2503 results = [i[1:] for i in results]
2504
2505 return results
2506
2507 def WaitOrShow(self, wait):
2508 """Wait for job results or only print the job IDs.
2509
2510 @type wait: boolean
2511 @param wait: whether to wait or not
2512
2513 """
2514 if wait:
2515 return self.GetResults()
2516 else:
2517 if not self.jobs:
2518 self.SubmitPending()
2519 for _, status, result, name in self.jobs:
2520 if status:
2521 ToStdout("%s: %s", result, name)
2522 else:
2523 ToStderr("Failure for %s: %s", name, result)
2524 return [row[1:3] for row in self.jobs]
2525
2526
2527 def FormatParamsDictInfo(param_dict, actual, roman=False):
2528 """Formats a parameter dictionary.
2529
2530 @type param_dict: dict
2531 @param param_dict: the own parameters
2532 @type actual: dict
2533 @param actual: the current parameter set (including defaults)
2534 @rtype: dict
2535 @return: dictionary where the value of each parameter is either a fully
2536 formatted string or a dictionary containing formatted strings
2537
2538 """
2539 ret = {}
2540 for (key, data) in actual.items():
2541 if isinstance(data, dict) and data:
2542 ret[key] = FormatParamsDictInfo(param_dict.get(key, {}), data, roman)
2543 else:
2544 default_str = "default (%s)" % compat.TryToRoman(data, roman)
2545 ret[key] = str(compat.TryToRoman(param_dict.get(key, default_str), roman))
2546 return ret
2547
2548
2549 def _FormatListInfoDefault(data, def_data):
2550 if data is not None:
2551 ret = utils.CommaJoin(data)
2552 else:
2553 ret = "default (%s)" % utils.CommaJoin(def_data)
2554 return ret
2555
2556
2557 def FormatPolicyInfo(custom_ipolicy, eff_ipolicy, iscluster, roman=False):
2558 """Formats an instance policy.
2559
2560 @type custom_ipolicy: dict
2561 @param custom_ipolicy: own policy
2562 @type eff_ipolicy: dict
2563 @param eff_ipolicy: effective policy (including defaults); ignored for
2564 cluster
2565 @type iscluster: bool
2566 @param iscluster: the policy is at cluster level
2567 @type roman: bool
2568 @param roman: whether to print the values in roman numerals
2569 @rtype: list of pairs
2570 @return: formatted data, suitable for L{PrintGenericInfo}
2571
2572 """
2573 if iscluster:
2574 eff_ipolicy = custom_ipolicy
2575
2576 minmax_out = []
2577 custom_minmax = custom_ipolicy.get(constants.ISPECS_MINMAX)
2578 if custom_minmax:
2579 for (k, minmax) in enumerate(custom_minmax):
2580 minmax_out.append([
2581 ("%s/%s" % (key, k),
2582 FormatParamsDictInfo(minmax[key], minmax[key], roman))
2583 for key in constants.ISPECS_MINMAX_KEYS
2584 ])
2585 else:
2586 for (k, minmax) in enumerate(eff_ipolicy[constants.ISPECS_MINMAX]):
2587 minmax_out.append([
2588 ("%s/%s" % (key, k),
2589 FormatParamsDictInfo({}, minmax[key], roman))
2590 for key in constants.ISPECS_MINMAX_KEYS
2591 ])
2592 ret = [("bounds specs", minmax_out)]
2593
2594 if iscluster:
2595 stdspecs = custom_ipolicy[constants.ISPECS_STD]
2596 ret.append(
2597 (constants.ISPECS_STD,
2598 FormatParamsDictInfo(stdspecs, stdspecs, roman))
2599 )
2600
2601 ret.append(
2602 ("allowed disk templates",
2603 _FormatListInfoDefault(custom_ipolicy.get(constants.IPOLICY_DTS),
2604 eff_ipolicy[constants.IPOLICY_DTS]))
2605 )
2606 to_roman = compat.TryToRoman
2607 ret.extend([
2608 (key, str(to_roman(custom_ipolicy.get(key,
2609 "default (%s)" % eff_ipolicy[key]),
2610 roman)))
2611 for key in constants.IPOLICY_PARAMETERS
2612 ])
2613 return ret
2614
2615
2616 def _PrintSpecsParameters(buf, specs):
2617 values = ("%s=%s" % (par, val) for (par, val) in sorted(specs.items()))
2618 buf.write(",".join(values))
2619
2620
2621 def PrintIPolicyCommand(buf, ipolicy, isgroup):
2622 """Print the command option used to generate the given instance policy.
2623
2624 Currently only the parts dealing with specs are supported.
2625
2626 @type buf: StringIO
2627 @param buf: stream to write into
2628 @type ipolicy: dict
2629 @param ipolicy: instance policy
2630 @type isgroup: bool
2631 @param isgroup: whether the policy is at group level
2632
2633 """
2634 if not isgroup:
2635 stdspecs = ipolicy.get("std")
2636 if stdspecs:
2637 buf.write(" %s " % IPOLICY_STD_SPECS_STR)
2638 _PrintSpecsParameters(buf, stdspecs)
2639 minmaxes = ipolicy.get("minmax", [])
2640 first = True
2641 for minmax in minmaxes:
2642 minspecs = minmax.get("min")
2643 maxspecs = minmax.get("max")
2644 if minspecs and maxspecs:
2645 if first:
2646 buf.write(" %s " % IPOLICY_BOUNDS_SPECS_STR)
2647 first = False
2648 else:
2649 buf.write("//")
2650 buf.write("min:")
2651 _PrintSpecsParameters(buf, minspecs)
2652 buf.write("/max:")
2653 _PrintSpecsParameters(buf, maxspecs)
2654
2655
2656 def ConfirmOperation(names, list_type, text, extra=""):
2657 """Ask the user to confirm an operation on a list of list_type.
2658
2659 This function is used to request confirmation for doing an operation
2660 on a given list of list_type.
2661
2662 @type names: list
2663 @param names: the list of names that we display when
2664 we ask for confirmation
2665 @type list_type: str
2666 @param list_type: Human readable name for elements in the list (e.g. nodes)
2667 @type text: str
2668 @param text: the operation that the user should confirm
2669 @rtype: boolean
2670 @return: True or False depending on user's confirmation.
2671
2672 """
2673 count = len(names)
2674 msg = ("The %s will operate on %d %s.\n%s"
2675 "Do you want to continue?" % (text, count, list_type, extra))
2676 affected = (("\nAffected %s:\n" % list_type) +
2677 "\n".join([" %s" % name for name in names]))
2678
2679 choices = [("y", True, "Yes, execute the %s" % text),
2680 ("n", False, "No, abort the %s" % text)]
2681
2682 if count > 20:
2683 choices.insert(1, ("v", "v", "View the list of affected %s" % list_type))
2684 question = msg
2685 else:
2686 question = msg + affected
2687
2688 choice = AskUser(question, choices)
2689 if choice == "v":
2690 choices.pop(1)
2691 choice = AskUser(msg + affected, choices)
2692 return choice
2693
2694
2695 def _MaybeParseUnit(elements):
2696 """Parses and returns an array of potential values with units.
2697
2698 """
2699 parsed = {}
2700 for k, v in elements.items():
2701 if v == constants.VALUE_DEFAULT:
2702 parsed[k] = v
2703 else:
2704 parsed[k] = utils.ParseUnit(v)
2705 return parsed
2706
2707
2708 def _InitISpecsFromSplitOpts(ipolicy, ispecs_mem_size, ispecs_cpu_count,
2709 ispecs_disk_count, ispecs_disk_size,
2710 ispecs_nic_count, group_ipolicy, fill_all):
2711 try:
2712 if ispecs_mem_size:
2713 ispecs_mem_size = _MaybeParseUnit(ispecs_mem_size)
2714 if ispecs_disk_size:
2715 ispecs_disk_size = _MaybeParseUnit(ispecs_disk_size)
2716 except (TypeError, ValueError, errors.UnitParseError), err:
2717 raise errors.OpPrereqError("Invalid disk (%s) or memory (%s) size"
2718 " in policy: %s" %
2719 (ispecs_disk_size, ispecs_mem_size, err),
2720 errors.ECODE_INVAL)
2721
2722 # prepare ipolicy dict
2723 ispecs_transposed = {
2724 constants.ISPEC_MEM_SIZE: ispecs_mem_size,
2725 constants.ISPEC_CPU_COUNT: ispecs_cpu_count,
2726 constants.ISPEC_DISK_COUNT: ispecs_disk_count,
2727 constants.ISPEC_DISK_SIZE: ispecs_disk_size,
2728 constants.ISPEC_NIC_COUNT: ispecs_nic_count,
2729 }
2730
2731 # first, check that the values given are correct
2732 if group_ipolicy:
2733 forced_type = TISPECS_GROUP_TYPES
2734 else:
2735 forced_type = TISPECS_CLUSTER_TYPES
2736 for specs in ispecs_transposed.values():
2737 assert type(specs) is dict
2738 utils.ForceDictType(specs, forced_type)
2739
2740 # then transpose
2741 ispecs = {
2742 constants.ISPECS_MIN: {},
2743 constants.ISPECS_MAX: {},
2744 constants.ISPECS_STD: {},
2745 }
2746 for (name, specs) in ispecs_transposed.iteritems():
2747 assert name in constants.ISPECS_PARAMETERS
2748 for key, val in specs.items(): # {min: .. ,max: .., std: ..}
2749 assert key in ispecs
2750 ispecs[key][name] = val
2751 minmax_out = {}
2752 for key in constants.ISPECS_MINMAX_KEYS:
2753 if fill_all:
2754 minmax_out[key] = \
2755 objects.FillDict(constants.ISPECS_MINMAX_DEFAULTS[key], ispecs[key])
2756 else:
2757 minmax_out[key] = ispecs[key]
2758 ipolicy[constants.ISPECS_MINMAX] = [minmax_out]
2759 if fill_all:
2760 ipolicy[constants.ISPECS_STD] = \
2761 objects.FillDict(constants.IPOLICY_DEFAULTS[constants.ISPECS_STD],
2762 ispecs[constants.ISPECS_STD])
2763 else:
2764 ipolicy[constants.ISPECS_STD] = ispecs[constants.ISPECS_STD]
2765
2766
2767 def _ParseSpecUnit(spec, keyname):
2768 ret = spec.copy()
2769 for k in [constants.ISPEC_DISK_SIZE, constants.ISPEC_MEM_SIZE]:
2770 if k in ret:
2771 try:
2772 ret[k] = utils.ParseUnit(ret[k])
2773 except (TypeError, ValueError, errors.UnitParseError), err:
2774 raise errors.OpPrereqError(("Invalid parameter %s (%s) in %s instance"
2775 " specs: %s" % (k, ret[k], keyname, err)),
2776 errors.ECODE_INVAL)
2777 return ret
2778
2779
2780 def _ParseISpec(spec, keyname, required):
2781 ret = _ParseSpecUnit(spec, keyname)
2782 utils.ForceDictType(ret, constants.ISPECS_PARAMETER_TYPES)
2783 missing = constants.ISPECS_PARAMETERS - frozenset(ret.keys())
2784 if required and missing:
2785 raise errors.OpPrereqError("Missing parameters in ipolicy spec %s: %s" %
2786 (keyname, utils.CommaJoin(missing)),
2787 errors.ECODE_INVAL)
2788 return ret
2789
2790
2791 def _GetISpecsInAllowedValues(minmax_ispecs, allowed_values):
2792 ret = None
2793 if (minmax_ispecs and allowed_values and len(minmax_ispecs) == 1 and
2794 len(minmax_ispecs[0]) == 1):
2795 for (key, spec) in minmax_ispecs[0].items():
2796 # This loop is executed exactly once
2797 if key in allowed_values and not spec:
2798 ret = key
2799 return ret
2800
2801
2802 def _InitISpecsFromFullOpts(ipolicy_out, minmax_ispecs, std_ispecs,
2803 group_ipolicy, allowed_values):
2804 found_allowed = _GetISpecsInAllowedValues(minmax_ispecs, allowed_values)
2805 if found_allowed is not None:
2806 ipolicy_out[constants.ISPECS_MINMAX] = found_allowed
2807 elif minmax_ispecs is not None:
2808 minmax_out = []
2809 for mmpair in minmax_ispecs:
2810 mmpair_out = {}
2811 for (key, spec) in mmpair.items():
2812 if key not in constants.ISPECS_MINMAX_KEYS:
2813 msg = "Invalid key in bounds instance specifications: %s" % key
2814 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
2815 mmpair_out[key] = _ParseISpec(spec, key, True)
2816 minmax_out.append(mmpair_out)
2817 ipolicy_out[constants.ISPECS_MINMAX] = minmax_out
2818 if std_ispecs is not None:
2819 assert not group_ipolicy # This is not an option for gnt-group
2820 ipolicy_out[constants.ISPECS_STD] = _ParseISpec(std_ispecs, "std", False)
2821
2822
2823 def CreateIPolicyFromOpts(ispecs_mem_size=None,
2824 ispecs_cpu_count=None,
2825 ispecs_disk_count=None,
2826 ispecs_disk_size=None,
2827 ispecs_nic_count=None,
2828 minmax_ispecs=None,
2829 std_ispecs=None,
2830 ipolicy_disk_templates=None,
2831 ipolicy_vcpu_ratio=None,
2832 ipolicy_spindle_ratio=None,
2833 group_ipolicy=False,
2834 allowed_values=None,
2835 fill_all=False):
2836 """Creation of instance policy based on command line options.
2837
2838 @param fill_all: whether for cluster policies we should ensure that
2839 all values are filled
2840
2841 """
2842 assert not (fill_all and allowed_values)
2843
2844 split_specs = (ispecs_mem_size or ispecs_cpu_count or ispecs_disk_count or
2845 ispecs_disk_size or ispecs_nic_count)
2846 if (split_specs and (minmax_ispecs is not None or std_ispecs is not None)):
2847 raise errors.OpPrereqError("A --specs-xxx option cannot be specified"
2848 " together with any --ipolicy-xxx-specs option",
2849 errors.ECODE_INVAL)
2850
2851 ipolicy_out = objects.MakeEmptyIPolicy()
2852 if split_specs:
2853 assert fill_all
2854 _InitISpecsFromSplitOpts(ipolicy_out, ispecs_mem_size, ispecs_cpu_count,
2855 ispecs_disk_count, ispecs_disk_size,
2856 ispecs_nic_count, group_ipolicy, fill_all)
2857 elif (minmax_ispecs is not None or std_ispecs is not None):
2858 _InitISpecsFromFullOpts(ipolicy_out, minmax_ispecs, std_ispecs,
2859 group_ipolicy, allowed_values)
2860
2861 if ipolicy_disk_templates is not None:
2862 if allowed_values and ipolicy_disk_templates in allowed_values:
2863 ipolicy_out[constants.IPOLICY_DTS] = ipolicy_disk_templates
2864 else:
2865 ipolicy_out[constants.IPOLICY_DTS] = list(ipolicy_disk_templates)
2866 if ipolicy_vcpu_ratio is not None:
2867 ipolicy_out[constants.IPOLICY_VCPU_RATIO] = ipolicy_vcpu_ratio
2868 if ipolicy_spindle_ratio is not None:
2869 ipolicy_out[constants.IPOLICY_SPINDLE_RATIO] = ipolicy_spindle_ratio
2870
2871 assert not (frozenset(ipolicy_out.keys()) - constants.IPOLICY_ALL_KEYS)
2872
2873 if not group_ipolicy and fill_all:
2874 ipolicy_out = objects.FillIPolicy(constants.IPOLICY_DEFAULTS, ipolicy_out)
2875
2876 return ipolicy_out
2877
2878
2879 def _NotAContainer(data):
2880 """ Checks whether the input is not a container data type.
2881
2882 @rtype: bool
2883
2884 """
2885 return not (isinstance(data, (list, dict, tuple)))
2886
2887
2888 def _GetAlignmentMapping(data):
2889 """ Returns info about alignment if present in an encoded ordered dictionary.
2890
2891 @type data: list of tuple
2892 @param data: The encoded ordered dictionary, as defined in
2893 L{_SerializeGenericInfo}.
2894 @rtype: dict of any to int
2895 @return: The dictionary mapping alignment groups to the maximum length of the
2896 dictionary key found in the group.
2897
2898 """
2899 alignment_map = {}
2900 for entry in data:
2901 if len(entry) > 2:
2902 group_key = entry[2]
2903 key_length = len(entry[0])
2904 if group_key in alignment_map:
2905 alignment_map[group_key] = max(alignment_map[group_key], key_length)
2906 else:
2907 alignment_map[group_key] = key_length
2908
2909 return alignment_map
2910
2911
2912 def _SerializeGenericInfo(buf, data, level, afterkey=False):
2913 """Formatting core of L{PrintGenericInfo}.
2914
2915 @param buf: (string) stream to accumulate the result into
2916 @param data: data to format
2917 @type level: int
2918 @param level: depth in the data hierarchy, used for indenting
2919 @type afterkey: bool
2920 @param afterkey: True when we are in the middle of a line after a key (used
2921 to properly add newlines or indentation)
2922
2923 """
2924 baseind = " "
2925 if isinstance(data, dict):
2926 if not data:
2927 buf.write("\n")
2928 else:
2929 if afterkey:
2930 buf.write("\n")
2931 doindent = True
2932 else:
2933 doindent = False
2934 for key in sorted(data):
2935 if doindent:
2936 buf.write(baseind * level)
2937 else:
2938 doindent = True
2939 buf.write(key)
2940 buf.write(": ")
2941 _SerializeGenericInfo(buf, data[key], level + 1, afterkey=True)
2942 elif isinstance(data, list) and len(data) > 0 and isinstance(data[0], tuple):
2943 # list of tuples (an ordered dictionary)
2944 # the tuples may have two or three members - key, value, and alignment group
2945 # if the alignment group is present, align all values sharing the same group
2946 if afterkey:
2947 buf.write("\n")
2948 doindent = True
2949 else:
2950 doindent = False
2951
2952 alignment_mapping = _GetAlignmentMapping(data)
2953 for entry in data:
2954 key, val = entry[0:2]
2955 if doindent:
2956 buf.write(baseind * level)
2957 else:
2958 doindent = True
2959 buf.write(key)
2960 buf.write(": ")
2961 if len(entry) > 2:
2962 max_key_length = alignment_mapping[entry[2]]
2963 buf.write(" " * (max_key_length - len(key)))
2964 _SerializeGenericInfo(buf, val, level + 1, afterkey=True)
2965 elif isinstance(data, tuple) and all(map(_NotAContainer, data)):
2966 # tuples with simple content are serialized as inline lists
2967 buf.write("[%s]\n" % utils.CommaJoin(data))
2968 elif isinstance(data, list) or isinstance(data, tuple):
2969 # lists and tuples
2970 if not data:
2971 buf.write("\n")
2972 else:
2973 if afterkey:
2974 buf.write("\n")
2975 doindent = True
2976 else:
2977 doindent = False
2978 for item in data:
2979 if doindent:
2980 buf.write(baseind * level)
2981 else:
2982 doindent = True
2983 buf.write("-")
2984 buf.write(baseind[1:])
2985 _SerializeGenericInfo(buf, item, level + 1)
2986 else:
2987 # This branch should be only taken for strings, but it's practically
2988 # impossible to guarantee that no other types are produced somewhere
2989 buf.write(str(data))
2990 buf.write("\n")
2991
2992
2993 def PrintGenericInfo(data):
2994 """Print information formatted according to the hierarchy.
2995
2996 The output is a valid YAML string.
2997
2998 @param data: the data to print. It's a hierarchical structure whose elements
2999 can be:
3000 - dictionaries, where keys are strings and values are of any of the
3001 types listed here
3002 - lists of tuples (key, value) or (key, value, alignment_group), where
3003 key is a string, value is of any of the types listed here, and
3004 alignment_group can be any hashable value; it's a way to encode
3005 ordered dictionaries; any entries sharing the same alignment group are
3006 aligned by appending whitespace before the value as needed
3007 - lists of any of the types listed here
3008 - strings
3009
3010 """
3011 buf = StringIO()
3012 _SerializeGenericInfo(buf, data, 0)
3013 ToStdout(buf.getvalue().rstrip("\n"))