Merge branch 'stable-2.14' into stable-2.15
[ganeti-github.git] / lib / cli.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Google Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions are
9 # met:
10 #
11 # 1. Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
13 #
14 # 2. Redistributions in binary form must reproduce the above copyright
15 # notice, this list of conditions and the following disclaimer in the
16 # documentation and/or other materials provided with the distribution.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
19 # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
20 # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30
31 """Module dealing with command line parsing"""
32
33
34 import sys
35 import textwrap
36 import os.path
37 import time
38 import logging
39 import errno
40 import itertools
41 import shlex
42 from cStringIO import StringIO
43
44 from ganeti import utils
45 from ganeti import errors
46 from ganeti import constants
47 from ganeti import opcodes
48 import ganeti.rpc.errors as rpcerr
49 import ganeti.rpc.node as rpc
50 from ganeti import ssh
51 from ganeti import compat
52 from ganeti import netutils
53 from ganeti import qlang
54 from ganeti import objects
55 from ganeti import pathutils
56 from ganeti import serializer
57 import ganeti.cli_opts
58 # Import constants
59 from ganeti.cli_opts import * # pylint: disable=W0401
60
61 from ganeti.runtime import (GetClient)
62
63 from optparse import (OptionParser, TitledHelpFormatter)
64
65
66 __all__ = [
67 # Generic functions for CLI programs
68 "ConfirmOperation",
69 "CreateIPolicyFromOpts",
70 "GenericMain",
71 "GenericInstanceCreate",
72 "GenericList",
73 "GenericListFields",
74 "GetClient",
75 "GetOnlineNodes",
76 "GetNodesSshPorts",
77 "GetNodeUUIDs",
78 "JobExecutor",
79 "JobSubmittedException",
80 "ParseTimespec",
81 "RunWhileClusterStopped",
82 "RunWhileDaemonsStopped",
83 "SubmitOpCode",
84 "SubmitOpCodeToDrainedQueue",
85 "SubmitOrSend",
86 "UsesRPC",
87 # Formatting functions
88 "ToStderr", "ToStdout",
89 "ToStdoutAndLoginfo",
90 "FormatError",
91 "FormatQueryResult",
92 "FormatParamsDictInfo",
93 "FormatPolicyInfo",
94 "PrintIPolicyCommand",
95 "PrintGenericInfo",
96 "GenerateTable",
97 "AskUser",
98 "FormatTimestamp",
99 "FormatLogMessage",
100 # Tags functions
101 "ListTags",
102 "AddTags",
103 "RemoveTags",
104 # command line options support infrastructure
105 "ARGS_MANY_INSTANCES",
106 "ARGS_MANY_NODES",
107 "ARGS_MANY_GROUPS",
108 "ARGS_MANY_NETWORKS",
109 "ARGS_MANY_FILTERS",
110 "ARGS_NONE",
111 "ARGS_ONE_INSTANCE",
112 "ARGS_ONE_NODE",
113 "ARGS_ONE_GROUP",
114 "ARGS_ONE_OS",
115 "ARGS_ONE_NETWORK",
116 "ARGS_ONE_FILTER",
117 "ArgChoice",
118 "ArgCommand",
119 "ArgFile",
120 "ArgGroup",
121 "ArgHost",
122 "ArgInstance",
123 "ArgJobId",
124 "ArgNetwork",
125 "ArgNode",
126 "ArgOs",
127 "ArgExtStorage",
128 "ArgFilter",
129 "ArgSuggest",
130 "ArgUnknown",
131 "FixHvParams",
132 "SplitNodeOption",
133 "CalculateOSNames",
134 "ParseFields",
135 ] + ganeti.cli_opts.__all__ # Command line options
136
137 # Query result status for clients
138 (QR_NORMAL,
139 QR_UNKNOWN,
140 QR_INCOMPLETE) = range(3)
141
142 #: Maximum batch size for ChooseJob
143 _CHOOSE_BATCH = 25
144
145
146 # constants used to create InstancePolicy dictionary
147 TISPECS_GROUP_TYPES = {
148 constants.ISPECS_MIN: constants.VTYPE_INT,
149 constants.ISPECS_MAX: constants.VTYPE_INT,
150 }
151
152 TISPECS_CLUSTER_TYPES = {
153 constants.ISPECS_MIN: constants.VTYPE_INT,
154 constants.ISPECS_MAX: constants.VTYPE_INT,
155 constants.ISPECS_STD: constants.VTYPE_INT,
156 }
157
158 #: User-friendly names for query2 field types
159 _QFT_NAMES = {
160 constants.QFT_UNKNOWN: "Unknown",
161 constants.QFT_TEXT: "Text",
162 constants.QFT_BOOL: "Boolean",
163 constants.QFT_NUMBER: "Number",
164 constants.QFT_NUMBER_FLOAT: "Floating-point number",
165 constants.QFT_UNIT: "Storage size",
166 constants.QFT_TIMESTAMP: "Timestamp",
167 constants.QFT_OTHER: "Custom",
168 }
169
170
171 class _Argument(object):
172 def __init__(self, min=0, max=None): # pylint: disable=W0622
173 self.min = min
174 self.max = max
175
176 def __repr__(self):
177 return ("<%s min=%s max=%s>" %
178 (self.__class__.__name__, self.min, self.max))
179
180
181 class ArgSuggest(_Argument):
182 """Suggesting argument.
183
184 Value can be any of the ones passed to the constructor.
185
186 """
187 # pylint: disable=W0622
188 def __init__(self, min=0, max=None, choices=None):
189 _Argument.__init__(self, min=min, max=max)
190 self.choices = choices
191
192 def __repr__(self):
193 return ("<%s min=%s max=%s choices=%r>" %
194 (self.__class__.__name__, self.min, self.max, self.choices))
195
196
197 class ArgChoice(ArgSuggest):
198 """Choice argument.
199
200 Value can be any of the ones passed to the constructor. Like L{ArgSuggest},
201 but value must be one of the choices.
202
203 """
204
205
206 class ArgUnknown(_Argument):
207 """Unknown argument to program (e.g. determined at runtime).
208
209 """
210
211
212 class ArgInstance(_Argument):
213 """Instances argument.
214
215 """
216
217
218 class ArgNode(_Argument):
219 """Node argument.
220
221 """
222
223
224 class ArgNetwork(_Argument):
225 """Network argument.
226
227 """
228
229
230 class ArgGroup(_Argument):
231 """Node group argument.
232
233 """
234
235
236 class ArgJobId(_Argument):
237 """Job ID argument.
238
239 """
240
241
242 class ArgFile(_Argument):
243 """File path argument.
244
245 """
246
247
248 class ArgCommand(_Argument):
249 """Command argument.
250
251 """
252
253
254 class ArgHost(_Argument):
255 """Host argument.
256
257 """
258
259
260 class ArgOs(_Argument):
261 """OS argument.
262
263 """
264
265
266 class ArgExtStorage(_Argument):
267 """ExtStorage argument.
268
269 """
270
271
272 class ArgFilter(_Argument):
273 """Filter UUID argument.
274
275 """
276
277
278 ARGS_NONE = []
279 ARGS_MANY_INSTANCES = [ArgInstance()]
280 ARGS_MANY_NETWORKS = [ArgNetwork()]
281 ARGS_MANY_NODES = [ArgNode()]
282 ARGS_MANY_GROUPS = [ArgGroup()]
283 ARGS_MANY_FILTERS = [ArgFilter()]
284 ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)]
285 ARGS_ONE_NETWORK = [ArgNetwork(min=1, max=1)]
286 ARGS_ONE_NODE = [ArgNode(min=1, max=1)]
287 ARGS_ONE_GROUP = [ArgGroup(min=1, max=1)]
288 ARGS_ONE_OS = [ArgOs(min=1, max=1)]
289 ARGS_ONE_FILTER = [ArgFilter(min=1, max=1)]
290
291
292 def _ExtractTagsObject(opts, args):
293 """Extract the tag type object.
294
295 Note that this function will modify its args parameter.
296
297 """
298 if not hasattr(opts, "tag_type"):
299 raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
300 kind = opts.tag_type
301 if kind == constants.TAG_CLUSTER:
302 retval = kind, ""
303 elif kind in (constants.TAG_NODEGROUP,
304 constants.TAG_NODE,
305 constants.TAG_NETWORK,
306 constants.TAG_INSTANCE):
307 if not args:
308 raise errors.OpPrereqError("no arguments passed to the command",
309 errors.ECODE_INVAL)
310 name = args.pop(0)
311 retval = kind, name
312 else:
313 raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
314 return retval
315
316
317 def _ExtendTags(opts, args):
318 """Extend the args if a source file has been given.
319
320 This function will extend the tags with the contents of the file
321 passed in the 'tags_source' attribute of the opts parameter. A file
322 named '-' will be replaced by stdin.
323
324 """
325 fname = opts.tags_source
326 if fname is None:
327 return
328 if fname == "-":
329 new_fh = sys.stdin
330 else:
331 new_fh = open(fname, "r")
332 new_data = []
333 try:
334 # we don't use the nice 'new_data = [line.strip() for line in fh]'
335 # because of python bug 1633941
336 while True:
337 line = new_fh.readline()
338 if not line:
339 break
340 new_data.append(line.strip())
341 finally:
342 new_fh.close()
343 args.extend(new_data)
344
345
346 def ListTags(opts, args):
347 """List the tags on a given object.
348
349 This is a generic implementation that knows how to deal with all
350 three cases of tag objects (cluster, node, instance). The opts
351 argument is expected to contain a tag_type field denoting what
352 object type we work on.
353
354 """
355 kind, name = _ExtractTagsObject(opts, args)
356 cl = GetClient()
357 result = cl.QueryTags(kind, name)
358 result = list(result)
359 result.sort()
360 for tag in result:
361 ToStdout(tag)
362
363
364 def AddTags(opts, args):
365 """Add tags on a given object.
366
367 This is a generic implementation that knows how to deal with all
368 three cases of tag objects (cluster, node, instance). The opts
369 argument is expected to contain a tag_type field denoting what
370 object type we work on.
371
372 """
373 kind, name = _ExtractTagsObject(opts, args)
374 _ExtendTags(opts, args)
375 if not args:
376 raise errors.OpPrereqError("No tags to be added", errors.ECODE_INVAL)
377 op = opcodes.OpTagsSet(kind=kind, name=name, tags=args)
378 SubmitOrSend(op, opts)
379
380
381 def RemoveTags(opts, args):
382 """Remove tags from a given object.
383
384 This is a generic implementation that knows how to deal with all
385 three cases of tag objects (cluster, node, instance). The opts
386 argument is expected to contain a tag_type field denoting what
387 object type we work on.
388
389 """
390 kind, name = _ExtractTagsObject(opts, args)
391 _ExtendTags(opts, args)
392 if not args:
393 raise errors.OpPrereqError("No tags to be removed", errors.ECODE_INVAL)
394 op = opcodes.OpTagsDel(kind=kind, name=name, tags=args)
395 SubmitOrSend(op, opts)
396
397
398 class _ShowUsage(Exception):
399 """Exception class for L{_ParseArgs}.
400
401 """
402 def __init__(self, exit_error):
403 """Initializes instances of this class.
404
405 @type exit_error: bool
406 @param exit_error: Whether to report failure on exit
407
408 """
409 Exception.__init__(self)
410 self.exit_error = exit_error
411
412
413 class _ShowVersion(Exception):
414 """Exception class for L{_ParseArgs}.
415
416 """
417
418
419 def _ParseArgs(binary, argv, commands, aliases, env_override):
420 """Parser for the command line arguments.
421
422 This function parses the arguments and returns the function which
423 must be executed together with its (modified) arguments.
424
425 @param binary: Script name
426 @param argv: Command line arguments
427 @param commands: Dictionary containing command definitions
428 @param aliases: dictionary with command aliases {"alias": "target", ...}
429 @param env_override: list of env variables allowed for default args
430 @raise _ShowUsage: If usage description should be shown
431 @raise _ShowVersion: If version should be shown
432
433 """
434 assert not (env_override - set(commands))
435 assert not (set(aliases.keys()) & set(commands.keys()))
436
437 if len(argv) > 1:
438 cmd = argv[1]
439 else:
440 # No option or command given
441 raise _ShowUsage(exit_error=True)
442
443 if cmd == "--version":
444 raise _ShowVersion()
445 elif cmd == "--help":
446 raise _ShowUsage(exit_error=False)
447 elif not (cmd in commands or cmd in aliases):
448 raise _ShowUsage(exit_error=True)
449
450 # get command, unalias it, and look it up in commands
451 if cmd in aliases:
452 if aliases[cmd] not in commands:
453 raise errors.ProgrammerError("Alias '%s' maps to non-existing"
454 " command '%s'" % (cmd, aliases[cmd]))
455
456 cmd = aliases[cmd]
457
458 if cmd in env_override:
459 args_env_name = ("%s_%s" % (binary.replace("-", "_"), cmd)).upper()
460 env_args = os.environ.get(args_env_name)
461 if env_args:
462 argv = utils.InsertAtPos(argv, 2, shlex.split(env_args))
463
464 func, args_def, parser_opts, usage, description = commands[cmd]
465 parser = OptionParser(option_list=parser_opts + COMMON_OPTS,
466 description=description,
467 formatter=TitledHelpFormatter(),
468 usage="%%prog %s %s" % (cmd, usage))
469 parser.disable_interspersed_args()
470 options, args = parser.parse_args(args=argv[2:])
471
472 if not _CheckArguments(cmd, args_def, args):
473 return None, None, None
474
475 return func, options, args
476
477
478 def _FormatUsage(binary, commands):
479 """Generates a nice description of all commands.
480
481 @param binary: Script name
482 @param commands: Dictionary containing command definitions
483
484 """
485 # compute the max line length for cmd + usage
486 mlen = min(60, max(map(len, commands)))
487
488 yield "Usage: %s {command} [options...] [argument...]" % binary
489 yield "%s <command> --help to see details, or man %s" % (binary, binary)
490 yield ""
491 yield "Commands:"
492
493 # and format a nice command list
494 for (cmd, (_, _, _, _, help_text)) in sorted(commands.items()):
495 help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
496 yield " %-*s - %s" % (mlen, cmd, help_lines.pop(0))
497 for line in help_lines:
498 yield " %-*s %s" % (mlen, "", line)
499
500 yield ""
501
502
503 def _CheckArguments(cmd, args_def, args):
504 """Verifies the arguments using the argument definition.
505
506 Algorithm:
507
508 1. Abort with error if values specified by user but none expected.
509
510 1. For each argument in definition
511
512 1. Keep running count of minimum number of values (min_count)
513 1. Keep running count of maximum number of values (max_count)
514 1. If it has an unlimited number of values
515
516 1. Abort with error if it's not the last argument in the definition
517
518 1. If last argument has limited number of values
519
520 1. Abort with error if number of values doesn't match or is too large
521
522 1. Abort with error if user didn't pass enough values (min_count)
523
524 """
525 if args and not args_def:
526 ToStderr("Error: Command %s expects no arguments", cmd)
527 return False
528
529 min_count = None
530 max_count = None
531 check_max = None
532
533 last_idx = len(args_def) - 1
534
535 for idx, arg in enumerate(args_def):
536 if min_count is None:
537 min_count = arg.min
538 elif arg.min is not None:
539 min_count += arg.min
540
541 if max_count is None:
542 max_count = arg.max
543 elif arg.max is not None:
544 max_count += arg.max
545
546 if idx == last_idx:
547 check_max = (arg.max is not None)
548
549 elif arg.max is None:
550 raise errors.ProgrammerError("Only the last argument can have max=None")
551
552 if check_max:
553 # Command with exact number of arguments
554 if (min_count is not None and max_count is not None and
555 min_count == max_count and len(args) != min_count):
556 ToStderr("Error: Command %s expects %d argument(s)", cmd, min_count)
557 return False
558
559 # Command with limited number of arguments
560 if max_count is not None and len(args) > max_count:
561 ToStderr("Error: Command %s expects only %d argument(s)",
562 cmd, max_count)
563 return False
564
565 # Command with some required arguments
566 if min_count is not None and len(args) < min_count:
567 ToStderr("Error: Command %s expects at least %d argument(s)",
568 cmd, min_count)
569 return False
570
571 return True
572
573
574 def SplitNodeOption(value):
575 """Splits the value of a --node option.
576
577 """
578 if value and ":" in value:
579 return value.split(":", 1)
580 else:
581 return (value, None)
582
583
584 def CalculateOSNames(os_name, os_variants):
585 """Calculates all the names an OS can be called, according to its variants.
586
587 @type os_name: string
588 @param os_name: base name of the os
589 @type os_variants: list or None
590 @param os_variants: list of supported variants
591 @rtype: list
592 @return: list of valid names
593
594 """
595 if os_variants:
596 return ["%s+%s" % (os_name, v) for v in os_variants]
597 else:
598 return [os_name]
599
600
601 def ParseFields(selected, default):
602 """Parses the values of "--field"-like options.
603
604 @type selected: string or None
605 @param selected: User-selected options
606 @type default: list
607 @param default: Default fields
608
609 """
610 if selected is None:
611 return default
612
613 if selected.startswith("+"):
614 return default + selected[1:].split(",")
615
616 return selected.split(",")
617
618
619 UsesRPC = rpc.RunWithRPC
620
621
622 def AskUser(text, choices=None):
623 """Ask the user a question.
624
625 @param text: the question to ask
626
627 @param choices: list with elements tuples (input_char, return_value,
628 description); if not given, it will default to: [('y', True,
629 'Perform the operation'), ('n', False, 'Do no do the operation')];
630 note that the '?' char is reserved for help
631
632 @return: one of the return values from the choices list; if input is
633 not possible (i.e. not running with a tty, we return the last
634 entry from the list
635
636 """
637 if choices is None:
638 choices = [("y", True, "Perform the operation"),
639 ("n", False, "Do not perform the operation")]
640 if not choices or not isinstance(choices, list):
641 raise errors.ProgrammerError("Invalid choices argument to AskUser")
642 for entry in choices:
643 if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == "?":
644 raise errors.ProgrammerError("Invalid choices element to AskUser")
645
646 answer = choices[-1][1]
647 new_text = []
648 for line in text.splitlines():
649 new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
650 text = "\n".join(new_text)
651 try:
652 f = file("/dev/tty", "a+")
653 except IOError:
654 return answer
655 try:
656 chars = [entry[0] for entry in choices]
657 chars[-1] = "[%s]" % chars[-1]
658 chars.append("?")
659 maps = dict([(entry[0], entry[1]) for entry in choices])
660 while True:
661 f.write(text)
662 f.write("\n")
663 f.write("/".join(chars))
664 f.write(": ")
665 line = f.readline(2).strip().lower()
666 if line in maps:
667 answer = maps[line]
668 break
669 elif line == "?":
670 for entry in choices:
671 f.write(" %s - %s\n" % (entry[0], entry[2]))
672 f.write("\n")
673 continue
674 finally:
675 f.close()
676 return answer
677
678
679 class JobSubmittedException(Exception):
680 """Job was submitted, client should exit.
681
682 This exception has one argument, the ID of the job that was
683 submitted. The handler should print this ID.
684
685 This is not an error, just a structured way to exit from clients.
686
687 """
688
689
690 def SendJob(ops, cl=None):
691 """Function to submit an opcode without waiting for the results.
692
693 @type ops: list
694 @param ops: list of opcodes
695 @type cl: luxi.Client
696 @param cl: the luxi client to use for communicating with the master;
697 if None, a new client will be created
698
699 """
700 if cl is None:
701 cl = GetClient()
702
703 job_id = cl.SubmitJob(ops)
704
705 return job_id
706
707
708 def GenericPollJob(job_id, cbs, report_cbs):
709 """Generic job-polling function.
710
711 @type job_id: number
712 @param job_id: Job ID
713 @type cbs: Instance of L{JobPollCbBase}
714 @param cbs: Data callbacks
715 @type report_cbs: Instance of L{JobPollReportCbBase}
716 @param report_cbs: Reporting callbacks
717
718 @return: the opresult of the job
719 @raise errors.JobLost: If job can't be found
720 @raise errors.OpExecError: If job didn't succeed
721
722 """
723 prev_job_info = None
724 prev_logmsg_serial = None
725
726 status = None
727
728 while True:
729 result = cbs.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
730 prev_logmsg_serial)
731 if not result:
732 # job not found, go away!
733 raise errors.JobLost("Job with id %s lost" % job_id)
734
735 if result == constants.JOB_NOTCHANGED:
736 report_cbs.ReportNotChanged(job_id, status)
737
738 # Wait again
739 continue
740
741 # Split result, a tuple of (field values, log entries)
742 (job_info, log_entries) = result
743 (status, ) = job_info
744
745 if log_entries:
746 for log_entry in log_entries:
747 (serial, timestamp, log_type, message) = log_entry
748 report_cbs.ReportLogMessage(job_id, serial, timestamp,
749 log_type, message)
750 prev_logmsg_serial = max(prev_logmsg_serial, serial)
751
752 # TODO: Handle canceled and archived jobs
753 elif status in (constants.JOB_STATUS_SUCCESS,
754 constants.JOB_STATUS_ERROR,
755 constants.JOB_STATUS_CANCELING,
756 constants.JOB_STATUS_CANCELED):
757 break
758
759 prev_job_info = job_info
760
761 jobs = cbs.QueryJobs([job_id], ["status", "opstatus", "opresult"])
762 if not jobs:
763 raise errors.JobLost("Job with id %s lost" % job_id)
764
765 status, opstatus, result = jobs[0]
766
767 if status == constants.JOB_STATUS_SUCCESS:
768 return result
769
770 if status in (constants.JOB_STATUS_CANCELING, constants.JOB_STATUS_CANCELED):
771 raise errors.OpExecError("Job was canceled")
772
773 has_ok = False
774 for idx, (status, msg) in enumerate(zip(opstatus, result)):
775 if status == constants.OP_STATUS_SUCCESS:
776 has_ok = True
777 elif status == constants.OP_STATUS_ERROR:
778 errors.MaybeRaise(msg)
779
780 if has_ok:
781 raise errors.OpExecError("partial failure (opcode %d): %s" %
782 (idx, msg))
783
784 raise errors.OpExecError(str(msg))
785
786 # default failure mode
787 raise errors.OpExecError(result)
788
789
790 class JobPollCbBase(object):
791 """Base class for L{GenericPollJob} callbacks.
792
793 """
794 def __init__(self):
795 """Initializes this class.
796
797 """
798
799 def WaitForJobChangeOnce(self, job_id, fields,
800 prev_job_info, prev_log_serial):
801 """Waits for changes on a job.
802
803 """
804 raise NotImplementedError()
805
806 def QueryJobs(self, job_ids, fields):
807 """Returns the selected fields for the selected job IDs.
808
809 @type job_ids: list of numbers
810 @param job_ids: Job IDs
811 @type fields: list of strings
812 @param fields: Fields
813
814 """
815 raise NotImplementedError()
816
817
818 class JobPollReportCbBase(object):
819 """Base class for L{GenericPollJob} reporting callbacks.
820
821 """
822 def __init__(self):
823 """Initializes this class.
824
825 """
826
827 def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
828 """Handles a log message.
829
830 """
831 raise NotImplementedError()
832
833 def ReportNotChanged(self, job_id, status):
834 """Called for if a job hasn't changed in a while.
835
836 @type job_id: number
837 @param job_id: Job ID
838 @type status: string or None
839 @param status: Job status if available
840
841 """
842 raise NotImplementedError()
843
844
845 class _LuxiJobPollCb(JobPollCbBase):
846 def __init__(self, cl):
847 """Initializes this class.
848
849 """
850 JobPollCbBase.__init__(self)
851 self.cl = cl
852
853 def WaitForJobChangeOnce(self, job_id, fields,
854 prev_job_info, prev_log_serial):
855 """Waits for changes on a job.
856
857 """
858 return self.cl.WaitForJobChangeOnce(job_id, fields,
859 prev_job_info, prev_log_serial)
860
861 def QueryJobs(self, job_ids, fields):
862 """Returns the selected fields for the selected job IDs.
863
864 """
865 return self.cl.QueryJobs(job_ids, fields)
866
867
868 class FeedbackFnJobPollReportCb(JobPollReportCbBase):
869 def __init__(self, feedback_fn):
870 """Initializes this class.
871
872 """
873 JobPollReportCbBase.__init__(self)
874
875 self.feedback_fn = feedback_fn
876
877 assert callable(feedback_fn)
878
879 def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
880 """Handles a log message.
881
882 """
883 self.feedback_fn((timestamp, log_type, log_msg))
884
885 def ReportNotChanged(self, job_id, status):
886 """Called if a job hasn't changed in a while.
887
888 """
889 # Ignore
890
891
892 class StdioJobPollReportCb(JobPollReportCbBase):
893 def __init__(self):
894 """Initializes this class.
895
896 """
897 JobPollReportCbBase.__init__(self)
898
899 self.notified_queued = False
900 self.notified_waitlock = False
901
902 def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
903 """Handles a log message.
904
905 """
906 ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)),
907 FormatLogMessage(log_type, log_msg))
908
909 def ReportNotChanged(self, job_id, status):
910 """Called if a job hasn't changed in a while.
911
912 """
913 if status is None:
914 return
915
916 if status == constants.JOB_STATUS_QUEUED and not self.notified_queued:
917 ToStderr("Job %s is waiting in queue", job_id)
918 self.notified_queued = True
919
920 elif status == constants.JOB_STATUS_WAITING and not self.notified_waitlock:
921 ToStderr("Job %s is trying to acquire all necessary locks", job_id)
922 self.notified_waitlock = True
923
924
925 def FormatLogMessage(log_type, log_msg):
926 """Formats a job message according to its type.
927
928 """
929 if log_type != constants.ELOG_MESSAGE:
930 log_msg = str(log_msg)
931
932 return utils.SafeEncode(log_msg)
933
934
935 def PollJob(job_id, cl=None, feedback_fn=None, reporter=None):
936 """Function to poll for the result of a job.
937
938 @type job_id: job identified
939 @param job_id: the job to poll for results
940 @type cl: luxi.Client
941 @param cl: the luxi client to use for communicating with the master;
942 if None, a new client will be created
943
944 """
945 if cl is None:
946 cl = GetClient()
947
948 if reporter is None:
949 if feedback_fn:
950 reporter = FeedbackFnJobPollReportCb(feedback_fn)
951 else:
952 reporter = StdioJobPollReportCb()
953 elif feedback_fn:
954 raise errors.ProgrammerError("Can't specify reporter and feedback function")
955
956 return GenericPollJob(job_id, _LuxiJobPollCb(cl), reporter)
957
958
959 def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None, reporter=None):
960 """Legacy function to submit an opcode.
961
962 This is just a simple wrapper over the construction of the processor
963 instance. It should be extended to better handle feedback and
964 interaction functions.
965
966 """
967 if cl is None:
968 cl = GetClient()
969
970 SetGenericOpcodeOpts([op], opts)
971
972 job_id = SendJob([op], cl=cl)
973 if hasattr(opts, "print_jobid") and opts.print_jobid:
974 ToStdout("%d" % job_id)
975
976 op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn,
977 reporter=reporter)
978
979 return op_results[0]
980
981
982 def SubmitOpCodeToDrainedQueue(op):
983 """Forcefully insert a job in the queue, even if it is drained.
984
985 """
986 cl = GetClient()
987 job_id = cl.SubmitJobToDrainedQueue([op])
988 op_results = PollJob(job_id, cl=cl)
989 return op_results[0]
990
991
992 def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
993 """Wrapper around SubmitOpCode or SendJob.
994
995 This function will decide, based on the 'opts' parameter, whether to
996 submit and wait for the result of the opcode (and return it), or
997 whether to just send the job and print its identifier. It is used in
998 order to simplify the implementation of the '--submit' option.
999
1000 It will also process the opcodes if we're sending the via SendJob
1001 (otherwise SubmitOpCode does it).
1002
1003 """
1004 if opts and opts.submit_only:
1005 job = [op]
1006 SetGenericOpcodeOpts(job, opts)
1007 job_id = SendJob(job, cl=cl)
1008 if opts.print_jobid:
1009 ToStdout("%d" % job_id)
1010 raise JobSubmittedException(job_id)
1011 else:
1012 return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn, opts=opts)
1013
1014
1015 def _InitReasonTrail(op, opts):
1016 """Builds the first part of the reason trail
1017
1018 Builds the initial part of the reason trail, adding the user provided reason
1019 (if it exists) and the name of the command starting the operation.
1020
1021 @param op: the opcode the reason trail will be added to
1022 @param opts: the command line options selected by the user
1023
1024 """
1025 assert len(sys.argv) >= 2
1026 trail = []
1027
1028 if opts.reason:
1029 trail.append((constants.OPCODE_REASON_SRC_USER,
1030 opts.reason,
1031 utils.EpochNano()))
1032
1033 binary = os.path.basename(sys.argv[0])
1034 source = "%s:%s" % (constants.OPCODE_REASON_SRC_CLIENT, binary)
1035 command = sys.argv[1]
1036 trail.append((source, command, utils.EpochNano()))
1037 op.reason = trail
1038
1039
1040 def SetGenericOpcodeOpts(opcode_list, options):
1041 """Processor for generic options.
1042
1043 This function updates the given opcodes based on generic command
1044 line options (like debug, dry-run, etc.).
1045
1046 @param opcode_list: list of opcodes
1047 @param options: command line options or None
1048 @return: None (in-place modification)
1049
1050 """
1051 if not options:
1052 return
1053 for op in opcode_list:
1054 op.debug_level = options.debug
1055 if hasattr(options, "dry_run"):
1056 op.dry_run = options.dry_run
1057 if getattr(options, "priority", None) is not None:
1058 op.priority = options.priority
1059 _InitReasonTrail(op, options)
1060
1061
1062 def FormatError(err):
1063 """Return a formatted error message for a given error.
1064
1065 This function takes an exception instance and returns a tuple
1066 consisting of two values: first, the recommended exit code, and
1067 second, a string describing the error message (not
1068 newline-terminated).
1069
1070 """
1071 retcode = 1
1072 obuf = StringIO()
1073 msg = str(err)
1074 if isinstance(err, errors.ConfigurationError):
1075 txt = "Corrupt configuration file: %s" % msg
1076 logging.error(txt)
1077 obuf.write(txt + "\n")
1078 obuf.write("Aborting.")
1079 retcode = 2
1080 elif isinstance(err, errors.HooksAbort):
1081 obuf.write("Failure: hooks execution failed:\n")
1082 for node, script, out in err.args[0]:
1083 if out:
1084 obuf.write(" node: %s, script: %s, output: %s\n" %
1085 (node, script, out))
1086 else:
1087 obuf.write(" node: %s, script: %s (no output)\n" %
1088 (node, script))
1089 elif isinstance(err, errors.HooksFailure):
1090 obuf.write("Failure: hooks general failure: %s" % msg)
1091 elif isinstance(err, errors.ResolverError):
1092 this_host = netutils.Hostname.GetSysName()
1093 if err.args[0] == this_host:
1094 msg = "Failure: can't resolve my own hostname ('%s')"
1095 else:
1096 msg = "Failure: can't resolve hostname '%s'"
1097 obuf.write(msg % err.args[0])
1098 elif isinstance(err, errors.OpPrereqError):
1099 if len(err.args) == 2:
1100 obuf.write("Failure: prerequisites not met for this"
1101 " operation:\nerror type: %s, error details:\n%s" %
1102 (err.args[1], err.args[0]))
1103 else:
1104 obuf.write("Failure: prerequisites not met for this"
1105 " operation:\n%s" % msg)
1106 elif isinstance(err, errors.OpExecError):
1107 obuf.write("Failure: command execution error:\n%s" % msg)
1108 elif isinstance(err, errors.TagError):
1109 obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
1110 elif isinstance(err, errors.JobQueueDrainError):
1111 obuf.write("Failure: the job queue is marked for drain and doesn't"
1112 " accept new requests\n")
1113 elif isinstance(err, errors.JobQueueFull):
1114 obuf.write("Failure: the job queue is full and doesn't accept new"
1115 " job submissions until old jobs are archived\n")
1116 elif isinstance(err, errors.TypeEnforcementError):
1117 obuf.write("Parameter Error: %s" % msg)
1118 elif isinstance(err, errors.ParameterError):
1119 obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
1120 elif isinstance(err, rpcerr.NoMasterError):
1121 if err.args[0] == pathutils.MASTER_SOCKET:
1122 daemon = "the master daemon"
1123 elif err.args[0] == pathutils.QUERY_SOCKET:
1124 daemon = "the config daemon"
1125 else:
1126 daemon = "socket '%s'" % str(err.args[0])
1127 obuf.write("Cannot communicate with %s.\nIs the process running"
1128 " and listening for connections?" % daemon)
1129 elif isinstance(err, rpcerr.TimeoutError):
1130 obuf.write("Timeout while talking to the master daemon. Jobs might have"
1131 " been submitted and will continue to run even if the call"
1132 " timed out. Useful commands in this situation are \"gnt-job"
1133 " list\", \"gnt-job cancel\" and \"gnt-job watch\". Error:\n")
1134 obuf.write(msg)
1135 elif isinstance(err, rpcerr.PermissionError):
1136 obuf.write("It seems you don't have permissions to connect to the"
1137 " master daemon.\nPlease retry as a different user.")
1138 elif isinstance(err, rpcerr.ProtocolError):
1139 obuf.write("Unhandled protocol error while talking to the master daemon:\n"
1140 "%s" % msg)
1141 elif isinstance(err, errors.JobLost):
1142 obuf.write("Error checking job status: %s" % msg)
1143 elif isinstance(err, errors.QueryFilterParseError):
1144 obuf.write("Error while parsing query filter: %s\n" % err.args[0])
1145 obuf.write("\n".join(err.GetDetails()))
1146 elif isinstance(err, errors.GenericError):
1147 obuf.write("Unhandled Ganeti error: %s" % msg)
1148 elif isinstance(err, JobSubmittedException):
1149 obuf.write("JobID: %s\n" % err.args[0])
1150 retcode = 0
1151 else:
1152 obuf.write("Unhandled exception: %s" % msg)
1153 return retcode, obuf.getvalue().rstrip("\n")
1154
1155
1156 def GenericMain(commands, override=None, aliases=None,
1157 env_override=frozenset()):
1158 """Generic main function for all the gnt-* commands.
1159
1160 @param commands: a dictionary with a special structure, see the design doc
1161 for command line handling.
1162 @param override: if not None, we expect a dictionary with keys that will
1163 override command line options; this can be used to pass
1164 options from the scripts to generic functions
1165 @param aliases: dictionary with command aliases {'alias': 'target, ...}
1166 @param env_override: list of environment names which are allowed to submit
1167 default args for commands
1168
1169 """
1170 # save the program name and the entire command line for later logging
1171 if sys.argv:
1172 binary = os.path.basename(sys.argv[0])
1173 if not binary:
1174 binary = sys.argv[0]
1175
1176 if len(sys.argv) >= 2:
1177 logname = utils.ShellQuoteArgs([binary, sys.argv[1]])
1178 else:
1179 logname = binary
1180
1181 cmdline = utils.ShellQuoteArgs([binary] + sys.argv[1:])
1182 else:
1183 binary = "<unknown program>"
1184 cmdline = "<unknown>"
1185
1186 if aliases is None:
1187 aliases = {}
1188
1189 try:
1190 (func, options, args) = _ParseArgs(binary, sys.argv, commands, aliases,
1191 env_override)
1192 except _ShowVersion:
1193 ToStdout("%s (ganeti %s) %s", binary, constants.VCS_VERSION,
1194 constants.RELEASE_VERSION)
1195 return constants.EXIT_SUCCESS
1196 except _ShowUsage, err:
1197 for line in _FormatUsage(binary, commands):
1198 ToStdout(line)
1199
1200 if err.exit_error:
1201 return constants.EXIT_FAILURE
1202 else:
1203 return constants.EXIT_SUCCESS
1204 except errors.ParameterError, err:
1205 result, err_msg = FormatError(err)
1206 ToStderr(err_msg)
1207 return 1
1208
1209 if func is None: # parse error
1210 return 1
1211
1212 if override is not None:
1213 for key, val in override.iteritems():
1214 setattr(options, key, val)
1215
1216 utils.SetupLogging(pathutils.LOG_COMMANDS, logname, debug=options.debug,
1217 stderr_logging=True)
1218
1219 logging.debug("Command line: %s", cmdline)
1220
1221 try:
1222 result = func(options, args)
1223 except (errors.GenericError, rpcerr.ProtocolError,
1224 JobSubmittedException), err:
1225 result, err_msg = FormatError(err)
1226 logging.exception("Error during command processing")
1227 ToStderr(err_msg)
1228 except KeyboardInterrupt:
1229 result = constants.EXIT_FAILURE
1230 ToStderr("Aborted. Note that if the operation created any jobs, they"
1231 " might have been submitted and"
1232 " will continue to run in the background.")
1233 except IOError, err:
1234 if err.errno == errno.EPIPE:
1235 # our terminal went away, we'll exit
1236 sys.exit(constants.EXIT_FAILURE)
1237 else:
1238 raise
1239
1240 return result
1241
1242
1243 def ParseNicOption(optvalue):
1244 """Parses the value of the --net option(s).
1245
1246 """
1247 try:
1248 nic_max = max(int(nidx[0]) + 1 for nidx in optvalue)
1249 except (TypeError, ValueError), err:
1250 raise errors.OpPrereqError("Invalid NIC index passed: %s" % str(err),
1251 errors.ECODE_INVAL)
1252
1253 nics = [{}] * nic_max
1254 for nidx, ndict in optvalue:
1255 nidx = int(nidx)
1256
1257 if not isinstance(ndict, dict):
1258 raise errors.OpPrereqError("Invalid nic/%d value: expected dict,"
1259 " got %s" % (nidx, ndict), errors.ECODE_INVAL)
1260
1261 utils.ForceDictType(ndict, constants.INIC_PARAMS_TYPES)
1262
1263 nics[nidx] = ndict
1264
1265 return nics
1266
1267
1268 def FixHvParams(hvparams):
1269 # In Ganeti 2.8.4 the separator for the usb_devices hvparam was changed from
1270 # comma to space because commas cannot be accepted on the command line
1271 # (they already act as the separator between different hvparams). Still,
1272 # RAPI should be able to accept commas for backwards compatibility.
1273 # Therefore, we convert spaces into commas here, and we keep the old
1274 # parsing logic everywhere else.
1275 try:
1276 new_usb_devices = hvparams[constants.HV_USB_DEVICES].replace(" ", ",")
1277 hvparams[constants.HV_USB_DEVICES] = new_usb_devices
1278 except KeyError:
1279 #No usb_devices, no modification required
1280 pass
1281
1282
1283 def GenericInstanceCreate(mode, opts, args):
1284 """Add an instance to the cluster via either creation or import.
1285
1286 @param mode: constants.INSTANCE_CREATE or constants.INSTANCE_IMPORT
1287 @param opts: the command line options selected by the user
1288 @type args: list
1289 @param args: should contain only one element, the new instance name
1290 @rtype: int
1291 @return: the desired exit code
1292
1293 """
1294 instance = args[0]
1295 forthcoming = opts.ensure_value("forthcoming", False)
1296 commit = opts.ensure_value("commit", False)
1297
1298 if forthcoming and commit:
1299 raise errors.OpPrereqError("Creating an instance only forthcoming and"
1300 " commiting it are mutally exclusive",
1301 errors.ECODE_INVAL)
1302
1303 (pnode, snode) = SplitNodeOption(opts.node)
1304
1305 hypervisor = None
1306 hvparams = {}
1307 if opts.hypervisor:
1308 hypervisor, hvparams = opts.hypervisor
1309
1310 if opts.nics:
1311 nics = ParseNicOption(opts.nics)
1312 elif opts.no_nics:
1313 # no nics
1314 nics = []
1315 elif mode == constants.INSTANCE_CREATE:
1316 # default of one nic, all auto
1317 nics = [{}]
1318 else:
1319 # mode == import
1320 nics = []
1321
1322 if opts.disk_template == constants.DT_DISKLESS:
1323 if opts.disks or opts.sd_size is not None:
1324 raise errors.OpPrereqError("Diskless instance but disk"
1325 " information passed", errors.ECODE_INVAL)
1326 disks = []
1327 else:
1328 if (not opts.disks and not opts.sd_size
1329 and mode == constants.INSTANCE_CREATE):
1330 raise errors.OpPrereqError("No disk information specified",
1331 errors.ECODE_INVAL)
1332 if opts.disks and opts.sd_size is not None:
1333 raise errors.OpPrereqError("Please use either the '--disk' or"
1334 " '-s' option", errors.ECODE_INVAL)
1335 if opts.sd_size is not None:
1336 opts.disks = [(0, {constants.IDISK_SIZE: opts.sd_size})]
1337
1338 if opts.disks:
1339 try:
1340 disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
1341 except ValueError, err:
1342 raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err),
1343 errors.ECODE_INVAL)
1344 disks = [{}] * disk_max
1345 else:
1346 disks = []
1347 for didx, ddict in opts.disks:
1348 didx = int(didx)
1349 if not isinstance(ddict, dict):
1350 msg = "Invalid disk/%d value: expected dict, got %s" % (didx, ddict)
1351 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
1352 elif constants.IDISK_SIZE in ddict:
1353 if constants.IDISK_ADOPT in ddict:
1354 raise errors.OpPrereqError("Only one of 'size' and 'adopt' allowed"
1355 " (disk %d)" % didx, errors.ECODE_INVAL)
1356 try:
1357 ddict[constants.IDISK_SIZE] = \
1358 utils.ParseUnit(ddict[constants.IDISK_SIZE])
1359 except ValueError, err:
1360 raise errors.OpPrereqError("Invalid disk size for disk %d: %s" %
1361 (didx, err), errors.ECODE_INVAL)
1362 elif constants.IDISK_ADOPT in ddict:
1363 if constants.IDISK_SPINDLES in ddict:
1364 raise errors.OpPrereqError("spindles is not a valid option when"
1365 " adopting a disk", errors.ECODE_INVAL)
1366 if mode == constants.INSTANCE_IMPORT:
1367 raise errors.OpPrereqError("Disk adoption not allowed for instance"
1368 " import", errors.ECODE_INVAL)
1369 ddict[constants.IDISK_SIZE] = 0
1370 else:
1371 raise errors.OpPrereqError("Missing size or adoption source for"
1372 " disk %d" % didx, errors.ECODE_INVAL)
1373 if constants.IDISK_SPINDLES in ddict:
1374 ddict[constants.IDISK_SPINDLES] = int(ddict[constants.IDISK_SPINDLES])
1375
1376 disks[didx] = ddict
1377
1378 if opts.tags is not None:
1379 tags = opts.tags.split(",")
1380 else:
1381 tags = []
1382
1383 utils.ForceDictType(opts.beparams, constants.BES_PARAMETER_COMPAT)
1384 utils.ForceDictType(hvparams, constants.HVS_PARAMETER_TYPES)
1385 FixHvParams(hvparams)
1386
1387 osparams_private = opts.osparams_private or serializer.PrivateDict()
1388 osparams_secret = opts.osparams_secret or serializer.PrivateDict()
1389
1390 helper_startup_timeout = opts.helper_startup_timeout
1391 helper_shutdown_timeout = opts.helper_shutdown_timeout
1392
1393 if mode == constants.INSTANCE_CREATE:
1394 start = opts.start
1395 os_type = opts.os
1396 force_variant = opts.force_variant
1397 src_node = None
1398 src_path = None
1399 no_install = opts.no_install
1400 identify_defaults = False
1401 compress = constants.IEC_NONE
1402 if opts.instance_communication is None:
1403 instance_communication = False
1404 else:
1405 instance_communication = opts.instance_communication
1406 elif mode == constants.INSTANCE_IMPORT:
1407 if forthcoming:
1408 raise errors.OpPrereqError("forthcoming instances can only be created,"
1409 " not imported")
1410 start = False
1411 os_type = None
1412 force_variant = False
1413 src_node = opts.src_node
1414 src_path = opts.src_dir
1415 no_install = None
1416 identify_defaults = opts.identify_defaults
1417 compress = opts.compress
1418 instance_communication = False
1419 else:
1420 raise errors.ProgrammerError("Invalid creation mode %s" % mode)
1421
1422 op = opcodes.OpInstanceCreate(
1423 forthcoming=forthcoming,
1424 commit=commit,
1425 instance_name=instance,
1426 disks=disks,
1427 disk_template=opts.disk_template,
1428 group_name=opts.nodegroup,
1429 nics=nics,
1430 conflicts_check=opts.conflicts_check,
1431 pnode=pnode, snode=snode,
1432 ip_check=opts.ip_check,
1433 name_check=opts.name_check,
1434 wait_for_sync=opts.wait_for_sync,
1435 file_storage_dir=opts.file_storage_dir,
1436 file_driver=opts.file_driver,
1437 iallocator=opts.iallocator,
1438 hypervisor=hypervisor,
1439 hvparams=hvparams,
1440 beparams=opts.beparams,
1441 osparams=opts.osparams,
1442 osparams_private=osparams_private,
1443 osparams_secret=osparams_secret,
1444 mode=mode,
1445 opportunistic_locking=opts.opportunistic_locking,
1446 start=start,
1447 os_type=os_type,
1448 force_variant=force_variant,
1449 src_node=src_node,
1450 src_path=src_path,
1451 compress=compress,
1452 tags=tags,
1453 no_install=no_install,
1454 identify_defaults=identify_defaults,
1455 ignore_ipolicy=opts.ignore_ipolicy,
1456 instance_communication=instance_communication,
1457 helper_startup_timeout=helper_startup_timeout,
1458 helper_shutdown_timeout=helper_shutdown_timeout)
1459
1460 SubmitOrSend(op, opts)
1461 return 0
1462
1463
1464 class _RunWhileDaemonsStoppedHelper(object):
1465 """Helper class for L{RunWhileDaemonsStopped} to simplify state management
1466
1467 """
1468 def __init__(self, feedback_fn, cluster_name, master_node,
1469 online_nodes, ssh_ports, exclude_daemons, debug,
1470 verbose):
1471 """Initializes this class.
1472
1473 @type feedback_fn: callable
1474 @param feedback_fn: Feedback function
1475 @type cluster_name: string
1476 @param cluster_name: Cluster name
1477 @type master_node: string
1478 @param master_node Master node name
1479 @type online_nodes: list
1480 @param online_nodes: List of names of online nodes
1481 @type ssh_ports: list
1482 @param ssh_ports: List of SSH ports of online nodes
1483 @type exclude_daemons: list of string
1484 @param exclude_daemons: list of daemons that will be restarted on master
1485 after all others are shutdown
1486 @type debug: boolean
1487 @param debug: show debug output
1488 @type verbose: boolesn
1489 @param verbose: show verbose output
1490
1491 """
1492 self.feedback_fn = feedback_fn
1493 self.cluster_name = cluster_name
1494 self.master_node = master_node
1495 self.online_nodes = online_nodes
1496 self.ssh_ports = dict(zip(online_nodes, ssh_ports))
1497
1498 self.ssh = ssh.SshRunner(self.cluster_name)
1499
1500 self.nonmaster_nodes = [name for name in online_nodes
1501 if name != master_node]
1502
1503 self.exclude_daemons = exclude_daemons
1504 self.debug = debug
1505 self.verbose = verbose
1506
1507 assert self.master_node not in self.nonmaster_nodes
1508
1509 def _RunCmd(self, node_name, cmd):
1510 """Runs a command on the local or a remote machine.
1511
1512 @type node_name: string
1513 @param node_name: Machine name
1514 @type cmd: list
1515 @param cmd: Command
1516
1517 """
1518 if node_name is None or node_name == self.master_node:
1519 # No need to use SSH
1520 result = utils.RunCmd(cmd)
1521 else:
1522 result = self.ssh.Run(node_name, constants.SSH_LOGIN_USER,
1523 utils.ShellQuoteArgs(cmd),
1524 port=self.ssh_ports[node_name])
1525
1526 if result.failed:
1527 errmsg = ["Failed to run command %s" % result.cmd]
1528 if node_name:
1529 errmsg.append("on node %s" % node_name)
1530 errmsg.append(": exitcode %s and error %s" %
1531 (result.exit_code, result.output))
1532 raise errors.OpExecError(" ".join(errmsg))
1533
1534 def Call(self, fn, *args):
1535 """Call function while all daemons are stopped.
1536
1537 @type fn: callable
1538 @param fn: Function to be called
1539
1540 """
1541 # Pause watcher by acquiring an exclusive lock on watcher state file
1542 self.feedback_fn("Blocking watcher")
1543 watcher_block = utils.FileLock.Open(pathutils.WATCHER_LOCK_FILE)
1544 try:
1545 # TODO: Currently, this just blocks. There's no timeout.
1546 # TODO: Should it be a shared lock?
1547 watcher_block.Exclusive(blocking=True)
1548
1549 # Stop master daemons, so that no new jobs can come in and all running
1550 # ones are finished
1551 self.feedback_fn("Stopping master daemons")
1552 self._RunCmd(None, [pathutils.DAEMON_UTIL, "stop-master"])
1553 try:
1554 # Stop daemons on all nodes
1555 online_nodes = [self.master_node] + [n for n in self.online_nodes
1556 if n != self.master_node]
1557 for node_name in online_nodes:
1558 self.feedback_fn("Stopping daemons on %s" % node_name)
1559 self._RunCmd(node_name, [pathutils.DAEMON_UTIL, "stop-all"])
1560 # Starting any daemons listed as exception
1561 if node_name == self.master_node:
1562 for daemon in self.exclude_daemons:
1563 self.feedback_fn("Starting daemon '%s' on %s" % (daemon,
1564 node_name))
1565 self._RunCmd(node_name, [pathutils.DAEMON_UTIL, "start", daemon])
1566
1567 # All daemons are shut down now
1568 try:
1569 return fn(self, *args)
1570 except Exception, err:
1571 _, errmsg = FormatError(err)
1572 logging.exception("Caught exception")
1573 self.feedback_fn(errmsg)
1574 raise
1575 finally:
1576 # Start cluster again, master node last
1577 for node_name in self.nonmaster_nodes + [self.master_node]:
1578 # Stopping any daemons listed as exception.
1579 # This might look unnecessary, but it makes sure that daemon-util
1580 # starts all daemons in the right order.
1581 if node_name == self.master_node:
1582 self.exclude_daemons.reverse()
1583 for daemon in self.exclude_daemons:
1584 self.feedback_fn("Stopping daemon '%s' on %s" % (daemon,
1585 node_name))
1586 self._RunCmd(node_name, [pathutils.DAEMON_UTIL, "stop", daemon])
1587 self.feedback_fn("Starting daemons on %s" % node_name)
1588 self._RunCmd(node_name, [pathutils.DAEMON_UTIL, "start-all"])
1589
1590 finally:
1591 # Resume watcher
1592 watcher_block.Close()
1593
1594
1595 def RunWhileDaemonsStopped(feedback_fn, exclude_daemons, fn, *args, **kwargs):
1596 """Calls a function while all cluster daemons are stopped.
1597
1598 @type feedback_fn: callable
1599 @param feedback_fn: Feedback function
1600 @type exclude_daemons: list of string
1601 @param exclude_daemons: list of daemons that stopped, but immediately
1602 restarted on the master to be available when calling
1603 'fn'. If None, all daemons will be stopped and none
1604 will be started before calling 'fn'.
1605 @type fn: callable
1606 @param fn: Function to be called when daemons are stopped
1607
1608 """
1609 feedback_fn("Gathering cluster information")
1610
1611 # This ensures we're running on the master daemon
1612 cl = GetClient()
1613
1614 (cluster_name, master_node) = \
1615 cl.QueryConfigValues(["cluster_name", "master_node"])
1616
1617 online_nodes = GetOnlineNodes([], cl=cl)
1618 ssh_ports = GetNodesSshPorts(online_nodes, cl)
1619
1620 # Don't keep a reference to the client. The master daemon will go away.
1621 del cl
1622
1623 assert master_node in online_nodes
1624 if exclude_daemons is None:
1625 exclude_daemons = []
1626
1627 debug = kwargs.get("debug", False)
1628 verbose = kwargs.get("verbose", False)
1629
1630 return _RunWhileDaemonsStoppedHelper(
1631 feedback_fn, cluster_name, master_node, online_nodes, ssh_ports,
1632 exclude_daemons, debug, verbose).Call(fn, *args)
1633
1634
1635 def RunWhileClusterStopped(feedback_fn, fn, *args):
1636 """Calls a function while all cluster daemons are stopped.
1637
1638 @type feedback_fn: callable
1639 @param feedback_fn: Feedback function
1640 @type fn: callable
1641 @param fn: Function to be called when daemons are stopped
1642
1643 """
1644 RunWhileDaemonsStopped(feedback_fn, None, fn, *args)
1645
1646
1647 def GenerateTable(headers, fields, separator, data,
1648 numfields=None, unitfields=None,
1649 units=None):
1650 """Prints a table with headers and different fields.
1651
1652 @type headers: dict
1653 @param headers: dictionary mapping field names to headers for
1654 the table
1655 @type fields: list
1656 @param fields: the field names corresponding to each row in
1657 the data field
1658 @param separator: the separator to be used; if this is None,
1659 the default 'smart' algorithm is used which computes optimal
1660 field width, otherwise just the separator is used between
1661 each field
1662 @type data: list
1663 @param data: a list of lists, each sublist being one row to be output
1664 @type numfields: list
1665 @param numfields: a list with the fields that hold numeric
1666 values and thus should be right-aligned
1667 @type unitfields: list
1668 @param unitfields: a list with the fields that hold numeric
1669 values that should be formatted with the units field
1670 @type units: string or None
1671 @param units: the units we should use for formatting, or None for
1672 automatic choice (human-readable for non-separator usage, otherwise
1673 megabytes); this is a one-letter string
1674
1675 """
1676 if units is None:
1677 if separator:
1678 units = "m"
1679 else:
1680 units = "h"
1681
1682 if numfields is None:
1683 numfields = []
1684 if unitfields is None:
1685 unitfields = []
1686
1687 numfields = utils.FieldSet(*numfields) # pylint: disable=W0142
1688 unitfields = utils.FieldSet(*unitfields) # pylint: disable=W0142
1689
1690 format_fields = []
1691 for field in fields:
1692 if headers and field not in headers:
1693 # TODO: handle better unknown fields (either revert to old
1694 # style of raising exception, or deal more intelligently with
1695 # variable fields)
1696 headers[field] = field
1697 if separator is not None:
1698 format_fields.append("%s")
1699 elif numfields.Matches(field):
1700 format_fields.append("%*s")
1701 else:
1702 format_fields.append("%-*s")
1703
1704 if separator is None:
1705 mlens = [0 for name in fields]
1706 format_str = " ".join(format_fields)
1707 else:
1708 format_str = separator.replace("%", "%%").join(format_fields)
1709
1710 for row in data:
1711 if row is None:
1712 continue
1713 for idx, val in enumerate(row):
1714 if unitfields.Matches(fields[idx]):
1715 try:
1716 val = int(val)
1717 except (TypeError, ValueError):
1718 pass
1719 else:
1720 val = row[idx] = utils.FormatUnit(val, units)
1721 val = row[idx] = str(val)
1722 if separator is None:
1723 mlens[idx] = max(mlens[idx], len(val))
1724
1725 result = []
1726 if headers:
1727 args = []
1728 for idx, name in enumerate(fields):
1729 hdr = headers[name]
1730 if separator is None:
1731 mlens[idx] = max(mlens[idx], len(hdr))
1732 args.append(mlens[idx])
1733 args.append(hdr)
1734 result.append(format_str % tuple(args))
1735
1736 if separator is None:
1737 assert len(mlens) == len(fields)
1738
1739 if fields and not numfields.Matches(fields[-1]):
1740 mlens[-1] = 0
1741
1742 for line in data:
1743 args = []
1744 if line is None:
1745 line = ["-" for _ in fields]
1746 for idx in range(len(fields)):
1747 if separator is None:
1748 args.append(mlens[idx])
1749 args.append(line[idx])
1750 result.append(format_str % tuple(args))
1751
1752 return result
1753
1754
1755 def _FormatBool(value):
1756 """Formats a boolean value as a string.
1757
1758 """
1759 if value:
1760 return "Y"
1761 return "N"
1762
1763
1764 #: Default formatting for query results; (callback, align right)
1765 _DEFAULT_FORMAT_QUERY = {
1766 constants.QFT_TEXT: (str, False),
1767 constants.QFT_BOOL: (_FormatBool, False),
1768 constants.QFT_NUMBER: (str, True),
1769 constants.QFT_NUMBER_FLOAT: (str, True),
1770 constants.QFT_TIMESTAMP: (utils.FormatTime, False),
1771 constants.QFT_OTHER: (str, False),
1772 constants.QFT_UNKNOWN: (str, False),
1773 }
1774
1775
1776 def _GetColumnFormatter(fdef, override, unit):
1777 """Returns formatting function for a field.
1778
1779 @type fdef: L{objects.QueryFieldDefinition}
1780 @type override: dict
1781 @param override: Dictionary for overriding field formatting functions,
1782 indexed by field name, contents like L{_DEFAULT_FORMAT_QUERY}
1783 @type unit: string
1784 @param unit: Unit used for formatting fields of type L{constants.QFT_UNIT}
1785 @rtype: tuple; (callable, bool)
1786 @return: Returns the function to format a value (takes one parameter) and a
1787 boolean for aligning the value on the right-hand side
1788
1789 """
1790 fmt = override.get(fdef.name, None)
1791 if fmt is not None:
1792 return fmt
1793
1794 assert constants.QFT_UNIT not in _DEFAULT_FORMAT_QUERY
1795
1796 if fdef.kind == constants.QFT_UNIT:
1797 # Can't keep this information in the static dictionary
1798 return (lambda value: utils.FormatUnit(value, unit), True)
1799
1800 fmt = _DEFAULT_FORMAT_QUERY.get(fdef.kind, None)
1801 if fmt is not None:
1802 return fmt
1803
1804 raise NotImplementedError("Can't format column type '%s'" % fdef.kind)
1805
1806
1807 class _QueryColumnFormatter(object):
1808 """Callable class for formatting fields of a query.
1809
1810 """
1811 def __init__(self, fn, status_fn, verbose):
1812 """Initializes this class.
1813
1814 @type fn: callable
1815 @param fn: Formatting function
1816 @type status_fn: callable
1817 @param status_fn: Function to report fields' status
1818 @type verbose: boolean
1819 @param verbose: whether to use verbose field descriptions or not
1820
1821 """
1822 self._fn = fn
1823 self._status_fn = status_fn
1824 self._verbose = verbose
1825
1826 def __call__(self, data):
1827 """Returns a field's string representation.
1828
1829 """
1830 (status, value) = data
1831
1832 # Report status
1833 self._status_fn(status)
1834
1835 if status == constants.RS_NORMAL:
1836 return self._fn(value)
1837
1838 assert value is None, \
1839 "Found value %r for abnormal status %s" % (value, status)
1840
1841 return FormatResultError(status, self._verbose)
1842
1843
1844 def FormatResultError(status, verbose):
1845 """Formats result status other than L{constants.RS_NORMAL}.
1846
1847 @param status: The result status
1848 @type verbose: boolean
1849 @param verbose: Whether to return the verbose text
1850 @return: Text of result status
1851
1852 """
1853 assert status != constants.RS_NORMAL, \
1854 "FormatResultError called with status equal to constants.RS_NORMAL"
1855 try:
1856 (verbose_text, normal_text) = constants.RSS_DESCRIPTION[status]
1857 except KeyError:
1858 raise NotImplementedError("Unknown status %s" % status)
1859 else:
1860 if verbose:
1861 return verbose_text
1862 return normal_text
1863
1864
1865 def FormatQueryResult(result, unit=None, format_override=None, separator=None,
1866 header=False, verbose=False):
1867 """Formats data in L{objects.QueryResponse}.
1868
1869 @type result: L{objects.QueryResponse}
1870 @param result: result of query operation
1871 @type unit: string
1872 @param unit: Unit used for formatting fields of type L{constants.QFT_UNIT},
1873 see L{utils.text.FormatUnit}
1874 @type format_override: dict
1875 @param format_override: Dictionary for overriding field formatting functions,
1876 indexed by field name, contents like L{_DEFAULT_FORMAT_QUERY}
1877 @type separator: string or None
1878 @param separator: String used to separate fields
1879 @type header: bool
1880 @param header: Whether to output header row
1881 @type verbose: boolean
1882 @param verbose: whether to use verbose field descriptions or not
1883
1884 """
1885 if unit is None:
1886 if separator:
1887 unit = "m"
1888 else:
1889 unit = "h"
1890
1891 if format_override is None:
1892 format_override = {}
1893
1894 stats = dict.fromkeys(constants.RS_ALL, 0)
1895
1896 def _RecordStatus(status):
1897 if status in stats:
1898 stats[status] += 1
1899
1900 columns = []
1901 for fdef in result.fields:
1902 assert fdef.title and fdef.name
1903 (fn, align_right) = _GetColumnFormatter(fdef, format_override, unit)
1904 columns.append(TableColumn(fdef.title,
1905 _QueryColumnFormatter(fn, _RecordStatus,
1906 verbose),
1907 align_right))
1908
1909 table = FormatTable(result.data, columns, header, separator)
1910
1911 # Collect statistics
1912 assert len(stats) == len(constants.RS_ALL)
1913 assert compat.all(count >= 0 for count in stats.values())
1914
1915 # Determine overall status. If there was no data, unknown fields must be
1916 # detected via the field definitions.
1917 if (stats[constants.RS_UNKNOWN] or
1918 (not result.data and _GetUnknownFields(result.fields))):
1919 status = QR_UNKNOWN
1920 elif compat.any(count > 0 for key, count in stats.items()
1921 if key != constants.RS_NORMAL):
1922 status = QR_INCOMPLETE
1923 else:
1924 status = QR_NORMAL
1925
1926 return (status, table)
1927
1928
1929 def _GetUnknownFields(fdefs):
1930 """Returns list of unknown fields included in C{fdefs}.
1931
1932 @type fdefs: list of L{objects.QueryFieldDefinition}
1933
1934 """
1935 return [fdef for fdef in fdefs
1936 if fdef.kind == constants.QFT_UNKNOWN]
1937
1938
1939 def _WarnUnknownFields(fdefs):
1940 """Prints a warning to stderr if a query included unknown fields.
1941
1942 @type fdefs: list of L{objects.QueryFieldDefinition}
1943
1944 """
1945 unknown = _GetUnknownFields(fdefs)
1946 if unknown:
1947 ToStderr("Warning: Queried for unknown fields %s",
1948 utils.CommaJoin(fdef.name for fdef in unknown))
1949 return True
1950
1951 return False
1952
1953
1954 def GenericList(resource, fields, names, unit, separator, header, cl=None,
1955 format_override=None, verbose=False, force_filter=False,
1956 namefield=None, qfilter=None, isnumeric=False):
1957 """Generic implementation for listing all items of a resource.
1958
1959 @param resource: One of L{constants.QR_VIA_LUXI}
1960 @type fields: list of strings
1961 @param fields: List of fields to query for
1962 @type names: list of strings
1963 @param names: Names of items to query for
1964 @type unit: string or None
1965 @param unit: Unit used for formatting fields of type L{constants.QFT_UNIT} or
1966 None for automatic choice (human-readable for non-separator usage,
1967 otherwise megabytes); this is a one-letter string
1968 @type separator: string or None
1969 @param separator: String used to separate fields
1970 @type header: bool
1971 @param header: Whether to show header row
1972 @type force_filter: bool
1973 @param force_filter: Whether to always treat names as filter
1974 @type format_override: dict
1975 @param format_override: Dictionary for overriding field formatting functions,
1976 indexed by field name, contents like L{_DEFAULT_FORMAT_QUERY}
1977 @type verbose: boolean
1978 @param verbose: whether to use verbose field descriptions or not
1979 @type namefield: string
1980 @param namefield: Name of field to use for simple filters (see
1981 L{qlang.MakeFilter} for details)
1982 @type qfilter: list or None
1983 @param qfilter: Query filter (in addition to names)
1984 @param isnumeric: bool
1985 @param isnumeric: Whether the namefield's type is numeric, and therefore
1986 any simple filters built by namefield should use integer values to
1987 reflect that
1988
1989 """
1990 if not names:
1991 names = None
1992
1993 namefilter = qlang.MakeFilter(names, force_filter, namefield=namefield,
1994 isnumeric=isnumeric)
1995
1996 if qfilter is None:
1997 qfilter = namefilter
1998 elif namefilter is not None:
1999 qfilter = [qlang.OP_AND, namefilter, qfilter]
2000
2001 if cl is None:
2002 cl = GetClient()
2003
2004 response = cl.Query(resource, fields, qfilter)
2005
2006 found_unknown = _WarnUnknownFields(response.fields)
2007
2008 (status, data) = FormatQueryResult(response, unit=unit, separator=separator,
2009 header=header,
2010 format_override=format_override,
2011 verbose=verbose)
2012
2013 for line in data:
2014 ToStdout(line)
2015
2016 assert ((found_unknown and status == QR_UNKNOWN) or
2017 (not found_unknown and status != QR_UNKNOWN))
2018
2019 if status == QR_UNKNOWN:
2020 return constants.EXIT_UNKNOWN_FIELD
2021
2022 # TODO: Should the list command fail if not all data could be collected?
2023 return constants.EXIT_SUCCESS
2024
2025
2026 def _FieldDescValues(fdef):
2027 """Helper function for L{GenericListFields} to get query field description.
2028
2029 @type fdef: L{objects.QueryFieldDefinition}
2030 @rtype: list
2031
2032 """
2033 return [
2034 fdef.name,
2035 _QFT_NAMES.get(fdef.kind, fdef.kind),
2036 fdef.title,
2037 fdef.doc,
2038 ]
2039
2040
2041 def GenericListFields(resource, fields, separator, header, cl=None):
2042 """Generic implementation for listing fields for a resource.
2043
2044 @param resource: One of L{constants.QR_VIA_LUXI}
2045 @type fields: list of strings
2046 @param fields: List of fields to query for
2047 @type separator: string or None
2048 @param separator: String used to separate fields
2049 @type header: bool
2050 @param header: Whether to show header row
2051
2052 """
2053 if cl is None:
2054 cl = GetClient()
2055
2056 if not fields:
2057 fields = None
2058
2059 response = cl.QueryFields(resource, fields)
2060
2061 found_unknown = _WarnUnknownFields(response.fields)
2062
2063 columns = [
2064 TableColumn("Name", str, False),
2065 TableColumn("Type", str, False),
2066 TableColumn("Title", str, False),
2067 TableColumn("Description", str, False),
2068 ]
2069
2070 rows = map(_FieldDescValues, response.fields)
2071
2072 for line in FormatTable(rows, columns, header, separator):
2073 ToStdout(line)
2074
2075 if found_unknown:
2076 return constants.EXIT_UNKNOWN_FIELD
2077
2078 return constants.EXIT_SUCCESS
2079
2080
2081 class TableColumn(object):
2082 """Describes a column for L{FormatTable}.
2083
2084 """
2085 def __init__(self, title, fn, align_right):
2086 """Initializes this class.
2087
2088 @type title: string
2089 @param title: Column title
2090 @type fn: callable
2091 @param fn: Formatting function
2092 @type align_right: bool
2093 @param align_right: Whether to align values on the right-hand side
2094
2095 """
2096 self.title = title
2097 self.format = fn
2098 self.align_right = align_right
2099
2100
2101 def _GetColFormatString(width, align_right):
2102 """Returns the format string for a field.
2103
2104 """
2105 if align_right:
2106 sign = ""
2107 else:
2108 sign = "-"
2109
2110 return "%%%s%ss" % (sign, width)
2111
2112
2113 def FormatTable(rows, columns, header, separator):
2114 """Formats data as a table.
2115
2116 @type rows: list of lists
2117 @param rows: Row data, one list per row
2118 @type columns: list of L{TableColumn}
2119 @param columns: Column descriptions
2120 @type header: bool
2121 @param header: Whether to show header row
2122 @type separator: string or None
2123 @param separator: String used to separate columns
2124
2125 """
2126 if header:
2127 data = [[col.title for col in columns]]
2128 colwidth = [len(col.title) for col in columns]
2129 else:
2130 data = []
2131 colwidth = [0 for _ in columns]
2132
2133 # Format row data
2134 for row in rows:
2135 assert len(row) == len(columns)
2136
2137 formatted = [col.format(value) for value, col in zip(row, columns)]
2138
2139 if separator is None:
2140 # Update column widths
2141 for idx, (oldwidth, value) in enumerate(zip(colwidth, formatted)):
2142 # Modifying a list's items while iterating is fine
2143 colwidth[idx] = max(oldwidth, len(value))
2144
2145 data.append(formatted)
2146
2147 if separator is not None:
2148 # Return early if a separator is used
2149 return [separator.join(row) for row in data]
2150
2151 if columns and not columns[-1].align_right:
2152 # Avoid unnecessary spaces at end of line
2153 colwidth[-1] = 0
2154
2155 # Build format string
2156 fmt = " ".join([_GetColFormatString(width, col.align_right)
2157 for col, width in zip(columns, colwidth)])
2158
2159 return [fmt % tuple(row) for row in data]
2160
2161
2162 def FormatTimestamp(ts):
2163 """Formats a given timestamp.
2164
2165 @type ts: timestamp
2166 @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
2167
2168 @rtype: string
2169 @return: a string with the formatted timestamp
2170
2171 """
2172 if not isinstance(ts, (tuple, list)) or len(ts) != 2:
2173 return "?"
2174
2175 (sec, usecs) = ts
2176 return utils.FormatTime(sec, usecs=usecs)
2177
2178
2179 def ParseTimespec(value):
2180 """Parse a time specification.
2181
2182 The following suffixed will be recognized:
2183
2184 - s: seconds
2185 - m: minutes
2186 - h: hours
2187 - d: day
2188 - w: weeks
2189
2190 Without any suffix, the value will be taken to be in seconds.
2191
2192 """
2193 value = str(value)
2194 if not value:
2195 raise errors.OpPrereqError("Empty time specification passed",
2196 errors.ECODE_INVAL)
2197 suffix_map = {
2198 "s": 1,
2199 "m": 60,
2200 "h": 3600,
2201 "d": 86400,
2202 "w": 604800,
2203 }
2204 if value[-1] not in suffix_map:
2205 try:
2206 value = int(value)
2207 except (TypeError, ValueError):
2208 raise errors.OpPrereqError("Invalid time specification '%s'" % value,
2209 errors.ECODE_INVAL)
2210 else:
2211 multiplier = suffix_map[value[-1]]
2212 value = value[:-1]
2213 if not value: # no data left after stripping the suffix
2214 raise errors.OpPrereqError("Invalid time specification (only"
2215 " suffix passed)", errors.ECODE_INVAL)
2216 try:
2217 value = int(value) * multiplier
2218 except (TypeError, ValueError):
2219 raise errors.OpPrereqError("Invalid time specification '%s'" % value,
2220 errors.ECODE_INVAL)
2221 return value
2222
2223
2224 def GetOnlineNodes(nodes, cl=None, nowarn=False, secondary_ips=False,
2225 filter_master=False, nodegroup=None):
2226 """Returns the names of online nodes.
2227
2228 This function will also log a warning on stderr with the names of
2229 the online nodes.
2230
2231 @param nodes: if not empty, use only this subset of nodes (minus the
2232 offline ones)
2233 @param cl: if not None, luxi client to use
2234 @type nowarn: boolean
2235 @param nowarn: by default, this function will output a note with the
2236 offline nodes that are skipped; if this parameter is True the
2237 note is not displayed
2238 @type secondary_ips: boolean
2239 @param secondary_ips: if True, return the secondary IPs instead of the
2240 names, useful for doing network traffic over the replication interface
2241 (if any)
2242 @type filter_master: boolean
2243 @param filter_master: if True, do not return the master node in the list
2244 (useful in coordination with secondary_ips where we cannot check our
2245 node name against the list)
2246 @type nodegroup: string
2247 @param nodegroup: If set, only return nodes in this node group
2248
2249 """
2250 if cl is None:
2251 cl = GetClient()
2252
2253 qfilter = []
2254
2255 if nodes:
2256 qfilter.append(qlang.MakeSimpleFilter("name", nodes))
2257
2258 if nodegroup is not None:
2259 qfilter.append([qlang.OP_OR, [qlang.OP_EQUAL, "group", nodegroup],
2260 [qlang.OP_EQUAL, "group.uuid", nodegroup]])
2261
2262 if filter_master:
2263 qfilter.append([qlang.OP_NOT, [qlang.OP_TRUE, "master"]])
2264
2265 if qfilter:
2266 if len(qfilter) > 1:
2267 final_filter = [qlang.OP_AND] + qfilter
2268 else:
2269 assert len(qfilter) == 1
2270 final_filter = qfilter[0]
2271 else:
2272 final_filter = None
2273
2274 result = cl.Query(constants.QR_NODE, ["name", "offline", "sip"], final_filter)
2275
2276 def _IsOffline(row):
2277 (_, (_, offline), _) = row
2278 return offline
2279
2280 def _GetName(row):
2281 ((_, name), _, _) = row
2282 return name
2283
2284 def _GetSip(row):
2285 (_, _, (_, sip)) = row
2286 return sip
2287
2288 (offline, online) = compat.partition(result.data, _IsOffline)
2289
2290 if offline and not nowarn:
2291 ToStderr("Note: skipping offline node(s): %s" %
2292 utils.CommaJoin(map(_GetName, offline)))
2293
2294 if secondary_ips:
2295 fn = _GetSip
2296 else:
2297 fn = _GetName
2298
2299 return map(fn, online)
2300
2301
2302 def GetNodesSshPorts(nodes, cl):
2303 """Retrieves SSH ports of given nodes.
2304
2305 @param nodes: the names of nodes
2306 @type nodes: a list of strings
2307 @param cl: a client to use for the query
2308 @type cl: L{ganeti.luxi.Client}
2309 @return: the list of SSH ports corresponding to the nodes
2310 @rtype: a list of tuples
2311
2312 """
2313 return map(lambda t: t[0],
2314 cl.QueryNodes(names=nodes,
2315 fields=["ndp/ssh_port"],
2316 use_locking=False))
2317
2318
2319 def GetNodeUUIDs(nodes, cl):
2320 """Retrieves the UUIDs of given nodes.
2321
2322 @param nodes: the names of nodes
2323 @type nodes: a list of string
2324 @param cl: a client to use for the query
2325 @type cl: L{ganeti.luxi.Client}
2326 @return: the list of UUIDs corresponding to the nodes
2327 @rtype: a list of tuples
2328
2329 """
2330 return map(lambda t: t[0],
2331 cl.QueryNodes(names=nodes,
2332 fields=["uuid"],
2333 use_locking=False))
2334
2335
2336 def _ToStream(stream, txt, *args):
2337 """Write a message to a stream, bypassing the logging system
2338
2339 @type stream: file object
2340 @param stream: the file to which we should write
2341 @type txt: str
2342 @param txt: the message
2343
2344 """
2345 try:
2346 if args:
2347 args = tuple(args)
2348 stream.write(txt % args)
2349 else:
2350 stream.write(txt)
2351 stream.write("\n")
2352 stream.flush()
2353 except IOError, err:
2354 if err.errno == errno.EPIPE:
2355 # our terminal went away, we'll exit
2356 sys.exit(constants.EXIT_FAILURE)
2357 else:
2358 raise
2359
2360
2361 def ToStdout(txt, *args):
2362 """Write a message to stdout only, bypassing the logging system
2363
2364 This is just a wrapper over _ToStream.
2365
2366 @type txt: str
2367 @param txt: the message
2368
2369 """
2370 _ToStream(sys.stdout, txt, *args)
2371
2372
2373 def ToStdoutAndLoginfo(txt, *args):
2374 """Write a message to stdout and additionally log it at INFO level"""
2375 ToStdout(txt, *args)
2376 logging.info(txt, *args)
2377
2378
2379 def ToStderr(txt, *args):
2380 """Write a message to stderr only, bypassing the logging system
2381
2382 This is just a wrapper over _ToStream.
2383
2384 @type txt: str
2385 @param txt: the message
2386
2387 """
2388 _ToStream(sys.stderr, txt, *args)
2389
2390
2391 class JobExecutor(object):
2392 """Class which manages the submission and execution of multiple jobs.
2393
2394 Note that instances of this class should not be reused between
2395 GetResults() calls.
2396
2397 """
2398 def __init__(self, cl=None, verbose=True, opts=None, feedback_fn=None):
2399 self.queue = []
2400 if cl is None:
2401 cl = GetClient()
2402 self.cl = cl
2403 self.verbose = verbose
2404 self.jobs = []
2405 self.opts = opts
2406 self.feedback_fn = feedback_fn
2407 self._counter = itertools.count()
2408
2409 @staticmethod
2410 def _IfName(name, fmt):
2411 """Helper function for formatting name.
2412
2413 """
2414 if name:
2415 return fmt % name
2416
2417 return ""
2418
2419 def QueueJob(self, name, *ops):
2420 """Record a job for later submit.
2421
2422 @type name: string
2423 @param name: a description of the job, will be used in WaitJobSet
2424
2425 """
2426 SetGenericOpcodeOpts(ops, self.opts)
2427 self.queue.append((self._counter.next(), name, ops))
2428
2429 def AddJobId(self, name, status, job_id):
2430 """Adds a job ID to the internal queue.
2431
2432 """
2433 self.jobs.append((self._counter.next(), status, job_id, name))
2434
2435 def SubmitPending(self, each=False):
2436 """Submit all pending jobs.
2437
2438 """
2439 if each:
2440 results = []
2441 for (_, _, ops) in self.queue:
2442 # SubmitJob will remove the success status, but raise an exception if
2443 # the submission fails, so we'll notice that anyway.
2444 results.append([True, self.cl.SubmitJob(ops)[0]])
2445 else:
2446 results = self.cl.SubmitManyJobs([ops for (_, _, ops) in self.queue])
2447 for ((status, data), (idx, name, _)) in zip(results, self.queue):
2448 self.jobs.append((idx, status, data, name))
2449
2450 def _ChooseJob(self):
2451 """Choose a non-waiting/queued job to poll next.
2452
2453 """
2454 assert self.jobs, "_ChooseJob called with empty job list"
2455
2456 result = self.cl.QueryJobs([i[2] for i in self.jobs[:_CHOOSE_BATCH]],
2457 ["status"])
2458 assert result
2459
2460 for job_data, status in zip(self.jobs, result):
2461 if (isinstance(status, list) and status and
2462 status[0] in (constants.JOB_STATUS_QUEUED,
2463 constants.JOB_STATUS_WAITING,
2464 constants.JOB_STATUS_CANCELING)):
2465 # job is still present and waiting
2466 continue
2467 # good candidate found (either running job or lost job)
2468 self.jobs.remove(job_data)
2469 return job_data
2470
2471 # no job found
2472 return self.jobs.pop(0)
2473
2474 def GetResults(self):
2475 """Wait for and return the results of all jobs.
2476
2477 @rtype: list
2478 @return: list of tuples (success, job results), in the same order
2479 as the submitted jobs; if a job has failed, instead of the result
2480 there will be the error message
2481
2482 """
2483 if not self.jobs:
2484 self.SubmitPending()
2485 results = []
2486 if self.verbose:
2487 ok_jobs = [row[2] for row in self.jobs if row[1]]
2488 if ok_jobs:
2489 ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
2490
2491 # first, remove any non-submitted jobs
2492 self.jobs, failures = compat.partition(self.jobs, lambda x: x[1])
2493 for idx, _, jid, name in failures:
2494 ToStderr("Failed to submit job%s: %s", self._IfName(name, " for %s"), jid)
2495 results.append((idx, False, jid))
2496
2497 while self.jobs:
2498 (idx, _, jid, name) = self._ChooseJob()
2499 ToStdout("Waiting for job %s%s ...", jid, self._IfName(name, " for %s"))
2500 try:
2501 job_result = PollJob(jid, cl=self.cl, feedback_fn=self.feedback_fn)
2502 success = True
2503 except errors.JobLost, err:
2504 _, job_result = FormatError(err)
2505 ToStderr("Job %s%s has been archived, cannot check its result",
2506 jid, self._IfName(name, " for %s"))
2507 success = False
2508 except (errors.GenericError, rpcerr.ProtocolError), err:
2509 _, job_result = FormatError(err)
2510 success = False
2511 # the error message will always be shown, verbose or not
2512 ToStderr("Job %s%s has failed: %s",
2513 jid, self._IfName(name, " for %s"), job_result)
2514
2515 results.append((idx, success, job_result))
2516
2517 # sort based on the index, then drop it
2518 results.sort()
2519 results = [i[1:] for i in results]
2520
2521 return results
2522
2523 def WaitOrShow(self, wait):
2524 """Wait for job results or only print the job IDs.
2525
2526 @type wait: boolean
2527 @param wait: whether to wait or not
2528
2529 """
2530 if wait:
2531 return self.GetResults()
2532 else:
2533 if not self.jobs:
2534 self.SubmitPending()
2535 for _, status, result, name in self.jobs:
2536 if status:
2537 ToStdout("%s: %s", result, name)
2538 else:
2539 ToStderr("Failure for %s: %s", name, result)
2540 return [row[1:3] for row in self.jobs]
2541
2542
2543 def FormatParamsDictInfo(param_dict, actual, roman=False):
2544 """Formats a parameter dictionary.
2545
2546 @type param_dict: dict
2547 @param param_dict: the own parameters
2548 @type actual: dict
2549 @param actual: the current parameter set (including defaults)
2550 @rtype: dict
2551 @return: dictionary where the value of each parameter is either a fully
2552 formatted string or a dictionary containing formatted strings
2553
2554 """
2555 ret = {}
2556 for (key, data) in actual.items():
2557 if isinstance(data, dict) and data:
2558 ret[key] = FormatParamsDictInfo(param_dict.get(key, {}), data, roman)
2559 else:
2560 default_str = "default (%s)" % compat.TryToRoman(data, roman)
2561 ret[key] = str(compat.TryToRoman(param_dict.get(key, default_str), roman))
2562 return ret
2563
2564
2565 def _FormatListInfoDefault(data, def_data):
2566 if data is not None:
2567 ret = utils.CommaJoin(data)
2568 else:
2569 ret = "default (%s)" % utils.CommaJoin(def_data)
2570 return ret
2571
2572
2573 def FormatPolicyInfo(custom_ipolicy, eff_ipolicy, iscluster, roman=False):
2574 """Formats an instance policy.
2575
2576 @type custom_ipolicy: dict
2577 @param custom_ipolicy: own policy
2578 @type eff_ipolicy: dict
2579 @param eff_ipolicy: effective policy (including defaults); ignored for
2580 cluster
2581 @type iscluster: bool
2582 @param iscluster: the policy is at cluster level
2583 @type roman: bool
2584 @param roman: whether to print the values in roman numerals
2585 @rtype: list of pairs
2586 @return: formatted data, suitable for L{PrintGenericInfo}
2587
2588 """
2589 if iscluster:
2590 eff_ipolicy = custom_ipolicy
2591
2592 minmax_out = []
2593 custom_minmax = custom_ipolicy.get(constants.ISPECS_MINMAX)
2594 if custom_minmax:
2595 for (k, minmax) in enumerate(custom_minmax):
2596 minmax_out.append([
2597 ("%s/%s" % (key, k),
2598 FormatParamsDictInfo(minmax[key], minmax[key], roman))
2599 for key in constants.ISPECS_MINMAX_KEYS
2600 ])
2601 else:
2602 for (k, minmax) in enumerate(eff_ipolicy[constants.ISPECS_MINMAX]):
2603 minmax_out.append([
2604 ("%s/%s" % (key, k),
2605 FormatParamsDictInfo({}, minmax[key], roman))
2606 for key in constants.ISPECS_MINMAX_KEYS
2607 ])
2608 ret = [("bounds specs", minmax_out)]
2609
2610 if iscluster:
2611 stdspecs = custom_ipolicy[constants.ISPECS_STD]
2612 ret.append(
2613 (constants.ISPECS_STD,
2614 FormatParamsDictInfo(stdspecs, stdspecs, roman))
2615 )
2616
2617 ret.append(
2618 ("allowed disk templates",
2619 _FormatListInfoDefault(custom_ipolicy.get(constants.IPOLICY_DTS),
2620 eff_ipolicy[constants.IPOLICY_DTS]))
2621 )
2622 to_roman = compat.TryToRoman
2623 ret.extend([
2624 (key, str(to_roman(custom_ipolicy.get(key,
2625 "default (%s)" % eff_ipolicy[key]),
2626 roman)))
2627 for key in constants.IPOLICY_PARAMETERS
2628 ])
2629 return ret
2630
2631
2632 def _PrintSpecsParameters(buf, specs):
2633 values = ("%s=%s" % (par, val) for (par, val) in sorted(specs.items()))
2634 buf.write(",".join(values))
2635
2636
2637 def PrintIPolicyCommand(buf, ipolicy, isgroup):
2638 """Print the command option used to generate the given instance policy.
2639
2640 Currently only the parts dealing with specs are supported.
2641
2642 @type buf: StringIO
2643 @param buf: stream to write into
2644 @type ipolicy: dict
2645 @param ipolicy: instance policy
2646 @type isgroup: bool
2647 @param isgroup: whether the policy is at group level
2648
2649 """
2650 if not isgroup:
2651 stdspecs = ipolicy.get("std")
2652 if stdspecs:
2653 buf.write(" %s " % IPOLICY_STD_SPECS_STR)
2654 _PrintSpecsParameters(buf, stdspecs)
2655 minmaxes = ipolicy.get("minmax", [])
2656 first = True
2657 for minmax in minmaxes:
2658 minspecs = minmax.get("min")
2659 maxspecs = minmax.get("max")
2660 if minspecs and maxspecs:
2661 if first:
2662 buf.write(" %s " % IPOLICY_BOUNDS_SPECS_STR)
2663 first = False
2664 else:
2665 buf.write("//")
2666 buf.write("min:")
2667 _PrintSpecsParameters(buf, minspecs)
2668 buf.write("/max:")
2669 _PrintSpecsParameters(buf, maxspecs)
2670
2671
2672 def ConfirmOperation(names, list_type, text, extra=""):
2673 """Ask the user to confirm an operation on a list of list_type.
2674
2675 This function is used to request confirmation for doing an operation
2676 on a given list of list_type.
2677
2678 @type names: list
2679 @param names: the list of names that we display when
2680 we ask for confirmation
2681 @type list_type: str
2682 @param list_type: Human readable name for elements in the list (e.g. nodes)
2683 @type text: str
2684 @param text: the operation that the user should confirm
2685 @rtype: boolean
2686 @return: True or False depending on user's confirmation.
2687
2688 """
2689 count = len(names)
2690 msg = ("The %s will operate on %d %s.\n%s"
2691 "Do you want to continue?" % (text, count, list_type, extra))
2692 affected = (("\nAffected %s:\n" % list_type) +
2693 "\n".join([" %s" % name for name in names]))
2694
2695 choices = [("y", True, "Yes, execute the %s" % text),
2696 ("n", False, "No, abort the %s" % text)]
2697
2698 if count > 20:
2699 choices.insert(1, ("v", "v", "View the list of affected %s" % list_type))
2700 question = msg
2701 else:
2702 question = msg + affected
2703
2704 choice = AskUser(question, choices)
2705 if choice == "v":
2706 choices.pop(1)
2707 choice = AskUser(msg + affected, choices)
2708 return choice
2709
2710
2711 def _MaybeParseUnit(elements):
2712 """Parses and returns an array of potential values with units.
2713
2714 """
2715 parsed = {}
2716 for k, v in elements.items():
2717 if v == constants.VALUE_DEFAULT:
2718 parsed[k] = v
2719 else:
2720 parsed[k] = utils.ParseUnit(v)
2721 return parsed
2722
2723
2724 def _InitISpecsFromSplitOpts(ipolicy, ispecs_mem_size, ispecs_cpu_count,
2725 ispecs_disk_count, ispecs_disk_size,
2726 ispecs_nic_count, group_ipolicy, fill_all):
2727 try:
2728 if ispecs_mem_size:
2729 ispecs_mem_size = _MaybeParseUnit(ispecs_mem_size)
2730 if ispecs_disk_size:
2731 ispecs_disk_size = _MaybeParseUnit(ispecs_disk_size)
2732 except (TypeError, ValueError, errors.UnitParseError), err:
2733 raise errors.OpPrereqError("Invalid disk (%s) or memory (%s) size"
2734 " in policy: %s" %
2735 (ispecs_disk_size, ispecs_mem_size, err),
2736 errors.ECODE_INVAL)
2737
2738 # prepare ipolicy dict
2739 ispecs_transposed = {
2740 constants.ISPEC_MEM_SIZE: ispecs_mem_size,
2741 constants.ISPEC_CPU_COUNT: ispecs_cpu_count,
2742 constants.ISPEC_DISK_COUNT: ispecs_disk_count,
2743 constants.ISPEC_DISK_SIZE: ispecs_disk_size,
2744 constants.ISPEC_NIC_COUNT: ispecs_nic_count,
2745 }
2746
2747 # first, check that the values given are correct
2748 if group_ipolicy:
2749 forced_type = TISPECS_GROUP_TYPES
2750 else:
2751 forced_type = TISPECS_CLUSTER_TYPES
2752 for specs in ispecs_transposed.values():
2753 assert type(specs) is dict
2754 utils.ForceDictType(specs, forced_type)
2755
2756 # then transpose
2757 ispecs = {
2758 constants.ISPECS_MIN: {},
2759 constants.ISPECS_MAX: {},
2760 constants.ISPECS_STD: {},
2761 }
2762 for (name, specs) in ispecs_transposed.iteritems():
2763 assert name in constants.ISPECS_PARAMETERS
2764 for key, val in specs.items(): # {min: .. ,max: .., std: ..}
2765 assert key in ispecs
2766 ispecs[key][name] = val
2767 minmax_out = {}
2768 for key in constants.ISPECS_MINMAX_KEYS:
2769 if fill_all:
2770 minmax_out[key] = \
2771 objects.FillDict(constants.ISPECS_MINMAX_DEFAULTS[key], ispecs[key])
2772 else:
2773 minmax_out[key] = ispecs[key]
2774 ipolicy[constants.ISPECS_MINMAX] = [minmax_out]
2775 if fill_all:
2776 ipolicy[constants.ISPECS_STD] = \
2777 objects.FillDict(constants.IPOLICY_DEFAULTS[constants.ISPECS_STD],
2778 ispecs[constants.ISPECS_STD])
2779 else:
2780 ipolicy[constants.ISPECS_STD] = ispecs[constants.ISPECS_STD]
2781
2782
2783 def _ParseSpecUnit(spec, keyname):
2784 ret = spec.copy()
2785 for k in [constants.ISPEC_DISK_SIZE, constants.ISPEC_MEM_SIZE]:
2786 if k in ret:
2787 try:
2788 ret[k] = utils.ParseUnit(ret[k])
2789 except (TypeError, ValueError, errors.UnitParseError), err:
2790 raise errors.OpPrereqError(("Invalid parameter %s (%s) in %s instance"
2791 " specs: %s" % (k, ret[k], keyname, err)),
2792 errors.ECODE_INVAL)
2793 return ret
2794
2795
2796 def _ParseISpec(spec, keyname, required):
2797 ret = _ParseSpecUnit(spec, keyname)
2798 utils.ForceDictType(ret, constants.ISPECS_PARAMETER_TYPES)
2799 missing = constants.ISPECS_PARAMETERS - frozenset(ret.keys())
2800 if required and missing:
2801 raise errors.OpPrereqError("Missing parameters in ipolicy spec %s: %s" %
2802 (keyname, utils.CommaJoin(missing)),
2803 errors.ECODE_INVAL)
2804 return ret
2805
2806
2807 def _GetISpecsInAllowedValues(minmax_ispecs, allowed_values):
2808 ret = None
2809 if (minmax_ispecs and allowed_values and len(minmax_ispecs) == 1 and
2810 len(minmax_ispecs[0]) == 1):
2811 for (key, spec) in minmax_ispecs[0].items():
2812 # This loop is executed exactly once
2813 if key in allowed_values and not spec:
2814 ret = key
2815 return ret
2816
2817
2818 def _InitISpecsFromFullOpts(ipolicy_out, minmax_ispecs, std_ispecs,
2819 group_ipolicy, allowed_values):
2820 found_allowed = _GetISpecsInAllowedValues(minmax_ispecs, allowed_values)
2821 if found_allowed is not None:
2822 ipolicy_out[constants.ISPECS_MINMAX] = found_allowed
2823 elif minmax_ispecs is not None:
2824 minmax_out = []
2825 for mmpair in minmax_ispecs:
2826 mmpair_out = {}
2827 for (key, spec) in mmpair.items():
2828 if key not in constants.ISPECS_MINMAX_KEYS:
2829 msg = "Invalid key in bounds instance specifications: %s" % key
2830 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
2831 mmpair_out[key] = _ParseISpec(spec, key, True)
2832 minmax_out.append(mmpair_out)
2833 ipolicy_out[constants.ISPECS_MINMAX] = minmax_out
2834 if std_ispecs is not None:
2835 assert not group_ipolicy # This is not an option for gnt-group
2836 ipolicy_out[constants.ISPECS_STD] = _ParseISpec(std_ispecs, "std", False)
2837
2838
2839 def CreateIPolicyFromOpts(ispecs_mem_size=None,
2840 ispecs_cpu_count=None,
2841 ispecs_disk_count=None,
2842 ispecs_disk_size=None,
2843 ispecs_nic_count=None,
2844 minmax_ispecs=None,
2845 std_ispecs=None,
2846 ipolicy_disk_templates=None,
2847 ipolicy_vcpu_ratio=None,
2848 ipolicy_spindle_ratio=None,
2849 group_ipolicy=False,
2850 allowed_values=None,
2851 fill_all=False):
2852 """Creation of instance policy based on command line options.
2853
2854 @param fill_all: whether for cluster policies we should ensure that
2855 all values are filled
2856
2857 """
2858 assert not (fill_all and allowed_values)
2859
2860 split_specs = (ispecs_mem_size or ispecs_cpu_count or ispecs_disk_count or
2861 ispecs_disk_size or ispecs_nic_count)
2862 if (split_specs and (minmax_ispecs is not None or std_ispecs is not None)):
2863 raise errors.OpPrereqError("A --specs-xxx option cannot be specified"
2864 " together with any --ipolicy-xxx-specs option",
2865 errors.ECODE_INVAL)
2866
2867 ipolicy_out = objects.MakeEmptyIPolicy()
2868 if split_specs:
2869 assert fill_all
2870 _InitISpecsFromSplitOpts(ipolicy_out, ispecs_mem_size, ispecs_cpu_count,
2871 ispecs_disk_count, ispecs_disk_size,
2872 ispecs_nic_count, group_ipolicy, fill_all)
2873 elif (minmax_ispecs is not None or std_ispecs is not None):
2874 _InitISpecsFromFullOpts(ipolicy_out, minmax_ispecs, std_ispecs,
2875 group_ipolicy, allowed_values)
2876
2877 if ipolicy_disk_templates is not None:
2878 if allowed_values and ipolicy_disk_templates in allowed_values:
2879 ipolicy_out[constants.IPOLICY_DTS] = ipolicy_disk_templates
2880 else:
2881 ipolicy_out[constants.IPOLICY_DTS] = list(ipolicy_disk_templates)
2882 if ipolicy_vcpu_ratio is not None:
2883 ipolicy_out[constants.IPOLICY_VCPU_RATIO] = ipolicy_vcpu_ratio
2884 if ipolicy_spindle_ratio is not None:
2885 ipolicy_out[constants.IPOLICY_SPINDLE_RATIO] = ipolicy_spindle_ratio
2886
2887 assert not (frozenset(ipolicy_out.keys()) - constants.IPOLICY_ALL_KEYS)
2888
2889 if not group_ipolicy and fill_all:
2890 ipolicy_out = objects.FillIPolicy(constants.IPOLICY_DEFAULTS, ipolicy_out)
2891
2892 return ipolicy_out
2893
2894
2895 def _NotAContainer(data):
2896 """ Checks whether the input is not a container data type.
2897
2898 @rtype: bool
2899
2900 """
2901 return not (isinstance(data, (list, dict, tuple)))
2902
2903
2904 def _GetAlignmentMapping(data):
2905 """ Returns info about alignment if present in an encoded ordered dictionary.
2906
2907 @type data: list of tuple
2908 @param data: The encoded ordered dictionary, as defined in
2909 L{_SerializeGenericInfo}.
2910 @rtype: dict of any to int
2911 @return: The dictionary mapping alignment groups to the maximum length of the
2912 dictionary key found in the group.
2913
2914 """
2915 alignment_map = {}
2916 for entry in data:
2917 if len(entry) > 2:
2918 group_key = entry[2]
2919 key_length = len(entry[0])
2920 if group_key in alignment_map:
2921 alignment_map[group_key] = max(alignment_map[group_key], key_length)
2922 else:
2923 alignment_map[group_key] = key_length
2924
2925 return alignment_map
2926
2927
2928 def _SerializeGenericInfo(buf, data, level, afterkey=False):
2929 """Formatting core of L{PrintGenericInfo}.
2930
2931 @param buf: (string) stream to accumulate the result into
2932 @param data: data to format
2933 @type level: int
2934 @param level: depth in the data hierarchy, used for indenting
2935 @type afterkey: bool
2936 @param afterkey: True when we are in the middle of a line after a key (used
2937 to properly add newlines or indentation)
2938
2939 """
2940 baseind = " "
2941 if isinstance(data, dict):
2942 if not data:
2943 buf.write("\n")
2944 else:
2945 if afterkey:
2946 buf.write("\n")
2947 doindent = True
2948 else:
2949 doindent = False
2950 for key in sorted(data):
2951 if doindent:
2952 buf.write(baseind * level)
2953 else:
2954 doindent = True
2955 buf.write(key)
2956 buf.write(": ")
2957 _SerializeGenericInfo(buf, data[key], level + 1, afterkey=True)
2958 elif isinstance(data, list) and len(data) > 0 and isinstance(data[0], tuple):
2959 # list of tuples (an ordered dictionary)
2960 # the tuples may have two or three members - key, value, and alignment group
2961 # if the alignment group is present, align all values sharing the same group
2962 if afterkey:
2963 buf.write("\n")
2964 doindent = True
2965 else:
2966 doindent = False
2967
2968 alignment_mapping = _GetAlignmentMapping(data)
2969 for entry in data:
2970 key, val = entry[0:2]
2971 if doindent:
2972 buf.write(baseind * level)
2973 else:
2974 doindent = True
2975 buf.write(key)
2976 buf.write(": ")
2977 if len(entry) > 2:
2978 max_key_length = alignment_mapping[entry[2]]
2979 buf.write(" " * (max_key_length - len(key)))
2980 _SerializeGenericInfo(buf, val, level + 1, afterkey=True)
2981 elif isinstance(data, tuple) and all(map(_NotAContainer, data)):
2982 # tuples with simple content are serialized as inline lists
2983 buf.write("[%s]\n" % utils.CommaJoin(data))
2984 elif isinstance(data, list) or isinstance(data, tuple):
2985 # lists and tuples
2986 if not data:
2987 buf.write("\n")
2988 else:
2989 if afterkey:
2990 buf.write("\n")
2991 doindent = True
2992 else:
2993 doindent = False
2994 for item in data:
2995 if doindent:
2996 buf.write(baseind * level)
2997 else:
2998 doindent = True
2999 buf.write("-")
3000 buf.write(baseind[1:])
3001 _SerializeGenericInfo(buf, item, level + 1)
3002 else:
3003 # This branch should be only taken for strings, but it's practically
3004 # impossible to guarantee that no other types are produced somewhere
3005 buf.write(str(data))
3006 buf.write("\n")
3007
3008
3009 def PrintGenericInfo(data):
3010 """Print information formatted according to the hierarchy.
3011
3012 The output is a valid YAML string.
3013
3014 @param data: the data to print. It's a hierarchical structure whose elements
3015 can be:
3016 - dictionaries, where keys are strings and values are of any of the
3017 types listed here
3018 - lists of tuples (key, value) or (key, value, alignment_group), where
3019 key is a string, value is of any of the types listed here, and
3020 alignment_group can be any hashable value; it's a way to encode
3021 ordered dictionaries; any entries sharing the same alignment group are
3022 aligned by appending whitespace before the value as needed
3023 - lists of any of the types listed here
3024 - strings
3025
3026 """
3027 buf = StringIO()
3028 _SerializeGenericInfo(buf, data, 0)
3029 ToStdout(buf.getvalue().rstrip("\n"))