Merge branch 'stable-2.16' into stable-2.17
[ganeti-github.git] / lib / workerpool.py
1 #
2 #
3
4 # Copyright (C) 2008, 2009, 2010 Google Inc.
5 # All rights reserved.
6 #
7 # Redistribution and use in source and binary forms, with or without
8 # modification, are permitted provided that the following conditions are
9 # met:
10 #
11 # 1. Redistributions of source code must retain the above copyright notice,
12 # this list of conditions and the following disclaimer.
13 #
14 # 2. Redistributions in binary form must reproduce the above copyright
15 # notice, this list of conditions and the following disclaimer in the
16 # documentation and/or other materials provided with the distribution.
17 #
18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
19 # IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
20 # TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
21 # PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
22 # CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
23 # EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
24 # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
25 # PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
26 # LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
27 # NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
28 # SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29
30
31 """Base classes for worker pools.
32
33 """
34
35 import logging
36 import threading
37 import heapq
38 import itertools
39
40 from ganeti import compat
41 from ganeti import errors
42
43
44 _TERMINATE = object()
45 _DEFAULT_PRIORITY = 0
46
47
48 class DeferTask(Exception):
49 """Special exception class to defer a task.
50
51 This class can be raised by L{BaseWorker.RunTask} to defer the execution of a
52 task. Optionally, the priority of the task can be changed.
53
54 """
55 def __init__(self, priority=None):
56 """Initializes this class.
57
58 @type priority: number
59 @param priority: New task priority (None means no change)
60
61 """
62 Exception.__init__(self)
63 self.priority = priority
64
65
66 class NoSuchTask(Exception):
67 """Exception raised when a task can't be found.
68
69 """
70
71
72 class BaseWorker(threading.Thread, object):
73 """Base worker class for worker pools.
74
75 Users of a worker pool must override RunTask in a subclass.
76
77 """
78 # pylint: disable=W0212
79 def __init__(self, pool, worker_id):
80 """Constructor for BaseWorker thread.
81
82 @param pool: the parent worker pool
83 @param worker_id: identifier for this worker
84
85 """
86 super(BaseWorker, self).__init__(name=worker_id)
87 self.pool = pool
88 self._worker_id = worker_id
89 self._current_task = None
90
91 assert self.getName() == worker_id
92
93 def ShouldTerminate(self):
94 """Returns whether this worker should terminate.
95
96 Should only be called from within L{RunTask}.
97
98 """
99 self.pool._lock.acquire()
100 try:
101 assert self._HasRunningTaskUnlocked()
102 return self.pool._ShouldWorkerTerminateUnlocked(self)
103 finally:
104 self.pool._lock.release()
105
106 def GetCurrentPriority(self):
107 """Returns the priority of the current task.
108
109 Should only be called from within L{RunTask}.
110
111 """
112 self.pool._lock.acquire()
113 try:
114 assert self._HasRunningTaskUnlocked()
115
116 (priority, _, _, _) = self._current_task
117
118 return priority
119 finally:
120 self.pool._lock.release()
121
122 def SetTaskName(self, taskname):
123 """Sets the name of the current task.
124
125 Should only be called from within L{RunTask}.
126
127 @type taskname: string
128 @param taskname: Task's name
129
130 """
131 if taskname:
132 name = "%s/%s" % (self._worker_id, taskname)
133 else:
134 name = self._worker_id
135
136 # Set thread name
137 self.setName(name)
138
139 def _HasRunningTaskUnlocked(self):
140 """Returns whether this worker is currently running a task.
141
142 """
143 return (self._current_task is not None)
144
145 def _GetCurrentOrderAndTaskId(self):
146 """Returns the order and task ID of the current task.
147
148 Should only be called from within L{RunTask}.
149
150 """
151 self.pool._lock.acquire()
152 try:
153 assert self._HasRunningTaskUnlocked()
154
155 (_, order_id, task_id, _) = self._current_task
156
157 return (order_id, task_id)
158 finally:
159 self.pool._lock.release()
160
161 def run(self):
162 """Main thread function.
163
164 Waits for new tasks to show up in the queue.
165
166 """
167 pool = self.pool
168
169 while True:
170 assert self._current_task is None
171
172 defer = None
173 try:
174 # Wait on lock to be told either to terminate or to do a task
175 pool._lock.acquire()
176 try:
177 task = pool._WaitForTaskUnlocked(self)
178
179 if task is _TERMINATE:
180 # Told to terminate
181 break
182
183 if task is None:
184 # Spurious notification, ignore
185 continue
186
187 self._current_task = task
188
189 # No longer needed, dispose of reference
190 del task
191
192 assert self._HasRunningTaskUnlocked()
193
194 finally:
195 pool._lock.release()
196
197 (priority, _, _, args) = self._current_task
198 try:
199 # Run the actual task
200 assert defer is None
201 logging.debug("Starting task %r, priority %s", args, priority)
202 assert self.getName() == self._worker_id
203 try:
204 self.RunTask(*args)
205 finally:
206 self.SetTaskName(None)
207 logging.debug("Done with task %r, priority %s", args, priority)
208 except DeferTask, err:
209 defer = err
210
211 if defer.priority is None:
212 # Use same priority
213 defer.priority = priority
214
215 logging.debug("Deferring task %r, new priority %s",
216 args, defer.priority)
217
218 assert self._HasRunningTaskUnlocked()
219 except: # pylint: disable=W0702
220 logging.exception("Caught unhandled exception")
221
222 assert self._HasRunningTaskUnlocked()
223 finally:
224 # Notify pool
225 pool._lock.acquire()
226 try:
227 if defer:
228 assert self._current_task
229 # Schedule again for later run
230 (_, _, task_id, args) = self._current_task
231 pool._AddTaskUnlocked(args, defer.priority, task_id)
232
233 if self._current_task:
234 self._current_task = None
235 pool._worker_to_pool.notifyAll()
236 finally:
237 pool._lock.release()
238
239 assert not self._HasRunningTaskUnlocked()
240
241 logging.debug("Terminates")
242
243 def RunTask(self, *args):
244 """Function called to start a task.
245
246 This needs to be implemented by child classes.
247
248 """
249 raise NotImplementedError()
250
251
252 class WorkerPool(object):
253 """Worker pool with a queue.
254
255 This class is thread-safe.
256
257 Tasks are guaranteed to be started in the order in which they're
258 added to the pool. Due to the nature of threading, they're not
259 guaranteed to finish in the same order.
260
261 @type _tasks: list of tuples
262 @ivar _tasks: Each tuple has the format (priority, order ID, task ID,
263 arguments). Priority and order ID are numeric and essentially control the
264 sort order. The order ID is an increasing number denoting the order in
265 which tasks are added to the queue. The task ID is controlled by user of
266 workerpool, see L{AddTask} for details. The task arguments are C{None} for
267 abandoned tasks, otherwise a sequence of arguments to be passed to
268 L{BaseWorker.RunTask}). The list must fulfill the heap property (for use by
269 the C{heapq} module).
270 @type _taskdata: dict; (task IDs as keys, tuples as values)
271 @ivar _taskdata: Mapping from task IDs to entries in L{_tasks}
272
273 """
274 def __init__(self, name, num_workers, worker_class):
275 """Constructor for worker pool.
276
277 @param num_workers: number of workers to be started
278 (dynamic resizing is not yet implemented)
279 @param worker_class: the class to be instantiated for workers;
280 should derive from L{BaseWorker}
281
282 """
283 # Some of these variables are accessed by BaseWorker
284 self._lock = threading.Lock()
285 self._pool_to_pool = threading.Condition(self._lock)
286 self._pool_to_worker = threading.Condition(self._lock)
287 self._worker_to_pool = threading.Condition(self._lock)
288 self._worker_class = worker_class
289 self._name = name
290 self._last_worker_id = 0
291 self._workers = []
292 self._quiescing = False
293
294 # Terminating workers
295 self._termworkers = []
296
297 # Queued tasks
298 self._counter = itertools.count()
299 self._tasks = []
300 self._taskdata = {}
301
302 # Start workers
303 self.Resize(num_workers)
304
305 # TODO: Implement dynamic resizing?
306
307 def _WaitWhileQuiescingUnlocked(self):
308 """Wait until the worker pool has finished quiescing.
309
310 """
311 while self._quiescing:
312 self._pool_to_pool.wait()
313
314 def _AddTaskUnlocked(self, args, priority, task_id):
315 """Adds a task to the internal queue.
316
317 @type args: sequence
318 @param args: Arguments passed to L{BaseWorker.RunTask}
319 @type priority: number
320 @param priority: Task priority
321 @param task_id: Task ID
322
323 """
324 assert isinstance(args, (tuple, list)), "Arguments must be a sequence"
325 assert isinstance(priority, (int, long)), "Priority must be numeric"
326 assert task_id is None or isinstance(task_id, (int, long)), \
327 "Task ID must be numeric or None"
328
329 task = [priority, self._counter.next(), task_id, args]
330
331 if task_id is not None:
332 assert task_id not in self._taskdata
333 # Keep a reference to change priority later if necessary
334 self._taskdata[task_id] = task
335
336 # A counter is used to ensure elements are processed in their incoming
337 # order. For processing they're sorted by priority and then counter.
338 heapq.heappush(self._tasks, task)
339
340 # Notify a waiting worker
341 self._pool_to_worker.notify()
342
343 def AddTask(self, args, priority=_DEFAULT_PRIORITY, task_id=None):
344 """Adds a task to the queue.
345
346 @type args: sequence
347 @param args: arguments passed to L{BaseWorker.RunTask}
348 @type priority: number
349 @param priority: Task priority
350 @param task_id: Task ID
351 @note: The task ID can be essentially anything that can be used as a
352 dictionary key. Callers, however, must ensure a task ID is unique while a
353 task is in the pool or while it might return to the pool due to deferring
354 using L{DeferTask}.
355
356 """
357 self._lock.acquire()
358 try:
359 self._WaitWhileQuiescingUnlocked()
360 self._AddTaskUnlocked(args, priority, task_id)
361 finally:
362 self._lock.release()
363
364 def AddManyTasks(self, tasks, priority=_DEFAULT_PRIORITY, task_id=None):
365 """Add a list of tasks to the queue.
366
367 @type tasks: list of tuples
368 @param tasks: list of args passed to L{BaseWorker.RunTask}
369 @type priority: number or list of numbers
370 @param priority: Priority for all added tasks or a list with the priority
371 for each task
372 @type task_id: list
373 @param task_id: List with the ID for each task
374 @note: See L{AddTask} for a note on task IDs.
375
376 """
377 assert compat.all(isinstance(task, (tuple, list)) for task in tasks), \
378 "Each task must be a sequence"
379 assert (isinstance(priority, (int, long)) or
380 compat.all(isinstance(prio, (int, long)) for prio in priority)), \
381 "Priority must be numeric or be a list of numeric values"
382 assert task_id is None or isinstance(task_id, (tuple, list)), \
383 "Task IDs must be in a sequence"
384
385 if isinstance(priority, (int, long)):
386 priority = [priority] * len(tasks)
387 elif len(priority) != len(tasks):
388 raise errors.ProgrammerError("Number of priorities (%s) doesn't match"
389 " number of tasks (%s)" %
390 (len(priority), len(tasks)))
391
392 if task_id is None:
393 task_id = [None] * len(tasks)
394 elif len(task_id) != len(tasks):
395 raise errors.ProgrammerError("Number of task IDs (%s) doesn't match"
396 " number of tasks (%s)" %
397 (len(task_id), len(tasks)))
398
399 self._lock.acquire()
400 try:
401 self._WaitWhileQuiescingUnlocked()
402
403 assert compat.all(isinstance(prio, (int, long)) for prio in priority)
404 assert len(tasks) == len(priority)
405 assert len(tasks) == len(task_id)
406
407 for (args, prio, tid) in zip(tasks, priority, task_id):
408 self._AddTaskUnlocked(args, prio, tid)
409 finally:
410 self._lock.release()
411
412 def ChangeTaskPriority(self, task_id, priority):
413 """Changes a task's priority.
414
415 @param task_id: Task ID
416 @type priority: number
417 @param priority: New task priority
418 @raise NoSuchTask: When the task referred by C{task_id} can not be found
419 (it may never have existed, may have already been processed, or is
420 currently running)
421
422 """
423 assert isinstance(priority, (int, long)), "Priority must be numeric"
424
425 self._lock.acquire()
426 try:
427 logging.debug("About to change priority of task %s to %s",
428 task_id, priority)
429
430 # Find old task
431 oldtask = self._taskdata.get(task_id, None)
432 if oldtask is None:
433 msg = "Task '%s' was not found" % task_id
434 logging.debug(msg)
435 raise NoSuchTask(msg)
436
437 # Prepare new task
438 newtask = [priority] + oldtask[1:]
439
440 # Mark old entry as abandoned (this doesn't change the sort order and
441 # therefore doesn't invalidate the heap property of L{self._tasks}).
442 # See also <http://docs.python.org/library/heapq.html#priority-queue-
443 # implementation-notes>.
444 oldtask[-1] = None
445
446 # Change reference to new task entry and forget the old one
447 assert task_id is not None
448 self._taskdata[task_id] = newtask
449
450 # Add a new task with the old number and arguments
451 heapq.heappush(self._tasks, newtask)
452
453 # Notify a waiting worker
454 self._pool_to_worker.notify()
455 finally:
456 self._lock.release()
457
458 def _WaitForTaskUnlocked(self, worker):
459 """Waits for a task for a worker.
460
461 @type worker: L{BaseWorker}
462 @param worker: Worker thread
463
464 """
465 while True:
466 if self._ShouldWorkerTerminateUnlocked(worker):
467 return _TERMINATE
468
469 # If there's a pending task, return it immediately
470 if self._tasks:
471 # Get task from queue and tell pool about it
472 try:
473 task = heapq.heappop(self._tasks)
474 finally:
475 self._worker_to_pool.notifyAll()
476
477 (_, _, task_id, args) = task
478
479 # If the priority was changed, "args" is None
480 if args is None:
481 # Try again
482 logging.debug("Found abandoned task (%r)", task)
483 continue
484
485 # Delete reference
486 if task_id is not None:
487 del self._taskdata[task_id]
488
489 return task
490
491 logging.debug("Waiting for tasks")
492
493 # wait() releases the lock and sleeps until notified
494 self._pool_to_worker.wait()
495
496 logging.debug("Notified while waiting")
497
498 def _ShouldWorkerTerminateUnlocked(self, worker):
499 """Returns whether a worker should terminate.
500
501 """
502 return (worker in self._termworkers)
503
504 def _HasRunningTasksUnlocked(self):
505 """Checks whether there's a task running in a worker.
506
507 """
508 for worker in self._workers + self._termworkers:
509 if worker._HasRunningTaskUnlocked(): # pylint: disable=W0212
510 return True
511 return False
512
513 def HasRunningTasks(self):
514 """Checks whether there's at least one task running.
515
516 """
517 self._lock.acquire()
518 try:
519 return self._HasRunningTasksUnlocked()
520 finally:
521 self._lock.release()
522
523 def Quiesce(self):
524 """Waits until the task queue is empty.
525
526 """
527 self._lock.acquire()
528 try:
529 self._quiescing = True
530
531 # Wait while there are tasks pending or running
532 while self._tasks or self._HasRunningTasksUnlocked():
533 self._worker_to_pool.wait()
534
535 finally:
536 self._quiescing = False
537
538 # Make sure AddTasks continues in case it was waiting
539 self._pool_to_pool.notifyAll()
540
541 self._lock.release()
542
543 def _NewWorkerIdUnlocked(self):
544 """Return an identifier for a new worker.
545
546 """
547 self._last_worker_id += 1
548
549 return "%s%d" % (self._name, self._last_worker_id)
550
551 def _ResizeUnlocked(self, num_workers):
552 """Changes the number of workers.
553
554 """
555 assert num_workers >= 0, "num_workers must be >= 0"
556
557 logging.debug("Resizing to %s workers", num_workers)
558
559 current_count = len(self._workers)
560
561 if current_count == num_workers:
562 # Nothing to do
563 pass
564
565 elif current_count > num_workers:
566 if num_workers == 0:
567 # Create copy of list to iterate over while lock isn't held.
568 termworkers = self._workers[:]
569 del self._workers[:]
570 else:
571 # TODO: Implement partial downsizing
572 raise NotImplementedError()
573 #termworkers = ...
574
575 self._termworkers += termworkers
576
577 # Notify workers that something has changed
578 self._pool_to_worker.notifyAll()
579
580 # Join all terminating workers
581 self._lock.release()
582 try:
583 for worker in termworkers:
584 logging.debug("Waiting for thread %s", worker.getName())
585 worker.join()
586 finally:
587 self._lock.acquire()
588
589 # Remove terminated threads. This could be done in a more efficient way
590 # (del self._termworkers[:]), but checking worker.isAlive() makes sure we
591 # don't leave zombie threads around.
592 for worker in termworkers:
593 assert worker in self._termworkers, ("Worker not in list of"
594 " terminating workers")
595 if not worker.isAlive():
596 self._termworkers.remove(worker)
597
598 assert not self._termworkers, "Zombie worker detected"
599
600 elif current_count < num_workers:
601 # Create (num_workers - current_count) new workers
602 for _ in range(num_workers - current_count):
603 worker = self._worker_class(self, self._NewWorkerIdUnlocked())
604 self._workers.append(worker)
605 worker.start()
606
607 def Resize(self, num_workers):
608 """Changes the number of workers in the pool.
609
610 @param num_workers: the new number of workers
611
612 """
613 self._lock.acquire()
614 try:
615 return self._ResizeUnlocked(num_workers)
616 finally:
617 self._lock.release()
618
619 def TerminateWorkers(self):
620 """Terminate all worker threads.
621
622 Unstarted tasks will be ignored.
623
624 """
625 logging.debug("Terminating all workers")
626
627 self._lock.acquire()
628 try:
629 self._ResizeUnlocked(0)
630
631 if self._tasks:
632 logging.debug("There are %s tasks left", len(self._tasks))
633 finally:
634 self._lock.release()
635
636 logging.debug("All workers terminated")