Merge branch 'stable-2.13' into stable-2.14
[ganeti-github.git] / lib / mcpu.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions are
9 # met:
10 #
11 # 1. Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
13 #
14 # 2. Redistributions in binary form must reproduce the above copyright
15 # notice, this list of conditions and the following disclaimer in the
16 # documentation and/or other materials provided with the distribution.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
19 # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
20 # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30
31 """Module implementing the logic behind the cluster operations
32
33 This module implements the logic for doing operations in the cluster. There
34 are two kinds of classes defined:
35 - logical units, which know how to deal with their specific opcode only
36 - the processor, which dispatches the opcodes to their logical units
37
38 """
39
40 import sys
41 import logging
42 import random
43 import time
44 import itertools
45 import traceback
46
47 from ganeti import opcodes
48 from ganeti import opcodes_base
49 from ganeti import constants
50 from ganeti import errors
51 from ganeti import hooksmaster
52 from ganeti import cmdlib
53 from ganeti import locking
54 from ganeti import utils
55 from ganeti import compat
56 from ganeti import wconfd
57
58
59 sighupReceived = [False]
60 lusExecuting = [0]
61
62 _OP_PREFIX = "Op"
63 _LU_PREFIX = "LU"
64
65
66 class LockAcquireTimeout(Exception):
67 """Exception to report timeouts on acquiring locks.
68
69 """
70
71
72 def _CalculateLockAttemptTimeouts():
73 """Calculate timeouts for lock attempts.
74
75 """
76 result = [constants.LOCK_ATTEMPTS_MINWAIT]
77 running_sum = result[0]
78
79 # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a
80 # blocking acquire
81 while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT:
82 timeout = (result[-1] * 1.05) ** 1.25
83
84 # Cap max timeout. This gives other jobs a chance to run even if
85 # we're still trying to get our locks, before finally moving to a
86 # blocking acquire.
87 timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT)
88 # And also cap the lower boundary for safety
89 timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT)
90
91 result.append(timeout)
92 running_sum += timeout
93
94 return result
95
96
97 class LockAttemptTimeoutStrategy(object):
98 """Class with lock acquire timeout strategy.
99
100 """
101 __slots__ = [
102 "_timeouts",
103 "_random_fn",
104 "_time_fn",
105 ]
106
107 _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
108
109 def __init__(self, _time_fn=time.time, _random_fn=random.random):
110 """Initializes this class.
111
112 @param _time_fn: Time function for unittests
113 @param _random_fn: Random number generator for unittests
114
115 """
116 object.__init__(self)
117
118 self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
119 self._time_fn = _time_fn
120 self._random_fn = _random_fn
121
122 def NextAttempt(self):
123 """Returns the timeout for the next attempt.
124
125 """
126 try:
127 timeout = self._timeouts.next()
128 except StopIteration:
129 # No more timeouts, do blocking acquire
130 timeout = None
131
132 if timeout is not None:
133 # Add a small variation (-/+ 5%) to timeout. This helps in situations
134 # where two or more jobs are fighting for the same lock(s).
135 variation_range = timeout * 0.1
136 timeout += ((self._random_fn() * variation_range) -
137 (variation_range * 0.5))
138
139 return timeout
140
141
142 class OpExecCbBase(object): # pylint: disable=W0232
143 """Base class for OpCode execution callbacks.
144
145 """
146 def NotifyStart(self):
147 """Called when we are about to execute the LU.
148
149 This function is called when we're about to start the lu's Exec() method,
150 that is, after we have acquired all locks.
151
152 """
153
154 def NotifyRetry(self):
155 """Called when we are about to reset an LU to retry again.
156
157 This function is called after PrepareRetry successfully completed.
158
159 """
160
161 def Feedback(self, *args):
162 """Sends feedback from the LU code to the end-user.
163
164 """
165
166 def CurrentPriority(self): # pylint: disable=R0201
167 """Returns current priority or C{None}.
168
169 """
170 return None
171
172 def SubmitManyJobs(self, jobs):
173 """Submits jobs for processing.
174
175 See L{jqueue.JobQueue.SubmitManyJobs}.
176
177 """
178 raise NotImplementedError
179
180
181 def _LUNameForOpName(opname):
182 """Computes the LU name for a given OpCode name.
183
184 """
185 assert opname.startswith(_OP_PREFIX), \
186 "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
187
188 return _LU_PREFIX + opname[len(_OP_PREFIX):]
189
190
191 def _ComputeDispatchTable():
192 """Computes the opcode-to-lu dispatch table.
193
194 """
195 return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__)))
196 for op in opcodes.OP_MAPPING.values()
197 if op.WITH_LU)
198
199
200 def _SetBaseOpParams(src, defcomment, dst):
201 """Copies basic opcode parameters.
202
203 @type src: L{opcodes.OpCode}
204 @param src: Source opcode
205 @type defcomment: string
206 @param defcomment: Comment to specify if not already given
207 @type dst: L{opcodes.OpCode}
208 @param dst: Destination opcode
209
210 """
211 if hasattr(src, "debug_level"):
212 dst.debug_level = src.debug_level
213
214 if (getattr(dst, "priority", None) is None and
215 hasattr(src, "priority")):
216 dst.priority = src.priority
217
218 if not getattr(dst, opcodes_base.COMMENT_ATTR, None):
219 dst.comment = defcomment
220
221 if hasattr(src, constants.OPCODE_REASON):
222 dst.reason = list(getattr(dst, constants.OPCODE_REASON, []))
223 dst.reason.extend(getattr(src, constants.OPCODE_REASON, []))
224
225
226 def _ProcessResult(submit_fn, op, result):
227 """Examines opcode result.
228
229 If necessary, additional processing on the result is done.
230
231 """
232 if isinstance(result, cmdlib.ResultWithJobs):
233 # Copy basic parameters (e.g. priority)
234 map(compat.partial(_SetBaseOpParams, op,
235 "Submitted by %s" % op.OP_ID),
236 itertools.chain(*result.jobs))
237
238 # Submit jobs
239 job_submission = submit_fn(result.jobs)
240
241 # Build dictionary
242 result = result.other
243
244 assert constants.JOB_IDS_KEY not in result, \
245 "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
246
247 result[constants.JOB_IDS_KEY] = job_submission
248
249 return result
250
251
252 def _FailingSubmitManyJobs(_):
253 """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception.
254
255 """
256 raise errors.ProgrammerError("Opcodes processed without callbacks (e.g."
257 " queries) can not submit jobs")
258
259
260 def _LockList(names):
261 """If 'names' is a string, make it a single-element list.
262
263 @type names: list or string or NoneType
264 @param names: Lock names
265 @rtype: a list of strings
266 @return: if 'names' argument is an iterable, a list of it;
267 if it's a string, make it a one-element list;
268 if L{locking.ALL_SET}, L{locking.ALL_SET}
269
270 """
271 if names == locking.ALL_SET:
272 return names
273 elif isinstance(names, basestring):
274 return [names]
275 else:
276 return list(names)
277
278
279 class Processor(object):
280 """Object which runs OpCodes"""
281 DISPATCH_TABLE = _ComputeDispatchTable()
282
283 def __init__(self, context, ec_id, enable_locks=True):
284 """Constructor for Processor
285
286 @type context: GanetiContext
287 @param context: global Ganeti context
288 @type ec_id: string
289 @param ec_id: execution context identifier
290
291 """
292 self.context = context
293 self._ec_id = ec_id
294 self._cbs = None
295 self.cfg = context.GetConfig(ec_id)
296 self.rpc = context.GetRpc(self.cfg)
297 self.hmclass = hooksmaster.HooksMaster
298 self._enable_locks = enable_locks
299 self.wconfd = wconfd # Indirection to allow testing
300 self._wconfdcontext = context.GetWConfdContext(ec_id)
301
302 def _CheckLocksEnabled(self):
303 """Checks if locking is enabled.
304
305 @raise errors.ProgrammerError: In case locking is not enabled
306
307 """
308 if not self._enable_locks:
309 raise errors.ProgrammerError("Attempted to use disabled locks")
310
311 def _RequestAndWait(self, request, timeout):
312 """Request locks from WConfD and wait for them to be granted.
313
314 @type request: list
315 @param request: the lock request to be sent to WConfD
316 @type timeout: float
317 @param timeout: the time to wait for the request to be granted
318 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
319 amount of time; in this case, locks still might be acquired or a request
320 pending.
321
322 """
323 logging.debug("Trying %ss to request %s for %s",
324 timeout, request, self._wconfdcontext)
325 if self._cbs:
326 priority = self._cbs.CurrentPriority() # pylint: disable=W0612
327 else:
328 priority = None
329
330 if priority is None:
331 priority = constants.OP_PRIO_DEFAULT
332
333 ## Expect a signal
334 if sighupReceived[0]:
335 logging.warning("Ignoring unexpected SIGHUP")
336 sighupReceived[0] = False
337
338 # Request locks
339 self.wconfd.Client().UpdateLocksWaiting(self._wconfdcontext, priority,
340 request)
341 pending = self.wconfd.Client().HasPendingRequest(self._wconfdcontext)
342
343 if pending:
344 def _HasPending():
345 if sighupReceived[0]:
346 return self.wconfd.Client().HasPendingRequest(self._wconfdcontext)
347 else:
348 return True
349
350 pending = utils.SimpleRetry(False, _HasPending, 0.05, timeout)
351
352 signal = sighupReceived[0]
353
354 if pending:
355 pending = self.wconfd.Client().HasPendingRequest(self._wconfdcontext)
356
357 if pending and signal:
358 logging.warning("Ignoring unexpected SIGHUP")
359 sighupReceived[0] = False
360
361 logging.debug("Finished trying. Pending: %s", pending)
362 if pending:
363 raise LockAcquireTimeout()
364
365 def _AcquireLocks(self, level, names, shared, opportunistic, timeout,
366 opportunistic_count=1, request_only=False):
367 """Acquires locks via the Ganeti lock manager.
368
369 @type level: int
370 @param level: Lock level
371 @type names: list or string
372 @param names: Lock names
373 @type shared: bool
374 @param shared: Whether the locks should be acquired in shared mode
375 @type opportunistic: bool
376 @param opportunistic: Whether to acquire opportunistically
377 @type timeout: None or float
378 @param timeout: Timeout for acquiring the locks
379 @type request_only: bool
380 @param request_only: do not acquire the locks, just return the request
381 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
382 amount of time; in this case, locks still might be acquired or a request
383 pending.
384
385 """
386 self._CheckLocksEnabled()
387
388 if self._cbs:
389 priority = self._cbs.CurrentPriority() # pylint: disable=W0612
390 else:
391 priority = None
392
393 if priority is None:
394 priority = constants.OP_PRIO_DEFAULT
395
396 if names == locking.ALL_SET:
397 if opportunistic:
398 expand_fns = {
399 locking.LEVEL_CLUSTER: (lambda: [locking.BGL]),
400 locking.LEVEL_INSTANCE: self.cfg.GetInstanceList,
401 locking.LEVEL_NODEGROUP: self.cfg.GetNodeGroupList,
402 locking.LEVEL_NODE: self.cfg.GetNodeList,
403 locking.LEVEL_NODE_RES: self.cfg.GetNodeList,
404 locking.LEVEL_NETWORK: self.cfg.GetNetworkList,
405 }
406 names = expand_fns[level]()
407 else:
408 names = locking.LOCKSET_NAME
409
410 names = _LockList(names)
411
412 # For locks of the same level, the lock order is lexicographic
413 names.sort()
414
415 levelname = locking.LEVEL_NAMES[level]
416
417 locks = ["%s/%s" % (levelname, lock) for lock in list(names)]
418
419 if not names:
420 logging.debug("Acquiring no locks for (%s) at level %s",
421 self._wconfdcontext, levelname)
422 return []
423
424 if shared:
425 request = [[lock, "shared"] for lock in locks]
426 else:
427 request = [[lock, "exclusive"] for lock in locks]
428
429 if request_only:
430 logging.debug("Lock request for level %s is %s", level, request)
431 return request
432
433 self.cfg.OutDate()
434
435 if timeout is None:
436 ## Note: once we are so desperate for locks to request them
437 ## unconditionally, we no longer care about an original plan
438 ## to acquire locks opportunistically.
439 logging.info("Definitely requesting %s for %s",
440 request, self._wconfdcontext)
441 ## The only way to be sure of not getting starved is to sequentially
442 ## acquire the locks one by one (in lock order).
443 for r in request:
444 logging.debug("Definite request %s for %s", r, self._wconfdcontext)
445 self.wconfd.Client().UpdateLocksWaiting(self._wconfdcontext, priority,
446 [r])
447 while True:
448 pending = self.wconfd.Client().HasPendingRequest(self._wconfdcontext)
449 if not pending:
450 break
451 time.sleep(10.0 * random.random())
452
453 elif opportunistic:
454 logging.debug("For %ss trying to opportunistically acquire"
455 " at least %d of %s for %s.",
456 timeout, opportunistic_count, locks, self._wconfdcontext)
457 locks = utils.SimpleRetry(
458 lambda l: l != [], self.wconfd.Client().GuardedOpportunisticLockUnion,
459 2.0, timeout, args=[opportunistic_count, self._wconfdcontext, request])
460 logging.debug("Managed to get the following locks: %s", locks)
461 if locks == []:
462 raise LockAcquireTimeout()
463 else:
464 self._RequestAndWait(request, timeout)
465
466 return locks
467
468 def _ExecLU(self, lu):
469 """Logical Unit execution sequence.
470
471 """
472 write_count = self.cfg.write_count
473 lu.cfg.OutDate()
474 lu.CheckPrereq()
475
476 hm = self.BuildHooksManager(lu)
477 h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
478 lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
479 self.Log, None)
480
481 if getattr(lu.op, "dry_run", False):
482 # in this mode, no post-hooks are run, and the config is not
483 # written (as it might have been modified by another LU, and we
484 # shouldn't do writeout on behalf of other threads
485 self.LogInfo("dry-run mode requested, not actually executing"
486 " the operation")
487 return lu.dry_run_result
488
489 if self._cbs:
490 submit_mj_fn = self._cbs.SubmitManyJobs
491 else:
492 submit_mj_fn = _FailingSubmitManyJobs
493
494 lusExecuting[0] += 1
495 try:
496 result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log))
497 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
498 result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
499 self.Log, result)
500 finally:
501 # FIXME: This needs locks if not lu_class.REQ_BGL
502 lusExecuting[0] -= 1
503 if write_count != self.cfg.write_count:
504 hm.RunConfigUpdate()
505
506 return result
507
508 def BuildHooksManager(self, lu):
509 return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu)
510
511 def _LockAndExecLU(self, lu, level, calc_timeout, pending=None):
512 """Execute a Logical Unit, with the needed locks.
513
514 This is a recursive function that starts locking the given level, and
515 proceeds up, till there are no more locks to acquire. Then it executes the
516 given LU and its opcodes.
517
518 """
519 pending = pending or []
520 logging.debug("Looking at locks of level %s, still need to obtain %s",
521 level, pending)
522 adding_locks = level in lu.add_locks
523 acquiring_locks = level in lu.needed_locks
524
525 if level not in locking.LEVELS:
526 if pending:
527 self._RequestAndWait(pending, calc_timeout())
528 lu.cfg.OutDate()
529 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext)
530 pending = []
531
532 logging.debug("Finished acquiring locks")
533
534 if self._cbs:
535 self._cbs.NotifyStart()
536
537 try:
538 result = self._ExecLU(lu)
539 except errors.OpPrereqError, err:
540 (_, ecode) = err.args
541 if ecode != errors.ECODE_TEMP_NORES:
542 raise
543 logging.debug("Temporarily out of resources; will retry internally")
544 try:
545 lu.PrepareRetry(self.Log)
546 if self._cbs:
547 self._cbs.NotifyRetry()
548 except errors.OpRetryNotSupportedError:
549 logging.debug("LU does not know how to retry.")
550 raise err
551 raise LockAcquireTimeout()
552 except AssertionError, err:
553 # this is a bit ugly, as we don't know from which phase
554 # (prereq, exec) this comes; but it's better than an exception
555 # with no information
556 (_, _, tb) = sys.exc_info()
557 err_info = traceback.format_tb(tb)
558 del tb
559 logging.exception("Detected AssertionError")
560 raise errors.OpExecError("Internal assertion error: please report"
561 " this as a bug.\nError message: '%s';"
562 " location:\n%s" % (str(err), err_info[-1]))
563 return result
564
565 # Determine if the acquiring is opportunistic up front
566 opportunistic = lu.opportunistic_locks[level]
567
568 dont_collate = lu.dont_collate_locks[level]
569
570 if dont_collate and pending:
571 self._RequestAndWait(pending, calc_timeout())
572 lu.cfg.OutDate()
573 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext)
574 pending = []
575
576 if adding_locks and opportunistic:
577 # We could simultaneously acquire locks opportunistically and add new
578 # ones, but that would require altering the API, and no use cases are
579 # present in the system at the moment.
580 raise NotImplementedError("Can't opportunistically acquire locks when"
581 " adding new ones")
582
583 if adding_locks and acquiring_locks and \
584 lu.needed_locks[level] == locking.ALL_SET:
585 # It would also probably be possible to acquire all locks of a certain
586 # type while adding new locks, but there is no use case at the moment.
587 raise NotImplementedError("Can't request all locks of a certain level"
588 " and add new locks")
589
590 if adding_locks or acquiring_locks:
591 self._CheckLocksEnabled()
592
593 lu.DeclareLocks(level)
594 share = lu.share_locks[level]
595 opportunistic_count = lu.opportunistic_locks_count[level]
596
597 try:
598 if acquiring_locks:
599 needed_locks = _LockList(lu.needed_locks[level])
600 else:
601 needed_locks = []
602
603 if adding_locks:
604 needed_locks.extend(_LockList(lu.add_locks[level]))
605
606 timeout = calc_timeout()
607 if timeout is not None and not opportunistic:
608 pending = pending + self._AcquireLocks(level, needed_locks, share,
609 opportunistic, timeout,
610 request_only=True)
611 else:
612 if pending:
613 self._RequestAndWait(pending, calc_timeout())
614 lu.cfg.OutDate()
615 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext)
616 pending = []
617 self._AcquireLocks(level, needed_locks, share, opportunistic,
618 timeout,
619 opportunistic_count=opportunistic_count)
620 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext)
621
622 result = self._LockAndExecLU(lu, level + 1, calc_timeout,
623 pending=pending)
624 finally:
625 levelname = locking.LEVEL_NAMES[level]
626 logging.debug("Freeing locks at level %s for %s",
627 levelname, self._wconfdcontext)
628 self.wconfd.Client().FreeLocksLevel(self._wconfdcontext, levelname)
629 else:
630 result = self._LockAndExecLU(lu, level + 1, calc_timeout, pending=pending)
631
632 return result
633
634 # pylint: disable=R0201
635 def _CheckLUResult(self, op, result):
636 """Check the LU result against the contract in the opcode.
637
638 """
639 resultcheck_fn = op.OP_RESULT
640 if not (resultcheck_fn is None or resultcheck_fn(result)):
641 logging.error("Expected opcode result matching %s, got %s",
642 resultcheck_fn, result)
643 if not getattr(op, "dry_run", False):
644 # FIXME: LUs should still behave in dry_run mode, or
645 # alternately we should have OP_DRYRUN_RESULT; in the
646 # meantime, we simply skip the OP_RESULT check in dry-run mode
647 raise errors.OpResultError("Opcode result does not match %s: %s" %
648 (resultcheck_fn, utils.Truncate(result, 80)))
649
650 def ExecOpCode(self, op, cbs, timeout=None):
651 """Execute an opcode.
652
653 @type op: an OpCode instance
654 @param op: the opcode to be executed
655 @type cbs: L{OpExecCbBase}
656 @param cbs: Runtime callbacks
657 @type timeout: float or None
658 @param timeout: Maximum time to acquire all locks, None for no timeout
659 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
660 amount of time
661
662 """
663 if not isinstance(op, opcodes.OpCode):
664 raise errors.ProgrammerError("Non-opcode instance passed"
665 " to ExecOpcode (%s)" % type(op))
666
667 lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
668 if lu_class is None:
669 raise errors.OpCodeUnknown("Unknown opcode")
670
671 if timeout is None:
672 calc_timeout = lambda: None
673 else:
674 calc_timeout = utils.RunningTimeout(timeout, False).Remaining
675
676 self._cbs = cbs
677 try:
678 if self._enable_locks:
679 # Acquire the Big Ganeti Lock exclusively if this LU requires it,
680 # and in a shared fashion otherwise (to prevent concurrent run with
681 # an exclusive LU.
682 self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
683 not lu_class.REQ_BGL, False, calc_timeout())
684 elif lu_class.REQ_BGL:
685 raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
686 " disabled" % op.OP_ID)
687
688 lu = lu_class(self, op, self.context, self.cfg, self.rpc,
689 self._wconfdcontext, self.wconfd)
690 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext)
691 lu.ExpandNames()
692 assert lu.needed_locks is not None, "needed_locks not set by LU"
693
694 try:
695 result = self._LockAndExecLU(lu, locking.LEVEL_CLUSTER + 1,
696 calc_timeout)
697 finally:
698 if self._ec_id:
699 self.cfg.DropECReservations(self._ec_id)
700 finally:
701 self.wconfd.Client().FreeLocksLevel(
702 self._wconfdcontext, locking.LEVEL_NAMES[locking.LEVEL_CLUSTER])
703 self._cbs = None
704
705 self._CheckLUResult(op, result)
706
707 return result
708
709 def Log(self, *args):
710 """Forward call to feedback callback function.
711
712 """
713 if self._cbs:
714 self._cbs.Feedback(*args)
715
716 def LogStep(self, current, total, message):
717 """Log a change in LU execution progress.
718
719 """
720 logging.debug("Step %d/%d %s", current, total, message)
721 self.Log("STEP %d/%d %s" % (current, total, message))
722
723 def LogWarning(self, message, *args, **kwargs):
724 """Log a warning to the logs and the user.
725
726 The optional keyword argument is 'hint' and can be used to show a
727 hint to the user (presumably related to the warning). If the
728 message is empty, it will not be printed at all, allowing one to
729 show only a hint.
730
731 """
732 assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
733 "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
734 if args:
735 message = message % tuple(args)
736 if message:
737 logging.warning(message)
738 self.Log(" - WARNING: %s" % message)
739 if "hint" in kwargs:
740 self.Log(" Hint: %s" % kwargs["hint"])
741
742 def LogInfo(self, message, *args):
743 """Log an informational message to the logs and the user.
744
745 """
746 if args:
747 message = message % tuple(args)
748 logging.info(message)
749 self.Log(" - INFO: %s" % message)
750
751 def GetECId(self):
752 """Returns the current execution context ID.
753
754 """
755 if not self._ec_id:
756 raise errors.ProgrammerError("Tried to use execution context id when"
757 " not set")
758 return self._ec_id