Merge branch 'stable-2.16' into stable-2.17
[ganeti-github.git] / lib / locking.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions are
9 # met:
10 #
11 # 1. Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
13 #
14 # 2. Redistributions in binary form must reproduce the above copyright
15 # notice, this list of conditions and the following disclaimer in the
16 # documentation and/or other materials provided with the distribution.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
19 # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
20 # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30 """Module implementing the Ganeti locking code."""
31
32 # pylint: disable=W0212
33
34 # W0212 since e.g. LockSet methods use (a lot) the internals of
35 # SharedLock
36
37 import os
38 import select
39 import threading
40 import errno
41 import logging
42 import heapq
43 import time
44
45 from ganeti import errors
46 from ganeti import utils
47 from ganeti import compat
48 from ganeti import query
49
50
51 _EXCLUSIVE_TEXT = "exclusive"
52 _SHARED_TEXT = "shared"
53 _DELETED_TEXT = "deleted"
54
55 _DEFAULT_PRIORITY = 0
56
57 #: Minimum timeout required to consider scheduling a pending acquisition
58 #: (seconds)
59 _LOCK_ACQUIRE_MIN_TIMEOUT = (1.0 / 1000)
60
61
62 def ssynchronized(mylock, shared=0):
63 """Shared Synchronization decorator.
64
65 Calls the function holding the given lock, either in exclusive or shared
66 mode. It requires the passed lock to be a SharedLock (or support its
67 semantics).
68
69 @type mylock: lockable object or string
70 @param mylock: lock to acquire or class member name of the lock to acquire
71
72 """
73 def wrap(fn):
74 def sync_function(*args, **kwargs):
75 if isinstance(mylock, basestring):
76 assert args, "cannot ssynchronize on non-class method: self not found"
77 # args[0] is "self"
78 lock = getattr(args[0], mylock)
79 else:
80 lock = mylock
81 lock.acquire(shared=shared)
82 try:
83 return fn(*args, **kwargs)
84 finally:
85 lock.release()
86 return sync_function
87 return wrap
88
89
90 class _SingleNotifyPipeConditionWaiter(object):
91 """Helper class for SingleNotifyPipeCondition
92
93 """
94 __slots__ = [
95 "_fd",
96 ]
97
98 def __init__(self, fd):
99 """Constructor for _SingleNotifyPipeConditionWaiter
100
101 @type fd: int
102 @param fd: File descriptor to wait for
103
104 """
105 object.__init__(self)
106 self._fd = fd
107
108 def __call__(self, timeout):
109 """Wait for something to happen on the pipe.
110
111 @type timeout: float or None
112 @param timeout: Timeout for waiting (can be None)
113
114 """
115 running_timeout = utils.RunningTimeout(timeout, True)
116 poller = select.poll()
117 poller.register(self._fd, select.POLLHUP)
118
119 while True:
120 remaining_time = running_timeout.Remaining()
121
122 if remaining_time is not None:
123 if remaining_time < 0.0:
124 break
125
126 # Our calculation uses seconds, poll() wants milliseconds
127 remaining_time *= 1000
128
129 try:
130 result = poller.poll(remaining_time)
131 except EnvironmentError, err:
132 if err.errno != errno.EINTR:
133 raise
134 result = None
135
136 # Check whether we were notified
137 if result and result[0][0] == self._fd:
138 break
139
140
141 class _BaseCondition(object):
142 """Base class containing common code for conditions.
143
144 Some of this code is taken from python's threading module.
145
146 """
147 __slots__ = [
148 "_lock",
149 "acquire",
150 "release",
151 "_is_owned",
152 "_acquire_restore",
153 "_release_save",
154 ]
155
156 def __init__(self, lock):
157 """Constructor for _BaseCondition.
158
159 @type lock: threading.Lock
160 @param lock: condition base lock
161
162 """
163 object.__init__(self)
164
165 try:
166 self._release_save = lock._release_save
167 except AttributeError:
168 self._release_save = self._base_release_save
169 try:
170 self._acquire_restore = lock._acquire_restore
171 except AttributeError:
172 self._acquire_restore = self._base_acquire_restore
173 try:
174 self._is_owned = lock.is_owned
175 except AttributeError:
176 self._is_owned = self._base_is_owned
177
178 self._lock = lock
179
180 # Export the lock's acquire() and release() methods
181 self.acquire = lock.acquire
182 self.release = lock.release
183
184 def _base_is_owned(self):
185 """Check whether lock is owned by current thread.
186
187 """
188 if self._lock.acquire(0):
189 self._lock.release()
190 return False
191 return True
192
193 def _base_release_save(self):
194 self._lock.release()
195
196 def _base_acquire_restore(self, _):
197 self._lock.acquire()
198
199 def _check_owned(self):
200 """Raise an exception if the current thread doesn't own the lock.
201
202 """
203 if not self._is_owned():
204 raise RuntimeError("cannot work with un-aquired lock")
205
206
207 class SingleNotifyPipeCondition(_BaseCondition):
208 """Condition which can only be notified once.
209
210 This condition class uses pipes and poll, internally, to be able to wait for
211 notification with a timeout, without resorting to polling. It is almost
212 compatible with Python's threading.Condition, with the following differences:
213 - notifyAll can only be called once, and no wait can happen after that
214 - notify is not supported, only notifyAll
215
216 """
217
218 __slots__ = [
219 "_read_fd",
220 "_write_fd",
221 "_nwaiters",
222 "_notified",
223 ]
224
225 _waiter_class = _SingleNotifyPipeConditionWaiter
226
227 def __init__(self, lock):
228 """Constructor for SingleNotifyPipeCondition
229
230 """
231 _BaseCondition.__init__(self, lock)
232 self._nwaiters = 0
233 self._notified = False
234 self._read_fd = None
235 self._write_fd = None
236
237 def _check_unnotified(self):
238 """Throws an exception if already notified.
239
240 """
241 if self._notified:
242 raise RuntimeError("cannot use already notified condition")
243
244 def _Cleanup(self):
245 """Cleanup open file descriptors, if any.
246
247 """
248 if self._read_fd is not None:
249 os.close(self._read_fd)
250 self._read_fd = None
251
252 if self._write_fd is not None:
253 os.close(self._write_fd)
254 self._write_fd = None
255
256 def wait(self, timeout):
257 """Wait for a notification.
258
259 @type timeout: float or None
260 @param timeout: Waiting timeout (can be None)
261
262 """
263 self._check_owned()
264 self._check_unnotified()
265
266 self._nwaiters += 1
267 try:
268 if self._read_fd is None:
269 (self._read_fd, self._write_fd) = os.pipe()
270
271 wait_fn = self._waiter_class(self._read_fd)
272 state = self._release_save()
273 try:
274 # Wait for notification
275 wait_fn(timeout)
276 finally:
277 # Re-acquire lock
278 self._acquire_restore(state)
279 finally:
280 self._nwaiters -= 1
281 if self._nwaiters == 0:
282 self._Cleanup()
283
284 def notifyAll(self): # pylint: disable=C0103
285 """Close the writing side of the pipe to notify all waiters.
286
287 """
288 self._check_owned()
289 self._check_unnotified()
290 self._notified = True
291 if self._write_fd is not None:
292 os.close(self._write_fd)
293 self._write_fd = None
294
295
296 class PipeCondition(_BaseCondition):
297 """Group-only non-polling condition with counters.
298
299 This condition class uses pipes and poll, internally, to be able to wait for
300 notification with a timeout, without resorting to polling. It is almost
301 compatible with Python's threading.Condition, but only supports notifyAll and
302 non-recursive locks. As an additional features it's able to report whether
303 there are any waiting threads.
304
305 """
306 __slots__ = [
307 "_waiters",
308 "_single_condition",
309 ]
310
311 _single_condition_class = SingleNotifyPipeCondition
312
313 def __init__(self, lock):
314 """Initializes this class.
315
316 """
317 _BaseCondition.__init__(self, lock)
318 self._waiters = set()
319 self._single_condition = self._single_condition_class(self._lock)
320
321 def wait(self, timeout):
322 """Wait for a notification.
323
324 @type timeout: float or None
325 @param timeout: Waiting timeout (can be None)
326
327 """
328 self._check_owned()
329
330 # Keep local reference to the pipe. It could be replaced by another thread
331 # notifying while we're waiting.
332 cond = self._single_condition
333
334 self._waiters.add(threading.currentThread())
335 try:
336 cond.wait(timeout)
337 finally:
338 self._check_owned()
339 self._waiters.remove(threading.currentThread())
340
341 def notifyAll(self): # pylint: disable=C0103
342 """Notify all currently waiting threads.
343
344 """
345 self._check_owned()
346 self._single_condition.notifyAll()
347 self._single_condition = self._single_condition_class(self._lock)
348
349 def get_waiting(self):
350 """Returns a list of all waiting threads.
351
352 """
353 self._check_owned()
354
355 return self._waiters
356
357 def has_waiting(self):
358 """Returns whether there are active waiters.
359
360 """
361 self._check_owned()
362
363 return bool(self._waiters)
364
365 def __repr__(self):
366 return ("<%s.%s waiters=%s at %#x>" %
367 (self.__class__.__module__, self.__class__.__name__,
368 self._waiters, id(self)))
369
370
371 class _PipeConditionWithMode(PipeCondition):
372 __slots__ = [
373 "shared",
374 ]
375
376 def __init__(self, lock, shared):
377 """Initializes this class.
378
379 """
380 self.shared = shared
381 PipeCondition.__init__(self, lock)
382
383
384 class SharedLock(object):
385 """Implements a shared lock.
386
387 Multiple threads can acquire the lock in a shared way by calling
388 C{acquire(shared=1)}. In order to acquire the lock in an exclusive way
389 threads can call C{acquire(shared=0)}.
390
391 Notes on data structures: C{__pending} contains a priority queue (heapq) of
392 all pending acquires: C{[(priority1: prioqueue1), (priority2: prioqueue2),
393 ...]}. Each per-priority queue contains a normal in-order list of conditions
394 to be notified when the lock can be acquired. Shared locks are grouped
395 together by priority and the condition for them is stored in
396 C{__pending_shared} if it already exists. C{__pending_by_prio} keeps
397 references for the per-priority queues indexed by priority for faster access.
398
399 @type name: string
400 @ivar name: the name of the lock
401
402 """
403 __slots__ = [
404 "__weakref__",
405 "__deleted",
406 "__exc",
407 "__lock",
408 "__pending",
409 "__pending_by_prio",
410 "__pending_shared",
411 "__shr",
412 "__time_fn",
413 "name",
414 ]
415
416 __condition_class = _PipeConditionWithMode
417
418 def __init__(self, name, monitor=None, _time_fn=time.time):
419 """Construct a new SharedLock.
420
421 @param name: the name of the lock
422 @param monitor: Lock monitor with which to register
423
424 """
425 object.__init__(self)
426
427 self.name = name
428
429 # Used for unittesting
430 self.__time_fn = _time_fn
431
432 # Internal lock
433 self.__lock = threading.Lock()
434
435 # Queue containing waiting acquires
436 self.__pending = []
437 self.__pending_by_prio = {}
438 self.__pending_shared = {}
439
440 # Current lock holders
441 self.__shr = set()
442 self.__exc = None
443
444 # is this lock in the deleted state?
445 self.__deleted = False
446
447 # Register with lock monitor
448 if monitor:
449 logging.debug("Adding lock %s to monitor", name)
450 monitor.RegisterLock(self)
451
452 def __repr__(self):
453 return ("<%s.%s name=%s at %#x>" %
454 (self.__class__.__module__, self.__class__.__name__,
455 self.name, id(self)))
456
457 def GetLockInfo(self, requested):
458 """Retrieves information for querying locks.
459
460 @type requested: set
461 @param requested: Requested information, see C{query.LQ_*}
462
463 """
464 self.__lock.acquire()
465 try:
466 # Note: to avoid unintentional race conditions, no references to
467 # modifiable objects should be returned unless they were created in this
468 # function.
469 mode = None
470 owner_names = None
471
472 if query.LQ_MODE in requested:
473 if self.__deleted:
474 mode = _DELETED_TEXT
475 assert not (self.__exc or self.__shr)
476 elif self.__exc:
477 mode = _EXCLUSIVE_TEXT
478 elif self.__shr:
479 mode = _SHARED_TEXT
480
481 # Current owner(s) are wanted
482 if query.LQ_OWNER in requested:
483 if self.__exc:
484 owner = [self.__exc]
485 else:
486 owner = self.__shr
487
488 if owner:
489 assert not self.__deleted
490 owner_names = [i.getName() for i in owner]
491
492 # Pending acquires are wanted
493 if query.LQ_PENDING in requested:
494 pending = []
495
496 # Sorting instead of copying and using heaq functions for simplicity
497 for (_, prioqueue) in sorted(self.__pending):
498 for cond in prioqueue:
499 if cond.shared:
500 pendmode = _SHARED_TEXT
501 else:
502 pendmode = _EXCLUSIVE_TEXT
503
504 # List of names will be sorted in L{query._GetLockPending}
505 pending.append((pendmode, [i.getName()
506 for i in cond.get_waiting()]))
507 else:
508 pending = None
509
510 return [(self.name, mode, owner_names, pending)]
511 finally:
512 self.__lock.release()
513
514 def __check_deleted(self):
515 """Raises an exception if the lock has been deleted.
516
517 """
518 if self.__deleted:
519 raise errors.LockError("Deleted lock %s" % self.name)
520
521 def __is_sharer(self):
522 """Is the current thread sharing the lock at this time?
523
524 """
525 return threading.currentThread() in self.__shr
526
527 def __is_exclusive(self):
528 """Is the current thread holding the lock exclusively at this time?
529
530 """
531 return threading.currentThread() == self.__exc
532
533 def __is_owned(self, shared=-1):
534 """Is the current thread somehow owning the lock at this time?
535
536 This is a private version of the function, which presumes you're holding
537 the internal lock.
538
539 """
540 if shared < 0:
541 return self.__is_sharer() or self.__is_exclusive()
542 elif shared:
543 return self.__is_sharer()
544 else:
545 return self.__is_exclusive()
546
547 def is_owned(self, shared=-1):
548 """Is the current thread somehow owning the lock at this time?
549
550 @param shared:
551 - < 0: check for any type of ownership (default)
552 - 0: check for exclusive ownership
553 - > 0: check for shared ownership
554
555 """
556 self.__lock.acquire()
557 try:
558 return self.__is_owned(shared=shared)
559 finally:
560 self.__lock.release()
561
562 #: Necessary to remain compatible with threading.Condition, which tries to
563 #: retrieve a locks' "_is_owned" attribute
564 _is_owned = is_owned
565
566 def _count_pending(self):
567 """Returns the number of pending acquires.
568
569 @rtype: int
570
571 """
572 self.__lock.acquire()
573 try:
574 return sum(len(prioqueue) for (_, prioqueue) in self.__pending)
575 finally:
576 self.__lock.release()
577
578 def _check_empty(self):
579 """Checks whether there are any pending acquires.
580
581 @rtype: bool
582
583 """
584 self.__lock.acquire()
585 try:
586 # Order is important: __find_first_pending_queue modifies __pending
587 (_, prioqueue) = self.__find_first_pending_queue()
588
589 return not (prioqueue or
590 self.__pending or
591 self.__pending_by_prio or
592 self.__pending_shared)
593 finally:
594 self.__lock.release()
595
596 def __do_acquire(self, shared):
597 """Actually acquire the lock.
598
599 """
600 if shared:
601 self.__shr.add(threading.currentThread())
602 else:
603 self.__exc = threading.currentThread()
604
605 def __can_acquire(self, shared):
606 """Determine whether lock can be acquired.
607
608 """
609 if shared:
610 return self.__exc is None
611 else:
612 return len(self.__shr) == 0 and self.__exc is None
613
614 def __find_first_pending_queue(self):
615 """Tries to find the topmost queued entry with pending acquires.
616
617 Removes empty entries while going through the list.
618
619 """
620 while self.__pending:
621 (priority, prioqueue) = self.__pending[0]
622
623 if prioqueue:
624 return (priority, prioqueue)
625
626 # Remove empty queue
627 heapq.heappop(self.__pending)
628 del self.__pending_by_prio[priority]
629 assert priority not in self.__pending_shared
630
631 return (None, None)
632
633 def __is_on_top(self, cond):
634 """Checks whether the passed condition is on top of the queue.
635
636 The caller must make sure the queue isn't empty.
637
638 """
639 (_, prioqueue) = self.__find_first_pending_queue()
640
641 return cond == prioqueue[0]
642
643 def __acquire_unlocked(self, shared, timeout, priority):
644 """Acquire a shared lock.
645
646 @param shared: whether to acquire in shared mode; by default an
647 exclusive lock will be acquired
648 @param timeout: maximum waiting time before giving up
649 @type priority: integer
650 @param priority: Priority for acquiring lock
651
652 """
653 self.__check_deleted()
654
655 # We cannot acquire the lock if we already have it
656 assert not self.__is_owned(), ("double acquire() on a non-recursive lock"
657 " %s" % self.name)
658
659 # Remove empty entries from queue
660 self.__find_first_pending_queue()
661
662 # Check whether someone else holds the lock or there are pending acquires.
663 if not self.__pending and self.__can_acquire(shared):
664 # Apparently not, can acquire lock directly.
665 self.__do_acquire(shared)
666 return True
667
668 # The lock couldn't be acquired right away, so if a timeout is given and is
669 # considered too short, return right away as scheduling a pending
670 # acquisition is quite expensive
671 if timeout is not None and timeout < _LOCK_ACQUIRE_MIN_TIMEOUT:
672 return False
673
674 prioqueue = self.__pending_by_prio.get(priority, None)
675
676 if shared:
677 # Try to re-use condition for shared acquire
678 wait_condition = self.__pending_shared.get(priority, None)
679 assert (wait_condition is None or
680 (wait_condition.shared and wait_condition in prioqueue))
681 else:
682 wait_condition = None
683
684 if wait_condition is None:
685 if prioqueue is None:
686 assert priority not in self.__pending_by_prio
687
688 prioqueue = []
689 heapq.heappush(self.__pending, (priority, prioqueue))
690 self.__pending_by_prio[priority] = prioqueue
691
692 wait_condition = self.__condition_class(self.__lock, shared)
693 prioqueue.append(wait_condition)
694
695 if shared:
696 # Keep reference for further shared acquires on same priority. This is
697 # better than trying to find it in the list of pending acquires.
698 assert priority not in self.__pending_shared
699 self.__pending_shared[priority] = wait_condition
700
701 wait_start = self.__time_fn()
702 acquired = False
703
704 try:
705 # Wait until we become the topmost acquire in the queue or the timeout
706 # expires.
707 while True:
708 if self.__is_on_top(wait_condition) and self.__can_acquire(shared):
709 self.__do_acquire(shared)
710 acquired = True
711 break
712
713 # A lot of code assumes blocking acquires always succeed, therefore we
714 # can never return False for a blocking acquire
715 if (timeout is not None and
716 utils.TimeoutExpired(wait_start, timeout, _time_fn=self.__time_fn)):
717 break
718
719 # Wait for notification
720 wait_condition.wait(timeout)
721 self.__check_deleted()
722 finally:
723 # Remove condition from queue if there are no more waiters
724 if not wait_condition.has_waiting():
725 prioqueue.remove(wait_condition)
726 if wait_condition.shared:
727 # Remove from list of shared acquires if it wasn't while releasing
728 # (e.g. on lock deletion)
729 self.__pending_shared.pop(priority, None)
730
731 return acquired
732
733 def acquire(self, shared=0, timeout=None, priority=None,
734 test_notify=None):
735 """Acquire a shared lock.
736
737 @type shared: integer (0/1) used as a boolean
738 @param shared: whether to acquire in shared mode; by default an
739 exclusive lock will be acquired
740 @type timeout: float
741 @param timeout: maximum waiting time before giving up
742 @type priority: integer
743 @param priority: Priority for acquiring lock
744 @type test_notify: callable or None
745 @param test_notify: Special callback function for unittesting
746
747 """
748 if priority is None:
749 priority = _DEFAULT_PRIORITY
750
751 self.__lock.acquire()
752 try:
753 # We already got the lock, notify now
754 if __debug__ and callable(test_notify):
755 test_notify()
756
757 return self.__acquire_unlocked(shared, timeout, priority)
758 finally:
759 self.__lock.release()
760
761 def downgrade(self):
762 """Changes the lock mode from exclusive to shared.
763
764 Pending acquires in shared mode on the same priority will go ahead.
765
766 """
767 self.__lock.acquire()
768 try:
769 assert self.__is_owned(), "Lock must be owned"
770
771 if self.__is_exclusive():
772 # Do nothing if the lock is already acquired in shared mode
773 self.__exc = None
774 self.__do_acquire(1)
775
776 # Important: pending shared acquires should only jump ahead if there
777 # was a transition from exclusive to shared, otherwise an owner of a
778 # shared lock can keep calling this function to push incoming shared
779 # acquires
780 (priority, prioqueue) = self.__find_first_pending_queue()
781 if prioqueue:
782 # Is there a pending shared acquire on this priority?
783 cond = self.__pending_shared.pop(priority, None)
784 if cond:
785 assert cond.shared
786 assert cond in prioqueue
787
788 # Ensure shared acquire is on top of queue
789 if len(prioqueue) > 1:
790 prioqueue.remove(cond)
791 prioqueue.insert(0, cond)
792
793 # Notify
794 cond.notifyAll()
795
796 assert not self.__is_exclusive()
797 assert self.__is_sharer()
798
799 return True
800 finally:
801 self.__lock.release()
802
803 def release(self):
804 """Release a Shared Lock.
805
806 You must have acquired the lock, either in shared or in exclusive mode,
807 before calling this function.
808
809 """
810 self.__lock.acquire()
811 try:
812 assert self.__is_exclusive() or self.__is_sharer(), \
813 "Cannot release non-owned lock"
814
815 # Autodetect release type
816 if self.__is_exclusive():
817 self.__exc = None
818 notify = True
819 else:
820 self.__shr.remove(threading.currentThread())
821 notify = not self.__shr
822
823 # Notify topmost condition in queue if there are no owners left (for
824 # shared locks)
825 if notify:
826 self.__notify_topmost()
827 finally:
828 self.__lock.release()
829
830 def __notify_topmost(self):
831 """Notifies topmost condition in queue of pending acquires.
832
833 """
834 (priority, prioqueue) = self.__find_first_pending_queue()
835 if prioqueue:
836 cond = prioqueue[0]
837 cond.notifyAll()
838 if cond.shared:
839 # Prevent further shared acquires from sneaking in while waiters are
840 # notified
841 self.__pending_shared.pop(priority, None)
842
843 def _notify_topmost(self):
844 """Exported version of L{__notify_topmost}.
845
846 """
847 self.__lock.acquire()
848 try:
849 return self.__notify_topmost()
850 finally:
851 self.__lock.release()
852
853 def delete(self, timeout=None, priority=None):
854 """Delete a Shared Lock.
855
856 This operation will declare the lock for removal. First the lock will be
857 acquired in exclusive mode if you don't already own it, then the lock
858 will be put in a state where any future and pending acquire() fail.
859
860 @type timeout: float
861 @param timeout: maximum waiting time before giving up
862 @type priority: integer
863 @param priority: Priority for acquiring lock
864
865 """
866 if priority is None:
867 priority = _DEFAULT_PRIORITY
868
869 self.__lock.acquire()
870 try:
871 assert not self.__is_sharer(), "Cannot delete() a lock while sharing it"
872
873 self.__check_deleted()
874
875 # The caller is allowed to hold the lock exclusively already.
876 acquired = self.__is_exclusive()
877
878 if not acquired:
879 acquired = self.__acquire_unlocked(0, timeout, priority)
880
881 if acquired:
882 assert self.__is_exclusive() and not self.__is_sharer(), \
883 "Lock wasn't acquired in exclusive mode"
884
885 self.__deleted = True
886 self.__exc = None
887
888 assert not (self.__exc or self.__shr), "Found owner during deletion"
889
890 # Notify all acquires. They'll throw an error.
891 for (_, prioqueue) in self.__pending:
892 for cond in prioqueue:
893 cond.notifyAll()
894
895 assert self.__deleted
896
897 return acquired
898 finally:
899 self.__lock.release()
900
901 def _release_save(self):
902 shared = self.__is_sharer()
903 self.release()
904 return shared
905
906 def _acquire_restore(self, shared):
907 self.acquire(shared=shared)
908
909
910 # Whenever we want to acquire a full LockSet we pass None as the value
911 # to acquire. Hide this behind this nicely named constant.
912 ALL_SET = None
913
914 LOCKSET_NAME = "[lockset]"
915
916
917 def _TimeoutZero():
918 """Returns the number zero.
919
920 """
921 return 0
922
923
924 class _AcquireTimeout(Exception):
925 """Internal exception to abort an acquire on a timeout.
926
927 """
928
929
930 # Locking levels, must be acquired in increasing order. Current rules are:
931 # - At level LEVEL_CLUSTER resides the Big Ganeti Lock (BGL) which must be
932 # acquired before performing any operation, either in shared or exclusive
933 # mode. Acquiring the BGL in exclusive mode is discouraged and should be
934 # avoided..
935 # - At levels LEVEL_NODE and LEVEL_INSTANCE reside node and instance locks. If
936 # you need more than one node, or more than one instance, acquire them at the
937 # same time.
938 # - LEVEL_NODE_RES is for node resources and should be used by operations with
939 # possibly high impact on the node's disks.
940 (LEVEL_CLUSTER,
941 LEVEL_INSTANCE,
942 LEVEL_NODEGROUP,
943 LEVEL_NODE,
944 LEVEL_NODE_RES,
945 LEVEL_NETWORK) = range(0, 6)
946
947 LEVELS = [
948 LEVEL_CLUSTER,
949 LEVEL_INSTANCE,
950 LEVEL_NODEGROUP,
951 LEVEL_NODE,
952 LEVEL_NODE_RES,
953 LEVEL_NETWORK,
954 ]
955
956 # Lock levels which are modifiable
957 LEVELS_MOD = compat.UniqueFrozenset([
958 LEVEL_NODE_RES,
959 LEVEL_NODE,
960 LEVEL_NODEGROUP,
961 LEVEL_INSTANCE,
962 LEVEL_NETWORK,
963 ])
964
965 #: Lock level names (make sure to use singular form)
966 LEVEL_NAMES = {
967 LEVEL_CLUSTER: "cluster",
968 LEVEL_INSTANCE: "instance",
969 LEVEL_NODEGROUP: "nodegroup",
970 LEVEL_NODE: "node",
971 LEVEL_NODE_RES: "node-res",
972 LEVEL_NETWORK: "network",
973 }
974
975 # Constant for the big ganeti lock
976 BGL = "BGL"