Merge branch 'stable-2.16' into stable-2.17
[ganeti-github.git] / lib / mcpu.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions are
9 # met:
10 #
11 # 1. Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
13 #
14 # 2. Redistributions in binary form must reproduce the above copyright
15 # notice, this list of conditions and the following disclaimer in the
16 # documentation and/or other materials provided with the distribution.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
19 # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
20 # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30
31 """Module implementing the logic behind the cluster operations
32
33 This module implements the logic for doing operations in the cluster. There
34 are two kinds of classes defined:
35 - logical units, which know how to deal with their specific opcode only
36 - the processor, which dispatches the opcodes to their logical units
37
38 """
39
40 import sys
41 import logging
42 import random
43 import time
44 import itertools
45 import traceback
46
47 from ganeti import opcodes
48 from ganeti import opcodes_base
49 from ganeti import constants
50 from ganeti import errors
51 from ganeti import hooksmaster
52 from ganeti import cmdlib
53 from ganeti import locking
54 from ganeti import utils
55 from ganeti import wconfd
56
57
58 sighupReceived = [False]
59 lusExecuting = [0]
60
61 _OP_PREFIX = "Op"
62 _LU_PREFIX = "LU"
63
64
65 class LockAcquireTimeout(Exception):
66 """Exception to report timeouts on acquiring locks.
67
68 """
69
70
71 def _CalculateLockAttemptTimeouts():
72 """Calculate timeouts for lock attempts.
73
74 """
75 result = [constants.LOCK_ATTEMPTS_MINWAIT]
76 running_sum = result[0]
77
78 # Wait for a total of at least LOCK_ATTEMPTS_TIMEOUT before doing a
79 # blocking acquire
80 while running_sum < constants.LOCK_ATTEMPTS_TIMEOUT:
81 timeout = (result[-1] * 1.05) ** 1.25
82
83 # Cap max timeout. This gives other jobs a chance to run even if
84 # we're still trying to get our locks, before finally moving to a
85 # blocking acquire.
86 timeout = min(timeout, constants.LOCK_ATTEMPTS_MAXWAIT)
87 # And also cap the lower boundary for safety
88 timeout = max(timeout, constants.LOCK_ATTEMPTS_MINWAIT)
89
90 result.append(timeout)
91 running_sum += timeout
92
93 return result
94
95
96 class LockAttemptTimeoutStrategy(object):
97 """Class with lock acquire timeout strategy.
98
99 """
100 __slots__ = [
101 "_timeouts",
102 "_random_fn",
103 "_time_fn",
104 ]
105
106 _TIMEOUT_PER_ATTEMPT = _CalculateLockAttemptTimeouts()
107
108 def __init__(self, _time_fn=time.time, _random_fn=random.random):
109 """Initializes this class.
110
111 @param _time_fn: Time function for unittests
112 @param _random_fn: Random number generator for unittests
113
114 """
115 object.__init__(self)
116
117 self._timeouts = iter(self._TIMEOUT_PER_ATTEMPT)
118 self._time_fn = _time_fn
119 self._random_fn = _random_fn
120
121 def NextAttempt(self):
122 """Returns the timeout for the next attempt.
123
124 """
125 try:
126 timeout = self._timeouts.next()
127 except StopIteration:
128 # No more timeouts, do blocking acquire
129 timeout = None
130
131 if timeout is not None:
132 # Add a small variation (-/+ 5%) to timeout. This helps in situations
133 # where two or more jobs are fighting for the same lock(s).
134 variation_range = timeout * 0.1
135 timeout += ((self._random_fn() * variation_range) -
136 (variation_range * 0.5))
137
138 return timeout
139
140
141 class OpExecCbBase(object): # pylint: disable=W0232
142 """Base class for OpCode execution callbacks.
143
144 """
145 def NotifyStart(self):
146 """Called when we are about to execute the LU.
147
148 This function is called when we're about to start the lu's Exec() method,
149 that is, after we have acquired all locks.
150
151 """
152
153 def NotifyRetry(self):
154 """Called when we are about to reset an LU to retry again.
155
156 This function is called after PrepareRetry successfully completed.
157
158 """
159
160 # TODO: Cleanup calling conventions, make them explicit.
161 def Feedback(self, *args):
162 """Sends feedback from the LU code to the end-user.
163
164 """
165
166 def CurrentPriority(self): # pylint: disable=R0201
167 """Returns current priority or C{None}.
168
169 """
170 return None
171
172 def SubmitManyJobs(self, jobs):
173 """Submits jobs for processing.
174
175 See L{jqueue.JobQueue.SubmitManyJobs}.
176
177 """
178 raise NotImplementedError
179
180
181 def _LUNameForOpName(opname):
182 """Computes the LU name for a given OpCode name.
183
184 """
185 assert opname.startswith(_OP_PREFIX), \
186 "Invalid OpCode name, doesn't start with %s: %s" % (_OP_PREFIX, opname)
187
188 return _LU_PREFIX + opname[len(_OP_PREFIX):]
189
190
191 def _ComputeDispatchTable():
192 """Computes the opcode-to-lu dispatch table.
193
194 """
195 return dict((op, getattr(cmdlib, _LUNameForOpName(op.__name__)))
196 for op in opcodes.OP_MAPPING.values()
197 if op.WITH_LU)
198
199
200 def _SetBaseOpParams(src, defcomment, dst):
201 """Copies basic opcode parameters.
202
203 @type src: L{opcodes.OpCode}
204 @param src: Source opcode
205 @type defcomment: string
206 @param defcomment: Comment to specify if not already given
207 @type dst: L{opcodes.OpCode}
208 @param dst: Destination opcode
209
210 """
211 if hasattr(src, "debug_level"):
212 dst.debug_level = src.debug_level
213
214 if (getattr(dst, "priority", None) is None and
215 hasattr(src, "priority")):
216 dst.priority = src.priority
217
218 if not getattr(dst, opcodes_base.COMMENT_ATTR, None):
219 dst.comment = defcomment
220
221 if hasattr(src, constants.OPCODE_REASON):
222 dst.reason = list(getattr(dst, constants.OPCODE_REASON, []))
223 dst.reason.extend(getattr(src, constants.OPCODE_REASON, []))
224
225
226 def _ProcessResult(submit_fn, op, result):
227 """Examines opcode result.
228
229 If necessary, additional processing on the result is done.
230
231 """
232 if isinstance(result, cmdlib.ResultWithJobs):
233 # Copy basic parameters (e.g. priority)
234 for op2 in itertools.chain(*result.jobs):
235 _SetBaseOpParams(op, "Submitted by %s" % op.OP_ID, op2)
236
237 # Submit jobs
238 job_submission = submit_fn(result.jobs)
239
240 # Build dictionary
241 result = result.other
242
243 assert constants.JOB_IDS_KEY not in result, \
244 "Key '%s' found in additional return values" % constants.JOB_IDS_KEY
245
246 result[constants.JOB_IDS_KEY] = job_submission
247
248 return result
249
250
251 def _FailingSubmitManyJobs(_):
252 """Implementation of L{OpExecCbBase.SubmitManyJobs} to raise an exception.
253
254 """
255 raise errors.ProgrammerError("Opcodes processed without callbacks (e.g."
256 " queries) can not submit jobs")
257
258
259 def _LockList(names):
260 """If 'names' is a string, make it a single-element list.
261
262 @type names: list or string or NoneType
263 @param names: Lock names
264 @rtype: a list of strings
265 @return: if 'names' argument is an iterable, a list of it;
266 if it's a string, make it a one-element list;
267 if L{locking.ALL_SET}, L{locking.ALL_SET}
268
269 """
270 if names == locking.ALL_SET:
271 return names
272 elif isinstance(names, basestring):
273 return [names]
274 else:
275 return list(names)
276
277
278 def _CheckSecretParameters(op):
279 """Check if secret parameters are expected, but missing.
280
281 """
282 if hasattr(op, "osparams_secret") and op.osparams_secret:
283 for secret_param in op.osparams_secret:
284 if op.osparams_secret[secret_param].Get() == constants.REDACTED:
285 raise errors.OpPrereqError("Please re-submit secret parameters to job.",
286 errors.ECODE_INVAL)
287
288
289 class Processor(object):
290 """Object which runs OpCodes"""
291 DISPATCH_TABLE = _ComputeDispatchTable()
292
293 def __init__(self, context, ec_id, enable_locks=True):
294 """Constructor for Processor
295
296 @type context: GanetiContext
297 @param context: global Ganeti context
298 @type ec_id: string
299 @param ec_id: execution context identifier
300
301 """
302 self._ec_id = ec_id
303 self._cbs = None
304 self.cfg = context.GetConfig(ec_id)
305 self.rpc = context.GetRpc(self.cfg)
306 self.hmclass = hooksmaster.HooksMaster
307 self._enable_locks = enable_locks
308 self.wconfd = wconfd # Indirection to allow testing
309 self._wconfdcontext = context.GetWConfdContext(ec_id)
310
311 def _CheckLocksEnabled(self):
312 """Checks if locking is enabled.
313
314 @raise errors.ProgrammerError: In case locking is not enabled
315
316 """
317 if not self._enable_locks:
318 raise errors.ProgrammerError("Attempted to use disabled locks")
319
320 def _RequestAndWait(self, request, timeout):
321 """Request locks from WConfD and wait for them to be granted.
322
323 @type request: list
324 @param request: the lock request to be sent to WConfD
325 @type timeout: float
326 @param timeout: the time to wait for the request to be granted
327 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
328 amount of time; in this case, locks still might be acquired or a request
329 pending.
330
331 """
332 logging.debug("Trying %ss to request %s for %s",
333 timeout, request, self._wconfdcontext)
334 if self._cbs:
335 priority = self._cbs.CurrentPriority() # pylint: disable=W0612
336 else:
337 priority = None
338
339 if priority is None:
340 priority = constants.OP_PRIO_DEFAULT
341
342 ## Expect a signal
343 if sighupReceived[0]:
344 logging.warning("Ignoring unexpected SIGHUP")
345 sighupReceived[0] = False
346
347 # Request locks
348 self.wconfd.Client().UpdateLocksWaiting(self._wconfdcontext, priority,
349 request)
350 pending = self.wconfd.Client().HasPendingRequest(self._wconfdcontext)
351
352 if pending:
353 def _HasPending():
354 if sighupReceived[0]:
355 return self.wconfd.Client().HasPendingRequest(self._wconfdcontext)
356 else:
357 return True
358
359 pending = utils.SimpleRetry(False, _HasPending, 0.05, timeout)
360
361 signal = sighupReceived[0]
362
363 if pending:
364 pending = self.wconfd.Client().HasPendingRequest(self._wconfdcontext)
365
366 if pending and signal:
367 logging.warning("Ignoring unexpected SIGHUP")
368 sighupReceived[0] = False
369
370 logging.debug("Finished trying. Pending: %s", pending)
371 if pending:
372 raise LockAcquireTimeout()
373
374 def _AcquireLocks(self, level, names, shared, opportunistic, timeout,
375 opportunistic_count=1, request_only=False):
376 """Acquires locks via the Ganeti lock manager.
377
378 @type level: int
379 @param level: Lock level
380 @type names: list or string
381 @param names: Lock names
382 @type shared: bool
383 @param shared: Whether the locks should be acquired in shared mode
384 @type opportunistic: bool
385 @param opportunistic: Whether to acquire opportunistically
386 @type timeout: None or float
387 @param timeout: Timeout for acquiring the locks
388 @type request_only: bool
389 @param request_only: do not acquire the locks, just return the request
390 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
391 amount of time; in this case, locks still might be acquired or a request
392 pending.
393
394 """
395 self._CheckLocksEnabled()
396
397 if self._cbs:
398 priority = self._cbs.CurrentPriority() # pylint: disable=W0612
399 else:
400 priority = None
401
402 if priority is None:
403 priority = constants.OP_PRIO_DEFAULT
404
405 if names == locking.ALL_SET:
406 if opportunistic:
407 expand_fns = {
408 locking.LEVEL_CLUSTER: (lambda: [locking.BGL]),
409 locking.LEVEL_INSTANCE: self.cfg.GetInstanceList,
410 locking.LEVEL_NODEGROUP: self.cfg.GetNodeGroupList,
411 locking.LEVEL_NODE: self.cfg.GetNodeList,
412 locking.LEVEL_NODE_RES: self.cfg.GetNodeList,
413 locking.LEVEL_NETWORK: self.cfg.GetNetworkList,
414 }
415 names = expand_fns[level]()
416 else:
417 names = locking.LOCKSET_NAME
418
419 names = _LockList(names)
420
421 # For locks of the same level, the lock order is lexicographic
422 names.sort()
423
424 levelname = locking.LEVEL_NAMES[level]
425
426 locks = ["%s/%s" % (levelname, lock) for lock in list(names)]
427
428 if not names:
429 logging.debug("Acquiring no locks for (%s) at level %s",
430 self._wconfdcontext, levelname)
431 return []
432
433 if shared:
434 request = [[lock, "shared"] for lock in locks]
435 else:
436 request = [[lock, "exclusive"] for lock in locks]
437
438 if request_only:
439 logging.debug("Lock request for level %s is %s", level, request)
440 return request
441
442 self.cfg.OutDate()
443
444 if timeout is None:
445 ## Note: once we are so desperate for locks to request them
446 ## unconditionally, we no longer care about an original plan
447 ## to acquire locks opportunistically.
448 logging.info("Definitely requesting %s for %s",
449 request, self._wconfdcontext)
450 ## The only way to be sure of not getting starved is to sequentially
451 ## acquire the locks one by one (in lock order).
452 for r in request:
453 logging.debug("Definite request %s for %s", r, self._wconfdcontext)
454 self.wconfd.Client().UpdateLocksWaiting(self._wconfdcontext, priority,
455 [r])
456 while True:
457 pending = self.wconfd.Client().HasPendingRequest(self._wconfdcontext)
458 if not pending:
459 break
460 time.sleep(10.0 * random.random())
461
462 elif opportunistic:
463 logging.debug("For %ss trying to opportunistically acquire"
464 " at least %d of %s for %s.",
465 timeout, opportunistic_count, locks, self._wconfdcontext)
466 locks = utils.SimpleRetry(
467 lambda l: l != [], self.wconfd.Client().GuardedOpportunisticLockUnion,
468 2.0, timeout, args=[opportunistic_count, self._wconfdcontext, request])
469 logging.debug("Managed to get the following locks: %s", locks)
470 if locks == []:
471 raise LockAcquireTimeout()
472 else:
473 self._RequestAndWait(request, timeout)
474
475 return locks
476
477 def _ExecLU(self, lu):
478 """Logical Unit execution sequence.
479
480 """
481 write_count = self.cfg.write_count
482 lu.cfg.OutDate()
483 lu.CheckPrereq()
484
485 hm = self.BuildHooksManager(lu)
486 try:
487 h_results = hm.RunPhase(constants.HOOKS_PHASE_PRE)
488 except Exception, err: # pylint: disable=W0703
489 # This gives the LU a chance of cleaning up in case of an hooks failure.
490 # The type of exception is deliberately broad to be able to react to
491 # any kind of failure.
492 lu.HooksAbortCallBack(constants.HOOKS_PHASE_PRE, self.Log, err)
493 # We re-raise the exception to not alter the behavior of LU handling
494 # otherwise.
495 raise err
496 lu.HooksCallBack(constants.HOOKS_PHASE_PRE, h_results,
497 self.Log, None)
498
499 if getattr(lu.op, "dry_run", False):
500 # in this mode, no post-hooks are run, and the config is not
501 # written (as it might have been modified by another LU, and we
502 # shouldn't do writeout on behalf of other threads
503 self.LogInfo("dry-run mode requested, not actually executing"
504 " the operation")
505 return lu.dry_run_result
506
507 if self._cbs:
508 submit_mj_fn = self._cbs.SubmitManyJobs
509 else:
510 submit_mj_fn = _FailingSubmitManyJobs
511
512 lusExecuting[0] += 1
513 try:
514 result = _ProcessResult(submit_mj_fn, lu.op, lu.Exec(self.Log))
515 h_results = hm.RunPhase(constants.HOOKS_PHASE_POST)
516 result = lu.HooksCallBack(constants.HOOKS_PHASE_POST, h_results,
517 self.Log, result)
518 finally:
519 # FIXME: This needs locks if not lu_class.REQ_BGL
520 lusExecuting[0] -= 1
521 if write_count != self.cfg.write_count:
522 hm.RunConfigUpdate()
523
524 return result
525
526 def BuildHooksManager(self, lu):
527 return self.hmclass.BuildFromLu(lu.rpc.call_hooks_runner, lu)
528
529 def _LockAndExecLU(self, lu, level, calc_timeout, pending=None):
530 """Execute a Logical Unit, with the needed locks.
531
532 This is a recursive function that starts locking the given level, and
533 proceeds up, till there are no more locks to acquire. Then it executes the
534 given LU and its opcodes.
535
536 """
537 pending = pending or []
538 logging.debug("Looking at locks of level %s, still need to obtain %s",
539 level, pending)
540 adding_locks = level in lu.add_locks
541 acquiring_locks = level in lu.needed_locks
542
543 if level not in locking.LEVELS:
544 if pending:
545 self._RequestAndWait(pending, calc_timeout())
546 lu.cfg.OutDate()
547 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext)
548 pending = []
549
550 logging.debug("Finished acquiring locks")
551
552 if self._cbs:
553 self._cbs.NotifyStart()
554
555 try:
556 result = self._ExecLU(lu)
557 except errors.OpPrereqError, err:
558 if len(err.args) < 2 or err.args[1] != errors.ECODE_TEMP_NORES:
559 raise
560
561 logging.debug("Temporarily out of resources; will retry internally")
562 try:
563 lu.PrepareRetry(self.Log)
564 if self._cbs:
565 self._cbs.NotifyRetry()
566 except errors.OpRetryNotSupportedError:
567 logging.debug("LU does not know how to retry.")
568 raise err
569 raise LockAcquireTimeout()
570 except AssertionError, err:
571 # this is a bit ugly, as we don't know from which phase
572 # (prereq, exec) this comes; but it's better than an exception
573 # with no information
574 (_, _, tb) = sys.exc_info()
575 err_info = traceback.format_tb(tb)
576 del tb
577 logging.exception("Detected AssertionError")
578 raise errors.OpExecError("Internal assertion error: please report"
579 " this as a bug.\nError message: '%s';"
580 " location:\n%s" % (str(err), err_info[-1]))
581 return result
582
583 # Determine if the acquiring is opportunistic up front
584 opportunistic = lu.opportunistic_locks[level]
585
586 dont_collate = lu.dont_collate_locks[level]
587
588 if dont_collate and pending:
589 self._RequestAndWait(pending, calc_timeout())
590 lu.cfg.OutDate()
591 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext)
592 pending = []
593
594 if adding_locks and opportunistic:
595 # We could simultaneously acquire locks opportunistically and add new
596 # ones, but that would require altering the API, and no use cases are
597 # present in the system at the moment.
598 raise NotImplementedError("Can't opportunistically acquire locks when"
599 " adding new ones")
600
601 if adding_locks and acquiring_locks and \
602 lu.needed_locks[level] == locking.ALL_SET:
603 # It would also probably be possible to acquire all locks of a certain
604 # type while adding new locks, but there is no use case at the moment.
605 raise NotImplementedError("Can't request all locks of a certain level"
606 " and add new locks")
607
608 if adding_locks or acquiring_locks:
609 self._CheckLocksEnabled()
610
611 lu.DeclareLocks(level)
612 share = lu.share_locks[level]
613 opportunistic_count = lu.opportunistic_locks_count[level]
614
615 try:
616 if acquiring_locks:
617 needed_locks = _LockList(lu.needed_locks[level])
618 else:
619 needed_locks = []
620
621 if adding_locks:
622 needed_locks.extend(_LockList(lu.add_locks[level]))
623
624 timeout = calc_timeout()
625 if timeout is not None and not opportunistic:
626 pending = pending + self._AcquireLocks(level, needed_locks, share,
627 opportunistic, timeout,
628 request_only=True)
629 else:
630 if pending:
631 self._RequestAndWait(pending, calc_timeout())
632 lu.cfg.OutDate()
633 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext)
634 pending = []
635 self._AcquireLocks(level, needed_locks, share, opportunistic,
636 timeout,
637 opportunistic_count=opportunistic_count)
638 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext)
639
640 result = self._LockAndExecLU(lu, level + 1, calc_timeout,
641 pending=pending)
642 finally:
643 levelname = locking.LEVEL_NAMES[level]
644 logging.debug("Freeing locks at level %s for %s",
645 levelname, self._wconfdcontext)
646 self.wconfd.Client().FreeLocksLevel(self._wconfdcontext, levelname)
647 else:
648 result = self._LockAndExecLU(lu, level + 1, calc_timeout, pending=pending)
649
650 return result
651
652 # pylint: disable=R0201
653 def _CheckLUResult(self, op, result):
654 """Check the LU result against the contract in the opcode.
655
656 """
657 resultcheck_fn = op.OP_RESULT
658 if not (resultcheck_fn is None or resultcheck_fn(result)):
659 logging.error("Expected opcode result matching %s, got %s",
660 resultcheck_fn, result)
661 if not getattr(op, "dry_run", False):
662 # FIXME: LUs should still behave in dry_run mode, or
663 # alternately we should have OP_DRYRUN_RESULT; in the
664 # meantime, we simply skip the OP_RESULT check in dry-run mode
665 raise errors.OpResultError("Opcode result does not match %s: %s" %
666 (resultcheck_fn, utils.Truncate(result, 80)))
667
668 def ExecOpCode(self, op, cbs, timeout=None):
669 """Execute an opcode.
670
671 @type op: an OpCode instance
672 @param op: the opcode to be executed
673 @type cbs: L{OpExecCbBase}
674 @param cbs: Runtime callbacks
675 @type timeout: float or None
676 @param timeout: Maximum time to acquire all locks, None for no timeout
677 @raise LockAcquireTimeout: In case locks couldn't be acquired in specified
678 amount of time
679
680 """
681 if not isinstance(op, opcodes.OpCode):
682 raise errors.ProgrammerError("Non-opcode instance passed"
683 " to ExecOpcode (%s)" % type(op))
684
685 lu_class = self.DISPATCH_TABLE.get(op.__class__, None)
686 if lu_class is None:
687 raise errors.OpCodeUnknown("Unknown opcode")
688
689 if timeout is None:
690 calc_timeout = lambda: None
691 else:
692 calc_timeout = utils.RunningTimeout(timeout, False).Remaining
693
694 self._cbs = cbs
695 try:
696 if self._enable_locks:
697 # Acquire the Big Ganeti Lock exclusively if this LU requires it,
698 # and in a shared fashion otherwise (to prevent concurrent run with
699 # an exclusive LU.
700 self._AcquireLocks(locking.LEVEL_CLUSTER, locking.BGL,
701 not lu_class.REQ_BGL, False, calc_timeout())
702 elif lu_class.REQ_BGL:
703 raise errors.ProgrammerError("Opcode '%s' requires BGL, but locks are"
704 " disabled" % op.OP_ID)
705
706 lu = lu_class(self, op, self.cfg, self.rpc,
707 self._wconfdcontext, self.wconfd)
708 lu.wconfdlocks = self.wconfd.Client().ListLocks(self._wconfdcontext)
709 _CheckSecretParameters(op)
710 lu.ExpandNames()
711 assert lu.needed_locks is not None, "needed_locks not set by LU"
712
713 try:
714 result = self._LockAndExecLU(lu, locking.LEVEL_CLUSTER + 1,
715 calc_timeout)
716 finally:
717 if self._ec_id:
718 self.cfg.DropECReservations(self._ec_id)
719 finally:
720 self.wconfd.Client().FreeLocksLevel(
721 self._wconfdcontext, locking.LEVEL_NAMES[locking.LEVEL_CLUSTER])
722 self._cbs = None
723
724 self._CheckLUResult(op, result)
725
726 return result
727
728 def Log(self, *args):
729 """Forward call to feedback callback function.
730
731 """
732 if self._cbs:
733 self._cbs.Feedback(*args)
734
735 def LogStep(self, current, total, message):
736 """Log a change in LU execution progress.
737
738 """
739 logging.debug("Step %d/%d %s", current, total, message)
740 self.Log("STEP %d/%d %s" % (current, total, message))
741
742 def LogWarning(self, message, *args, **kwargs):
743 """Log a warning to the logs and the user.
744
745 The optional keyword argument is 'hint' and can be used to show a
746 hint to the user (presumably related to the warning). If the
747 message is empty, it will not be printed at all, allowing one to
748 show only a hint.
749
750 """
751 assert not kwargs or (len(kwargs) == 1 and "hint" in kwargs), \
752 "Invalid keyword arguments for LogWarning (%s)" % str(kwargs)
753 if args:
754 message = message % tuple(args)
755 if message:
756 logging.warning(message)
757 self.Log(" - WARNING: %s" % message)
758 if "hint" in kwargs:
759 self.Log(" Hint: %s" % kwargs["hint"])
760
761 def LogInfo(self, message, *args):
762 """Log an informational message to the logs and the user.
763
764 """
765 if args:
766 message = message % tuple(args)
767 logging.info(message)
768 self.Log(" - INFO: %s" % message)
769
770 def GetECId(self):
771 """Returns the current execution context ID.
772
773 """
774 if not self._ec_id:
775 raise errors.ProgrammerError("Tried to use execution context id when"
776 " not set")
777 return self._ec_id