Catch IOError of SSH files when removing node
[ganeti-github.git] / lib / cli.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Google Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions are
9 # met:
10 #
11 # 1. Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
13 #
14 # 2. Redistributions in binary form must reproduce the above copyright
15 # notice, this list of conditions and the following disclaimer in the
16 # documentation and/or other materials provided with the distribution.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
19 # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
20 # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30
31 """Module dealing with command line parsing"""
32
33
34 import sys
35 import textwrap
36 import os.path
37 import time
38 import logging
39 import errno
40 import itertools
41 import shlex
42 from cStringIO import StringIO
43
44 from ganeti import utils
45 from ganeti import errors
46 from ganeti import constants
47 from ganeti import opcodes
48 import ganeti.rpc.errors as rpcerr
49 import ganeti.rpc.node as rpc
50 from ganeti import ssh
51 from ganeti import compat
52 from ganeti import netutils
53 from ganeti import qlang
54 from ganeti import objects
55 from ganeti import pathutils
56 from ganeti import serializer
57 import ganeti.cli_opts
58 # Import constants
59 from ganeti.cli_opts import * # pylint: disable=W0401
60
61 from ganeti.runtime import (GetClient)
62
63 from optparse import (OptionParser, TitledHelpFormatter)
64
65
66 __all__ = [
67 # Generic functions for CLI programs
68 "ConfirmOperation",
69 "CreateIPolicyFromOpts",
70 "GenericMain",
71 "GenericInstanceCreate",
72 "GenericList",
73 "GenericListFields",
74 "GetClient",
75 "GetOnlineNodes",
76 "GetNodesSshPorts",
77 "GetNodeUUIDs",
78 "JobExecutor",
79 "JobSubmittedException",
80 "ParseTimespec",
81 "RunWhileClusterStopped",
82 "RunWhileDaemonsStopped",
83 "SubmitOpCode",
84 "SubmitOpCodeToDrainedQueue",
85 "SubmitOrSend",
86 "UsesRPC",
87 # Formatting functions
88 "ToStderr", "ToStdout",
89 "FormatError",
90 "FormatQueryResult",
91 "FormatParamsDictInfo",
92 "FormatPolicyInfo",
93 "PrintIPolicyCommand",
94 "PrintGenericInfo",
95 "GenerateTable",
96 "AskUser",
97 "FormatTimestamp",
98 "FormatLogMessage",
99 # Tags functions
100 "ListTags",
101 "AddTags",
102 "RemoveTags",
103 # command line options support infrastructure
104 "ARGS_MANY_INSTANCES",
105 "ARGS_MANY_NODES",
106 "ARGS_MANY_GROUPS",
107 "ARGS_MANY_NETWORKS",
108 "ARGS_MANY_FILTERS",
109 "ARGS_NONE",
110 "ARGS_ONE_INSTANCE",
111 "ARGS_ONE_NODE",
112 "ARGS_ONE_GROUP",
113 "ARGS_ONE_OS",
114 "ARGS_ONE_NETWORK",
115 "ARGS_ONE_FILTER",
116 "ArgChoice",
117 "ArgCommand",
118 "ArgFile",
119 "ArgGroup",
120 "ArgHost",
121 "ArgInstance",
122 "ArgJobId",
123 "ArgNetwork",
124 "ArgNode",
125 "ArgOs",
126 "ArgExtStorage",
127 "ArgFilter",
128 "ArgSuggest",
129 "ArgUnknown",
130 "FixHvParams",
131 "SplitNodeOption",
132 "CalculateOSNames",
133 "ParseFields",
134 ] + ganeti.cli_opts.__all__ # Command line options
135
136 # Query result status for clients
137 (QR_NORMAL,
138 QR_UNKNOWN,
139 QR_INCOMPLETE) = range(3)
140
141 #: Maximum batch size for ChooseJob
142 _CHOOSE_BATCH = 25
143
144
145 # constants used to create InstancePolicy dictionary
146 TISPECS_GROUP_TYPES = {
147 constants.ISPECS_MIN: constants.VTYPE_INT,
148 constants.ISPECS_MAX: constants.VTYPE_INT,
149 }
150
151 TISPECS_CLUSTER_TYPES = {
152 constants.ISPECS_MIN: constants.VTYPE_INT,
153 constants.ISPECS_MAX: constants.VTYPE_INT,
154 constants.ISPECS_STD: constants.VTYPE_INT,
155 }
156
157 #: User-friendly names for query2 field types
158 _QFT_NAMES = {
159 constants.QFT_UNKNOWN: "Unknown",
160 constants.QFT_TEXT: "Text",
161 constants.QFT_BOOL: "Boolean",
162 constants.QFT_NUMBER: "Number",
163 constants.QFT_NUMBER_FLOAT: "Floating-point number",
164 constants.QFT_UNIT: "Storage size",
165 constants.QFT_TIMESTAMP: "Timestamp",
166 constants.QFT_OTHER: "Custom",
167 }
168
169
170 class _Argument(object):
171 def __init__(self, min=0, max=None): # pylint: disable=W0622
172 self.min = min
173 self.max = max
174
175 def __repr__(self):
176 return ("<%s min=%s max=%s>" %
177 (self.__class__.__name__, self.min, self.max))
178
179
180 class ArgSuggest(_Argument):
181 """Suggesting argument.
182
183 Value can be any of the ones passed to the constructor.
184
185 """
186 # pylint: disable=W0622
187 def __init__(self, min=0, max=None, choices=None):
188 _Argument.__init__(self, min=min, max=max)
189 self.choices = choices
190
191 def __repr__(self):
192 return ("<%s min=%s max=%s choices=%r>" %
193 (self.__class__.__name__, self.min, self.max, self.choices))
194
195
196 class ArgChoice(ArgSuggest):
197 """Choice argument.
198
199 Value can be any of the ones passed to the constructor. Like L{ArgSuggest},
200 but value must be one of the choices.
201
202 """
203
204
205 class ArgUnknown(_Argument):
206 """Unknown argument to program (e.g. determined at runtime).
207
208 """
209
210
211 class ArgInstance(_Argument):
212 """Instances argument.
213
214 """
215
216
217 class ArgNode(_Argument):
218 """Node argument.
219
220 """
221
222
223 class ArgNetwork(_Argument):
224 """Network argument.
225
226 """
227
228
229 class ArgGroup(_Argument):
230 """Node group argument.
231
232 """
233
234
235 class ArgJobId(_Argument):
236 """Job ID argument.
237
238 """
239
240
241 class ArgFile(_Argument):
242 """File path argument.
243
244 """
245
246
247 class ArgCommand(_Argument):
248 """Command argument.
249
250 """
251
252
253 class ArgHost(_Argument):
254 """Host argument.
255
256 """
257
258
259 class ArgOs(_Argument):
260 """OS argument.
261
262 """
263
264
265 class ArgExtStorage(_Argument):
266 """ExtStorage argument.
267
268 """
269
270
271 class ArgFilter(_Argument):
272 """Filter UUID argument.
273
274 """
275
276
277 ARGS_NONE = []
278 ARGS_MANY_INSTANCES = [ArgInstance()]
279 ARGS_MANY_NETWORKS = [ArgNetwork()]
280 ARGS_MANY_NODES = [ArgNode()]
281 ARGS_MANY_GROUPS = [ArgGroup()]
282 ARGS_MANY_FILTERS = [ArgFilter()]
283 ARGS_ONE_INSTANCE = [ArgInstance(min=1, max=1)]
284 ARGS_ONE_NETWORK = [ArgNetwork(min=1, max=1)]
285 ARGS_ONE_NODE = [ArgNode(min=1, max=1)]
286 ARGS_ONE_GROUP = [ArgGroup(min=1, max=1)]
287 ARGS_ONE_OS = [ArgOs(min=1, max=1)]
288 ARGS_ONE_FILTER = [ArgFilter(min=1, max=1)]
289
290
291 def _ExtractTagsObject(opts, args):
292 """Extract the tag type object.
293
294 Note that this function will modify its args parameter.
295
296 """
297 if not hasattr(opts, "tag_type"):
298 raise errors.ProgrammerError("tag_type not passed to _ExtractTagsObject")
299 kind = opts.tag_type
300 if kind == constants.TAG_CLUSTER:
301 retval = kind, ""
302 elif kind in (constants.TAG_NODEGROUP,
303 constants.TAG_NODE,
304 constants.TAG_NETWORK,
305 constants.TAG_INSTANCE):
306 if not args:
307 raise errors.OpPrereqError("no arguments passed to the command",
308 errors.ECODE_INVAL)
309 name = args.pop(0)
310 retval = kind, name
311 else:
312 raise errors.ProgrammerError("Unhandled tag type '%s'" % kind)
313 return retval
314
315
316 def _ExtendTags(opts, args):
317 """Extend the args if a source file has been given.
318
319 This function will extend the tags with the contents of the file
320 passed in the 'tags_source' attribute of the opts parameter. A file
321 named '-' will be replaced by stdin.
322
323 """
324 fname = opts.tags_source
325 if fname is None:
326 return
327 if fname == "-":
328 new_fh = sys.stdin
329 else:
330 new_fh = open(fname, "r")
331 new_data = []
332 try:
333 # we don't use the nice 'new_data = [line.strip() for line in fh]'
334 # because of python bug 1633941
335 while True:
336 line = new_fh.readline()
337 if not line:
338 break
339 new_data.append(line.strip())
340 finally:
341 new_fh.close()
342 args.extend(new_data)
343
344
345 def ListTags(opts, args):
346 """List the tags on a given object.
347
348 This is a generic implementation that knows how to deal with all
349 three cases of tag objects (cluster, node, instance). The opts
350 argument is expected to contain a tag_type field denoting what
351 object type we work on.
352
353 """
354 kind, name = _ExtractTagsObject(opts, args)
355 cl = GetClient()
356 result = cl.QueryTags(kind, name)
357 result = list(result)
358 result.sort()
359 for tag in result:
360 ToStdout(tag)
361
362
363 def AddTags(opts, args):
364 """Add tags on a given object.
365
366 This is a generic implementation that knows how to deal with all
367 three cases of tag objects (cluster, node, instance). The opts
368 argument is expected to contain a tag_type field denoting what
369 object type we work on.
370
371 """
372 kind, name = _ExtractTagsObject(opts, args)
373 _ExtendTags(opts, args)
374 if not args:
375 raise errors.OpPrereqError("No tags to be added", errors.ECODE_INVAL)
376 op = opcodes.OpTagsSet(kind=kind, name=name, tags=args)
377 SubmitOrSend(op, opts)
378
379
380 def RemoveTags(opts, args):
381 """Remove tags from a given object.
382
383 This is a generic implementation that knows how to deal with all
384 three cases of tag objects (cluster, node, instance). The opts
385 argument is expected to contain a tag_type field denoting what
386 object type we work on.
387
388 """
389 kind, name = _ExtractTagsObject(opts, args)
390 _ExtendTags(opts, args)
391 if not args:
392 raise errors.OpPrereqError("No tags to be removed", errors.ECODE_INVAL)
393 op = opcodes.OpTagsDel(kind=kind, name=name, tags=args)
394 SubmitOrSend(op, opts)
395
396
397 class _ShowUsage(Exception):
398 """Exception class for L{_ParseArgs}.
399
400 """
401 def __init__(self, exit_error):
402 """Initializes instances of this class.
403
404 @type exit_error: bool
405 @param exit_error: Whether to report failure on exit
406
407 """
408 Exception.__init__(self)
409 self.exit_error = exit_error
410
411
412 class _ShowVersion(Exception):
413 """Exception class for L{_ParseArgs}.
414
415 """
416
417
418 def _ParseArgs(binary, argv, commands, aliases, env_override):
419 """Parser for the command line arguments.
420
421 This function parses the arguments and returns the function which
422 must be executed together with its (modified) arguments.
423
424 @param binary: Script name
425 @param argv: Command line arguments
426 @param commands: Dictionary containing command definitions
427 @param aliases: dictionary with command aliases {"alias": "target", ...}
428 @param env_override: list of env variables allowed for default args
429 @raise _ShowUsage: If usage description should be shown
430 @raise _ShowVersion: If version should be shown
431
432 """
433 assert not (env_override - set(commands))
434 assert not (set(aliases.keys()) & set(commands.keys()))
435
436 if len(argv) > 1:
437 cmd = argv[1]
438 else:
439 # No option or command given
440 raise _ShowUsage(exit_error=True)
441
442 if cmd == "--version":
443 raise _ShowVersion()
444 elif cmd == "--help":
445 raise _ShowUsage(exit_error=False)
446 elif not (cmd in commands or cmd in aliases):
447 raise _ShowUsage(exit_error=True)
448
449 # get command, unalias it, and look it up in commands
450 if cmd in aliases:
451 if aliases[cmd] not in commands:
452 raise errors.ProgrammerError("Alias '%s' maps to non-existing"
453 " command '%s'" % (cmd, aliases[cmd]))
454
455 cmd = aliases[cmd]
456
457 if cmd in env_override:
458 args_env_name = ("%s_%s" % (binary.replace("-", "_"), cmd)).upper()
459 env_args = os.environ.get(args_env_name)
460 if env_args:
461 argv = utils.InsertAtPos(argv, 2, shlex.split(env_args))
462
463 func, args_def, parser_opts, usage, description = commands[cmd]
464 parser = OptionParser(option_list=parser_opts + COMMON_OPTS,
465 description=description,
466 formatter=TitledHelpFormatter(),
467 usage="%%prog %s %s" % (cmd, usage))
468 parser.disable_interspersed_args()
469 options, args = parser.parse_args(args=argv[2:])
470
471 if not _CheckArguments(cmd, args_def, args):
472 return None, None, None
473
474 return func, options, args
475
476
477 def _FormatUsage(binary, commands):
478 """Generates a nice description of all commands.
479
480 @param binary: Script name
481 @param commands: Dictionary containing command definitions
482
483 """
484 # compute the max line length for cmd + usage
485 mlen = min(60, max(map(len, commands)))
486
487 yield "Usage: %s {command} [options...] [argument...]" % binary
488 yield "%s <command> --help to see details, or man %s" % (binary, binary)
489 yield ""
490 yield "Commands:"
491
492 # and format a nice command list
493 for (cmd, (_, _, _, _, help_text)) in sorted(commands.items()):
494 help_lines = textwrap.wrap(help_text, 79 - 3 - mlen)
495 yield " %-*s - %s" % (mlen, cmd, help_lines.pop(0))
496 for line in help_lines:
497 yield " %-*s %s" % (mlen, "", line)
498
499 yield ""
500
501
502 def _CheckArguments(cmd, args_def, args):
503 """Verifies the arguments using the argument definition.
504
505 Algorithm:
506
507 1. Abort with error if values specified by user but none expected.
508
509 1. For each argument in definition
510
511 1. Keep running count of minimum number of values (min_count)
512 1. Keep running count of maximum number of values (max_count)
513 1. If it has an unlimited number of values
514
515 1. Abort with error if it's not the last argument in the definition
516
517 1. If last argument has limited number of values
518
519 1. Abort with error if number of values doesn't match or is too large
520
521 1. Abort with error if user didn't pass enough values (min_count)
522
523 """
524 if args and not args_def:
525 ToStderr("Error: Command %s expects no arguments", cmd)
526 return False
527
528 min_count = None
529 max_count = None
530 check_max = None
531
532 last_idx = len(args_def) - 1
533
534 for idx, arg in enumerate(args_def):
535 if min_count is None:
536 min_count = arg.min
537 elif arg.min is not None:
538 min_count += arg.min
539
540 if max_count is None:
541 max_count = arg.max
542 elif arg.max is not None:
543 max_count += arg.max
544
545 if idx == last_idx:
546 check_max = (arg.max is not None)
547
548 elif arg.max is None:
549 raise errors.ProgrammerError("Only the last argument can have max=None")
550
551 if check_max:
552 # Command with exact number of arguments
553 if (min_count is not None and max_count is not None and
554 min_count == max_count and len(args) != min_count):
555 ToStderr("Error: Command %s expects %d argument(s)", cmd, min_count)
556 return False
557
558 # Command with limited number of arguments
559 if max_count is not None and len(args) > max_count:
560 ToStderr("Error: Command %s expects only %d argument(s)",
561 cmd, max_count)
562 return False
563
564 # Command with some required arguments
565 if min_count is not None and len(args) < min_count:
566 ToStderr("Error: Command %s expects at least %d argument(s)",
567 cmd, min_count)
568 return False
569
570 return True
571
572
573 def SplitNodeOption(value):
574 """Splits the value of a --node option.
575
576 """
577 if value and ":" in value:
578 return value.split(":", 1)
579 else:
580 return (value, None)
581
582
583 def CalculateOSNames(os_name, os_variants):
584 """Calculates all the names an OS can be called, according to its variants.
585
586 @type os_name: string
587 @param os_name: base name of the os
588 @type os_variants: list or None
589 @param os_variants: list of supported variants
590 @rtype: list
591 @return: list of valid names
592
593 """
594 if os_variants:
595 return ["%s+%s" % (os_name, v) for v in os_variants]
596 else:
597 return [os_name]
598
599
600 def ParseFields(selected, default):
601 """Parses the values of "--field"-like options.
602
603 @type selected: string or None
604 @param selected: User-selected options
605 @type default: list
606 @param default: Default fields
607
608 """
609 if selected is None:
610 return default
611
612 if selected.startswith("+"):
613 return default + selected[1:].split(",")
614
615 return selected.split(",")
616
617
618 UsesRPC = rpc.RunWithRPC
619
620
621 def AskUser(text, choices=None):
622 """Ask the user a question.
623
624 @param text: the question to ask
625
626 @param choices: list with elements tuples (input_char, return_value,
627 description); if not given, it will default to: [('y', True,
628 'Perform the operation'), ('n', False, 'Do no do the operation')];
629 note that the '?' char is reserved for help
630
631 @return: one of the return values from the choices list; if input is
632 not possible (i.e. not running with a tty, we return the last
633 entry from the list
634
635 """
636 if choices is None:
637 choices = [("y", True, "Perform the operation"),
638 ("n", False, "Do not perform the operation")]
639 if not choices or not isinstance(choices, list):
640 raise errors.ProgrammerError("Invalid choices argument to AskUser")
641 for entry in choices:
642 if not isinstance(entry, tuple) or len(entry) < 3 or entry[0] == "?":
643 raise errors.ProgrammerError("Invalid choices element to AskUser")
644
645 answer = choices[-1][1]
646 new_text = []
647 for line in text.splitlines():
648 new_text.append(textwrap.fill(line, 70, replace_whitespace=False))
649 text = "\n".join(new_text)
650 try:
651 f = file("/dev/tty", "a+")
652 except IOError:
653 return answer
654 try:
655 chars = [entry[0] for entry in choices]
656 chars[-1] = "[%s]" % chars[-1]
657 chars.append("?")
658 maps = dict([(entry[0], entry[1]) for entry in choices])
659 while True:
660 f.write(text)
661 f.write("\n")
662 f.write("/".join(chars))
663 f.write(": ")
664 line = f.readline(2).strip().lower()
665 if line in maps:
666 answer = maps[line]
667 break
668 elif line == "?":
669 for entry in choices:
670 f.write(" %s - %s\n" % (entry[0], entry[2]))
671 f.write("\n")
672 continue
673 finally:
674 f.close()
675 return answer
676
677
678 class JobSubmittedException(Exception):
679 """Job was submitted, client should exit.
680
681 This exception has one argument, the ID of the job that was
682 submitted. The handler should print this ID.
683
684 This is not an error, just a structured way to exit from clients.
685
686 """
687
688
689 def SendJob(ops, cl=None):
690 """Function to submit an opcode without waiting for the results.
691
692 @type ops: list
693 @param ops: list of opcodes
694 @type cl: luxi.Client
695 @param cl: the luxi client to use for communicating with the master;
696 if None, a new client will be created
697
698 """
699 if cl is None:
700 cl = GetClient()
701
702 job_id = cl.SubmitJob(ops)
703
704 return job_id
705
706
707 def GenericPollJob(job_id, cbs, report_cbs):
708 """Generic job-polling function.
709
710 @type job_id: number
711 @param job_id: Job ID
712 @type cbs: Instance of L{JobPollCbBase}
713 @param cbs: Data callbacks
714 @type report_cbs: Instance of L{JobPollReportCbBase}
715 @param report_cbs: Reporting callbacks
716
717 @return: the opresult of the job
718 @raise errors.JobLost: If job can't be found
719 @raise errors.OpExecError: If job didn't succeed
720
721 """
722 prev_job_info = None
723 prev_logmsg_serial = None
724
725 status = None
726
727 while True:
728 result = cbs.WaitForJobChangeOnce(job_id, ["status"], prev_job_info,
729 prev_logmsg_serial)
730 if not result:
731 # job not found, go away!
732 raise errors.JobLost("Job with id %s lost" % job_id)
733
734 if result == constants.JOB_NOTCHANGED:
735 report_cbs.ReportNotChanged(job_id, status)
736
737 # Wait again
738 continue
739
740 # Split result, a tuple of (field values, log entries)
741 (job_info, log_entries) = result
742 (status, ) = job_info
743
744 if log_entries:
745 for log_entry in log_entries:
746 (serial, timestamp, log_type, message) = log_entry
747 report_cbs.ReportLogMessage(job_id, serial, timestamp,
748 log_type, message)
749 prev_logmsg_serial = max(prev_logmsg_serial, serial)
750
751 # TODO: Handle canceled and archived jobs
752 elif status in (constants.JOB_STATUS_SUCCESS,
753 constants.JOB_STATUS_ERROR,
754 constants.JOB_STATUS_CANCELING,
755 constants.JOB_STATUS_CANCELED):
756 break
757
758 prev_job_info = job_info
759
760 jobs = cbs.QueryJobs([job_id], ["status", "opstatus", "opresult"])
761 if not jobs:
762 raise errors.JobLost("Job with id %s lost" % job_id)
763
764 status, opstatus, result = jobs[0]
765
766 if status == constants.JOB_STATUS_SUCCESS:
767 return result
768
769 if status in (constants.JOB_STATUS_CANCELING, constants.JOB_STATUS_CANCELED):
770 raise errors.OpExecError("Job was canceled")
771
772 has_ok = False
773 for idx, (status, msg) in enumerate(zip(opstatus, result)):
774 if status == constants.OP_STATUS_SUCCESS:
775 has_ok = True
776 elif status == constants.OP_STATUS_ERROR:
777 errors.MaybeRaise(msg)
778
779 if has_ok:
780 raise errors.OpExecError("partial failure (opcode %d): %s" %
781 (idx, msg))
782
783 raise errors.OpExecError(str(msg))
784
785 # default failure mode
786 raise errors.OpExecError(result)
787
788
789 class JobPollCbBase(object):
790 """Base class for L{GenericPollJob} callbacks.
791
792 """
793 def __init__(self):
794 """Initializes this class.
795
796 """
797
798 def WaitForJobChangeOnce(self, job_id, fields,
799 prev_job_info, prev_log_serial):
800 """Waits for changes on a job.
801
802 """
803 raise NotImplementedError()
804
805 def QueryJobs(self, job_ids, fields):
806 """Returns the selected fields for the selected job IDs.
807
808 @type job_ids: list of numbers
809 @param job_ids: Job IDs
810 @type fields: list of strings
811 @param fields: Fields
812
813 """
814 raise NotImplementedError()
815
816
817 class JobPollReportCbBase(object):
818 """Base class for L{GenericPollJob} reporting callbacks.
819
820 """
821 def __init__(self):
822 """Initializes this class.
823
824 """
825
826 def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
827 """Handles a log message.
828
829 """
830 raise NotImplementedError()
831
832 def ReportNotChanged(self, job_id, status):
833 """Called for if a job hasn't changed in a while.
834
835 @type job_id: number
836 @param job_id: Job ID
837 @type status: string or None
838 @param status: Job status if available
839
840 """
841 raise NotImplementedError()
842
843
844 class _LuxiJobPollCb(JobPollCbBase):
845 def __init__(self, cl):
846 """Initializes this class.
847
848 """
849 JobPollCbBase.__init__(self)
850 self.cl = cl
851
852 def WaitForJobChangeOnce(self, job_id, fields,
853 prev_job_info, prev_log_serial):
854 """Waits for changes on a job.
855
856 """
857 return self.cl.WaitForJobChangeOnce(job_id, fields,
858 prev_job_info, prev_log_serial)
859
860 def QueryJobs(self, job_ids, fields):
861 """Returns the selected fields for the selected job IDs.
862
863 """
864 return self.cl.QueryJobs(job_ids, fields)
865
866
867 class FeedbackFnJobPollReportCb(JobPollReportCbBase):
868 def __init__(self, feedback_fn):
869 """Initializes this class.
870
871 """
872 JobPollReportCbBase.__init__(self)
873
874 self.feedback_fn = feedback_fn
875
876 assert callable(feedback_fn)
877
878 def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
879 """Handles a log message.
880
881 """
882 self.feedback_fn((timestamp, log_type, log_msg))
883
884 def ReportNotChanged(self, job_id, status):
885 """Called if a job hasn't changed in a while.
886
887 """
888 # Ignore
889
890
891 class StdioJobPollReportCb(JobPollReportCbBase):
892 def __init__(self):
893 """Initializes this class.
894
895 """
896 JobPollReportCbBase.__init__(self)
897
898 self.notified_queued = False
899 self.notified_waitlock = False
900
901 def ReportLogMessage(self, job_id, serial, timestamp, log_type, log_msg):
902 """Handles a log message.
903
904 """
905 ToStdout("%s %s", time.ctime(utils.MergeTime(timestamp)),
906 FormatLogMessage(log_type, log_msg))
907
908 def ReportNotChanged(self, job_id, status):
909 """Called if a job hasn't changed in a while.
910
911 """
912 if status is None:
913 return
914
915 if status == constants.JOB_STATUS_QUEUED and not self.notified_queued:
916 ToStderr("Job %s is waiting in queue", job_id)
917 self.notified_queued = True
918
919 elif status == constants.JOB_STATUS_WAITING and not self.notified_waitlock:
920 ToStderr("Job %s is trying to acquire all necessary locks", job_id)
921 self.notified_waitlock = True
922
923
924 def FormatLogMessage(log_type, log_msg):
925 """Formats a job message according to its type.
926
927 """
928 if log_type != constants.ELOG_MESSAGE:
929 log_msg = str(log_msg)
930
931 return utils.SafeEncode(log_msg)
932
933
934 def PollJob(job_id, cl=None, feedback_fn=None, reporter=None):
935 """Function to poll for the result of a job.
936
937 @type job_id: job identified
938 @param job_id: the job to poll for results
939 @type cl: luxi.Client
940 @param cl: the luxi client to use for communicating with the master;
941 if None, a new client will be created
942
943 """
944 if cl is None:
945 cl = GetClient()
946
947 if reporter is None:
948 if feedback_fn:
949 reporter = FeedbackFnJobPollReportCb(feedback_fn)
950 else:
951 reporter = StdioJobPollReportCb()
952 elif feedback_fn:
953 raise errors.ProgrammerError("Can't specify reporter and feedback function")
954
955 return GenericPollJob(job_id, _LuxiJobPollCb(cl), reporter)
956
957
958 def SubmitOpCode(op, cl=None, feedback_fn=None, opts=None, reporter=None):
959 """Legacy function to submit an opcode.
960
961 This is just a simple wrapper over the construction of the processor
962 instance. It should be extended to better handle feedback and
963 interaction functions.
964
965 """
966 if cl is None:
967 cl = GetClient()
968
969 SetGenericOpcodeOpts([op], opts)
970
971 job_id = SendJob([op], cl=cl)
972 if hasattr(opts, "print_jobid") and opts.print_jobid:
973 ToStdout("%d" % job_id)
974
975 op_results = PollJob(job_id, cl=cl, feedback_fn=feedback_fn,
976 reporter=reporter)
977
978 return op_results[0]
979
980
981 def SubmitOpCodeToDrainedQueue(op):
982 """Forcefully insert a job in the queue, even if it is drained.
983
984 """
985 cl = GetClient()
986 job_id = cl.SubmitJobToDrainedQueue([op])
987 op_results = PollJob(job_id, cl=cl)
988 return op_results[0]
989
990
991 def SubmitOrSend(op, opts, cl=None, feedback_fn=None):
992 """Wrapper around SubmitOpCode or SendJob.
993
994 This function will decide, based on the 'opts' parameter, whether to
995 submit and wait for the result of the opcode (and return it), or
996 whether to just send the job and print its identifier. It is used in
997 order to simplify the implementation of the '--submit' option.
998
999 It will also process the opcodes if we're sending the via SendJob
1000 (otherwise SubmitOpCode does it).
1001
1002 """
1003 if opts and opts.submit_only:
1004 job = [op]
1005 SetGenericOpcodeOpts(job, opts)
1006 job_id = SendJob(job, cl=cl)
1007 if opts.print_jobid:
1008 ToStdout("%d" % job_id)
1009 raise JobSubmittedException(job_id)
1010 else:
1011 return SubmitOpCode(op, cl=cl, feedback_fn=feedback_fn, opts=opts)
1012
1013
1014 def _InitReasonTrail(op, opts):
1015 """Builds the first part of the reason trail
1016
1017 Builds the initial part of the reason trail, adding the user provided reason
1018 (if it exists) and the name of the command starting the operation.
1019
1020 @param op: the opcode the reason trail will be added to
1021 @param opts: the command line options selected by the user
1022
1023 """
1024 assert len(sys.argv) >= 2
1025 trail = []
1026
1027 if opts.reason:
1028 trail.append((constants.OPCODE_REASON_SRC_USER,
1029 opts.reason,
1030 utils.EpochNano()))
1031
1032 binary = os.path.basename(sys.argv[0])
1033 source = "%s:%s" % (constants.OPCODE_REASON_SRC_CLIENT, binary)
1034 command = sys.argv[1]
1035 trail.append((source, command, utils.EpochNano()))
1036 op.reason = trail
1037
1038
1039 def SetGenericOpcodeOpts(opcode_list, options):
1040 """Processor for generic options.
1041
1042 This function updates the given opcodes based on generic command
1043 line options (like debug, dry-run, etc.).
1044
1045 @param opcode_list: list of opcodes
1046 @param options: command line options or None
1047 @return: None (in-place modification)
1048
1049 """
1050 if not options:
1051 return
1052 for op in opcode_list:
1053 op.debug_level = options.debug
1054 if hasattr(options, "dry_run"):
1055 op.dry_run = options.dry_run
1056 if getattr(options, "priority", None) is not None:
1057 op.priority = options.priority
1058 _InitReasonTrail(op, options)
1059
1060
1061 def FormatError(err):
1062 """Return a formatted error message for a given error.
1063
1064 This function takes an exception instance and returns a tuple
1065 consisting of two values: first, the recommended exit code, and
1066 second, a string describing the error message (not
1067 newline-terminated).
1068
1069 """
1070 retcode = 1
1071 obuf = StringIO()
1072 msg = str(err)
1073 if isinstance(err, errors.ConfigurationError):
1074 txt = "Corrupt configuration file: %s" % msg
1075 logging.error(txt)
1076 obuf.write(txt + "\n")
1077 obuf.write("Aborting.")
1078 retcode = 2
1079 elif isinstance(err, errors.HooksAbort):
1080 obuf.write("Failure: hooks execution failed:\n")
1081 for node, script, out in err.args[0]:
1082 if out:
1083 obuf.write(" node: %s, script: %s, output: %s\n" %
1084 (node, script, out))
1085 else:
1086 obuf.write(" node: %s, script: %s (no output)\n" %
1087 (node, script))
1088 elif isinstance(err, errors.HooksFailure):
1089 obuf.write("Failure: hooks general failure: %s" % msg)
1090 elif isinstance(err, errors.ResolverError):
1091 this_host = netutils.Hostname.GetSysName()
1092 if err.args[0] == this_host:
1093 msg = "Failure: can't resolve my own hostname ('%s')"
1094 else:
1095 msg = "Failure: can't resolve hostname '%s'"
1096 obuf.write(msg % err.args[0])
1097 elif isinstance(err, errors.OpPrereqError):
1098 if len(err.args) == 2:
1099 obuf.write("Failure: prerequisites not met for this"
1100 " operation:\nerror type: %s, error details:\n%s" %
1101 (err.args[1], err.args[0]))
1102 else:
1103 obuf.write("Failure: prerequisites not met for this"
1104 " operation:\n%s" % msg)
1105 elif isinstance(err, errors.OpExecError):
1106 obuf.write("Failure: command execution error:\n%s" % msg)
1107 elif isinstance(err, errors.TagError):
1108 obuf.write("Failure: invalid tag(s) given:\n%s" % msg)
1109 elif isinstance(err, errors.JobQueueDrainError):
1110 obuf.write("Failure: the job queue is marked for drain and doesn't"
1111 " accept new requests\n")
1112 elif isinstance(err, errors.JobQueueFull):
1113 obuf.write("Failure: the job queue is full and doesn't accept new"
1114 " job submissions until old jobs are archived\n")
1115 elif isinstance(err, errors.TypeEnforcementError):
1116 obuf.write("Parameter Error: %s" % msg)
1117 elif isinstance(err, errors.ParameterError):
1118 obuf.write("Failure: unknown/wrong parameter name '%s'" % msg)
1119 elif isinstance(err, rpcerr.NoMasterError):
1120 if err.args[0] == pathutils.MASTER_SOCKET:
1121 daemon = "the master daemon"
1122 elif err.args[0] == pathutils.QUERY_SOCKET:
1123 daemon = "the config daemon"
1124 else:
1125 daemon = "socket '%s'" % str(err.args[0])
1126 obuf.write("Cannot communicate with %s.\nIs the process running"
1127 " and listening for connections?" % daemon)
1128 elif isinstance(err, rpcerr.TimeoutError):
1129 obuf.write("Timeout while talking to the master daemon. Jobs might have"
1130 " been submitted and will continue to run even if the call"
1131 " timed out. Useful commands in this situation are \"gnt-job"
1132 " list\", \"gnt-job cancel\" and \"gnt-job watch\". Error:\n")
1133 obuf.write(msg)
1134 elif isinstance(err, rpcerr.PermissionError):
1135 obuf.write("It seems you don't have permissions to connect to the"
1136 " master daemon.\nPlease retry as a different user.")
1137 elif isinstance(err, rpcerr.ProtocolError):
1138 obuf.write("Unhandled protocol error while talking to the master daemon:\n"
1139 "%s" % msg)
1140 elif isinstance(err, errors.JobLost):
1141 obuf.write("Error checking job status: %s" % msg)
1142 elif isinstance(err, errors.QueryFilterParseError):
1143 obuf.write("Error while parsing query filter: %s\n" % err.args[0])
1144 obuf.write("\n".join(err.GetDetails()))
1145 elif isinstance(err, errors.GenericError):
1146 obuf.write("Unhandled Ganeti error: %s" % msg)
1147 elif isinstance(err, JobSubmittedException):
1148 obuf.write("JobID: %s\n" % err.args[0])
1149 retcode = 0
1150 else:
1151 obuf.write("Unhandled exception: %s" % msg)
1152 return retcode, obuf.getvalue().rstrip("\n")
1153
1154
1155 def GenericMain(commands, override=None, aliases=None,
1156 env_override=frozenset()):
1157 """Generic main function for all the gnt-* commands.
1158
1159 @param commands: a dictionary with a special structure, see the design doc
1160 for command line handling.
1161 @param override: if not None, we expect a dictionary with keys that will
1162 override command line options; this can be used to pass
1163 options from the scripts to generic functions
1164 @param aliases: dictionary with command aliases {'alias': 'target, ...}
1165 @param env_override: list of environment names which are allowed to submit
1166 default args for commands
1167
1168 """
1169 # save the program name and the entire command line for later logging
1170 if sys.argv:
1171 binary = os.path.basename(sys.argv[0])
1172 if not binary:
1173 binary = sys.argv[0]
1174
1175 if len(sys.argv) >= 2:
1176 logname = utils.ShellQuoteArgs([binary, sys.argv[1]])
1177 else:
1178 logname = binary
1179
1180 cmdline = utils.ShellQuoteArgs([binary] + sys.argv[1:])
1181 else:
1182 binary = "<unknown program>"
1183 cmdline = "<unknown>"
1184
1185 if aliases is None:
1186 aliases = {}
1187
1188 try:
1189 (func, options, args) = _ParseArgs(binary, sys.argv, commands, aliases,
1190 env_override)
1191 except _ShowVersion:
1192 ToStdout("%s (ganeti %s) %s", binary, constants.VCS_VERSION,
1193 constants.RELEASE_VERSION)
1194 return constants.EXIT_SUCCESS
1195 except _ShowUsage, err:
1196 for line in _FormatUsage(binary, commands):
1197 ToStdout(line)
1198
1199 if err.exit_error:
1200 return constants.EXIT_FAILURE
1201 else:
1202 return constants.EXIT_SUCCESS
1203 except errors.ParameterError, err:
1204 result, err_msg = FormatError(err)
1205 ToStderr(err_msg)
1206 return 1
1207
1208 if func is None: # parse error
1209 return 1
1210
1211 if override is not None:
1212 for key, val in override.iteritems():
1213 setattr(options, key, val)
1214
1215 utils.SetupLogging(pathutils.LOG_COMMANDS, logname, debug=options.debug,
1216 stderr_logging=True)
1217
1218 logging.debug("Command line: %s", cmdline)
1219
1220 try:
1221 result = func(options, args)
1222 except (errors.GenericError, rpcerr.ProtocolError,
1223 JobSubmittedException), err:
1224 result, err_msg = FormatError(err)
1225 logging.exception("Error during command processing")
1226 ToStderr(err_msg)
1227 except KeyboardInterrupt:
1228 result = constants.EXIT_FAILURE
1229 ToStderr("Aborted. Note that if the operation created any jobs, they"
1230 " might have been submitted and"
1231 " will continue to run in the background.")
1232 except IOError, err:
1233 if err.errno == errno.EPIPE:
1234 # our terminal went away, we'll exit
1235 sys.exit(constants.EXIT_FAILURE)
1236 else:
1237 raise
1238
1239 return result
1240
1241
1242 def ParseNicOption(optvalue):
1243 """Parses the value of the --net option(s).
1244
1245 """
1246 try:
1247 nic_max = max(int(nidx[0]) + 1 for nidx in optvalue)
1248 except (TypeError, ValueError), err:
1249 raise errors.OpPrereqError("Invalid NIC index passed: %s" % str(err),
1250 errors.ECODE_INVAL)
1251
1252 nics = [{}] * nic_max
1253 for nidx, ndict in optvalue:
1254 nidx = int(nidx)
1255
1256 if not isinstance(ndict, dict):
1257 raise errors.OpPrereqError("Invalid nic/%d value: expected dict,"
1258 " got %s" % (nidx, ndict), errors.ECODE_INVAL)
1259
1260 utils.ForceDictType(ndict, constants.INIC_PARAMS_TYPES)
1261
1262 nics[nidx] = ndict
1263
1264 return nics
1265
1266
1267 def FixHvParams(hvparams):
1268 # In Ganeti 2.8.4 the separator for the usb_devices hvparam was changed from
1269 # comma to space because commas cannot be accepted on the command line
1270 # (they already act as the separator between different hvparams). Still,
1271 # RAPI should be able to accept commas for backwards compatibility.
1272 # Therefore, we convert spaces into commas here, and we keep the old
1273 # parsing logic everywhere else.
1274 try:
1275 new_usb_devices = hvparams[constants.HV_USB_DEVICES].replace(" ", ",")
1276 hvparams[constants.HV_USB_DEVICES] = new_usb_devices
1277 except KeyError:
1278 #No usb_devices, no modification required
1279 pass
1280
1281
1282 def GenericInstanceCreate(mode, opts, args):
1283 """Add an instance to the cluster via either creation or import.
1284
1285 @param mode: constants.INSTANCE_CREATE or constants.INSTANCE_IMPORT
1286 @param opts: the command line options selected by the user
1287 @type args: list
1288 @param args: should contain only one element, the new instance name
1289 @rtype: int
1290 @return: the desired exit code
1291
1292 """
1293 instance = args[0]
1294 forthcoming = opts.ensure_value("forthcoming", False)
1295 commit = opts.ensure_value("commit", False)
1296
1297 if forthcoming and commit:
1298 raise errors.OpPrereqError("Creating an instance only forthcoming and"
1299 " commiting it are mutally exclusive",
1300 errors.ECODE_INVAL)
1301
1302 (pnode, snode) = SplitNodeOption(opts.node)
1303
1304 hypervisor = None
1305 hvparams = {}
1306 if opts.hypervisor:
1307 hypervisor, hvparams = opts.hypervisor
1308
1309 if opts.nics:
1310 nics = ParseNicOption(opts.nics)
1311 elif opts.no_nics:
1312 # no nics
1313 nics = []
1314 elif mode == constants.INSTANCE_CREATE:
1315 # default of one nic, all auto
1316 nics = [{}]
1317 else:
1318 # mode == import
1319 nics = []
1320
1321 if opts.disk_template == constants.DT_DISKLESS:
1322 if opts.disks or opts.sd_size is not None:
1323 raise errors.OpPrereqError("Diskless instance but disk"
1324 " information passed", errors.ECODE_INVAL)
1325 disks = []
1326 else:
1327 if (not opts.disks and not opts.sd_size
1328 and mode == constants.INSTANCE_CREATE):
1329 raise errors.OpPrereqError("No disk information specified",
1330 errors.ECODE_INVAL)
1331 if opts.disks and opts.sd_size is not None:
1332 raise errors.OpPrereqError("Please use either the '--disk' or"
1333 " '-s' option", errors.ECODE_INVAL)
1334 if opts.sd_size is not None:
1335 opts.disks = [(0, {constants.IDISK_SIZE: opts.sd_size})]
1336
1337 if opts.disks:
1338 try:
1339 disk_max = max(int(didx[0]) + 1 for didx in opts.disks)
1340 except ValueError, err:
1341 raise errors.OpPrereqError("Invalid disk index passed: %s" % str(err),
1342 errors.ECODE_INVAL)
1343 disks = [{}] * disk_max
1344 else:
1345 disks = []
1346 for didx, ddict in opts.disks:
1347 didx = int(didx)
1348 if not isinstance(ddict, dict):
1349 msg = "Invalid disk/%d value: expected dict, got %s" % (didx, ddict)
1350 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
1351 elif constants.IDISK_SIZE in ddict:
1352 if constants.IDISK_ADOPT in ddict:
1353 raise errors.OpPrereqError("Only one of 'size' and 'adopt' allowed"
1354 " (disk %d)" % didx, errors.ECODE_INVAL)
1355 try:
1356 ddict[constants.IDISK_SIZE] = \
1357 utils.ParseUnit(ddict[constants.IDISK_SIZE])
1358 except ValueError, err:
1359 raise errors.OpPrereqError("Invalid disk size for disk %d: %s" %
1360 (didx, err), errors.ECODE_INVAL)
1361 elif constants.IDISK_ADOPT in ddict:
1362 if constants.IDISK_SPINDLES in ddict:
1363 raise errors.OpPrereqError("spindles is not a valid option when"
1364 " adopting a disk", errors.ECODE_INVAL)
1365 if mode == constants.INSTANCE_IMPORT:
1366 raise errors.OpPrereqError("Disk adoption not allowed for instance"
1367 " import", errors.ECODE_INVAL)
1368 ddict[constants.IDISK_SIZE] = 0
1369 else:
1370 raise errors.OpPrereqError("Missing size or adoption source for"
1371 " disk %d" % didx, errors.ECODE_INVAL)
1372 if constants.IDISK_SPINDLES in ddict:
1373 ddict[constants.IDISK_SPINDLES] = int(ddict[constants.IDISK_SPINDLES])
1374
1375 disks[didx] = ddict
1376
1377 if opts.tags is not None:
1378 tags = opts.tags.split(",")
1379 else:
1380 tags = []
1381
1382 utils.ForceDictType(opts.beparams, constants.BES_PARAMETER_COMPAT)
1383 utils.ForceDictType(hvparams, constants.HVS_PARAMETER_TYPES)
1384 FixHvParams(hvparams)
1385
1386 osparams_private = opts.osparams_private or serializer.PrivateDict()
1387 osparams_secret = opts.osparams_secret or serializer.PrivateDict()
1388
1389 helper_startup_timeout = opts.helper_startup_timeout
1390 helper_shutdown_timeout = opts.helper_shutdown_timeout
1391
1392 if mode == constants.INSTANCE_CREATE:
1393 start = opts.start
1394 os_type = opts.os
1395 force_variant = opts.force_variant
1396 src_node = None
1397 src_path = None
1398 no_install = opts.no_install
1399 identify_defaults = False
1400 compress = constants.IEC_NONE
1401 if opts.instance_communication is None:
1402 instance_communication = False
1403 else:
1404 instance_communication = opts.instance_communication
1405 elif mode == constants.INSTANCE_IMPORT:
1406 if forthcoming:
1407 raise errors.OpPrereqError("forthcoming instances can only be created,"
1408 " not imported")
1409 start = False
1410 os_type = None
1411 force_variant = False
1412 src_node = opts.src_node
1413 src_path = opts.src_dir
1414 no_install = None
1415 identify_defaults = opts.identify_defaults
1416 compress = opts.compress
1417 instance_communication = False
1418 else:
1419 raise errors.ProgrammerError("Invalid creation mode %s" % mode)
1420
1421 op = opcodes.OpInstanceCreate(
1422 forthcoming=forthcoming,
1423 commit=commit,
1424 instance_name=instance,
1425 disks=disks,
1426 disk_template=opts.disk_template,
1427 group_name=opts.nodegroup,
1428 nics=nics,
1429 conflicts_check=opts.conflicts_check,
1430 pnode=pnode, snode=snode,
1431 ip_check=opts.ip_check,
1432 name_check=opts.name_check,
1433 wait_for_sync=opts.wait_for_sync,
1434 file_storage_dir=opts.file_storage_dir,
1435 file_driver=opts.file_driver,
1436 iallocator=opts.iallocator,
1437 hypervisor=hypervisor,
1438 hvparams=hvparams,
1439 beparams=opts.beparams,
1440 osparams=opts.osparams,
1441 osparams_private=osparams_private,
1442 osparams_secret=osparams_secret,
1443 mode=mode,
1444 opportunistic_locking=opts.opportunistic_locking,
1445 start=start,
1446 os_type=os_type,
1447 force_variant=force_variant,
1448 src_node=src_node,
1449 src_path=src_path,
1450 compress=compress,
1451 tags=tags,
1452 no_install=no_install,
1453 identify_defaults=identify_defaults,
1454 ignore_ipolicy=opts.ignore_ipolicy,
1455 instance_communication=instance_communication,
1456 helper_startup_timeout=helper_startup_timeout,
1457 helper_shutdown_timeout=helper_shutdown_timeout)
1458
1459 SubmitOrSend(op, opts)
1460 return 0
1461
1462
1463 class _RunWhileDaemonsStoppedHelper(object):
1464 """Helper class for L{RunWhileDaemonsStopped} to simplify state management
1465
1466 """
1467 def __init__(self, feedback_fn, cluster_name, master_node,
1468 online_nodes, ssh_ports, exclude_daemons, debug,
1469 verbose):
1470 """Initializes this class.
1471
1472 @type feedback_fn: callable
1473 @param feedback_fn: Feedback function
1474 @type cluster_name: string
1475 @param cluster_name: Cluster name
1476 @type master_node: string
1477 @param master_node Master node name
1478 @type online_nodes: list
1479 @param online_nodes: List of names of online nodes
1480 @type ssh_ports: list
1481 @param ssh_ports: List of SSH ports of online nodes
1482 @type exclude_daemons: list of string
1483 @param exclude_daemons: list of daemons that will be restarted on master
1484 after all others are shutdown
1485 @type debug: boolean
1486 @param debug: show debug output
1487 @type verbose: boolesn
1488 @param verbose: show verbose output
1489
1490 """
1491 self.feedback_fn = feedback_fn
1492 self.cluster_name = cluster_name
1493 self.master_node = master_node
1494 self.online_nodes = online_nodes
1495 self.ssh_ports = dict(zip(online_nodes, ssh_ports))
1496
1497 self.ssh = ssh.SshRunner(self.cluster_name)
1498
1499 self.nonmaster_nodes = [name for name in online_nodes
1500 if name != master_node]
1501
1502 self.exclude_daemons = exclude_daemons
1503 self.debug = debug
1504 self.verbose = verbose
1505
1506 assert self.master_node not in self.nonmaster_nodes
1507
1508 def _RunCmd(self, node_name, cmd):
1509 """Runs a command on the local or a remote machine.
1510
1511 @type node_name: string
1512 @param node_name: Machine name
1513 @type cmd: list
1514 @param cmd: Command
1515
1516 """
1517 if node_name is None or node_name == self.master_node:
1518 # No need to use SSH
1519 result = utils.RunCmd(cmd)
1520 else:
1521 result = self.ssh.Run(node_name, constants.SSH_LOGIN_USER,
1522 utils.ShellQuoteArgs(cmd),
1523 port=self.ssh_ports[node_name])
1524
1525 if result.failed:
1526 errmsg = ["Failed to run command %s" % result.cmd]
1527 if node_name:
1528 errmsg.append("on node %s" % node_name)
1529 errmsg.append(": exitcode %s and error %s" %
1530 (result.exit_code, result.output))
1531 raise errors.OpExecError(" ".join(errmsg))
1532
1533 def Call(self, fn, *args):
1534 """Call function while all daemons are stopped.
1535
1536 @type fn: callable
1537 @param fn: Function to be called
1538
1539 """
1540 # Pause watcher by acquiring an exclusive lock on watcher state file
1541 self.feedback_fn("Blocking watcher")
1542 watcher_block = utils.FileLock.Open(pathutils.WATCHER_LOCK_FILE)
1543 try:
1544 # TODO: Currently, this just blocks. There's no timeout.
1545 # TODO: Should it be a shared lock?
1546 watcher_block.Exclusive(blocking=True)
1547
1548 # Stop master daemons, so that no new jobs can come in and all running
1549 # ones are finished
1550 self.feedback_fn("Stopping master daemons")
1551 self._RunCmd(None, [pathutils.DAEMON_UTIL, "stop-master"])
1552 try:
1553 # Stop daemons on all nodes
1554 online_nodes = [self.master_node] + [n for n in self.online_nodes
1555 if n != self.master_node]
1556 for node_name in online_nodes:
1557 self.feedback_fn("Stopping daemons on %s" % node_name)
1558 self._RunCmd(node_name, [pathutils.DAEMON_UTIL, "stop-all"])
1559 # Starting any daemons listed as exception
1560 if node_name == self.master_node:
1561 for daemon in self.exclude_daemons:
1562 self.feedback_fn("Starting daemon '%s' on %s" % (daemon,
1563 node_name))
1564 self._RunCmd(node_name, [pathutils.DAEMON_UTIL, "start", daemon])
1565
1566 # All daemons are shut down now
1567 try:
1568 return fn(self, *args)
1569 except Exception, err:
1570 _, errmsg = FormatError(err)
1571 logging.exception("Caught exception")
1572 self.feedback_fn(errmsg)
1573 raise
1574 finally:
1575 # Start cluster again, master node last
1576 for node_name in self.nonmaster_nodes + [self.master_node]:
1577 # Stopping any daemons listed as exception.
1578 # This might look unnecessary, but it makes sure that daemon-util
1579 # starts all daemons in the right order.
1580 if node_name == self.master_node:
1581 self.exclude_daemons.reverse()
1582 for daemon in self.exclude_daemons:
1583 self.feedback_fn("Stopping daemon '%s' on %s" % (daemon,
1584 node_name))
1585 self._RunCmd(node_name, [pathutils.DAEMON_UTIL, "stop", daemon])
1586 self.feedback_fn("Starting daemons on %s" % node_name)
1587 self._RunCmd(node_name, [pathutils.DAEMON_UTIL, "start-all"])
1588
1589 finally:
1590 # Resume watcher
1591 watcher_block.Close()
1592
1593
1594 def RunWhileDaemonsStopped(feedback_fn, exclude_daemons, fn, *args, **kwargs):
1595 """Calls a function while all cluster daemons are stopped.
1596
1597 @type feedback_fn: callable
1598 @param feedback_fn: Feedback function
1599 @type exclude_daemons: list of string
1600 @param exclude_daemons: list of daemons that stopped, but immediately
1601 restarted on the master to be available when calling
1602 'fn'. If None, all daemons will be stopped and none
1603 will be started before calling 'fn'.
1604 @type fn: callable
1605 @param fn: Function to be called when daemons are stopped
1606
1607 """
1608 feedback_fn("Gathering cluster information")
1609
1610 # This ensures we're running on the master daemon
1611 cl = GetClient()
1612
1613 (cluster_name, master_node) = \
1614 cl.QueryConfigValues(["cluster_name", "master_node"])
1615
1616 online_nodes = GetOnlineNodes([], cl=cl)
1617 ssh_ports = GetNodesSshPorts(online_nodes, cl)
1618
1619 # Don't keep a reference to the client. The master daemon will go away.
1620 del cl
1621
1622 assert master_node in online_nodes
1623 if exclude_daemons is None:
1624 exclude_daemons = []
1625
1626 debug = kwargs.get("debug", False)
1627 verbose = kwargs.get("verbose", False)
1628
1629 return _RunWhileDaemonsStoppedHelper(
1630 feedback_fn, cluster_name, master_node, online_nodes, ssh_ports,
1631 exclude_daemons, debug, verbose).Call(fn, *args)
1632
1633
1634 def RunWhileClusterStopped(feedback_fn, fn, *args):
1635 """Calls a function while all cluster daemons are stopped.
1636
1637 @type feedback_fn: callable
1638 @param feedback_fn: Feedback function
1639 @type fn: callable
1640 @param fn: Function to be called when daemons are stopped
1641
1642 """
1643 RunWhileDaemonsStopped(feedback_fn, None, fn, *args)
1644
1645
1646 def GenerateTable(headers, fields, separator, data,
1647 numfields=None, unitfields=None,
1648 units=None):
1649 """Prints a table with headers and different fields.
1650
1651 @type headers: dict
1652 @param headers: dictionary mapping field names to headers for
1653 the table
1654 @type fields: list
1655 @param fields: the field names corresponding to each row in
1656 the data field
1657 @param separator: the separator to be used; if this is None,
1658 the default 'smart' algorithm is used which computes optimal
1659 field width, otherwise just the separator is used between
1660 each field
1661 @type data: list
1662 @param data: a list of lists, each sublist being one row to be output
1663 @type numfields: list
1664 @param numfields: a list with the fields that hold numeric
1665 values and thus should be right-aligned
1666 @type unitfields: list
1667 @param unitfields: a list with the fields that hold numeric
1668 values that should be formatted with the units field
1669 @type units: string or None
1670 @param units: the units we should use for formatting, or None for
1671 automatic choice (human-readable for non-separator usage, otherwise
1672 megabytes); this is a one-letter string
1673
1674 """
1675 if units is None:
1676 if separator:
1677 units = "m"
1678 else:
1679 units = "h"
1680
1681 if numfields is None:
1682 numfields = []
1683 if unitfields is None:
1684 unitfields = []
1685
1686 numfields = utils.FieldSet(*numfields) # pylint: disable=W0142
1687 unitfields = utils.FieldSet(*unitfields) # pylint: disable=W0142
1688
1689 format_fields = []
1690 for field in fields:
1691 if headers and field not in headers:
1692 # TODO: handle better unknown fields (either revert to old
1693 # style of raising exception, or deal more intelligently with
1694 # variable fields)
1695 headers[field] = field
1696 if separator is not None:
1697 format_fields.append("%s")
1698 elif numfields.Matches(field):
1699 format_fields.append("%*s")
1700 else:
1701 format_fields.append("%-*s")
1702
1703 if separator is None:
1704 mlens = [0 for name in fields]
1705 format_str = " ".join(format_fields)
1706 else:
1707 format_str = separator.replace("%", "%%").join(format_fields)
1708
1709 for row in data:
1710 if row is None:
1711 continue
1712 for idx, val in enumerate(row):
1713 if unitfields.Matches(fields[idx]):
1714 try:
1715 val = int(val)
1716 except (TypeError, ValueError):
1717 pass
1718 else:
1719 val = row[idx] = utils.FormatUnit(val, units)
1720 val = row[idx] = str(val)
1721 if separator is None:
1722 mlens[idx] = max(mlens[idx], len(val))
1723
1724 result = []
1725 if headers:
1726 args = []
1727 for idx, name in enumerate(fields):
1728 hdr = headers[name]
1729 if separator is None:
1730 mlens[idx] = max(mlens[idx], len(hdr))
1731 args.append(mlens[idx])
1732 args.append(hdr)
1733 result.append(format_str % tuple(args))
1734
1735 if separator is None:
1736 assert len(mlens) == len(fields)
1737
1738 if fields and not numfields.Matches(fields[-1]):
1739 mlens[-1] = 0
1740
1741 for line in data:
1742 args = []
1743 if line is None:
1744 line = ["-" for _ in fields]
1745 for idx in range(len(fields)):
1746 if separator is None:
1747 args.append(mlens[idx])
1748 args.append(line[idx])
1749 result.append(format_str % tuple(args))
1750
1751 return result
1752
1753
1754 def _FormatBool(value):
1755 """Formats a boolean value as a string.
1756
1757 """
1758 if value:
1759 return "Y"
1760 return "N"
1761
1762
1763 #: Default formatting for query results; (callback, align right)
1764 _DEFAULT_FORMAT_QUERY = {
1765 constants.QFT_TEXT: (str, False),
1766 constants.QFT_BOOL: (_FormatBool, False),
1767 constants.QFT_NUMBER: (str, True),
1768 constants.QFT_NUMBER_FLOAT: (str, True),
1769 constants.QFT_TIMESTAMP: (utils.FormatTime, False),
1770 constants.QFT_OTHER: (str, False),
1771 constants.QFT_UNKNOWN: (str, False),
1772 }
1773
1774
1775 def _GetColumnFormatter(fdef, override, unit):
1776 """Returns formatting function for a field.
1777
1778 @type fdef: L{objects.QueryFieldDefinition}
1779 @type override: dict
1780 @param override: Dictionary for overriding field formatting functions,
1781 indexed by field name, contents like L{_DEFAULT_FORMAT_QUERY}
1782 @type unit: string
1783 @param unit: Unit used for formatting fields of type L{constants.QFT_UNIT}
1784 @rtype: tuple; (callable, bool)
1785 @return: Returns the function to format a value (takes one parameter) and a
1786 boolean for aligning the value on the right-hand side
1787
1788 """
1789 fmt = override.get(fdef.name, None)
1790 if fmt is not None:
1791 return fmt
1792
1793 assert constants.QFT_UNIT not in _DEFAULT_FORMAT_QUERY
1794
1795 if fdef.kind == constants.QFT_UNIT:
1796 # Can't keep this information in the static dictionary
1797 return (lambda value: utils.FormatUnit(value, unit), True)
1798
1799 fmt = _DEFAULT_FORMAT_QUERY.get(fdef.kind, None)
1800 if fmt is not None:
1801 return fmt
1802
1803 raise NotImplementedError("Can't format column type '%s'" % fdef.kind)
1804
1805
1806 class _QueryColumnFormatter(object):
1807 """Callable class for formatting fields of a query.
1808
1809 """
1810 def __init__(self, fn, status_fn, verbose):
1811 """Initializes this class.
1812
1813 @type fn: callable
1814 @param fn: Formatting function
1815 @type status_fn: callable
1816 @param status_fn: Function to report fields' status
1817 @type verbose: boolean
1818 @param verbose: whether to use verbose field descriptions or not
1819
1820 """
1821 self._fn = fn
1822 self._status_fn = status_fn
1823 self._verbose = verbose
1824
1825 def __call__(self, data):
1826 """Returns a field's string representation.
1827
1828 """
1829 (status, value) = data
1830
1831 # Report status
1832 self._status_fn(status)
1833
1834 if status == constants.RS_NORMAL:
1835 return self._fn(value)
1836
1837 assert value is None, \
1838 "Found value %r for abnormal status %s" % (value, status)
1839
1840 return FormatResultError(status, self._verbose)
1841
1842
1843 def FormatResultError(status, verbose):
1844 """Formats result status other than L{constants.RS_NORMAL}.
1845
1846 @param status: The result status
1847 @type verbose: boolean
1848 @param verbose: Whether to return the verbose text
1849 @return: Text of result status
1850
1851 """
1852 assert status != constants.RS_NORMAL, \
1853 "FormatResultError called with status equal to constants.RS_NORMAL"
1854 try:
1855 (verbose_text, normal_text) = constants.RSS_DESCRIPTION[status]
1856 except KeyError:
1857 raise NotImplementedError("Unknown status %s" % status)
1858 else:
1859 if verbose:
1860 return verbose_text
1861 return normal_text
1862
1863
1864 def FormatQueryResult(result, unit=None, format_override=None, separator=None,
1865 header=False, verbose=False):
1866 """Formats data in L{objects.QueryResponse}.
1867
1868 @type result: L{objects.QueryResponse}
1869 @param result: result of query operation
1870 @type unit: string
1871 @param unit: Unit used for formatting fields of type L{constants.QFT_UNIT},
1872 see L{utils.text.FormatUnit}
1873 @type format_override: dict
1874 @param format_override: Dictionary for overriding field formatting functions,
1875 indexed by field name, contents like L{_DEFAULT_FORMAT_QUERY}
1876 @type separator: string or None
1877 @param separator: String used to separate fields
1878 @type header: bool
1879 @param header: Whether to output header row
1880 @type verbose: boolean
1881 @param verbose: whether to use verbose field descriptions or not
1882
1883 """
1884 if unit is None:
1885 if separator:
1886 unit = "m"
1887 else:
1888 unit = "h"
1889
1890 if format_override is None:
1891 format_override = {}
1892
1893 stats = dict.fromkeys(constants.RS_ALL, 0)
1894
1895 def _RecordStatus(status):
1896 if status in stats:
1897 stats[status] += 1
1898
1899 columns = []
1900 for fdef in result.fields:
1901 assert fdef.title and fdef.name
1902 (fn, align_right) = _GetColumnFormatter(fdef, format_override, unit)
1903 columns.append(TableColumn(fdef.title,
1904 _QueryColumnFormatter(fn, _RecordStatus,
1905 verbose),
1906 align_right))
1907
1908 table = FormatTable(result.data, columns, header, separator)
1909
1910 # Collect statistics
1911 assert len(stats) == len(constants.RS_ALL)
1912 assert compat.all(count >= 0 for count in stats.values())
1913
1914 # Determine overall status. If there was no data, unknown fields must be
1915 # detected via the field definitions.
1916 if (stats[constants.RS_UNKNOWN] or
1917 (not result.data and _GetUnknownFields(result.fields))):
1918 status = QR_UNKNOWN
1919 elif compat.any(count > 0 for key, count in stats.items()
1920 if key != constants.RS_NORMAL):
1921 status = QR_INCOMPLETE
1922 else:
1923 status = QR_NORMAL
1924
1925 return (status, table)
1926
1927
1928 def _GetUnknownFields(fdefs):
1929 """Returns list of unknown fields included in C{fdefs}.
1930
1931 @type fdefs: list of L{objects.QueryFieldDefinition}
1932
1933 """
1934 return [fdef for fdef in fdefs
1935 if fdef.kind == constants.QFT_UNKNOWN]
1936
1937
1938 def _WarnUnknownFields(fdefs):
1939 """Prints a warning to stderr if a query included unknown fields.
1940
1941 @type fdefs: list of L{objects.QueryFieldDefinition}
1942
1943 """
1944 unknown = _GetUnknownFields(fdefs)
1945 if unknown:
1946 ToStderr("Warning: Queried for unknown fields %s",
1947 utils.CommaJoin(fdef.name for fdef in unknown))
1948 return True
1949
1950 return False
1951
1952
1953 def GenericList(resource, fields, names, unit, separator, header, cl=None,
1954 format_override=None, verbose=False, force_filter=False,
1955 namefield=None, qfilter=None, isnumeric=False):
1956 """Generic implementation for listing all items of a resource.
1957
1958 @param resource: One of L{constants.QR_VIA_LUXI}
1959 @type fields: list of strings
1960 @param fields: List of fields to query for
1961 @type names: list of strings
1962 @param names: Names of items to query for
1963 @type unit: string or None
1964 @param unit: Unit used for formatting fields of type L{constants.QFT_UNIT} or
1965 None for automatic choice (human-readable for non-separator usage,
1966 otherwise megabytes); this is a one-letter string
1967 @type separator: string or None
1968 @param separator: String used to separate fields
1969 @type header: bool
1970 @param header: Whether to show header row
1971 @type force_filter: bool
1972 @param force_filter: Whether to always treat names as filter
1973 @type format_override: dict
1974 @param format_override: Dictionary for overriding field formatting functions,
1975 indexed by field name, contents like L{_DEFAULT_FORMAT_QUERY}
1976 @type verbose: boolean
1977 @param verbose: whether to use verbose field descriptions or not
1978 @type namefield: string
1979 @param namefield: Name of field to use for simple filters (see
1980 L{qlang.MakeFilter} for details)
1981 @type qfilter: list or None
1982 @param qfilter: Query filter (in addition to names)
1983 @param isnumeric: bool
1984 @param isnumeric: Whether the namefield's type is numeric, and therefore
1985 any simple filters built by namefield should use integer values to
1986 reflect that
1987
1988 """
1989 if not names:
1990 names = None
1991
1992 namefilter = qlang.MakeFilter(names, force_filter, namefield=namefield,
1993 isnumeric=isnumeric)
1994
1995 if qfilter is None:
1996 qfilter = namefilter
1997 elif namefilter is not None:
1998 qfilter = [qlang.OP_AND, namefilter, qfilter]
1999
2000 if cl is None:
2001 cl = GetClient()
2002
2003 response = cl.Query(resource, fields, qfilter)
2004
2005 found_unknown = _WarnUnknownFields(response.fields)
2006
2007 (status, data) = FormatQueryResult(response, unit=unit, separator=separator,
2008 header=header,
2009 format_override=format_override,
2010 verbose=verbose)
2011
2012 for line in data:
2013 ToStdout(line)
2014
2015 assert ((found_unknown and status == QR_UNKNOWN) or
2016 (not found_unknown and status != QR_UNKNOWN))
2017
2018 if status == QR_UNKNOWN:
2019 return constants.EXIT_UNKNOWN_FIELD
2020
2021 # TODO: Should the list command fail if not all data could be collected?
2022 return constants.EXIT_SUCCESS
2023
2024
2025 def _FieldDescValues(fdef):
2026 """Helper function for L{GenericListFields} to get query field description.
2027
2028 @type fdef: L{objects.QueryFieldDefinition}
2029 @rtype: list
2030
2031 """
2032 return [
2033 fdef.name,
2034 _QFT_NAMES.get(fdef.kind, fdef.kind),
2035 fdef.title,
2036 fdef.doc,
2037 ]
2038
2039
2040 def GenericListFields(resource, fields, separator, header, cl=None):
2041 """Generic implementation for listing fields for a resource.
2042
2043 @param resource: One of L{constants.QR_VIA_LUXI}
2044 @type fields: list of strings
2045 @param fields: List of fields to query for
2046 @type separator: string or None
2047 @param separator: String used to separate fields
2048 @type header: bool
2049 @param header: Whether to show header row
2050
2051 """
2052 if cl is None:
2053 cl = GetClient()
2054
2055 if not fields:
2056 fields = None
2057
2058 response = cl.QueryFields(resource, fields)
2059
2060 found_unknown = _WarnUnknownFields(response.fields)
2061
2062 columns = [
2063 TableColumn("Name", str, False),
2064 TableColumn("Type", str, False),
2065 TableColumn("Title", str, False),
2066 TableColumn("Description", str, False),
2067 ]
2068
2069 rows = map(_FieldDescValues, response.fields)
2070
2071 for line in FormatTable(rows, columns, header, separator):
2072 ToStdout(line)
2073
2074 if found_unknown:
2075 return constants.EXIT_UNKNOWN_FIELD
2076
2077 return constants.EXIT_SUCCESS
2078
2079
2080 class TableColumn(object):
2081 """Describes a column for L{FormatTable}.
2082
2083 """
2084 def __init__(self, title, fn, align_right):
2085 """Initializes this class.
2086
2087 @type title: string
2088 @param title: Column title
2089 @type fn: callable
2090 @param fn: Formatting function
2091 @type align_right: bool
2092 @param align_right: Whether to align values on the right-hand side
2093
2094 """
2095 self.title = title
2096 self.format = fn
2097 self.align_right = align_right
2098
2099
2100 def _GetColFormatString(width, align_right):
2101 """Returns the format string for a field.
2102
2103 """
2104 if align_right:
2105 sign = ""
2106 else:
2107 sign = "-"
2108
2109 return "%%%s%ss" % (sign, width)
2110
2111
2112 def FormatTable(rows, columns, header, separator):
2113 """Formats data as a table.
2114
2115 @type rows: list of lists
2116 @param rows: Row data, one list per row
2117 @type columns: list of L{TableColumn}
2118 @param columns: Column descriptions
2119 @type header: bool
2120 @param header: Whether to show header row
2121 @type separator: string or None
2122 @param separator: String used to separate columns
2123
2124 """
2125 if header:
2126 data = [[col.title for col in columns]]
2127 colwidth = [len(col.title) for col in columns]
2128 else:
2129 data = []
2130 colwidth = [0 for _ in columns]
2131
2132 # Format row data
2133 for row in rows:
2134 assert len(row) == len(columns)
2135
2136 formatted = [col.format(value) for value, col in zip(row, columns)]
2137
2138 if separator is None:
2139 # Update column widths
2140 for idx, (oldwidth, value) in enumerate(zip(colwidth, formatted)):
2141 # Modifying a list's items while iterating is fine
2142 colwidth[idx] = max(oldwidth, len(value))
2143
2144 data.append(formatted)
2145
2146 if separator is not None:
2147 # Return early if a separator is used
2148 return [separator.join(row) for row in data]
2149
2150 if columns and not columns[-1].align_right:
2151 # Avoid unnecessary spaces at end of line
2152 colwidth[-1] = 0
2153
2154 # Build format string
2155 fmt = " ".join([_GetColFormatString(width, col.align_right)
2156 for col, width in zip(columns, colwidth)])
2157
2158 return [fmt % tuple(row) for row in data]
2159
2160
2161 def FormatTimestamp(ts):
2162 """Formats a given timestamp.
2163
2164 @type ts: timestamp
2165 @param ts: a timeval-type timestamp, a tuple of seconds and microseconds
2166
2167 @rtype: string
2168 @return: a string with the formatted timestamp
2169
2170 """
2171 if not isinstance(ts, (tuple, list)) or len(ts) != 2:
2172 return "?"
2173
2174 (sec, usecs) = ts
2175 return utils.FormatTime(sec, usecs=usecs)
2176
2177
2178 def ParseTimespec(value):
2179 """Parse a time specification.
2180
2181 The following suffixed will be recognized:
2182
2183 - s: seconds
2184 - m: minutes
2185 - h: hours
2186 - d: day
2187 - w: weeks
2188
2189 Without any suffix, the value will be taken to be in seconds.
2190
2191 """
2192 value = str(value)
2193 if not value:
2194 raise errors.OpPrereqError("Empty time specification passed",
2195 errors.ECODE_INVAL)
2196 suffix_map = {
2197 "s": 1,
2198 "m": 60,
2199 "h": 3600,
2200 "d": 86400,
2201 "w": 604800,
2202 }
2203 if value[-1] not in suffix_map:
2204 try:
2205 value = int(value)
2206 except (TypeError, ValueError):
2207 raise errors.OpPrereqError("Invalid time specification '%s'" % value,
2208 errors.ECODE_INVAL)
2209 else:
2210 multiplier = suffix_map[value[-1]]
2211 value = value[:-1]
2212 if not value: # no data left after stripping the suffix
2213 raise errors.OpPrereqError("Invalid time specification (only"
2214 " suffix passed)", errors.ECODE_INVAL)
2215 try:
2216 value = int(value) * multiplier
2217 except (TypeError, ValueError):
2218 raise errors.OpPrereqError("Invalid time specification '%s'" % value,
2219 errors.ECODE_INVAL)
2220 return value
2221
2222
2223 def GetOnlineNodes(nodes, cl=None, nowarn=False, secondary_ips=False,
2224 filter_master=False, nodegroup=None):
2225 """Returns the names of online nodes.
2226
2227 This function will also log a warning on stderr with the names of
2228 the online nodes.
2229
2230 @param nodes: if not empty, use only this subset of nodes (minus the
2231 offline ones)
2232 @param cl: if not None, luxi client to use
2233 @type nowarn: boolean
2234 @param nowarn: by default, this function will output a note with the
2235 offline nodes that are skipped; if this parameter is True the
2236 note is not displayed
2237 @type secondary_ips: boolean
2238 @param secondary_ips: if True, return the secondary IPs instead of the
2239 names, useful for doing network traffic over the replication interface
2240 (if any)
2241 @type filter_master: boolean
2242 @param filter_master: if True, do not return the master node in the list
2243 (useful in coordination with secondary_ips where we cannot check our
2244 node name against the list)
2245 @type nodegroup: string
2246 @param nodegroup: If set, only return nodes in this node group
2247
2248 """
2249 if cl is None:
2250 cl = GetClient()
2251
2252 qfilter = []
2253
2254 if nodes:
2255 qfilter.append(qlang.MakeSimpleFilter("name", nodes))
2256
2257 if nodegroup is not None:
2258 qfilter.append([qlang.OP_OR, [qlang.OP_EQUAL, "group", nodegroup],
2259 [qlang.OP_EQUAL, "group.uuid", nodegroup]])
2260
2261 if filter_master:
2262 qfilter.append([qlang.OP_NOT, [qlang.OP_TRUE, "master"]])
2263
2264 if qfilter:
2265 if len(qfilter) > 1:
2266 final_filter = [qlang.OP_AND] + qfilter
2267 else:
2268 assert len(qfilter) == 1
2269 final_filter = qfilter[0]
2270 else:
2271 final_filter = None
2272
2273 result = cl.Query(constants.QR_NODE, ["name", "offline", "sip"], final_filter)
2274
2275 def _IsOffline(row):
2276 (_, (_, offline), _) = row
2277 return offline
2278
2279 def _GetName(row):
2280 ((_, name), _, _) = row
2281 return name
2282
2283 def _GetSip(row):
2284 (_, _, (_, sip)) = row
2285 return sip
2286
2287 (offline, online) = compat.partition(result.data, _IsOffline)
2288
2289 if offline and not nowarn:
2290 ToStderr("Note: skipping offline node(s): %s" %
2291 utils.CommaJoin(map(_GetName, offline)))
2292
2293 if secondary_ips:
2294 fn = _GetSip
2295 else:
2296 fn = _GetName
2297
2298 return map(fn, online)
2299
2300
2301 def GetNodesSshPorts(nodes, cl):
2302 """Retrieves SSH ports of given nodes.
2303
2304 @param nodes: the names of nodes
2305 @type nodes: a list of strings
2306 @param cl: a client to use for the query
2307 @type cl: L{ganeti.luxi.Client}
2308 @return: the list of SSH ports corresponding to the nodes
2309 @rtype: a list of tuples
2310
2311 """
2312 return map(lambda t: t[0],
2313 cl.QueryNodes(names=nodes,
2314 fields=["ndp/ssh_port"],
2315 use_locking=False))
2316
2317
2318 def GetNodeUUIDs(nodes, cl):
2319 """Retrieves the UUIDs of given nodes.
2320
2321 @param nodes: the names of nodes
2322 @type nodes: a list of string
2323 @param cl: a client to use for the query
2324 @type cl: L{ganeti.luxi.Client}
2325 @return: the list of UUIDs corresponding to the nodes
2326 @rtype: a list of tuples
2327
2328 """
2329 return map(lambda t: t[0],
2330 cl.QueryNodes(names=nodes,
2331 fields=["uuid"],
2332 use_locking=False))
2333
2334
2335 def _ToStream(stream, txt, *args):
2336 """Write a message to a stream, bypassing the logging system
2337
2338 @type stream: file object
2339 @param stream: the file to which we should write
2340 @type txt: str
2341 @param txt: the message
2342
2343 """
2344 try:
2345 if args:
2346 args = tuple(args)
2347 stream.write(txt % args)
2348 else:
2349 stream.write(txt)
2350 stream.write("\n")
2351 stream.flush()
2352 except IOError, err:
2353 if err.errno == errno.EPIPE:
2354 # our terminal went away, we'll exit
2355 sys.exit(constants.EXIT_FAILURE)
2356 else:
2357 raise
2358
2359
2360 def ToStdout(txt, *args):
2361 """Write a message to stdout only, bypassing the logging system
2362
2363 This is just a wrapper over _ToStream.
2364
2365 @type txt: str
2366 @param txt: the message
2367
2368 """
2369 _ToStream(sys.stdout, txt, *args)
2370
2371
2372 def ToStderr(txt, *args):
2373 """Write a message to stderr only, bypassing the logging system
2374
2375 This is just a wrapper over _ToStream.
2376
2377 @type txt: str
2378 @param txt: the message
2379
2380 """
2381 _ToStream(sys.stderr, txt, *args)
2382
2383
2384 class JobExecutor(object):
2385 """Class which manages the submission and execution of multiple jobs.
2386
2387 Note that instances of this class should not be reused between
2388 GetResults() calls.
2389
2390 """
2391 def __init__(self, cl=None, verbose=True, opts=None, feedback_fn=None):
2392 self.queue = []
2393 if cl is None:
2394 cl = GetClient()
2395 self.cl = cl
2396 self.verbose = verbose
2397 self.jobs = []
2398 self.opts = opts
2399 self.feedback_fn = feedback_fn
2400 self._counter = itertools.count()
2401
2402 @staticmethod
2403 def _IfName(name, fmt):
2404 """Helper function for formatting name.
2405
2406 """
2407 if name:
2408 return fmt % name
2409
2410 return ""
2411
2412 def QueueJob(self, name, *ops):
2413 """Record a job for later submit.
2414
2415 @type name: string
2416 @param name: a description of the job, will be used in WaitJobSet
2417
2418 """
2419 SetGenericOpcodeOpts(ops, self.opts)
2420 self.queue.append((self._counter.next(), name, ops))
2421
2422 def AddJobId(self, name, status, job_id):
2423 """Adds a job ID to the internal queue.
2424
2425 """
2426 self.jobs.append((self._counter.next(), status, job_id, name))
2427
2428 def SubmitPending(self, each=False):
2429 """Submit all pending jobs.
2430
2431 """
2432 if each:
2433 results = []
2434 for (_, _, ops) in self.queue:
2435 # SubmitJob will remove the success status, but raise an exception if
2436 # the submission fails, so we'll notice that anyway.
2437 results.append([True, self.cl.SubmitJob(ops)[0]])
2438 else:
2439 results = self.cl.SubmitManyJobs([ops for (_, _, ops) in self.queue])
2440 for ((status, data), (idx, name, _)) in zip(results, self.queue):
2441 self.jobs.append((idx, status, data, name))
2442
2443 def _ChooseJob(self):
2444 """Choose a non-waiting/queued job to poll next.
2445
2446 """
2447 assert self.jobs, "_ChooseJob called with empty job list"
2448
2449 result = self.cl.QueryJobs([i[2] for i in self.jobs[:_CHOOSE_BATCH]],
2450 ["status"])
2451 assert result
2452
2453 for job_data, status in zip(self.jobs, result):
2454 if (isinstance(status, list) and status and
2455 status[0] in (constants.JOB_STATUS_QUEUED,
2456 constants.JOB_STATUS_WAITING,
2457 constants.JOB_STATUS_CANCELING)):
2458 # job is still present and waiting
2459 continue
2460 # good candidate found (either running job or lost job)
2461 self.jobs.remove(job_data)
2462 return job_data
2463
2464 # no job found
2465 return self.jobs.pop(0)
2466
2467 def GetResults(self):
2468 """Wait for and return the results of all jobs.
2469
2470 @rtype: list
2471 @return: list of tuples (success, job results), in the same order
2472 as the submitted jobs; if a job has failed, instead of the result
2473 there will be the error message
2474
2475 """
2476 if not self.jobs:
2477 self.SubmitPending()
2478 results = []
2479 if self.verbose:
2480 ok_jobs = [row[2] for row in self.jobs if row[1]]
2481 if ok_jobs:
2482 ToStdout("Submitted jobs %s", utils.CommaJoin(ok_jobs))
2483
2484 # first, remove any non-submitted jobs
2485 self.jobs, failures = compat.partition(self.jobs, lambda x: x[1])
2486 for idx, _, jid, name in failures:
2487 ToStderr("Failed to submit job%s: %s", self._IfName(name, " for %s"), jid)
2488 results.append((idx, False, jid))
2489
2490 while self.jobs:
2491 (idx, _, jid, name) = self._ChooseJob()
2492 ToStdout("Waiting for job %s%s ...", jid, self._IfName(name, " for %s"))
2493 try:
2494 job_result = PollJob(jid, cl=self.cl, feedback_fn=self.feedback_fn)
2495 success = True
2496 except errors.JobLost, err:
2497 _, job_result = FormatError(err)
2498 ToStderr("Job %s%s has been archived, cannot check its result",
2499 jid, self._IfName(name, " for %s"))
2500 success = False
2501 except (errors.GenericError, rpcerr.ProtocolError), err:
2502 _, job_result = FormatError(err)
2503 success = False
2504 # the error message will always be shown, verbose or not
2505 ToStderr("Job %s%s has failed: %s",
2506 jid, self._IfName(name, " for %s"), job_result)
2507
2508 results.append((idx, success, job_result))
2509
2510 # sort based on the index, then drop it
2511 results.sort()
2512 results = [i[1:] for i in results]
2513
2514 return results
2515
2516 def WaitOrShow(self, wait):
2517 """Wait for job results or only print the job IDs.
2518
2519 @type wait: boolean
2520 @param wait: whether to wait or not
2521
2522 """
2523 if wait:
2524 return self.GetResults()
2525 else:
2526 if not self.jobs:
2527 self.SubmitPending()
2528 for _, status, result, name in self.jobs:
2529 if status:
2530 ToStdout("%s: %s", result, name)
2531 else:
2532 ToStderr("Failure for %s: %s", name, result)
2533 return [row[1:3] for row in self.jobs]
2534
2535
2536 def FormatParamsDictInfo(param_dict, actual, roman=False):
2537 """Formats a parameter dictionary.
2538
2539 @type param_dict: dict
2540 @param param_dict: the own parameters
2541 @type actual: dict
2542 @param actual: the current parameter set (including defaults)
2543 @rtype: dict
2544 @return: dictionary where the value of each parameter is either a fully
2545 formatted string or a dictionary containing formatted strings
2546
2547 """
2548 ret = {}
2549 for (key, data) in actual.items():
2550 if isinstance(data, dict) and data:
2551 ret[key] = FormatParamsDictInfo(param_dict.get(key, {}), data, roman)
2552 else:
2553 default_str = "default (%s)" % compat.TryToRoman(data, roman)
2554 ret[key] = str(compat.TryToRoman(param_dict.get(key, default_str), roman))
2555 return ret
2556
2557
2558 def _FormatListInfoDefault(data, def_data):
2559 if data is not None:
2560 ret = utils.CommaJoin(data)
2561 else:
2562 ret = "default (%s)" % utils.CommaJoin(def_data)
2563 return ret
2564
2565
2566 def FormatPolicyInfo(custom_ipolicy, eff_ipolicy, iscluster, roman=False):
2567 """Formats an instance policy.
2568
2569 @type custom_ipolicy: dict
2570 @param custom_ipolicy: own policy
2571 @type eff_ipolicy: dict
2572 @param eff_ipolicy: effective policy (including defaults); ignored for
2573 cluster
2574 @type iscluster: bool
2575 @param iscluster: the policy is at cluster level
2576 @type roman: bool
2577 @param roman: whether to print the values in roman numerals
2578 @rtype: list of pairs
2579 @return: formatted data, suitable for L{PrintGenericInfo}
2580
2581 """
2582 if iscluster:
2583 eff_ipolicy = custom_ipolicy
2584
2585 minmax_out = []
2586 custom_minmax = custom_ipolicy.get(constants.ISPECS_MINMAX)
2587 if custom_minmax:
2588 for (k, minmax) in enumerate(custom_minmax):
2589 minmax_out.append([
2590 ("%s/%s" % (key, k),
2591 FormatParamsDictInfo(minmax[key], minmax[key], roman))
2592 for key in constants.ISPECS_MINMAX_KEYS
2593 ])
2594 else:
2595 for (k, minmax) in enumerate(eff_ipolicy[constants.ISPECS_MINMAX]):
2596 minmax_out.append([
2597 ("%s/%s" % (key, k),
2598 FormatParamsDictInfo({}, minmax[key], roman))
2599 for key in constants.ISPECS_MINMAX_KEYS
2600 ])
2601 ret = [("bounds specs", minmax_out)]
2602
2603 if iscluster:
2604 stdspecs = custom_ipolicy[constants.ISPECS_STD]
2605 ret.append(
2606 (constants.ISPECS_STD,
2607 FormatParamsDictInfo(stdspecs, stdspecs, roman))
2608 )
2609
2610 ret.append(
2611 ("allowed disk templates",
2612 _FormatListInfoDefault(custom_ipolicy.get(constants.IPOLICY_DTS),
2613 eff_ipolicy[constants.IPOLICY_DTS]))
2614 )
2615 to_roman = compat.TryToRoman
2616 ret.extend([
2617 (key, str(to_roman(custom_ipolicy.get(key,
2618 "default (%s)" % eff_ipolicy[key]),
2619 roman)))
2620 for key in constants.IPOLICY_PARAMETERS
2621 ])
2622 return ret
2623
2624
2625 def _PrintSpecsParameters(buf, specs):
2626 values = ("%s=%s" % (par, val) for (par, val) in sorted(specs.items()))
2627 buf.write(",".join(values))
2628
2629
2630 def PrintIPolicyCommand(buf, ipolicy, isgroup):
2631 """Print the command option used to generate the given instance policy.
2632
2633 Currently only the parts dealing with specs are supported.
2634
2635 @type buf: StringIO
2636 @param buf: stream to write into
2637 @type ipolicy: dict
2638 @param ipolicy: instance policy
2639 @type isgroup: bool
2640 @param isgroup: whether the policy is at group level
2641
2642 """
2643 if not isgroup:
2644 stdspecs = ipolicy.get("std")
2645 if stdspecs:
2646 buf.write(" %s " % IPOLICY_STD_SPECS_STR)
2647 _PrintSpecsParameters(buf, stdspecs)
2648 minmaxes = ipolicy.get("minmax", [])
2649 first = True
2650 for minmax in minmaxes:
2651 minspecs = minmax.get("min")
2652 maxspecs = minmax.get("max")
2653 if minspecs and maxspecs:
2654 if first:
2655 buf.write(" %s " % IPOLICY_BOUNDS_SPECS_STR)
2656 first = False
2657 else:
2658 buf.write("//")
2659 buf.write("min:")
2660 _PrintSpecsParameters(buf, minspecs)
2661 buf.write("/max:")
2662 _PrintSpecsParameters(buf, maxspecs)
2663
2664
2665 def ConfirmOperation(names, list_type, text, extra=""):
2666 """Ask the user to confirm an operation on a list of list_type.
2667
2668 This function is used to request confirmation for doing an operation
2669 on a given list of list_type.
2670
2671 @type names: list
2672 @param names: the list of names that we display when
2673 we ask for confirmation
2674 @type list_type: str
2675 @param list_type: Human readable name for elements in the list (e.g. nodes)
2676 @type text: str
2677 @param text: the operation that the user should confirm
2678 @rtype: boolean
2679 @return: True or False depending on user's confirmation.
2680
2681 """
2682 count = len(names)
2683 msg = ("The %s will operate on %d %s.\n%s"
2684 "Do you want to continue?" % (text, count, list_type, extra))
2685 affected = (("\nAffected %s:\n" % list_type) +
2686 "\n".join([" %s" % name for name in names]))
2687
2688 choices = [("y", True, "Yes, execute the %s" % text),
2689 ("n", False, "No, abort the %s" % text)]
2690
2691 if count > 20:
2692 choices.insert(1, ("v", "v", "View the list of affected %s" % list_type))
2693 question = msg
2694 else:
2695 question = msg + affected
2696
2697 choice = AskUser(question, choices)
2698 if choice == "v":
2699 choices.pop(1)
2700 choice = AskUser(msg + affected, choices)
2701 return choice
2702
2703
2704 def _MaybeParseUnit(elements):
2705 """Parses and returns an array of potential values with units.
2706
2707 """
2708 parsed = {}
2709 for k, v in elements.items():
2710 if v == constants.VALUE_DEFAULT:
2711 parsed[k] = v
2712 else:
2713 parsed[k] = utils.ParseUnit(v)
2714 return parsed
2715
2716
2717 def _InitISpecsFromSplitOpts(ipolicy, ispecs_mem_size, ispecs_cpu_count,
2718 ispecs_disk_count, ispecs_disk_size,
2719 ispecs_nic_count, group_ipolicy, fill_all):
2720 try:
2721 if ispecs_mem_size:
2722 ispecs_mem_size = _MaybeParseUnit(ispecs_mem_size)
2723 if ispecs_disk_size:
2724 ispecs_disk_size = _MaybeParseUnit(ispecs_disk_size)
2725 except (TypeError, ValueError, errors.UnitParseError), err:
2726 raise errors.OpPrereqError("Invalid disk (%s) or memory (%s) size"
2727 " in policy: %s" %
2728 (ispecs_disk_size, ispecs_mem_size, err),
2729 errors.ECODE_INVAL)
2730
2731 # prepare ipolicy dict
2732 ispecs_transposed = {
2733 constants.ISPEC_MEM_SIZE: ispecs_mem_size,
2734 constants.ISPEC_CPU_COUNT: ispecs_cpu_count,
2735 constants.ISPEC_DISK_COUNT: ispecs_disk_count,
2736 constants.ISPEC_DISK_SIZE: ispecs_disk_size,
2737 constants.ISPEC_NIC_COUNT: ispecs_nic_count,
2738 }
2739
2740 # first, check that the values given are correct
2741 if group_ipolicy:
2742 forced_type = TISPECS_GROUP_TYPES
2743 else:
2744 forced_type = TISPECS_CLUSTER_TYPES
2745 for specs in ispecs_transposed.values():
2746 assert type(specs) is dict
2747 utils.ForceDictType(specs, forced_type)
2748
2749 # then transpose
2750 ispecs = {
2751 constants.ISPECS_MIN: {},
2752 constants.ISPECS_MAX: {},
2753 constants.ISPECS_STD: {},
2754 }
2755 for (name, specs) in ispecs_transposed.iteritems():
2756 assert name in constants.ISPECS_PARAMETERS
2757 for key, val in specs.items(): # {min: .. ,max: .., std: ..}
2758 assert key in ispecs
2759 ispecs[key][name] = val
2760 minmax_out = {}
2761 for key in constants.ISPECS_MINMAX_KEYS:
2762 if fill_all:
2763 minmax_out[key] = \
2764 objects.FillDict(constants.ISPECS_MINMAX_DEFAULTS[key], ispecs[key])
2765 else:
2766 minmax_out[key] = ispecs[key]
2767 ipolicy[constants.ISPECS_MINMAX] = [minmax_out]
2768 if fill_all:
2769 ipolicy[constants.ISPECS_STD] = \
2770 objects.FillDict(constants.IPOLICY_DEFAULTS[constants.ISPECS_STD],
2771 ispecs[constants.ISPECS_STD])
2772 else:
2773 ipolicy[constants.ISPECS_STD] = ispecs[constants.ISPECS_STD]
2774
2775
2776 def _ParseSpecUnit(spec, keyname):
2777 ret = spec.copy()
2778 for k in [constants.ISPEC_DISK_SIZE, constants.ISPEC_MEM_SIZE]:
2779 if k in ret:
2780 try:
2781 ret[k] = utils.ParseUnit(ret[k])
2782 except (TypeError, ValueError, errors.UnitParseError), err:
2783 raise errors.OpPrereqError(("Invalid parameter %s (%s) in %s instance"
2784 " specs: %s" % (k, ret[k], keyname, err)),
2785 errors.ECODE_INVAL)
2786 return ret
2787
2788
2789 def _ParseISpec(spec, keyname, required):
2790 ret = _ParseSpecUnit(spec, keyname)
2791 utils.ForceDictType(ret, constants.ISPECS_PARAMETER_TYPES)
2792 missing = constants.ISPECS_PARAMETERS - frozenset(ret.keys())
2793 if required and missing:
2794 raise errors.OpPrereqError("Missing parameters in ipolicy spec %s: %s" %
2795 (keyname, utils.CommaJoin(missing)),
2796 errors.ECODE_INVAL)
2797 return ret
2798
2799
2800 def _GetISpecsInAllowedValues(minmax_ispecs, allowed_values):
2801 ret = None
2802 if (minmax_ispecs and allowed_values and len(minmax_ispecs) == 1 and
2803 len(minmax_ispecs[0]) == 1):
2804 for (key, spec) in minmax_ispecs[0].items():
2805 # This loop is executed exactly once
2806 if key in allowed_values and not spec:
2807 ret = key
2808 return ret
2809
2810
2811 def _InitISpecsFromFullOpts(ipolicy_out, minmax_ispecs, std_ispecs,
2812 group_ipolicy, allowed_values):
2813 found_allowed = _GetISpecsInAllowedValues(minmax_ispecs, allowed_values)
2814 if found_allowed is not None:
2815 ipolicy_out[constants.ISPECS_MINMAX] = found_allowed
2816 elif minmax_ispecs is not None:
2817 minmax_out = []
2818 for mmpair in minmax_ispecs:
2819 mmpair_out = {}
2820 for (key, spec) in mmpair.items():
2821 if key not in constants.ISPECS_MINMAX_KEYS:
2822 msg = "Invalid key in bounds instance specifications: %s" % key
2823 raise errors.OpPrereqError(msg, errors.ECODE_INVAL)
2824 mmpair_out[key] = _ParseISpec(spec, key, True)
2825 minmax_out.append(mmpair_out)
2826 ipolicy_out[constants.ISPECS_MINMAX] = minmax_out
2827 if std_ispecs is not None:
2828 assert not group_ipolicy # This is not an option for gnt-group
2829 ipolicy_out[constants.ISPECS_STD] = _ParseISpec(std_ispecs, "std", False)
2830
2831
2832 def CreateIPolicyFromOpts(ispecs_mem_size=None,
2833 ispecs_cpu_count=None,
2834 ispecs_disk_count=None,
2835 ispecs_disk_size=None,
2836 ispecs_nic_count=None,
2837 minmax_ispecs=None,
2838 std_ispecs=None,
2839 ipolicy_disk_templates=None,
2840 ipolicy_vcpu_ratio=None,
2841 ipolicy_spindle_ratio=None,
2842 group_ipolicy=False,
2843 allowed_values=None,
2844 fill_all=False):
2845 """Creation of instance policy based on command line options.
2846
2847 @param fill_all: whether for cluster policies we should ensure that
2848 all values are filled
2849
2850 """
2851 assert not (fill_all and allowed_values)
2852
2853 split_specs = (ispecs_mem_size or ispecs_cpu_count or ispecs_disk_count or
2854 ispecs_disk_size or ispecs_nic_count)
2855 if (split_specs and (minmax_ispecs is not None or std_ispecs is not None)):
2856 raise errors.OpPrereqError("A --specs-xxx option cannot be specified"
2857 " together with any --ipolicy-xxx-specs option",
2858 errors.ECODE_INVAL)
2859
2860 ipolicy_out = objects.MakeEmptyIPolicy()
2861 if split_specs:
2862 assert fill_all
2863 _InitISpecsFromSplitOpts(ipolicy_out, ispecs_mem_size, ispecs_cpu_count,
2864 ispecs_disk_count, ispecs_disk_size,
2865 ispecs_nic_count, group_ipolicy, fill_all)
2866 elif (minmax_ispecs is not None or std_ispecs is not None):
2867 _InitISpecsFromFullOpts(ipolicy_out, minmax_ispecs, std_ispecs,
2868 group_ipolicy, allowed_values)
2869
2870 if ipolicy_disk_templates is not None:
2871 if allowed_values and ipolicy_disk_templates in allowed_values:
2872 ipolicy_out[constants.IPOLICY_DTS] = ipolicy_disk_templates
2873 else:
2874 ipolicy_out[constants.IPOLICY_DTS] = list(ipolicy_disk_templates)
2875 if ipolicy_vcpu_ratio is not None:
2876 ipolicy_out[constants.IPOLICY_VCPU_RATIO] = ipolicy_vcpu_ratio
2877 if ipolicy_spindle_ratio is not None:
2878 ipolicy_out[constants.IPOLICY_SPINDLE_RATIO] = ipolicy_spindle_ratio
2879
2880 assert not (frozenset(ipolicy_out.keys()) - constants.IPOLICY_ALL_KEYS)
2881
2882 if not group_ipolicy and fill_all:
2883 ipolicy_out = objects.FillIPolicy(constants.IPOLICY_DEFAULTS, ipolicy_out)
2884
2885 return ipolicy_out
2886
2887
2888 def _NotAContainer(data):
2889 """ Checks whether the input is not a container data type.
2890
2891 @rtype: bool
2892
2893 """
2894 return not (isinstance(data, (list, dict, tuple)))
2895
2896
2897 def _GetAlignmentMapping(data):
2898 """ Returns info about alignment if present in an encoded ordered dictionary.
2899
2900 @type data: list of tuple
2901 @param data: The encoded ordered dictionary, as defined in
2902 L{_SerializeGenericInfo}.
2903 @rtype: dict of any to int
2904 @return: The dictionary mapping alignment groups to the maximum length of the
2905 dictionary key found in the group.
2906
2907 """
2908 alignment_map = {}
2909 for entry in data:
2910 if len(entry) > 2:
2911 group_key = entry[2]
2912 key_length = len(entry[0])
2913 if group_key in alignment_map:
2914 alignment_map[group_key] = max(alignment_map[group_key], key_length)
2915 else:
2916 alignment_map[group_key] = key_length
2917
2918 return alignment_map
2919
2920
2921 def _SerializeGenericInfo(buf, data, level, afterkey=False):
2922 """Formatting core of L{PrintGenericInfo}.
2923
2924 @param buf: (string) stream to accumulate the result into
2925 @param data: data to format
2926 @type level: int
2927 @param level: depth in the data hierarchy, used for indenting
2928 @type afterkey: bool
2929 @param afterkey: True when we are in the middle of a line after a key (used
2930 to properly add newlines or indentation)
2931
2932 """
2933 baseind = " "
2934 if isinstance(data, dict):
2935 if not data:
2936 buf.write("\n")
2937 else:
2938 if afterkey:
2939 buf.write("\n")
2940 doindent = True
2941 else:
2942 doindent = False
2943 for key in sorted(data):
2944 if doindent:
2945 buf.write(baseind * level)
2946 else:
2947 doindent = True
2948 buf.write(key)
2949 buf.write(": ")
2950 _SerializeGenericInfo(buf, data[key], level + 1, afterkey=True)
2951 elif isinstance(data, list) and len(data) > 0 and isinstance(data[0], tuple):
2952 # list of tuples (an ordered dictionary)
2953 # the tuples may have two or three members - key, value, and alignment group
2954 # if the alignment group is present, align all values sharing the same group
2955 if afterkey:
2956 buf.write("\n")
2957 doindent = True
2958 else:
2959 doindent = False
2960
2961 alignment_mapping = _GetAlignmentMapping(data)
2962 for entry in data:
2963 key, val = entry[0:2]
2964 if doindent:
2965 buf.write(baseind * level)
2966 else:
2967 doindent = True
2968 buf.write(key)
2969 buf.write(": ")
2970 if len(entry) > 2:
2971 max_key_length = alignment_mapping[entry[2]]
2972 buf.write(" " * (max_key_length - len(key)))
2973 _SerializeGenericInfo(buf, val, level + 1, afterkey=True)
2974 elif isinstance(data, tuple) and all(map(_NotAContainer, data)):
2975 # tuples with simple content are serialized as inline lists
2976 buf.write("[%s]\n" % utils.CommaJoin(data))
2977 elif isinstance(data, list) or isinstance(data, tuple):
2978 # lists and tuples
2979 if not data:
2980 buf.write("\n")
2981 else:
2982 if afterkey:
2983 buf.write("\n")
2984 doindent = True
2985 else:
2986 doindent = False
2987 for item in data:
2988 if doindent:
2989 buf.write(baseind * level)
2990 else:
2991 doindent = True
2992 buf.write("-")
2993 buf.write(baseind[1:])
2994 _SerializeGenericInfo(buf, item, level + 1)
2995 else:
2996 # This branch should be only taken for strings, but it's practically
2997 # impossible to guarantee that no other types are produced somewhere
2998 buf.write(str(data))
2999 buf.write("\n")
3000
3001
3002 def PrintGenericInfo(data):
3003 """Print information formatted according to the hierarchy.
3004
3005 The output is a valid YAML string.
3006
3007 @param data: the data to print. It's a hierarchical structure whose elements
3008 can be:
3009 - dictionaries, where keys are strings and values are of any of the
3010 types listed here
3011 - lists of tuples (key, value) or (key, value, alignment_group), where
3012 key is a string, value is of any of the types listed here, and
3013 alignment_group can be any hashable value; it's a way to encode
3014 ordered dictionaries; any entries sharing the same alignment group are
3015 aligned by appending whitespace before the value as needed
3016 - lists of any of the types listed here
3017 - strings
3018
3019 """
3020 buf = StringIO()
3021 _SerializeGenericInfo(buf, data, 0)
3022 ToStdout(buf.getvalue().rstrip("\n"))